diff --git a/client/Cargo.toml b/client/Cargo.toml index 4260dd6..bf266b8 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -25,7 +25,7 @@ futures-util = "0.3.30" js-sys = "0.3.66" webpki-roots = "0.26.0" tokio-rustls = "0.25.0" -web-sys = { version = "0.3.66", features = ["TextEncoder"] } +web-sys = { version = "0.3.66", features = ["TextEncoder", "Navigator"] } [dependencies.getrandom] features = ["js"] diff --git a/client/src/lib.rs b/client/src/lib.rs index ef89788..7d6acd2 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -9,18 +9,38 @@ use wsstreamwrapper::WsStreamWrapper; use std::sync::Arc; use bytes::Bytes; -use http::{uri, Request}; +use http::{uri, HeaderName, HeaderValue, Request, Response}; +use http_body_util::BodyExt; use hyper::{body::Incoming, client::conn as hyper_conn}; -use web_sys::TextEncoder; use js_sys::{Object, Reflect, Uint8Array}; -use penguin_mux_wasm::{Multiplexor, MuxStream, Role}; -use tokio_rustls::{client::TlsStream, rustls, rustls::RootCertStore, TlsConnector}; +use penguin_mux_wasm::{Multiplexor, Role}; +use tokio_rustls::{rustls, rustls::RootCertStore, TlsConnector}; use wasm_bindgen::prelude::*; +use web_sys::TextEncoder; -type MuxIo = TokioIo>; -type MuxRustlsIo = TokioIo>>; type HttpBody = http_body_util::Full; +async fn send_req(req: http::Request, io: T) -> Response +where + T: hyper::rt::Read + hyper::rt::Write + std::marker::Unpin + 'static, +{ + let (mut req_sender, conn) = hyper_conn::http1::handshake::(io) + .await + .expect_throw("Failed to connect to host"); + + wasm_bindgen_futures::spawn_local(async move { + if let Err(e) = conn.await { + error!("wstcp: error in muxed hyper connection! {:?}", e); + } + }); + + debug!("sending req"); + req_sender + .send_request(req) + .await + .expect_throw("Failed to send request") +} + #[wasm_bindgen(start)] async fn start() { utils::set_panic_hook(); @@ -30,12 +50,13 @@ async fn start() { pub struct WsTcpWorker { rustls_config: Arc, mux: Multiplexor, + useragent: String, } #[wasm_bindgen] impl WsTcpWorker { #[wasm_bindgen(constructor)] - pub async fn new(ws_url: String) -> Result { + pub async fn new(ws_url: String, useragent: String) -> Result { let ws_uri = ws_url .parse::() .expect_throw("Failed to parse websocket URL"); @@ -64,7 +85,11 @@ impl WsTcpWorker { .with_no_client_auth(), ); - Ok(WsTcpWorker { mux, rustls_config }) + Ok(WsTcpWorker { + mux, + rustls_config, + useragent, + }) } pub async fn fetch(&self, url: String, options: Object) -> Result<(), JsValue> { @@ -92,40 +117,73 @@ impl WsTcpWorker { Ok(val) => val.as_string().unwrap_or("GET".to_string()), Err(_) => "GET".to_string(), }; + debug!("method {:?}", req_method_string); let req_method: http::Method = http::Method::try_from(>::as_ref(&req_method_string)) .expect_throw("Invalid http method"); - let body: Option> = Reflect::get(&options, &JsValue::from_str("body")).map(|val| { - if val.is_string() { - let str = val.as_string().expect_throw("Failed to encode body into uint8array"); - let encoder = TextEncoder::new().expect_throw("Failed to encode body into uint8array"); - let encoded = encoder.encode_with_input(str.as_ref()); - Some(encoded) - } else { - Some(Uint8Array::new(&val).to_vec()) - } - }).unwrap_or(None); + let body: Option> = Reflect::get(&options, &JsValue::from_str("body")) + .map(|val| { + if val.is_string() { + let str = val + .as_string() + .expect_throw("Failed to encode body into uint8array"); + let encoder = + TextEncoder::new().expect_throw("Failed to encode body into uint8array"); + let encoded = encoder.encode_with_input(str.as_ref()); + Some(encoded) + } else { + Some(Uint8Array::new(&val).to_vec()) + } + }) + .unwrap_or(None); let body_bytes: Bytes = match body { Some(vec) => Bytes::from(vec), - None => Bytes::new() + None => Bytes::new(), }; + let headers: Option>> = + Reflect::get(&options, &JsValue::from_str("headers")) + .map(|val| { + if val.is_truthy() { + Some(utils::entries_of_object(&Object::from(val))) + } else { + None + } + }) + .unwrap_or(None); + + let mut builder = Request::builder().uri(uri.clone()).method(req_method); + + if let Some(headers) = headers { + let headers_map = builder.headers_mut().expect_throw("failed to get headers"); + for hdr in headers { + headers_map.insert( + HeaderName::from_bytes(hdr[0].as_bytes()) + .expect_throw("failed to get hdr name"), + HeaderValue::from_str(hdr[1].clone().as_ref()) + .expect_throw("failed to get hdr value"), + ); + } + } + + builder = builder + .header("Host", uri_host) + .header("Connection", "close") + .header("User-Agent", self.useragent.clone()); + + let request = builder + .body(HttpBody::new(body_bytes)) + .expect_throw("Failed to make request"); + let channel = self .mux .client_new_stream_channel(uri_host.as_bytes(), uri_port) .await .expect_throw("Failed to create multiplexor channel"); - let request = Request::builder() - .header("Host", uri_host) - .header("Connection", "close") - .method(req_method) - .body(HttpBody::new(body_bytes)) - .expect_throw("Failed to create request"); - - let resp: hyper::Response; + let mut resp: hyper::Response; if *uri_scheme == uri::Scheme::HTTPS { let cloned_uri = uri_host.to_string().clone(); @@ -139,43 +197,15 @@ impl WsTcpWorker { ) .await .expect_throw("Failed to perform TLS handshake"); - let io = TokioIo::new(io); - let (mut req_sender, conn) = hyper_conn::http1::handshake::(io) - .await - .expect_throw("Failed to connect to host"); - - wasm_bindgen_futures::spawn_local(async move { - if let Err(e) = conn.await { - error!("wstcp: error in muxed hyper connection! {:?}", e); - } - }); - - debug!("sending req tls"); - resp = req_sender - .send_request(request) - .await - .expect_throw("Failed to send request"); - debug!("recieved resp"); + resp = send_req(request, TokioIo::new(io)).await; } else { - let io = TokioIo::new(channel); - let (mut req_sender, conn) = hyper_conn::http1::handshake::(io) - .await - .expect_throw("Failed to connect to host"); - - wasm_bindgen_futures::spawn_local(async move { - if let Err(e) = conn.await { - error!("err in conn: {:?}", e); - } - }); - debug!("sending req"); - resp = req_sender - .send_request(request) - .await - .expect_throw("Failed to send request"); - debug!("recieved resp"); + resp = send_req(request, TokioIo::new(channel)).await; } log!("{:?}", resp); + let body = resp.body_mut().collect(); + let body_bytes = body.await.expect_throw("Failed to get body").to_bytes(); + log!("{}", std::str::from_utf8(&body_bytes).expect_throw("e")); Ok(()) } diff --git a/client/src/utils.rs b/client/src/utils.rs index 6f96576..d8cc3a4 100644 --- a/client/src/utils.rs +++ b/client/src/utils.rs @@ -1,5 +1,7 @@ use wasm_bindgen::prelude::*; +use js_sys::{Array, Object}; + pub fn set_panic_hook() { #[cfg(feature = "console_error_panic_hook")] console_error_panic_hook::set_once(); @@ -15,7 +17,6 @@ extern "C" { pub fn console_error(s: &str); } - macro_rules! debug { ($($t:tt)*) => (utils::console_debug(&format_args!($($t)*).to_string())) } @@ -27,3 +28,20 @@ macro_rules! log { macro_rules! error { ($($t:tt)*) => (utils::console_error(&format_args!($($t)*).to_string())) } + +pub fn entries_of_object(obj: &Object) -> Vec> { + js_sys::Object::entries(obj) + .to_vec() + .iter() + .map(|val| { + Array::from(val) + .to_vec() + .iter() + .map(|val| { + val.as_string() + .expect_throw("failed to get string from object entry") + }) + .collect() + }) + .collect::>>() +} diff --git a/client/src/web/index.js b/client/src/web/index.js index b79f2eb..b41d1fa 100644 --- a/client/src/web/index.js +++ b/client/src/web/index.js @@ -1,5 +1,19 @@ (async () => { await wasm_bindgen("./wstcp_client_bg.wasm"); - let wstcp = await new wasm_bindgen.WsTcpWorker("wss://localhost:4000"); - await wstcp.fetch("https://alicesworld.tech"); + let wstcp = await new wasm_bindgen.WsTcpWorker("wss://localhost:4000", navigator.userAgent); + const t0 = performance.now(); + await wstcp.fetch("https://httpbin.org/post", {method: "POST", body: "test", headers: {"X-Header-One":"one","x-header-one":"One","X-Header-Two":"two"}}); + const t1 = performance.now(); + console.warn(`mux 1 took ${t1 - t0} ms or ${(t1 - t0)/1000} s`); + const t2 = performance.now(); + await wstcp.fetch("https://httpbin.org/post", {method: "POST", body: "test", headers: {"X-Header-One":"one","x-header-one":"One","X-Header-Two":"two"}}); + const t3 = performance.now(); + console.warn(`mux 2 took ${t3 - t2} ms or ${(t3 - t2)/1000} s`); + + const t4 = performance.now(); + await fetch("https://httpbin.org/post", {method: "POST", body: "test", headers: {"X-Header-One":"one","x-header-one":"One","X-Header-Two":"two"}}); + const t5 = performance.now(); + console.warn(`native took ${t5 - t4} ms or ${(t5 - t4)/1000} s`); + + alert() })(); diff --git a/client/src/wsstreamwrapper.rs b/client/src/wsstreamwrapper.rs index ed20095..c41c4c4 100644 --- a/client/src/wsstreamwrapper.rs +++ b/client/src/wsstreamwrapper.rs @@ -22,7 +22,6 @@ impl WsStreamWrapper { protocols: impl Into>>, ) -> Result<(Self, WsMeta), WsErr> { let (wsmeta, wsstream) = WsMeta::connect(url, protocols).await?; - debug!("readystate {:?}", wsstream.ready_state()); Ok((WsStreamWrapper { ws: wsstream }, wsmeta)) } } @@ -30,7 +29,6 @@ impl WsStreamWrapper { impl Stream for WsStreamWrapper { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - debug!("poll_next: {:?}", cx); let this = self.project(); let ret = this.ws.poll_next(cx); match ret { @@ -57,7 +55,6 @@ impl Sink for WsStreamWrapper { type Error = ws::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - debug!("poll_ready: {:?}", cx); let this = self.project(); let ret = this.ws.poll_ready(cx); match ret { @@ -70,7 +67,6 @@ impl Sink for WsStreamWrapper { } fn start_send(self: Pin<&mut Self>, item: ws::Message) -> Result<(), Self::Error> { - debug!("start_send: {:?}", item); use ws::Message::*; let item = match item { Text(txt) => WsMessage::Text(txt), @@ -101,7 +97,6 @@ impl Sink for WsStreamWrapper { } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - debug!("poll closing {:?}", cx); let this = self.project(); let ret = this.ws.poll_close(cx); match ret {