split the ws

This commit is contained in:
Toshit Chawda 2024-01-15 14:25:41 -08:00
parent 2c54b54b4f
commit 0d6bb2e8ba
No known key found for this signature in database
GPG key ID: 91480ED99E2B3D9D
3 changed files with 59 additions and 73 deletions

View file

@ -30,7 +30,7 @@ wasm-streams = "0.4.0"
either = "1.9.0" either = "1.9.0"
tokio-util = { version = "0.7.10", features = ["io"] } tokio-util = { version = "0.7.10", features = ["io"] }
async-compression = { version = "0.4.5", features = ["tokio", "gzip", "brotli"] } async-compression = { version = "0.4.5", features = ["tokio", "gzip", "brotli"] }
fastwebsockets = { version = "0.6.0" } fastwebsockets = { version = "0.6.0", features = ["simdutf8", "unstable-split"] }
rand = "0.8.5" rand = "0.8.5"
base64 = "0.21.7" base64 = "0.21.7"

View file

@ -73,6 +73,7 @@
); );
while (true) { while (true) {
await ws.send("data"); await ws.send("data");
await (new Promise((res, _) => setTimeout(res, 100)));
} }
} else { } else {
let resp = await epoxy_client.fetch("https://httpbin.org/get"); let resp = await epoxy_client.fetch("https://httpbin.org/get");

View file

@ -1,25 +1,22 @@
use crate::*; use crate::*;
use base64::{engine::general_purpose::STANDARD, Engine}; use base64::{engine::general_purpose::STANDARD, Engine};
use fastwebsockets::{CloseCode, Frame, OpCode, Payload, Role, WebSocket, WebSocketError}; use fastwebsockets::{
CloseCode, FragmentCollectorRead, Frame, OpCode, Payload, Role, WebSocket, WebSocketWrite,
};
use http_body_util::Empty; use http_body_util::Empty;
use hyper::{ use hyper::{
client::conn::http1 as hyper_conn,
header::{CONNECTION, UPGRADE}, header::{CONNECTION, UPGRADE},
upgrade::Upgraded,
StatusCode, StatusCode,
}; };
use js_sys::Function; use js_sys::Function;
use std::str::from_utf8; use std::str::from_utf8;
use tokio::sync::{mpsc, oneshot}; use tokio::io::WriteHalf;
enum EpxMsg {
SendText(String, oneshot::Sender<Result<(), WebSocketError>>),
Close,
}
#[wasm_bindgen] #[wasm_bindgen]
pub struct EpxWebSocket { pub struct EpxWebSocket {
msg_sender: mpsc::Sender<EpxMsg>, tx: WebSocketWrite<WriteHalf<TokioIo<Upgraded>>>,
onerror: Function, onerror: Function,
} }
@ -50,7 +47,6 @@ impl EpxWebSocket {
let rand: [u8; 16] = rand::random(); let rand: [u8; 16] = rand::random();
let key = STANDARD.encode(rand); let key = STANDARD.encode(rand);
let mut builder = Request::builder() let mut builder = Request::builder()
.method("GET") .method("GET")
.uri(url.clone()) .uri(url.clone())
@ -71,7 +67,8 @@ impl EpxWebSocket {
let (mut sender, conn) = Builder::new() let (mut sender, conn) = Builder::new()
.title_case_headers(true) .title_case_headers(true)
.preserve_header_case(true).handshake::<TokioIo<EpxStream>, Empty<Bytes>>(TokioIo::new(stream)) .preserve_header_case(true)
.handshake::<TokioIo<EpxStream>, Empty<Bytes>>(TokioIo::new(stream))
.await?; .await?;
wasm_bindgen_futures::spawn_local(async move { wasm_bindgen_futures::spawn_local(async move {
@ -83,18 +80,26 @@ impl EpxWebSocket {
let mut response = sender.send_request(req).await?; let mut response = sender.send_request(req).await?;
verify(&response)?; verify(&response)?;
let mut ws = WebSocket::after_handshake( let ws = WebSocket::after_handshake(
TokioIo::new(hyper::upgrade::on(&mut response).await?), TokioIo::new(hyper::upgrade::on(&mut response).await?),
Role::Client, Role::Client,
); );
let (msg_sender, mut rx) = mpsc::channel(1); let (rx, tx) = ws.split(tokio::io::split);
let mut rx = FragmentCollectorRead::new(rx);
wasm_bindgen_futures::spawn_local(async move { wasm_bindgen_futures::spawn_local(async move {
loop { while let Ok(frame) = rx
tokio::select! { .read_frame(&mut |arg| async move {
frame = ws.read_frame() => { error!(
if let Ok(frame) = frame { "wtf is an obligated write {:?}, {:?}, {:?}",
arg.fin, arg.opcode, arg.payload
);
Ok::<(), std::io::Error>(())
})
.await
{
match frame.opcode { match frame.opcode {
OpCode::Text => { OpCode::Text => {
if let Ok(str) = from_utf8(&frame.payload) { if let Ok(str) = from_utf8(&frame.payload) {
@ -114,35 +119,15 @@ impl EpxWebSocket {
_ => panic!("unknown opcode {:?}", frame.opcode), _ => panic!("unknown opcode {:?}", frame.opcode),
} }
} }
}
msg = rx.recv() => {
if let Some(msg) = msg {
match msg {
EpxMsg::SendText(payload, err) => {
let _ = err.send(ws.write_frame(Frame::text(
Payload::Owned(payload.as_bytes().to_vec()),
))
.await);
}
EpxMsg::Close => break,
}
} else {
break;
}
}
}
}
let _ = ws
.write_frame(Frame::close(CloseCode::Normal.into(), b""))
.await;
}); });
onopen onopen
.call0(&Object::default()) .call0(&Object::default())
.replace_err("Failed to call onopen")?; .replace_err("Failed to call onopen")?;
Ok(Self { msg_sender, onerror }) Ok(Self { tx, onerror })
}.await; }
.await;
if let Err(ret) = ret { if let Err(ret) = ret {
let _ = onerr.call1(&JsValue::null(), &jval!(ret.clone())); let _ = onerr.call1(&JsValue::null(), &jval!(ret.clone()));
Err(ret) Err(ret)
@ -154,23 +139,23 @@ impl EpxWebSocket {
#[wasm_bindgen] #[wasm_bindgen]
pub async fn send(&mut self, payload: String) -> Result<(), JsError> { pub async fn send(&mut self, payload: String) -> Result<(), JsError> {
let onerr = self.onerror.clone(); let onerr = self.onerror.clone();
let ret: Result<(), JsError> = async move { let ret = self
let (tx, rx) = oneshot::channel(); .tx
self.msg_sender.send(EpxMsg::SendText(payload, tx)).await?; .write_frame(Frame::text(Payload::Owned(payload.as_bytes().to_vec())))
Ok(rx.await??)
}
.await; .await;
if let Err(ret) = ret { if let Err(ret) = ret {
let _ = onerr.call1(&JsValue::null(), &jval!(ret.clone())); let _ = onerr.call1(&JsValue::null(), &jval!(format!("{}", ret)));
Err(ret) Err(ret.into())
} else { } else {
ret Ok(ret?)
} }
} }
#[wasm_bindgen] #[wasm_bindgen]
pub async fn close(&mut self) -> Result<(), JsError> { pub async fn close(&mut self) -> Result<(), JsError> {
self.msg_sender.send(EpxMsg::Close).await?; self.tx
.write_frame(Frame::close(CloseCode::Normal.into(), b""))
.await?;
Ok(()) Ok(())
} }
} }