From f035e5125620f8f065852076aa61091cd4846966 Mon Sep 17 00:00:00 2001 From: Toshit Chawda Date: Sun, 14 Jan 2024 21:44:27 -0800 Subject: [PATCH] minor changes --- client/src/lib.rs | 30 ++++--- client/src/web/index.js | 18 ++-- client/src/websocket.rs | 189 +++++++++++++++++++++++----------------- 3 files changed, 140 insertions(+), 97 deletions(-) diff --git a/client/src/lib.rs b/client/src/lib.rs index d1fc2b2..74248d7 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -2,12 +2,13 @@ #[macro_use] mod utils; mod tokioio; -mod wrappers; mod websocket; +mod wrappers; use tokioio::TokioIo; use utils::{ReplaceErr, UriExt}; use wrappers::{IncomingBody, WsStreamWrapper}; +use websocket::EpxWebSocket; use std::sync::Arc; @@ -15,12 +16,8 @@ use async_compression::tokio::bufread as async_comp; use bytes::Bytes; use futures_util::StreamExt; use http::{uri, HeaderName, HeaderValue, Request, Response}; -use hyper::{ - body::Incoming, - client::conn::http1::Builder, - Uri, -}; -use js_sys::{Array, Object, Reflect, Uint8Array}; +use hyper::{body::Incoming, client::conn::http1::Builder, Uri}; +use js_sys::{Function, Array, Object, Reflect, Uint8Array}; use penguin_mux_wasm::{Multiplexor, MuxStream}; use tokio_rustls::{client::TlsStream, rustls, rustls::RootCertStore, TlsConnector}; use tokio_util::{ @@ -163,10 +160,6 @@ impl EpoxyClient { redirect_limit, }) } - #[wasm_bindgen] - pub fn ptr(&mut self) -> *mut EpoxyClient { - self as *mut Self - } async fn get_http_io(&self, url: &Uri) -> Result { let url_host = url.host().replace_err("URL must have a host")?; @@ -221,6 +214,21 @@ impl EpoxyClient { } } + // shut up + #[allow(clippy::too_many_arguments)] + pub async fn connect_ws( + &self, + onopen: Function, + onclose: Function, + onerror: Function, + onmessage: Function, + url: String, + protocols: Vec, + origin: String, + ) -> Result { + EpxWebSocket::connect(self, onopen, onclose, onerror, onmessage, url, protocols, origin).await + } + pub async fn fetch(&self, url: String, options: Object) -> Result { let uri = url.parse::().replace_err("Failed to parse URL")?; let uri_scheme = uri.scheme().replace_err("URL must have a scheme")?; diff --git a/client/src/web/index.js b/client/src/web/index.js index 6530f89..3744986 100644 --- a/client/src/web/index.js +++ b/client/src/web/index.js @@ -12,7 +12,7 @@ const tconn0 = performance.now(); // args: websocket url, user agent, redirect limit - let wstcp = await new wasm_bindgen.WsTcp("wss://localhost:4000", navigator.userAgent, 10); + let epoxy = await new wasm_bindgen.EpoxyClient("wss://localhost:4000", navigator.userAgent, 10); const tconn1 = performance.now(); console.warn(`conn establish took ${tconn1 - tconn0} ms or ${(tconn1 - tconn0) / 1000} s`); @@ -25,14 +25,14 @@ ["https://httpbin.org/redirect/11", {}], ["https://httpbin.org/redirect/1", { redirect: "manual" }] ]) { - let resp = await wstcp.fetch(url[0], url[1]); + let resp = await epoxy.fetch(url[0], url[1]); console.warn(url, resp, Object.fromEntries(resp.headers)); console.warn(await resp.text()); } } else if (should_perf_test) { const test_mux = async (url) => { const t0 = performance.now(); - await wstcp.fetch(url); + await epoxy.fetch(url); const t1 = performance.now(); return t1 - t0; }; @@ -62,10 +62,18 @@ console.warn(`avg native (10) took ${total_native} ms or ${total_native / 1000} s`); console.warn(`mux - native: ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`); } else if (should_ws_test) { - let ws = await new wasm_bindgen.WsWebSocket(() => console.log("opened"), () => console.log("closed"), msg => console.log(msg), wstcp, "ws://localhost:9000", [], "localhost"); + let ws = await epoxy.connect_ws( + () => console.log("opened"), + () => console.log("closed"), + err => console.error(err), + msg => console.log(msg), + "ws://localhost:9000", + [], + "localhost" + ); await ws.send("data"); } else { - let resp = await wstcp.fetch("https://httpbin.org/get"); + let resp = await epoxy.fetch("https://httpbin.org/get"); console.warn(resp, Object.fromEntries(resp.headers)); console.warn(await resp.text()); } diff --git a/client/src/websocket.rs b/client/src/websocket.rs index 2329c1d..b06dd7c 100644 --- a/client/src/websocket.rs +++ b/client/src/websocket.rs @@ -20,123 +20,150 @@ enum EpxMsg { #[wasm_bindgen] pub struct EpxWebSocket { msg_sender: mpsc::Sender, + onerror: Function, } #[wasm_bindgen] impl EpxWebSocket { #[wasm_bindgen(constructor)] + pub fn new() -> Result { + Err(jerr!("Use EpoxyClient.connect_ws() instead.")) + } + + + // shut up + #[allow(clippy::too_many_arguments)] pub async fn connect( + tcp: &EpoxyClient, onopen: Function, onclose: Function, + onerror: Function, onmessage: Function, - tcp: &EpoxyClient, url: String, protocols: Vec, origin: String, ) -> Result { - let url = Uri::try_from(url).replace_err("Failed to parse URL")?; - let host = url.host().replace_err("URL must have a host")?; + let onerr = onerror.clone(); + let ret: Result = async move { + let url = Uri::try_from(url).replace_err("Failed to parse URL")?; + let host = url.host().replace_err("URL must have a host")?; - let rand: [u8; 16] = rand::random(); - let key = STANDARD.encode(rand); + let rand: [u8; 16] = rand::random(); + let key = STANDARD.encode(rand); - let mut builder = Request::builder() - .method("GET") - .uri(url.clone()) - .header("Host", host) - .header("Origin", origin) - .header(UPGRADE, "websocket") - .header(CONNECTION, "upgrade") - .header("Sec-WebSocket-Key", key) - .header("Sec-WebSocket-Version", "13"); + let mut builder = Request::builder() + .method("GET") + .uri(url.clone()) + .header("Host", host) + .header("Origin", origin) + .header(UPGRADE, "websocket") + .header(CONNECTION, "upgrade") + .header("Sec-WebSocket-Key", key) + .header("Sec-WebSocket-Version", "13"); - if !protocols.is_empty() { - builder = builder.header("Sec-WebSocket-Protocol", protocols.join(", ")); - } - - let req = builder.body(Empty::::new())?; - - let stream = tcp.get_http_io(&url).await?; - - let (mut sender, conn) = - hyper_conn::handshake::, Empty>(TokioIo::new(stream)) - .await?; - - wasm_bindgen_futures::spawn_local(async move { - if let Err(e) = conn.with_upgrades().await { - error!("wstcp: error in muxed hyper connection (ws)! {:?}", e); + if !protocols.is_empty() { + builder = builder.header("Sec-WebSocket-Protocol", protocols.join(", ")); } - }); - let mut response = sender.send_request(req).await?; - verify(&response)?; + let req = builder.body(Empty::::new())?; - let mut ws = WebSocket::after_handshake( - TokioIo::new(hyper::upgrade::on(&mut response).await?), - Role::Client, - ); + let stream = tcp.get_http_io(&url).await?; - let (msg_sender, mut rx) = mpsc::channel(1); + let (mut sender, conn) = + hyper_conn::handshake::, Empty>(TokioIo::new(stream)) + .await?; - wasm_bindgen_futures::spawn_local(async move { - loop { - tokio::select! { - frame = ws.read_frame() => { - if let Ok(frame) = frame { - error!("hiii"); - match frame.opcode { - OpCode::Text => { - if let Ok(str) = from_utf8(&frame.payload) { - let _ = onmessage.call1(&JsValue::null(), &jval!(str)); + wasm_bindgen_futures::spawn_local(async move { + if let Err(e) = conn.with_upgrades().await { + error!("wstcp: error in muxed hyper connection (ws)! {:?}", e); + } + }); + + let mut response = sender.send_request(req).await?; + verify(&response)?; + + let mut ws = WebSocket::after_handshake( + TokioIo::new(hyper::upgrade::on(&mut response).await?), + Role::Client, + ); + + let (msg_sender, mut rx) = mpsc::channel(1); + + wasm_bindgen_futures::spawn_local(async move { + loop { + tokio::select! { + frame = ws.read_frame() => { + if let Ok(frame) = frame { + match frame.opcode { + OpCode::Text => { + if let Ok(str) = from_utf8(&frame.payload) { + let _ = onmessage.call1(&JsValue::null(), &jval!(str)); + } } + OpCode::Binary => { + let _ = onmessage.call1( + &JsValue::null(), + &jval!(Uint8Array::from(frame.payload.to_vec().as_slice())), + ); + } + OpCode::Close => { + let _ = onclose.call0(&JsValue::null()); + break; + } + _ => panic!("unknown opcode {:?}", frame.opcode), } - OpCode::Binary => { - let _ = onmessage.call1( - &JsValue::null(), - &jval!(Uint8Array::from(frame.payload.to_vec().as_slice())), - ); - } - OpCode::Close => { - let _ = onclose.call0(&JsValue::null()); - break; - } - _ => panic!("unknown opcode {:?}", frame.opcode), } } - } - msg = rx.recv() => { - if let Some(msg) = msg { - match msg { - EpxMsg::SendText(payload, err) => { - let _ = err.send(ws.write_frame(Frame::text( - Payload::Owned(payload.as_bytes().to_vec()), - )) - .await); + msg = rx.recv() => { + if let Some(msg) = msg { + match msg { + EpxMsg::SendText(payload, err) => { + let _ = err.send(ws.write_frame(Frame::text( + Payload::Owned(payload.as_bytes().to_vec()), + )) + .await); + } + EpxMsg::Close => break, } - EpxMsg::Close => break, + } else { + break; } - } else { - break; } } } - } - let _ = ws.write_frame(Frame::close(CloseCode::Normal.into(), b"")) - .await; - }); + let _ = ws + .write_frame(Frame::close(CloseCode::Normal.into(), b"")) + .await; + }); - onopen - .call0(&Object::default()) - .replace_err("Failed to call onopen")?; + onopen + .call0(&Object::default()) + .replace_err("Failed to call onopen")?; - Ok(Self { msg_sender }) + Ok(Self { msg_sender, onerror }) + }.await; + if let Err(ret) = ret { + let _ = onerr.call1(&JsValue::null(), &jval!(ret.clone())); + Err(ret) + } else { + ret + } } #[wasm_bindgen] pub async fn send(&mut self, payload: String) -> Result<(), JsError> { - let (tx, rx) = oneshot::channel(); - self.msg_sender.send(EpxMsg::SendText(payload, tx)).await?; - Ok(rx.await??) + let onerr = self.onerror.clone(); + let ret: Result<(), JsError> = async move { + let (tx, rx) = oneshot::channel(); + self.msg_sender.send(EpxMsg::SendText(payload, tx)).await?; + Ok(rx.await??) + }.await; + if let Err(ret) = ret { + let _ = onerr.call1(&JsValue::null(), &jval!(ret.clone())); + Err(ret) + } else { + ret + } } #[wasm_bindgen]