diff --git a/client/demo.js b/client/demo.js index 0a39700..8f27fa6 100644 --- a/client/demo.js +++ b/client/demo.js @@ -5,6 +5,7 @@ ); const should_feature_test = (new URL(window.location.href)).searchParams.has("feature_test"); + const should_parallel_test = (new URL(window.location.href)).searchParams.has("parallel_test"); const should_perf_test = (new URL(window.location.href)).searchParams.has("perf_test"); const should_ws_test = (new URL(window.location.href)).searchParams.has("ws_test"); @@ -36,6 +37,40 @@ console.warn(url, resp, Object.fromEntries(resp.headers)); console.warn(await resp.text()); } + } else if (should_parallel_test) { + const test_mux = async (url) => { + const t0 = performance.now(); + await epoxy_client.fetch(url); + const t1 = performance.now(); + return t1 - t0; + }; + + const test_native = async (url) => { + const t0 = performance.now(); + await fetch(url); + const t1 = performance.now(); + return t1 - t0; + }; + + const num_tests = 10; + + let total_mux = 0; + await Promise.all([...Array(num_tests).keys()].map(async i=>{ + log(`running mux test ${i}`); + return await test_mux("https://httpbin.org/get"); + })).then((vals)=>{total_mux = vals.reduce((acc, x) => acc + x, 0)}); + total_mux = total_mux / num_tests; + + let total_native = 0; + await Promise.all([...Array(num_tests).keys()].map(async i=>{ + log(`running native test ${i}`); + return await test_native("https://httpbin.org/get"); + })).then((vals)=>{total_native = vals.reduce((acc, x) => acc + x, 0)}); + total_native = total_native / num_tests; + + log(`avg mux (10) took ${total_mux} ms or ${total_mux / 1000} s`); + log(`avg native (10) took ${total_native} ms or ${total_native / 1000} s`); + log(`mux - native: ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`); } else if (should_perf_test) { const test_mux = async (url) => { const t0 = performance.now(); diff --git a/client/src/lib.rs b/client/src/lib.rs index 80b9c58..dabadf2 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -173,7 +173,7 @@ impl EpoxyClient { }) } - async fn get_http_io(&mut self, url: &Uri) -> Result { + async fn get_http_io(&self, url: &Uri) -> Result { let url_host = url.host().replace_err("URL must have a host")?; let url_port = utils::get_url_port(url)?; let channel = self @@ -203,7 +203,7 @@ impl EpoxyClient { } async fn send_req( - &mut self, + &self, req: http::Request, should_redirect: bool, ) -> Result<(hyper::Response, Uri, bool), JsError> { @@ -231,7 +231,7 @@ impl EpoxyClient { // shut up #[allow(clippy::too_many_arguments)] pub async fn connect_ws( - &mut self, + &self, onopen: Function, onclose: Function, onerror: Function, @@ -247,7 +247,7 @@ impl EpoxyClient { } pub async fn fetch( - &mut self, + &self, url: String, options: Object, ) -> Result { diff --git a/client/src/websocket.rs b/client/src/websocket.rs index 2ce9149..addae2c 100644 --- a/client/src/websocket.rs +++ b/client/src/websocket.rs @@ -30,7 +30,7 @@ impl EpxWebSocket { // shut up #[allow(clippy::too_many_arguments)] pub async fn connect( - tcp: &mut EpoxyClient, + tcp: &EpoxyClient, onopen: Function, onclose: Function, onerror: Function, diff --git a/server/src/main.rs b/server/src/main.rs index 7b0b35c..0aa7194 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -117,6 +117,7 @@ async fn handle_mux( loop { tokio::select! { event = stream.read() => { + println!("ws rx"); match event { Some(event) => match event { WsEvent::Send(data) => { @@ -128,6 +129,7 @@ async fn handle_mux( } }, event = tcp_stream_framed.next() => { + println!("tcp rx"); match event.and_then(|x| x.ok()) { Some(event) => stream.write(event.into()).await?, None => return Ok(true), @@ -175,8 +177,8 @@ async fn accept_ws( println!("{:?}: connected", addr); ServerMux::handle(rx, tx, &mut |packet, stream| async move { - let mut close_err = stream.get_close_handle(); - let mut close_ok = stream.get_close_handle(); + let close_err = stream.get_close_handle(); + let close_ok = stream.get_close_handle(); tokio::spawn(async move { let _ = handle_mux(packet, stream) .or_else(|err| async move { diff --git a/wisp/src/lib.rs b/wisp/src/lib.rs index d4f843e..0e75e7d 100644 --- a/wisp/src/lib.rs +++ b/wisp/src/lib.rs @@ -10,7 +10,7 @@ pub use crate::packet::*; pub use crate::stream::*; use dashmap::DashMap; -use futures::{channel::mpsc, Future, StreamExt}; +use futures::{channel::mpsc, Future, FutureExt, StreamExt}; use std::sync::{ atomic::{AtomicBool, AtomicU32, Ordering}, Arc, @@ -109,11 +109,10 @@ impl ServerMux { R: ws::WebSocketRead, FR: std::future::Future>, { - futures::try_join! { - self.server_close_loop(close_rx, self.stream_map.clone(), self.tx.clone()), - self.server_msg_loop(rx, handler_fn) + futures::select! { + x = self.server_close_loop(close_rx, self.stream_map.clone(), self.tx.clone()).fuse() => x, + x = self.server_msg_loop(rx, handler_fn).fuse() => x } - .map(|_| ()) } async fn server_close_loop( @@ -299,7 +298,7 @@ impl ClientMux { } pub async fn client_new_stream( - &mut self, + &self, stream_type: StreamType, host: String, port: u16, diff --git a/wisp/src/stream.rs b/wisp/src/stream.rs index cd9daab..3998c9d 100644 --- a/wisp/src/stream.rs +++ b/wisp/src/stream.rs @@ -69,7 +69,7 @@ where } impl MuxStreamWrite { - pub async fn write(&mut self, data: Bytes) -> Result<(), crate::WispError> { + pub async fn write(&self, data: Bytes) -> Result<(), crate::WispError> { if self.is_closed.load(Ordering::Acquire) { return Err(crate::WispError::StreamAlreadyClosed); } @@ -86,7 +86,7 @@ impl MuxStreamWrite { } } - pub async fn close(&mut self, reason: u8) -> Result<(), crate::WispError> { + pub async fn close(&self, reason: u8) -> Result<(), crate::WispError> { if self.is_closed.load(Ordering::Acquire) { return Err(crate::WispError::StreamAlreadyClosed); } @@ -102,7 +102,7 @@ impl MuxStreamWrite { } pub(crate) fn into_sink(self) -> Pin + Send>> { - Box::pin(sink::unfold(self, |mut tx, data| async move { + Box::pin(sink::unfold(self, |tx, data| async move { tx.write(data).await?; Ok(tx) })) @@ -155,7 +155,7 @@ impl MuxStream { self.rx.read().await } - pub async fn write(&mut self, data: Bytes) -> Result<(), crate::WispError> { + pub async fn write(&self, data: Bytes) -> Result<(), crate::WispError> { self.tx.write(data).await } @@ -163,7 +163,7 @@ impl MuxStream { self.tx.get_close_handle() } - pub async fn close(&mut self, reason: u8) -> Result<(), crate::WispError> { + pub async fn close(&self, reason: u8) -> Result<(), crate::WispError> { self.tx.close(reason).await } @@ -186,7 +186,7 @@ pub struct MuxStreamCloser { } impl MuxStreamCloser { - pub async fn close(&mut self, reason: u8) -> Result<(), crate::WispError> { + pub async fn close(&self, reason: u8) -> Result<(), crate::WispError> { if self.is_closed.load(Ordering::Acquire) { return Err(crate::WispError::StreamAlreadyClosed); }