From 0d6bb2e8ba1a12e36c4e1d764fedd1210e392d9a Mon Sep 17 00:00:00 2001 From: Toshit Chawda Date: Mon, 15 Jan 2024 14:25:41 -0800 Subject: [PATCH] split the ws --- client/Cargo.toml | 2 +- client/demo.js | 1 + client/src/websocket.rs | 129 ++++++++++++++++++---------------------- 3 files changed, 59 insertions(+), 73 deletions(-) diff --git a/client/Cargo.toml b/client/Cargo.toml index 376cbc8..9fc6149 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -30,7 +30,7 @@ wasm-streams = "0.4.0" either = "1.9.0" tokio-util = { version = "0.7.10", features = ["io"] } async-compression = { version = "0.4.5", features = ["tokio", "gzip", "brotli"] } -fastwebsockets = { version = "0.6.0" } +fastwebsockets = { version = "0.6.0", features = ["simdutf8", "unstable-split"] } rand = "0.8.5" base64 = "0.21.7" diff --git a/client/demo.js b/client/demo.js index e757b51..da2d24d 100644 --- a/client/demo.js +++ b/client/demo.js @@ -73,6 +73,7 @@ ); while (true) { await ws.send("data"); + await (new Promise((res, _) => setTimeout(res, 100))); } } else { let resp = await epoxy_client.fetch("https://httpbin.org/get"); diff --git a/client/src/websocket.rs b/client/src/websocket.rs index d480a3c..addae2c 100644 --- a/client/src/websocket.rs +++ b/client/src/websocket.rs @@ -1,25 +1,22 @@ use crate::*; use base64::{engine::general_purpose::STANDARD, Engine}; -use fastwebsockets::{CloseCode, Frame, OpCode, Payload, Role, WebSocket, WebSocketError}; +use fastwebsockets::{ + CloseCode, FragmentCollectorRead, Frame, OpCode, Payload, Role, WebSocket, WebSocketWrite, +}; use http_body_util::Empty; use hyper::{ - client::conn::http1 as hyper_conn, header::{CONNECTION, UPGRADE}, + upgrade::Upgraded, StatusCode, }; use js_sys::Function; use std::str::from_utf8; -use tokio::sync::{mpsc, oneshot}; - -enum EpxMsg { - SendText(String, oneshot::Sender>), - Close, -} +use tokio::io::WriteHalf; #[wasm_bindgen] pub struct EpxWebSocket { - msg_sender: mpsc::Sender, + tx: WebSocketWrite>>, onerror: Function, } @@ -43,14 +40,13 @@ impl EpxWebSocket { origin: String, ) -> Result { let onerr = onerror.clone(); - let ret: Result = async move { + let ret: Result = async move { let url = Uri::try_from(url).replace_err("Failed to parse URL")?; let host = url.host().replace_err("URL must have a host")?; let rand: [u8; 16] = rand::random(); let key = STANDARD.encode(rand); - let mut builder = Request::builder() .method("GET") .uri(url.clone()) @@ -71,8 +67,9 @@ impl EpxWebSocket { let (mut sender, conn) = Builder::new() .title_case_headers(true) - .preserve_header_case(true).handshake::, Empty>(TokioIo::new(stream)) - .await?; + .preserve_header_case(true) + .handshake::, Empty>(TokioIo::new(stream)) + .await?; wasm_bindgen_futures::spawn_local(async move { if let Err(e) = conn.with_upgrades().await { @@ -83,81 +80,52 @@ impl EpxWebSocket { let mut response = sender.send_request(req).await?; verify(&response)?; - let mut ws = WebSocket::after_handshake( + let ws = WebSocket::after_handshake( TokioIo::new(hyper::upgrade::on(&mut response).await?), Role::Client, ); - let (msg_sender, mut rx) = mpsc::channel(1); + let (rx, tx) = ws.split(tokio::io::split); + + let mut rx = FragmentCollectorRead::new(rx); wasm_bindgen_futures::spawn_local(async move { - loop { - tokio::select! { - frame = ws.read_frame() => { - if let Ok(frame) = frame { - match frame.opcode { - OpCode::Text => { - if let Ok(str) = from_utf8(&frame.payload) { - let _ = onmessage.call1(&JsValue::null(), &jval!(str)); - } - } - OpCode::Binary => { - let _ = onmessage.call1( - &JsValue::null(), - &jval!(Uint8Array::from(frame.payload.to_vec().as_slice())), - ); - } - OpCode::Close => { - let _ = onclose.call0(&JsValue::null()); - break; - } - _ => panic!("unknown opcode {:?}", frame.opcode), - } + while let Ok(frame) = rx + .read_frame(&mut |arg| async move { + error!( + "wtf is an obligated write {:?}, {:?}, {:?}", + arg.fin, arg.opcode, arg.payload + ); + Ok::<(), std::io::Error>(()) + }) + .await + { + match frame.opcode { + OpCode::Text => { + if let Ok(str) = from_utf8(&frame.payload) { + let _ = onmessage.call1(&JsValue::null(), &jval!(str)); } } - msg = rx.recv() => { - if let Some(msg) = msg { - match msg { - EpxMsg::SendText(payload, err) => { - let _ = err.send(ws.write_frame(Frame::text( - Payload::Owned(payload.as_bytes().to_vec()), - )) - .await); - } - EpxMsg::Close => break, - } - } else { - break; - } + OpCode::Binary => { + let _ = onmessage.call1( + &JsValue::null(), + &jval!(Uint8Array::from(frame.payload.to_vec().as_slice())), + ); } + OpCode::Close => { + let _ = onclose.call0(&JsValue::null()); + break; + } + _ => panic!("unknown opcode {:?}", frame.opcode), } } - let _ = ws - .write_frame(Frame::close(CloseCode::Normal.into(), b"")) - .await; }); onopen .call0(&Object::default()) .replace_err("Failed to call onopen")?; - Ok(Self { msg_sender, onerror }) - }.await; - if let Err(ret) = ret { - let _ = onerr.call1(&JsValue::null(), &jval!(ret.clone())); - Err(ret) - } else { - ret - } - } - - #[wasm_bindgen] - pub async fn send(&mut self, payload: String) -> Result<(), JsError> { - let onerr = self.onerror.clone(); - let ret: Result<(), JsError> = async move { - let (tx, rx) = oneshot::channel(); - self.msg_sender.send(EpxMsg::SendText(payload, tx)).await?; - Ok(rx.await??) + Ok(Self { tx, onerror }) } .await; if let Err(ret) = ret { @@ -168,9 +136,26 @@ impl EpxWebSocket { } } + #[wasm_bindgen] + pub async fn send(&mut self, payload: String) -> Result<(), JsError> { + let onerr = self.onerror.clone(); + let ret = self + .tx + .write_frame(Frame::text(Payload::Owned(payload.as_bytes().to_vec()))) + .await; + if let Err(ret) = ret { + let _ = onerr.call1(&JsValue::null(), &jval!(format!("{}", ret))); + Err(ret.into()) + } else { + Ok(ret?) + } + } + #[wasm_bindgen] pub async fn close(&mut self) -> Result<(), JsError> { - self.msg_sender.send(EpxMsg::Close).await?; + self.tx + .write_frame(Frame::close(CloseCode::Normal.into(), b"")) + .await?; Ok(()) } }