diff --git a/Cargo.lock b/Cargo.lock index 45c3c42..ab3968a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1248,6 +1248,19 @@ version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ab9b36309365056cd639da3134bf87fa8f3d86008abf99e612384a6eecd459f" +[[package]] +name = "wasm-streams" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b65dc4c90b63b118468cf747d8bf3566c1913ef60be765b5730ead9e0a3ba129" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.66" @@ -1437,6 +1450,7 @@ dependencies = [ "tokio-rustls", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "webpki-roots", "ws_stream_wasm", diff --git a/client/Cargo.toml b/client/Cargo.toml index bf266b8..3e722dd 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -25,7 +25,8 @@ futures-util = "0.3.30" js-sys = "0.3.66" webpki-roots = "0.26.0" tokio-rustls = "0.25.0" -web-sys = { version = "0.3.66", features = ["TextEncoder", "Navigator"] } +web-sys = { version = "0.3.66", features = ["TextEncoder", "Navigator", "Response", "ResponseInit"] } +wasm-streams = "0.4.0" [dependencies.getrandom] features = ["js"] diff --git a/client/src/lib.rs b/client/src/lib.rs index 0d04a23..b85780e 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1,19 +1,18 @@ #[macro_use] mod utils; mod tokioio; -mod wsstreamwrapper; +mod wrappers; use tokioio::TokioIo; use utils::ReplaceErr; -use wsstreamwrapper::WsStreamWrapper; +use wrappers::{IncomingBody, WsStreamWrapper}; use std::sync::Arc; use bytes::Bytes; use http::{uri, HeaderName, HeaderValue, Request, Response}; -use http_body_util::BodyExt; use hyper::{body::Incoming, client::conn as hyper_conn}; -use js_sys::{Object, Reflect, Uint8Array}; +use js_sys::{Array, Object, Reflect, Uint8Array}; use penguin_mux_wasm::{Multiplexor, Role}; use tokio_rustls::{rustls, rustls::RootCertStore, TlsConnector}; use wasm_bindgen::prelude::*; @@ -35,7 +34,6 @@ where } }); - debug!("sending req"); req_sender .send_request(req) .await @@ -48,16 +46,16 @@ async fn start() { } #[wasm_bindgen] -pub struct WsTcpWorker { +pub struct WsTcp { rustls_config: Arc, mux: Multiplexor, useragent: String, } #[wasm_bindgen] -impl WsTcpWorker { +impl WsTcp { #[wasm_bindgen(constructor)] - pub async fn new(ws_url: String, useragent: String) -> Result { + pub async fn new(ws_url: String, useragent: String) -> Result { let ws_uri = ws_url .parse::() .replace_err("Failed to parse websocket url")?; @@ -70,14 +68,12 @@ impl WsTcpWorker { } debug!("connecting to ws {:?}", ws_url); - let (ws, wsmeta) = WsStreamWrapper::connect(ws_url, None) + let ws = WsStreamWrapper::connect(ws_url, None) .await .replace_err("Failed to connect to websocket")?; debug!("connected!"); let mux = Multiplexor::new(ws, Role::Client, None, None); - debug!("wsmeta ready state: {:?}", wsmeta.ready_state()); - let mut certstore = RootCertStore::empty(); certstore.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); @@ -87,18 +83,18 @@ impl WsTcpWorker { .with_no_client_auth(), ); - Ok(WsTcpWorker { + Ok(WsTcp { mux, rustls_config, useragent, }) } - pub async fn fetch(&self, url: String, options: Object) -> Result<(), JsError> { + pub async fn fetch(&self, url: String, options: Object) -> Result { let uri = url.parse::().replace_err("Failed to parse URL")?; let uri_scheme = uri.scheme().replace_err("URL must have a scheme")?; if *uri_scheme != uri::Scheme::HTTP && *uri_scheme != uri::Scheme::HTTPS { - return Err(nerr!("Scheme must be either `http` or `https`")); + return Err(jerr!("Scheme must be either `http` or `https`")); } let uri_host = uri.host().replace_err("URL must have a host")?; let uri_port = if let Some(port) = uri.port() { @@ -111,20 +107,19 @@ impl WsTcpWorker { } else if *uri_scheme == uri::Scheme::HTTPS { 443 } else { - return Err(nerr!("Failed to coerce port from scheme")); + return Err(jerr!("Failed to coerce port from scheme")); } }; - let req_method_string: String = match Reflect::get(&options, &JsValue::from_str("method")) { + let req_method_string: String = match Reflect::get(&options, &jval!("method")) { Ok(val) => val.as_string().unwrap_or("GET".to_string()), Err(_) => "GET".to_string(), }; - debug!("method {:?}", req_method_string); let req_method: http::Method = http::Method::try_from(>::as_ref(&req_method_string)) .replace_err("Invalid http method")?; - let body_jsvalue: Option = Reflect::get(&options, &JsValue::from_str("body")).ok(); + let body_jsvalue: Option = Reflect::get(&options, &jval!("body")).ok(); let body = if let Some(val) = body_jsvalue { if val.is_string() { let str = val @@ -146,16 +141,15 @@ impl WsTcpWorker { None => Bytes::new(), }; - let headers: Option>> = - Reflect::get(&options, &JsValue::from_str("headers")) - .map(|val| { - if val.is_truthy() { - Some(utils::entries_of_object(&Object::from(val))) - } else { - None - } - }) - .unwrap_or(None); + let headers: Option>> = Reflect::get(&options, &jval!("headers")) + .map(|val| { + if val.is_truthy() { + Some(utils::entries_of_object(&Object::from(val))) + } else { + None + } + }) + .unwrap_or(None); let mut builder = Request::builder().uri(uri.clone()).method(req_method); @@ -186,7 +180,7 @@ impl WsTcpWorker { .await .replace_err("Failed to create multiplexor channel")?; - let mut resp: hyper::Response; + let resp: hyper::Response; if *uri_scheme == uri::Scheme::HTTPS { let cloned_uri = uri_host.to_string().clone(); @@ -205,11 +199,42 @@ impl WsTcpWorker { resp = send_req(request, TokioIo::new(channel)).await?; } - log!("{:?}", resp); - let body = resp.body_mut().collect(); - let body_bytes = body.await.replace_err("Failed to get body")?.to_bytes(); - log!("{}", std::str::from_utf8(&body_bytes).replace_err("e")?); + let resp_headers_jsarray = resp + .headers() + .iter() + .filter_map(|val| { + Some(Array::of2( + &jval!(val.0.as_str()), + &jval!(val.1.to_str().ok()?), + )) + }) + .collect::(); - Ok(()) + let resp_headers = Object::from_entries(&resp_headers_jsarray) + .replace_err("Failed to create response headers object")?; + + let mut respinit = web_sys::ResponseInit::new(); + respinit + .headers(&resp_headers) + .status(resp.status().as_u16()) + .status_text(resp.status().canonical_reason().unwrap_or_default()); + + let body = IncomingBody::new(resp.into_body()); + let stream = wasm_streams::ReadableStream::from_stream(body); + + let resp = web_sys::Response::new_with_opt_readable_stream_and_init( + Some(&stream.into_raw()), + &respinit, + ) + .replace_err("Failed to make response")?; + + Object::define_property( + &resp, + &jval!("url"), + &utils::define_property_obj(jval!(url), false) + .replace_err("Failed to make define_property object for url")?, + ); + + Ok(resp) } } diff --git a/client/src/utils.rs b/client/src/utils.rs index 4e86392..f49d142 100644 --- a/client/src/utils.rs +++ b/client/src/utils.rs @@ -17,50 +17,51 @@ extern "C" { pub fn console_error(s: &str); } +#[allow(unused_macros)] macro_rules! debug { ($($t:tt)*) => (utils::console_debug(&format_args!($($t)*).to_string())) } +#[allow(unused_macros)] macro_rules! log { ($($t:tt)*) => (utils::console_log(&format_args!($($t)*).to_string())) } +#[allow(unused_macros)] macro_rules! error { ($($t:tt)*) => (utils::console_error(&format_args!($($t)*).to_string())) } -macro_rules! nerr { - ($expr:expr) => (JsError::new($expr)) +#[allow(unused_macros)] +macro_rules! jerr { + ($expr:expr) => { + JsError::new($expr) + }; } -pub fn entries_of_object(obj: &Object) -> Vec> { - js_sys::Object::entries(obj) - .to_vec() - .iter() - .map(|val| { - Array::from(val) - .to_vec() - .iter() - .map(|val| { - val.as_string() - .expect_throw("failed to get string from object entry") - }) - .collect() - }) - .collect::>>() +#[allow(unused_macros)] +macro_rules! jval { + ($expr:expr) => { + JsValue::from($expr) + }; } pub trait ReplaceErr { type Ok; fn replace_err(self, err: &str) -> Result; + fn replace_err_jv(self, err: &str) -> Result; } impl ReplaceErr for Result { type Ok = T; fn replace_err(self, err: &str) -> Result<::Ok, JsError> { - self.map_err(|_| JsError::new(err)) + self.map_err(|_| jerr!(err)) + } + + fn replace_err_jv(self, err: &str) -> Result<::Ok, JsValue> { + self.map_err(|_| jval!(err)) } } @@ -68,6 +69,33 @@ impl ReplaceErr for Option { type Ok = T; fn replace_err(self, err: &str) -> Result<::Ok, JsError> { - self.ok_or_else(|| JsError::new(err)) + self.ok_or_else(|| jerr!(err)) + } + + fn replace_err_jv(self, err: &str) -> Result<::Ok, JsValue> { + self.ok_or_else(|| jval!(err)) } } + +pub fn entries_of_object(obj: &Object) -> Vec> { + js_sys::Object::entries(obj) + .to_vec() + .iter() + .filter_map(|val| { + Array::from(val) + .to_vec() + .iter() + .map(|val| val.as_string()) + .collect::>>() + }) + .collect::>>() +} + +pub fn define_property_obj(value: JsValue, writable: bool) -> Result { + let entries: Array = vec![ + Array::of2(&jval!("value"), &jval!(value)), + Array::of2(&jval!("writable"), &jval!(writable)), + ] + .iter().collect::(); + Object::from_entries(&entries) +} diff --git a/client/src/web/index.js b/client/src/web/index.js index 9a12175..acccbbb 100644 --- a/client/src/web/index.js +++ b/client/src/web/index.js @@ -5,23 +5,15 @@ ); await wasm_bindgen("./wstcp_client_bg.wasm"); const tconn0 = performance.now(); - let wstcp = await new wasm_bindgen.WsTcpWorker("wss://localhost:4000", navigator.userAgent); + let wstcp = await new wasm_bindgen.WsTcp("wss://localhost:4000", navigator.userAgent); const tconn1 = performance.now(); console.warn(`conn establish took ${tconn1 - tconn0} ms or ${(tconn1 - tconn0) / 1000} s`); const t0 = performance.now(); - await wstcp.fetch("https://httpbin.org/post", { method: "POST", body: "test", headers: { "X-Header-One": "one", "x-header-one": "One", "X-Header-Two": "two" } }); + let resp = await wstcp.fetch("https://httpbin.org/post", { method: "POST", body: "test", headers: { "X-Header-One": "one", "x-header-one": "One", "X-Header-Two": "two" } }); const t1 = performance.now(); + + console.warn(resp); + console.warn(Object.fromEntries(resp.headers)); + console.warn(await resp.text()); console.warn(`mux 1 took ${t1 - t0} ms or ${(t1 - t0) / 1000} s`); - - const t2 = performance.now(); - await wstcp.fetch("https://httpbin.org/post", { method: "POST", body: "test", headers: { "X-Header-One": "one", "x-header-one": "One", "X-Header-Two": "two" } }); - const t3 = performance.now(); - console.warn(`mux 2 took ${t3 - t2} ms or ${(t3 - t2) / 1000} s`); - - const t4 = performance.now(); - await fetch("https://httpbin.org/post", { method: "POST", body: "test", headers: { "X-Header-One": "one", "x-header-one": "One", "X-Header-Two": "two" } }); - const t5 = performance.now(); - console.warn(`native took ${t5 - t4} ms or ${(t5 - t4) / 1000} s`); - - alert(`conn establish took ${tconn1 - tconn0} ms or ${(tconn1 - tconn0) / 1000} s\nmux 1 took ${t1 - t0} ms or ${(t1 - t0) / 1000} s\nmux 2 took ${t3 - t2} ms or ${(t3 - t2) / 1000} s\nnative took ${t5 - t4} ms or ${(t5 - t4) / 1000} s`) })(); diff --git a/client/src/wsstreamwrapper.rs b/client/src/wrappers.rs similarity index 74% rename from client/src/wsstreamwrapper.rs rename to client/src/wrappers.rs index c41c4c4..01fa61e 100644 --- a/client/src/wsstreamwrapper.rs +++ b/client/src/wrappers.rs @@ -5,6 +5,7 @@ use std::{ }; use futures_util::{Sink, Stream}; +use hyper::body::Body; use penguin_mux_wasm::ws; use pin_project_lite::pin_project; use ws_stream_wasm::{WsErr, WsMessage, WsMeta, WsStream}; @@ -20,9 +21,9 @@ impl WsStreamWrapper { pub async fn connect( url: impl AsRef, protocols: impl Into>>, - ) -> Result<(Self, WsMeta), WsErr> { - let (wsmeta, wsstream) = WsMeta::connect(url, protocols).await?; - Ok((WsStreamWrapper { ws: wsstream }, wsmeta)) + ) -> Result { + let (_, wsstream) = WsMeta::connect(url, protocols).await?; + Ok(WsStreamWrapper { ws: wsstream }) } } @@ -79,7 +80,7 @@ impl Sink for WsStreamWrapper { "ws close err: {:?}", err )))), - } + }; } Ping(_) | Pong(_) | Frame(_) => return Ok(()), }; @@ -114,3 +115,37 @@ impl ws::WebSocketStream for WsStreamWrapper { true } } + +pin_project! { + pub struct IncomingBody { + #[pin] + incoming: hyper::body::Incoming, + } +} + +impl IncomingBody { + pub fn new(incoming: hyper::body::Incoming) -> IncomingBody { + IncomingBody { incoming } + } +} + +impl Stream for IncomingBody { + type Item = Result; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + let ret = this.incoming.poll_frame(cx); + match ret { + Poll::Ready(item) => Poll::>::Ready(match item { + Some(frame) => frame + .map(|x| { + x.into_data() + .map(|x| Uint8Array::from(x.as_ref()).into()) + .replace_err_jv("Error creating uint8array from http frame") + }) + .ok(), + None => None, + }), + Poll::Pending => Poll::>::Pending, + } + } +}