diff --git a/Cargo.lock b/Cargo.lock index 94e2ea0..2879ef1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -511,7 +511,7 @@ checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" [[package]] name = "epoxy-client" -version = "2.0.4" +version = "2.0.5" dependencies = [ "async-compression", "async-trait", diff --git a/client/Cargo.toml b/client/Cargo.toml index 01ea5c2..c46fbc3 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "epoxy-client" -version = "2.0.4" +version = "2.0.5" edition = "2021" [lib] diff --git a/client/package.json b/client/package.json index cde81a8..706456f 100644 --- a/client/package.json +++ b/client/package.json @@ -1,6 +1,6 @@ { "name": "@mercuryworkshop/epoxy-tls", - "version": "2.0.4-2", + "version": "2.0.5-1", "description": "A wasm library for using raw encrypted tls/ssl/https/websocket streams on the browser", "scripts": { "build": "./build.sh" diff --git a/client/src/lib.rs b/client/src/lib.rs index 255e36b..f97b77c 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -22,8 +22,7 @@ use js_sys::{Array, Function, Object, Reflect}; use stream_provider::{StreamProvider, StreamProviderService}; use thiserror::Error; use utils::{ - convert_body, entries_of_object, is_null_body, is_redirect, object_get, object_set, - IncomingBody, UriExt, WasmExecutor, + asyncread_to_readablestream_stream, convert_body, entries_of_object, is_null_body, is_redirect, object_get, object_set, IncomingBody, UriExt, WasmExecutor }; use wasm_bindgen::prelude::*; use wasm_streams::ReadableStream; @@ -529,14 +528,14 @@ impl EpoxyClient { }, None => Either::Right(response_body), }; - Some(ReadableStream::from_async_read(decompressed_body, 1024).into_raw()) + Some(ReadableStream::from_stream(asyncread_to_readablestream_stream(decompressed_body)).into_raw()) } else { None }; } else { let response_stream = if !is_null_body(response.status().as_u16()) { let response_body = IncomingBody::new(response.into_body()).into_async_read(); - Some(ReadableStream::from_async_read(response_body, 1024).into_raw()) + Some(ReadableStream::from_stream(asyncread_to_readablestream_stream(response_body)).into_raw()) } else { None }; diff --git a/client/src/utils.rs b/client/src/utils.rs index e841a14..f3dbb77 100644 --- a/client/src/utils.rs +++ b/client/src/utils.rs @@ -1,10 +1,10 @@ use std::{ - pin::Pin, - task::{Context, Poll}, + pin::Pin, + task::{Context, Poll}, }; -use bytes::Bytes; -use futures_util::{Future, Stream}; +use bytes::{buf::UninitSlice, BufMut, Bytes, BytesMut}; +use futures_util::{ready, AsyncRead, Future, Stream, TryStreamExt}; use http::{HeaderValue, Uri}; use hyper::{body::Body, rt::Executor}; use js_sys::{Array, ArrayBuffer, Object, Reflect, Uint8Array}; @@ -15,22 +15,22 @@ use wasm_bindgen_futures::JsFuture; use crate::EpoxyError; pub trait UriExt { - fn get_redirect(&self, location: &HeaderValue) -> Result; + fn get_redirect(&self, location: &HeaderValue) -> Result; } impl UriExt for Uri { - fn get_redirect(&self, location: &HeaderValue) -> Result { - let new_uri = location.to_str()?.parse::()?; - let mut new_parts: http::uri::Parts = new_uri.into(); - if new_parts.scheme.is_none() { - new_parts.scheme = self.scheme().cloned(); - } - if new_parts.authority.is_none() { - new_parts.authority = self.authority().cloned(); - } + fn get_redirect(&self, location: &HeaderValue) -> Result { + let new_uri = location.to_str()?.parse::()?; + let mut new_parts: http::uri::Parts = new_uri.into(); + if new_parts.scheme.is_none() { + new_parts.scheme = self.scheme().cloned(); + } + if new_parts.authority.is_none() { + new_parts.authority = self.authority().cloned(); + } - Ok(Uri::from_parts(new_parts)?) - } + Ok(Uri::from_parts(new_parts)?) + } } #[derive(Clone)] @@ -38,103 +38,186 @@ pub struct WasmExecutor; impl Executor for WasmExecutor where - F: Future + Send + 'static, - F::Output: Send + 'static, + F: Future + Send + 'static, + F::Output: Send + 'static, { - fn execute(&self, future: F) { - wasm_bindgen_futures::spawn_local(async move { - let _ = future.await; - }); - } + fn execute(&self, future: F) { + wasm_bindgen_futures::spawn_local(async move { + let _ = future.await; + }); + } } pin_project! { - pub struct IncomingBody { - #[pin] - incoming: hyper::body::Incoming, - } + pub struct IncomingBody { + #[pin] + incoming: hyper::body::Incoming, + } } impl IncomingBody { - pub fn new(incoming: hyper::body::Incoming) -> IncomingBody { - IncomingBody { incoming } - } + pub fn new(incoming: hyper::body::Incoming) -> IncomingBody { + IncomingBody { incoming } + } } impl Stream for IncomingBody { - type Item = std::io::Result; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - let ret = this.incoming.poll_frame(cx); - match ret { - Poll::Ready(item) => Poll::>::Ready(match item { - Some(frame) => frame - .map(|x| { - x.into_data() - .map_err(|_| std::io::Error::other("not data frame")) - }) - .ok(), - None => None, - }), - Poll::Pending => Poll::>::Pending, - } - } + type Item = std::io::Result; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + let ret = this.incoming.poll_frame(cx); + match ret { + Poll::Ready(item) => Poll::>::Ready(match item { + Some(frame) => frame + .map(|x| { + x.into_data() + .map_err(|_| std::io::Error::other("not data frame")) + }) + .ok(), + None => None, + }), + Poll::Pending => Poll::>::Pending, + } + } +} + +pin_project! { + #[derive(Debug)] + pub struct ReaderStream { + #[pin] + reader: Option, + buf: BytesMut, + capacity: usize, + } +} + +impl ReaderStream { + pub fn new(reader: R) -> Self { + ReaderStream { + reader: Some(reader), + buf: BytesMut::new(), + capacity: 4096, + } + } +} + +pub fn poll_read_buf( + io: Pin<&mut T>, + cx: &mut Context<'_>, + buf: &mut B, +) -> Poll> { + if !buf.has_remaining_mut() { + return Poll::Ready(Ok(0)); + } + + let n = { + let dst = buf.chunk_mut(); + + let dst = unsafe { std::mem::transmute::<&mut UninitSlice, &mut [u8]>(dst) }; + ready!(io.poll_read(cx, dst)?) + }; + + unsafe { + buf.advance_mut(n); + } + + Poll::Ready(Ok(n)) +} + +impl Stream for ReaderStream { + type Item = std::io::Result; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.as_mut().project(); + + let reader = match this.reader.as_pin_mut() { + Some(r) => r, + None => return Poll::Ready(None), + }; + + if this.buf.capacity() == 0 { + this.buf.reserve(*this.capacity); + } + + match poll_read_buf(reader, cx, &mut this.buf) { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(err)) => { + self.project().reader.set(None); + Poll::Ready(Some(Err(err))) + } + Poll::Ready(Ok(0)) => { + self.project().reader.set(None); + Poll::Ready(None) + } + Poll::Ready(Ok(_)) => { + let chunk = this.buf.split(); + Poll::Ready(Some(Ok(chunk.freeze()))) + } + } + } } pub fn is_redirect(code: u16) -> bool { - [301, 302, 303, 307, 308].contains(&code) + [301, 302, 303, 307, 308].contains(&code) } pub fn is_null_body(code: u16) -> bool { - [101, 204, 205, 304].contains(&code) + [101, 204, 205, 304].contains(&code) } pub fn object_get(obj: &Object, key: &str) -> Option { - Reflect::get(obj, &key.into()).ok() + Reflect::get(obj, &key.into()).ok() } pub fn object_set(obj: &Object, key: &JsValue, value: &JsValue) -> Result<(), EpoxyError> { - if Reflect::set(obj, key, value).map_err(|_| EpoxyError::RawHeaderSetFailed)? { - Ok(()) - } else { - Err(EpoxyError::RawHeaderSetFailed) - } + if Reflect::set(obj, key, value).map_err(|_| EpoxyError::RawHeaderSetFailed)? { + Ok(()) + } else { + Err(EpoxyError::RawHeaderSetFailed) + } } pub async fn convert_body(val: JsValue) -> Result<(Uint8Array, web_sys::Request), JsValue> { - let mut request_init = web_sys::RequestInit::new(); - request_init.method("POST").body(Some(&val)); - object_set(&request_init, &"duplex".into(), &"half".into())?; - let req = web_sys::Request::new_with_str_and_init("/", &request_init)?; - Ok(( - JsFuture::from(req.array_buffer()?) - .await? - .dyn_into::() - .map(|x| Uint8Array::new(&x))?, - req, - )) + let mut request_init = web_sys::RequestInit::new(); + request_init.method("POST").body(Some(&val)); + object_set(&request_init, &"duplex".into(), &"half".into())?; + let req = web_sys::Request::new_with_str_and_init("/", &request_init)?; + Ok(( + JsFuture::from(req.array_buffer()?) + .await? + .dyn_into::() + .map(|x| Uint8Array::new(&x))?, + req, + )) } pub fn entries_of_object(obj: &Object) -> Vec> { - Object::entries(obj) - .to_vec() - .iter() - .filter_map(|val| { - Array::from(val) - .to_vec() - .iter() - .map(|val| val.as_string()) - .collect::>>() - }) - .collect::>>() + Object::entries(obj) + .to_vec() + .iter() + .filter_map(|val| { + Array::from(val) + .to_vec() + .iter() + .map(|val| val.as_string()) + .collect::>>() + }) + .collect::>>() } pub fn define_property_obj(value: JsValue, writable: bool) -> Result { - let entries: Array = [ - Array::of2(&"value".into(), &value), - Array::of2(&"writable".into(), &writable.into()), - ] - .iter() - .collect::(); - Object::from_entries(&entries) + let entries: Array = [ + Array::of2(&"value".into(), &value), + Array::of2(&"writable".into(), &writable.into()), + ] + .iter() + .collect::(); + Object::from_entries(&entries) +} + +pub fn asyncread_to_readablestream_stream( + read: R, +) -> impl Stream> { + ReaderStream::new(read) + .map_ok(|x| Uint8Array::from(x.as_ref()).into()) + .map_err(|x| EpoxyError::from(x).into()) }