diff --git a/Cargo.lock b/Cargo.lock index eefc704..0d1f3d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -727,6 +727,16 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core", +] + [[package]] name = "parking_lot_core" version = "0.9.9" @@ -987,12 +997,35 @@ dependencies = [ "digest", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + [[package]] name = "simdutf8" version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" +[[package]] +name = "simple-wisp-client" +version = "0.1.0" +dependencies = [ + "bytes", + "fastwebsockets", + "futures", + "http-body-util", + "hyper", + "tokio", + "tokio-native-tls", + "wisp-mux", +] + [[package]] name = "slab" version = "0.4.9" @@ -1076,16 +1109,18 @@ dependencies = [ [[package]] name = "tokio" -version = "1.35.1" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" +checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" dependencies = [ "backtrace", "bytes", "libc", "mio", "num_cpus", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.48.0", diff --git a/Cargo.toml b/Cargo.toml index 032dadf..2fcf5e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,10 +1,11 @@ [workspace] resolver = "2" -members = ["server", "client", "wisp"] +members = ["server", "client", "wisp", "simple-wisp-client"] [patch.crates-io] rustls-pki-types = { git = "https://github.com/r58Playz/rustls-pki-types" } [profile.release] lto = true -opt-level = 3 +opt-level = 'z' +codegen-units = 1 diff --git a/client/build.sh b/client/build.sh index 5439060..9d5a889 100755 --- a/client/build.sh +++ b/client/build.sh @@ -11,7 +11,7 @@ wasm-bindgen --weak-refs --target no-modules --no-modules-global epoxy --out-dir echo "[ws] bindgen finished" mv out/epoxy_client_bg.wasm out/epoxy_client_unoptimized.wasm -time wasm-opt -O4 out/epoxy_client_unoptimized.wasm -o out/epoxy_client_bg.wasm +time wasm-opt -Oz --vacuum --dce out/epoxy_client_unoptimized.wasm -o out/epoxy_client_bg.wasm echo "[ws] optimized" AUTOGENERATED_SOURCE=$(<"out/epoxy_client.js") diff --git a/client/demo.js b/client/demo.js index 8f27fa6..231842a 100644 --- a/client/demo.js +++ b/client/demo.js @@ -4,16 +4,21 @@ "color:red;font-size:3rem;font-weight:bold" ); - const should_feature_test = (new URL(window.location.href)).searchParams.has("feature_test"); - const should_parallel_test = (new URL(window.location.href)).searchParams.has("parallel_test"); - const should_perf_test = (new URL(window.location.href)).searchParams.has("perf_test"); - const should_ws_test = (new URL(window.location.href)).searchParams.has("ws_test"); + const params = (new URL(window.location.href)).searchParams; + + const should_feature_test = params.has("feature_test"); + const should_multiparallel_test = params.has("multi_parallel_test"); + const should_parallel_test = params.has("parallel_test"); + const should_multiperf_test = params.has("multi_perf_test"); + const should_perf_test = params.has("perf_test"); + const should_ws_test = params.has("ws_test"); const log = (str) => { let el = document.createElement("div"); el.innerText = str; document.getElementById("logs").appendChild(el); console.warn(str); + window.scrollTo(0, document.body.scrollHeight); } let { EpoxyClient } = await epoxy(); @@ -24,6 +29,19 @@ const tconn1 = performance.now(); log(`conn establish took ${tconn1 - tconn0} ms or ${(tconn1 - tconn0) / 1000} s`); + const test_mux = async (url) => { + const t0 = performance.now(); + await epoxy_client.fetch(url); + const t1 = performance.now(); + return t1 - t0; + }; + + const test_native = async (url) => { + const t0 = performance.now(); + await fetch(url, { cache: "no-store" }); + const t1 = performance.now(); + return t1 - t0; + }; if (should_feature_test) { for (const url of [ @@ -37,55 +55,78 @@ console.warn(url, resp, Object.fromEntries(resp.headers)); console.warn(await resp.text()); } + } else if (should_multiparallel_test) { + const num_tests = 10; + let total_mux_minus_native = 0; + for (const _ of Array(num_tests).keys()) { + let total_mux = 0; + await Promise.all([...Array(num_tests).keys()].map(async i => { + log(`running mux test ${i}`); + return await test_mux("https://httpbin.org/get"); + })).then((vals) => { total_mux = vals.reduce((acc, x) => acc + x, 0) }); + total_mux = total_mux / num_tests; + + let total_native = 0; + await Promise.all([...Array(num_tests).keys()].map(async i => { + log(`running native test ${i}`); + return await test_native("https://httpbin.org/get"); + })).then((vals) => { total_native = vals.reduce((acc, x) => acc + x, 0) }); + total_native = total_native / num_tests; + + log(`avg mux (${num_tests}) took ${total_mux} ms or ${total_mux / 1000} s`); + log(`avg native (${num_tests}) took ${total_native} ms or ${total_native / 1000} s`); + log(`avg mux - avg native (${num_tests}): ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`); + total_mux_minus_native += total_mux - total_native; + } + total_mux_minus_native = total_mux_minus_native / num_tests; + log(`total mux - native (${num_tests} tests of ${num_tests} reqs): ${total_mux_minus_native} ms or ${total_mux_minus_native / 1000} s`); } else if (should_parallel_test) { - const test_mux = async (url) => { - const t0 = performance.now(); - await epoxy_client.fetch(url); - const t1 = performance.now(); - return t1 - t0; - }; - - const test_native = async (url) => { - const t0 = performance.now(); - await fetch(url); - const t1 = performance.now(); - return t1 - t0; - }; - const num_tests = 10; let total_mux = 0; - await Promise.all([...Array(num_tests).keys()].map(async i=>{ + await Promise.all([...Array(num_tests).keys()].map(async i => { log(`running mux test ${i}`); return await test_mux("https://httpbin.org/get"); - })).then((vals)=>{total_mux = vals.reduce((acc, x) => acc + x, 0)}); + })).then((vals) => { total_mux = vals.reduce((acc, x) => acc + x, 0) }); total_mux = total_mux / num_tests; let total_native = 0; - await Promise.all([...Array(num_tests).keys()].map(async i=>{ + await Promise.all([...Array(num_tests).keys()].map(async i => { log(`running native test ${i}`); return await test_native("https://httpbin.org/get"); - })).then((vals)=>{total_native = vals.reduce((acc, x) => acc + x, 0)}); + })).then((vals) => { total_native = vals.reduce((acc, x) => acc + x, 0) }); total_native = total_native / num_tests; - log(`avg mux (10) took ${total_mux} ms or ${total_mux / 1000} s`); - log(`avg native (10) took ${total_native} ms or ${total_native / 1000} s`); - log(`mux - native: ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`); + log(`avg mux (${num_tests}) took ${total_mux} ms or ${total_mux / 1000} s`); + log(`avg native (${num_tests}) took ${total_native} ms or ${total_native / 1000} s`); + log(`avg mux - avg native (${num_tests}): ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`); + } else if (should_multiperf_test) { + const num_tests = 10; + let total_mux_minus_native = 0; + for (const _ of Array(num_tests).keys()) { + let total_mux = 0; + for (const i of Array(num_tests).keys()) { + log(`running mux test ${i}`); + total_mux += await test_mux("https://httpbin.org/get"); + } + total_mux = total_mux / num_tests; + + let total_native = 0; + for (const i of Array(num_tests).keys()) { + log(`running native test ${i}`); + total_native += await test_native("https://httpbin.org/get"); + } + total_native = total_native / num_tests; + + log(`avg mux (${num_tests}) took ${total_mux} ms or ${total_mux / 1000} s`); + log(`avg native (${num_tests}) took ${total_native} ms or ${total_native / 1000} s`); + log(`avg mux - avg native (${num_tests}): ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`); + total_mux_minus_native += total_mux - total_native; + } + total_mux_minus_native = total_mux_minus_native / num_tests; + log(`total mux - native (${num_tests} tests of ${num_tests} reqs): ${total_mux_minus_native} ms or ${total_mux_minus_native / 1000} s`); + } else if (should_perf_test) { - const test_mux = async (url) => { - const t0 = performance.now(); - await epoxy_client.fetch(url); - const t1 = performance.now(); - return t1 - t0; - }; - - const test_native = async (url) => { - const t0 = performance.now(); - await fetch(url); - const t1 = performance.now(); - return t1 - t0; - }; - const num_tests = 10; let total_mux = 0; @@ -102,9 +143,9 @@ } total_native = total_native / num_tests; - log(`avg mux (10) took ${total_mux} ms or ${total_mux / 1000} s`); - log(`avg native (10) took ${total_native} ms or ${total_native / 1000} s`); - log(`mux - native: ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`); + log(`avg mux (${num_tests}) took ${total_mux} ms or ${total_mux / 1000} s`); + log(`avg native (${num_tests}) took ${total_native} ms or ${total_native / 1000} s`); + log(`avg mux - avg native (${num_tests}): ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`); } else if (should_ws_test) { let ws = await epoxy_client.connect_ws( () => console.log("opened"), diff --git a/client/src/lib.rs b/client/src/lib.rs index dabadf2..0ea0467 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -73,10 +73,12 @@ async fn send_req( None }; + debug!("sending req"); let res = req_sender .send_request(req) .await .replace_err("Failed to send request"); + debug!("recieved res"); match res { Ok(res) => { if utils::is_redirect(res.status().as_u16()) @@ -176,6 +178,7 @@ impl EpoxyClient { async fn get_http_io(&self, url: &Uri) -> Result { let url_host = url.host().replace_err("URL must have a host")?; let url_port = utils::get_url_port(url)?; + debug!("making channel"); let channel = self .mux .client_new_stream(StreamType::Tcp, url_host.to_string(), url_port) @@ -187,6 +190,7 @@ impl EpoxyClient { if utils::get_is_secure(url)? { let cloned_uri = url_host.to_string().clone(); let connector = TlsConnector::from(self.rustls_config.clone()); + debug!("connecting channel"); let io = connector .connect( cloned_uri @@ -196,8 +200,11 @@ impl EpoxyClient { ) .await .replace_err("Failed to perform TLS handshake")?; + debug!("connected channel"); Ok(EpxStream::Left(io)) } else { + debug!("connecting channel"); + debug!("connected channel"); Ok(EpxStream::Right(channel)) } } diff --git a/simple-wisp-client/Cargo.toml b/simple-wisp-client/Cargo.toml new file mode 100644 index 0000000..972d5da --- /dev/null +++ b/simple-wisp-client/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "simple-wisp-client" +version = "0.1.0" +edition = "2021" + +[dependencies] +bytes = "1.5.0" +fastwebsockets = { version = "0.6.0", features = ["unstable-split", "upgrade"] } +futures = "0.3.30" +http-body-util = "0.1.0" +hyper = { version = "1.1.0", features = ["http1", "client"] } +tokio = { version = "1.36.0", features = ["full"] } +tokio-native-tls = "0.3.1" +wisp-mux = { path = "../wisp", features = ["fastwebsockets"]} + diff --git a/simple-wisp-client/src/main.rs b/simple-wisp-client/src/main.rs new file mode 100644 index 0000000..db49784 --- /dev/null +++ b/simple-wisp-client/src/main.rs @@ -0,0 +1,105 @@ +use bytes::Bytes; +use fastwebsockets::{handshake, FragmentCollectorRead}; +use futures::io::AsyncWriteExt; +use http_body_util::Empty; +use hyper::{ + header::{CONNECTION, UPGRADE}, + Request, +}; +use std::{error::Error, future::Future}; +use tokio::net::TcpStream; +use tokio_native_tls::{native_tls, TlsConnector}; +use wisp_mux::{ClientMux, StreamType}; + +#[derive(Debug)] +struct StrError(String); + +impl StrError { + pub fn new(str: &str) -> Self { + Self(str.to_string()) + } +} + +impl std::fmt::Display for StrError { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { + write!(fmt, "{}", self.0) + } +} + +impl Error for StrError {} + +struct SpawnExecutor; + +impl hyper::rt::Executor for SpawnExecutor +where + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, +{ + fn execute(&self, fut: Fut) { + tokio::task::spawn(fut); + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let addr = std::env::args() + .nth(1) + .ok_or(StrError::new("no src addr"))?; + + let addr_port: u16 = std::env::args() + .nth(2) + .ok_or(StrError::new("no src port"))? + .parse()?; + + let addr_dest = std::env::args() + .nth(3) + .ok_or(StrError::new("no dest addr"))?; + + let addr_dest_port: u16 = std::env::args() + .nth(4) + .ok_or(StrError::new("no dest port"))? + .parse()?; + + let socket = TcpStream::connect(format!("{}:{}", &addr, addr_port)).await?; + let cx = TlsConnector::from(native_tls::TlsConnector::builder().build()?); + let socket = cx.connect(&addr, socket).await?; + let req = Request::builder() + .method("GET") + .uri(format!("wss://{}:{}/", &addr, addr_port)) + .header("Host", &addr) + .header(UPGRADE, "websocket") + .header(CONNECTION, "upgrade") + .header( + "Sec-WebSocket-Key", + fastwebsockets::handshake::generate_key(), + ) + .header("Sec-WebSocket-Version", "13") + .header("Sec-WebSocket-Protocol", "wisp-v1") + .body(Empty::::new())?; + + let (ws, _) = handshake::client(&SpawnExecutor, req, socket).await?; + + let (rx, tx) = ws.split(tokio::io::split); + let rx = FragmentCollectorRead::new(rx); + + let (mux, fut) = ClientMux::new(rx, tx); + + tokio::task::spawn(fut); + + let mut hi: u64 = 0; + loop { + let mut channel = mux + .client_new_stream(StreamType::Tcp, addr_dest.clone(), addr_dest_port) + .await? + .into_io() + .into_asyncrw(); + for _ in 0..10 { + channel.write_all(b"hiiiiiiii").await?; + hi += 1; + println!("said hi {}", hi); + } + } + + #[allow(unreachable_code)] + Ok(()) +} diff --git a/wisp/src/lib.rs b/wisp/src/lib.rs index 9326ece..37b81c5 100644 --- a/wisp/src/lib.rs +++ b/wisp/src/lib.rs @@ -168,8 +168,8 @@ impl ServerMuxInner { Close(inner_packet) => { if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { let _ = stream.unbounded_send(WsEvent::Close(inner_packet)); - self.stream_map.lock().await.remove(&packet.stream_id); } + self.stream_map.lock().await.remove(&packet.stream_id); } } } else { @@ -276,8 +276,8 @@ impl ClientMuxInner { Close(inner_packet) => { if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { let _ = stream.unbounded_send(WsEvent::Close(inner_packet)); - self.stream_map.lock().await.remove(&packet.stream_id); } + self.stream_map.lock().await.remove(&packet.stream_id); } } }