workaround safari not supporting readable byte streams

This commit is contained in:
Toshit Chawda 2024-07-08 21:35:28 -07:00
parent 7edb4ad76c
commit 1916a8e7c8
No known key found for this signature in database
GPG key ID: 91480ED99E2B3D9D
5 changed files with 173 additions and 91 deletions

2
Cargo.lock generated
View file

@ -511,7 +511,7 @@ checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0"
[[package]] [[package]]
name = "epoxy-client" name = "epoxy-client"
version = "2.0.4" version = "2.0.5"
dependencies = [ dependencies = [
"async-compression", "async-compression",
"async-trait", "async-trait",

View file

@ -1,6 +1,6 @@
[package] [package]
name = "epoxy-client" name = "epoxy-client"
version = "2.0.4" version = "2.0.5"
edition = "2021" edition = "2021"
[lib] [lib]

View file

@ -1,6 +1,6 @@
{ {
"name": "@mercuryworkshop/epoxy-tls", "name": "@mercuryworkshop/epoxy-tls",
"version": "2.0.4-2", "version": "2.0.5-1",
"description": "A wasm library for using raw encrypted tls/ssl/https/websocket streams on the browser", "description": "A wasm library for using raw encrypted tls/ssl/https/websocket streams on the browser",
"scripts": { "scripts": {
"build": "./build.sh" "build": "./build.sh"

View file

@ -22,8 +22,7 @@ use js_sys::{Array, Function, Object, Reflect};
use stream_provider::{StreamProvider, StreamProviderService}; use stream_provider::{StreamProvider, StreamProviderService};
use thiserror::Error; use thiserror::Error;
use utils::{ use utils::{
convert_body, entries_of_object, is_null_body, is_redirect, object_get, object_set, asyncread_to_readablestream_stream, convert_body, entries_of_object, is_null_body, is_redirect, object_get, object_set, IncomingBody, UriExt, WasmExecutor
IncomingBody, UriExt, WasmExecutor,
}; };
use wasm_bindgen::prelude::*; use wasm_bindgen::prelude::*;
use wasm_streams::ReadableStream; use wasm_streams::ReadableStream;
@ -529,14 +528,14 @@ impl EpoxyClient {
}, },
None => Either::Right(response_body), None => Either::Right(response_body),
}; };
Some(ReadableStream::from_async_read(decompressed_body, 1024).into_raw()) Some(ReadableStream::from_stream(asyncread_to_readablestream_stream(decompressed_body)).into_raw())
} else { } else {
None None
}; };
} else { } else {
let response_stream = if !is_null_body(response.status().as_u16()) { let response_stream = if !is_null_body(response.status().as_u16()) {
let response_body = IncomingBody::new(response.into_body()).into_async_read(); let response_body = IncomingBody::new(response.into_body()).into_async_read();
Some(ReadableStream::from_async_read(response_body, 1024).into_raw()) Some(ReadableStream::from_stream(asyncread_to_readablestream_stream(response_body)).into_raw())
} else { } else {
None None
}; };

View file

@ -1,10 +1,10 @@
use std::{ use std::{
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
use bytes::Bytes; use bytes::{buf::UninitSlice, BufMut, Bytes, BytesMut};
use futures_util::{Future, Stream}; use futures_util::{ready, AsyncRead, Future, Stream, TryStreamExt};
use http::{HeaderValue, Uri}; use http::{HeaderValue, Uri};
use hyper::{body::Body, rt::Executor}; use hyper::{body::Body, rt::Executor};
use js_sys::{Array, ArrayBuffer, Object, Reflect, Uint8Array}; use js_sys::{Array, ArrayBuffer, Object, Reflect, Uint8Array};
@ -15,22 +15,22 @@ use wasm_bindgen_futures::JsFuture;
use crate::EpoxyError; use crate::EpoxyError;
pub trait UriExt { pub trait UriExt {
fn get_redirect(&self, location: &HeaderValue) -> Result<Uri, EpoxyError>; fn get_redirect(&self, location: &HeaderValue) -> Result<Uri, EpoxyError>;
} }
impl UriExt for Uri { impl UriExt for Uri {
fn get_redirect(&self, location: &HeaderValue) -> Result<Uri, EpoxyError> { fn get_redirect(&self, location: &HeaderValue) -> Result<Uri, EpoxyError> {
let new_uri = location.to_str()?.parse::<hyper::Uri>()?; let new_uri = location.to_str()?.parse::<hyper::Uri>()?;
let mut new_parts: http::uri::Parts = new_uri.into(); let mut new_parts: http::uri::Parts = new_uri.into();
if new_parts.scheme.is_none() { if new_parts.scheme.is_none() {
new_parts.scheme = self.scheme().cloned(); new_parts.scheme = self.scheme().cloned();
} }
if new_parts.authority.is_none() { if new_parts.authority.is_none() {
new_parts.authority = self.authority().cloned(); new_parts.authority = self.authority().cloned();
} }
Ok(Uri::from_parts(new_parts)?) Ok(Uri::from_parts(new_parts)?)
} }
} }
#[derive(Clone)] #[derive(Clone)]
@ -38,103 +38,186 @@ pub struct WasmExecutor;
impl<F> Executor<F> for WasmExecutor impl<F> Executor<F> for WasmExecutor
where where
F: Future + Send + 'static, F: Future + Send + 'static,
F::Output: Send + 'static, F::Output: Send + 'static,
{ {
fn execute(&self, future: F) { fn execute(&self, future: F) {
wasm_bindgen_futures::spawn_local(async move { wasm_bindgen_futures::spawn_local(async move {
let _ = future.await; let _ = future.await;
}); });
} }
} }
pin_project! { pin_project! {
pub struct IncomingBody { pub struct IncomingBody {
#[pin] #[pin]
incoming: hyper::body::Incoming, incoming: hyper::body::Incoming,
} }
} }
impl IncomingBody { impl IncomingBody {
pub fn new(incoming: hyper::body::Incoming) -> IncomingBody { pub fn new(incoming: hyper::body::Incoming) -> IncomingBody {
IncomingBody { incoming } IncomingBody { incoming }
} }
} }
impl Stream for IncomingBody { impl Stream for IncomingBody {
type Item = std::io::Result<Bytes>; type Item = std::io::Result<Bytes>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project(); let this = self.project();
let ret = this.incoming.poll_frame(cx); let ret = this.incoming.poll_frame(cx);
match ret { match ret {
Poll::Ready(item) => Poll::<Option<Self::Item>>::Ready(match item { Poll::Ready(item) => Poll::<Option<Self::Item>>::Ready(match item {
Some(frame) => frame Some(frame) => frame
.map(|x| { .map(|x| {
x.into_data() x.into_data()
.map_err(|_| std::io::Error::other("not data frame")) .map_err(|_| std::io::Error::other("not data frame"))
}) })
.ok(), .ok(),
None => None, None => None,
}), }),
Poll::Pending => Poll::<Option<Self::Item>>::Pending, Poll::Pending => Poll::<Option<Self::Item>>::Pending,
} }
} }
}
pin_project! {
#[derive(Debug)]
pub struct ReaderStream<R> {
#[pin]
reader: Option<R>,
buf: BytesMut,
capacity: usize,
}
}
impl<R: AsyncRead> ReaderStream<R> {
pub fn new(reader: R) -> Self {
ReaderStream {
reader: Some(reader),
buf: BytesMut::new(),
capacity: 4096,
}
}
}
pub fn poll_read_buf<T: AsyncRead + ?Sized, B: BufMut>(
io: Pin<&mut T>,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<std::io::Result<usize>> {
if !buf.has_remaining_mut() {
return Poll::Ready(Ok(0));
}
let n = {
let dst = buf.chunk_mut();
let dst = unsafe { std::mem::transmute::<&mut UninitSlice, &mut [u8]>(dst) };
ready!(io.poll_read(cx, dst)?)
};
unsafe {
buf.advance_mut(n);
}
Poll::Ready(Ok(n))
}
impl<R: AsyncRead> Stream for ReaderStream<R> {
type Item = std::io::Result<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();
let reader = match this.reader.as_pin_mut() {
Some(r) => r,
None => return Poll::Ready(None),
};
if this.buf.capacity() == 0 {
this.buf.reserve(*this.capacity);
}
match poll_read_buf(reader, cx, &mut this.buf) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => {
self.project().reader.set(None);
Poll::Ready(Some(Err(err)))
}
Poll::Ready(Ok(0)) => {
self.project().reader.set(None);
Poll::Ready(None)
}
Poll::Ready(Ok(_)) => {
let chunk = this.buf.split();
Poll::Ready(Some(Ok(chunk.freeze())))
}
}
}
} }
pub fn is_redirect(code: u16) -> bool { pub fn is_redirect(code: u16) -> bool {
[301, 302, 303, 307, 308].contains(&code) [301, 302, 303, 307, 308].contains(&code)
} }
pub fn is_null_body(code: u16) -> bool { pub fn is_null_body(code: u16) -> bool {
[101, 204, 205, 304].contains(&code) [101, 204, 205, 304].contains(&code)
} }
pub fn object_get(obj: &Object, key: &str) -> Option<JsValue> { pub fn object_get(obj: &Object, key: &str) -> Option<JsValue> {
Reflect::get(obj, &key.into()).ok() Reflect::get(obj, &key.into()).ok()
} }
pub fn object_set(obj: &Object, key: &JsValue, value: &JsValue) -> Result<(), EpoxyError> { pub fn object_set(obj: &Object, key: &JsValue, value: &JsValue) -> Result<(), EpoxyError> {
if Reflect::set(obj, key, value).map_err(|_| EpoxyError::RawHeaderSetFailed)? { if Reflect::set(obj, key, value).map_err(|_| EpoxyError::RawHeaderSetFailed)? {
Ok(()) Ok(())
} else { } else {
Err(EpoxyError::RawHeaderSetFailed) Err(EpoxyError::RawHeaderSetFailed)
} }
} }
pub async fn convert_body(val: JsValue) -> Result<(Uint8Array, web_sys::Request), JsValue> { pub async fn convert_body(val: JsValue) -> Result<(Uint8Array, web_sys::Request), JsValue> {
let mut request_init = web_sys::RequestInit::new(); let mut request_init = web_sys::RequestInit::new();
request_init.method("POST").body(Some(&val)); request_init.method("POST").body(Some(&val));
object_set(&request_init, &"duplex".into(), &"half".into())?; object_set(&request_init, &"duplex".into(), &"half".into())?;
let req = web_sys::Request::new_with_str_and_init("/", &request_init)?; let req = web_sys::Request::new_with_str_and_init("/", &request_init)?;
Ok(( Ok((
JsFuture::from(req.array_buffer()?) JsFuture::from(req.array_buffer()?)
.await? .await?
.dyn_into::<ArrayBuffer>() .dyn_into::<ArrayBuffer>()
.map(|x| Uint8Array::new(&x))?, .map(|x| Uint8Array::new(&x))?,
req, req,
)) ))
} }
pub fn entries_of_object(obj: &Object) -> Vec<Vec<String>> { pub fn entries_of_object(obj: &Object) -> Vec<Vec<String>> {
Object::entries(obj) Object::entries(obj)
.to_vec() .to_vec()
.iter() .iter()
.filter_map(|val| { .filter_map(|val| {
Array::from(val) Array::from(val)
.to_vec() .to_vec()
.iter() .iter()
.map(|val| val.as_string()) .map(|val| val.as_string())
.collect::<Option<Vec<_>>>() .collect::<Option<Vec<_>>>()
}) })
.collect::<Vec<Vec<_>>>() .collect::<Vec<Vec<_>>>()
} }
pub fn define_property_obj(value: JsValue, writable: bool) -> Result<Object, JsValue> { pub fn define_property_obj(value: JsValue, writable: bool) -> Result<Object, JsValue> {
let entries: Array = [ let entries: Array = [
Array::of2(&"value".into(), &value), Array::of2(&"value".into(), &value),
Array::of2(&"writable".into(), &writable.into()), Array::of2(&"writable".into(), &writable.into()),
] ]
.iter() .iter()
.collect::<Array>(); .collect::<Array>();
Object::from_entries(&entries) Object::from_entries(&entries)
}
pub fn asyncread_to_readablestream_stream<R: AsyncRead>(
read: R,
) -> impl Stream<Item = Result<JsValue, JsValue>> {
ReaderStream::new(read)
.map_ok(|x| Uint8Array::from(x.as_ref()).into())
.map_err(|x| EpoxyError::from(x).into())
} }