optimizations and more deadlock fixes

This commit is contained in:
Toshit Chawda 2024-02-03 22:46:19 -08:00
parent be340c0f82
commit ac39d82a53
No known key found for this signature in database
GPG key ID: 91480ED99E2B3D9D
8 changed files with 253 additions and 49 deletions

39
Cargo.lock generated
View file

@ -727,6 +727,16 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "parking_lot"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.9.9"
@ -987,12 +997,35 @@ dependencies = [
"digest",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1"
dependencies = [
"libc",
]
[[package]]
name = "simdutf8"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a"
[[package]]
name = "simple-wisp-client"
version = "0.1.0"
dependencies = [
"bytes",
"fastwebsockets",
"futures",
"http-body-util",
"hyper",
"tokio",
"tokio-native-tls",
"wisp-mux",
]
[[package]]
name = "slab"
version = "0.4.9"
@ -1076,16 +1109,18 @@ dependencies = [
[[package]]
name = "tokio"
version = "1.35.1"
version = "1.36.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104"
checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931"
dependencies = [
"backtrace",
"bytes",
"libc",
"mio",
"num_cpus",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.48.0",

View file

@ -1,10 +1,11 @@
[workspace]
resolver = "2"
members = ["server", "client", "wisp"]
members = ["server", "client", "wisp", "simple-wisp-client"]
[patch.crates-io]
rustls-pki-types = { git = "https://github.com/r58Playz/rustls-pki-types" }
[profile.release]
lto = true
opt-level = 3
opt-level = 'z'
codegen-units = 1

View file

@ -11,7 +11,7 @@ wasm-bindgen --weak-refs --target no-modules --no-modules-global epoxy --out-dir
echo "[ws] bindgen finished"
mv out/epoxy_client_bg.wasm out/epoxy_client_unoptimized.wasm
time wasm-opt -O4 out/epoxy_client_unoptimized.wasm -o out/epoxy_client_bg.wasm
time wasm-opt -Oz --vacuum --dce out/epoxy_client_unoptimized.wasm -o out/epoxy_client_bg.wasm
echo "[ws] optimized"
AUTOGENERATED_SOURCE=$(<"out/epoxy_client.js")

View file

@ -4,16 +4,21 @@
"color:red;font-size:3rem;font-weight:bold"
);
const should_feature_test = (new URL(window.location.href)).searchParams.has("feature_test");
const should_parallel_test = (new URL(window.location.href)).searchParams.has("parallel_test");
const should_perf_test = (new URL(window.location.href)).searchParams.has("perf_test");
const should_ws_test = (new URL(window.location.href)).searchParams.has("ws_test");
const params = (new URL(window.location.href)).searchParams;
const should_feature_test = params.has("feature_test");
const should_multiparallel_test = params.has("multi_parallel_test");
const should_parallel_test = params.has("parallel_test");
const should_multiperf_test = params.has("multi_perf_test");
const should_perf_test = params.has("perf_test");
const should_ws_test = params.has("ws_test");
const log = (str) => {
let el = document.createElement("div");
el.innerText = str;
document.getElementById("logs").appendChild(el);
console.warn(str);
window.scrollTo(0, document.body.scrollHeight);
}
let { EpoxyClient } = await epoxy();
@ -24,6 +29,19 @@
const tconn1 = performance.now();
log(`conn establish took ${tconn1 - tconn0} ms or ${(tconn1 - tconn0) / 1000} s`);
const test_mux = async (url) => {
const t0 = performance.now();
await epoxy_client.fetch(url);
const t1 = performance.now();
return t1 - t0;
};
const test_native = async (url) => {
const t0 = performance.now();
await fetch(url, { cache: "no-store" });
const t1 = performance.now();
return t1 - t0;
};
if (should_feature_test) {
for (const url of [
@ -37,55 +55,78 @@
console.warn(url, resp, Object.fromEntries(resp.headers));
console.warn(await resp.text());
}
} else if (should_multiparallel_test) {
const num_tests = 10;
let total_mux_minus_native = 0;
for (const _ of Array(num_tests).keys()) {
let total_mux = 0;
await Promise.all([...Array(num_tests).keys()].map(async i => {
log(`running mux test ${i}`);
return await test_mux("https://httpbin.org/get");
})).then((vals) => { total_mux = vals.reduce((acc, x) => acc + x, 0) });
total_mux = total_mux / num_tests;
let total_native = 0;
await Promise.all([...Array(num_tests).keys()].map(async i => {
log(`running native test ${i}`);
return await test_native("https://httpbin.org/get");
})).then((vals) => { total_native = vals.reduce((acc, x) => acc + x, 0) });
total_native = total_native / num_tests;
log(`avg mux (${num_tests}) took ${total_mux} ms or ${total_mux / 1000} s`);
log(`avg native (${num_tests}) took ${total_native} ms or ${total_native / 1000} s`);
log(`avg mux - avg native (${num_tests}): ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`);
total_mux_minus_native += total_mux - total_native;
}
total_mux_minus_native = total_mux_minus_native / num_tests;
log(`total mux - native (${num_tests} tests of ${num_tests} reqs): ${total_mux_minus_native} ms or ${total_mux_minus_native / 1000} s`);
} else if (should_parallel_test) {
const test_mux = async (url) => {
const t0 = performance.now();
await epoxy_client.fetch(url);
const t1 = performance.now();
return t1 - t0;
};
const test_native = async (url) => {
const t0 = performance.now();
await fetch(url);
const t1 = performance.now();
return t1 - t0;
};
const num_tests = 10;
let total_mux = 0;
await Promise.all([...Array(num_tests).keys()].map(async i=>{
await Promise.all([...Array(num_tests).keys()].map(async i => {
log(`running mux test ${i}`);
return await test_mux("https://httpbin.org/get");
})).then((vals)=>{total_mux = vals.reduce((acc, x) => acc + x, 0)});
})).then((vals) => { total_mux = vals.reduce((acc, x) => acc + x, 0) });
total_mux = total_mux / num_tests;
let total_native = 0;
await Promise.all([...Array(num_tests).keys()].map(async i=>{
await Promise.all([...Array(num_tests).keys()].map(async i => {
log(`running native test ${i}`);
return await test_native("https://httpbin.org/get");
})).then((vals)=>{total_native = vals.reduce((acc, x) => acc + x, 0)});
})).then((vals) => { total_native = vals.reduce((acc, x) => acc + x, 0) });
total_native = total_native / num_tests;
log(`avg mux (10) took ${total_mux} ms or ${total_mux / 1000} s`);
log(`avg native (10) took ${total_native} ms or ${total_native / 1000} s`);
log(`mux - native: ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`);
log(`avg mux (${num_tests}) took ${total_mux} ms or ${total_mux / 1000} s`);
log(`avg native (${num_tests}) took ${total_native} ms or ${total_native / 1000} s`);
log(`avg mux - avg native (${num_tests}): ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`);
} else if (should_multiperf_test) {
const num_tests = 10;
let total_mux_minus_native = 0;
for (const _ of Array(num_tests).keys()) {
let total_mux = 0;
for (const i of Array(num_tests).keys()) {
log(`running mux test ${i}`);
total_mux += await test_mux("https://httpbin.org/get");
}
total_mux = total_mux / num_tests;
let total_native = 0;
for (const i of Array(num_tests).keys()) {
log(`running native test ${i}`);
total_native += await test_native("https://httpbin.org/get");
}
total_native = total_native / num_tests;
log(`avg mux (${num_tests}) took ${total_mux} ms or ${total_mux / 1000} s`);
log(`avg native (${num_tests}) took ${total_native} ms or ${total_native / 1000} s`);
log(`avg mux - avg native (${num_tests}): ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`);
total_mux_minus_native += total_mux - total_native;
}
total_mux_minus_native = total_mux_minus_native / num_tests;
log(`total mux - native (${num_tests} tests of ${num_tests} reqs): ${total_mux_minus_native} ms or ${total_mux_minus_native / 1000} s`);
} else if (should_perf_test) {
const test_mux = async (url) => {
const t0 = performance.now();
await epoxy_client.fetch(url);
const t1 = performance.now();
return t1 - t0;
};
const test_native = async (url) => {
const t0 = performance.now();
await fetch(url);
const t1 = performance.now();
return t1 - t0;
};
const num_tests = 10;
let total_mux = 0;
@ -102,9 +143,9 @@
}
total_native = total_native / num_tests;
log(`avg mux (10) took ${total_mux} ms or ${total_mux / 1000} s`);
log(`avg native (10) took ${total_native} ms or ${total_native / 1000} s`);
log(`mux - native: ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`);
log(`avg mux (${num_tests}) took ${total_mux} ms or ${total_mux / 1000} s`);
log(`avg native (${num_tests}) took ${total_native} ms or ${total_native / 1000} s`);
log(`avg mux - avg native (${num_tests}): ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`);
} else if (should_ws_test) {
let ws = await epoxy_client.connect_ws(
() => console.log("opened"),

View file

@ -73,10 +73,12 @@ async fn send_req(
None
};
debug!("sending req");
let res = req_sender
.send_request(req)
.await
.replace_err("Failed to send request");
debug!("recieved res");
match res {
Ok(res) => {
if utils::is_redirect(res.status().as_u16())
@ -176,6 +178,7 @@ impl EpoxyClient {
async fn get_http_io(&self, url: &Uri) -> Result<EpxStream, JsError> {
let url_host = url.host().replace_err("URL must have a host")?;
let url_port = utils::get_url_port(url)?;
debug!("making channel");
let channel = self
.mux
.client_new_stream(StreamType::Tcp, url_host.to_string(), url_port)
@ -187,6 +190,7 @@ impl EpoxyClient {
if utils::get_is_secure(url)? {
let cloned_uri = url_host.to_string().clone();
let connector = TlsConnector::from(self.rustls_config.clone());
debug!("connecting channel");
let io = connector
.connect(
cloned_uri
@ -196,8 +200,11 @@ impl EpoxyClient {
)
.await
.replace_err("Failed to perform TLS handshake")?;
debug!("connected channel");
Ok(EpxStream::Left(io))
} else {
debug!("connecting channel");
debug!("connected channel");
Ok(EpxStream::Right(channel))
}
}

View file

@ -0,0 +1,15 @@
[package]
name = "simple-wisp-client"
version = "0.1.0"
edition = "2021"
[dependencies]
bytes = "1.5.0"
fastwebsockets = { version = "0.6.0", features = ["unstable-split", "upgrade"] }
futures = "0.3.30"
http-body-util = "0.1.0"
hyper = { version = "1.1.0", features = ["http1", "client"] }
tokio = { version = "1.36.0", features = ["full"] }
tokio-native-tls = "0.3.1"
wisp-mux = { path = "../wisp", features = ["fastwebsockets"]}

View file

@ -0,0 +1,105 @@
use bytes::Bytes;
use fastwebsockets::{handshake, FragmentCollectorRead};
use futures::io::AsyncWriteExt;
use http_body_util::Empty;
use hyper::{
header::{CONNECTION, UPGRADE},
Request,
};
use std::{error::Error, future::Future};
use tokio::net::TcpStream;
use tokio_native_tls::{native_tls, TlsConnector};
use wisp_mux::{ClientMux, StreamType};
#[derive(Debug)]
struct StrError(String);
impl StrError {
pub fn new(str: &str) -> Self {
Self(str.to_string())
}
}
impl std::fmt::Display for StrError {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
write!(fmt, "{}", self.0)
}
}
impl Error for StrError {}
struct SpawnExecutor;
impl<Fut> hyper::rt::Executor<Fut> for SpawnExecutor
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
fn execute(&self, fut: Fut) {
tokio::task::spawn(fut);
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let addr = std::env::args()
.nth(1)
.ok_or(StrError::new("no src addr"))?;
let addr_port: u16 = std::env::args()
.nth(2)
.ok_or(StrError::new("no src port"))?
.parse()?;
let addr_dest = std::env::args()
.nth(3)
.ok_or(StrError::new("no dest addr"))?;
let addr_dest_port: u16 = std::env::args()
.nth(4)
.ok_or(StrError::new("no dest port"))?
.parse()?;
let socket = TcpStream::connect(format!("{}:{}", &addr, addr_port)).await?;
let cx = TlsConnector::from(native_tls::TlsConnector::builder().build()?);
let socket = cx.connect(&addr, socket).await?;
let req = Request::builder()
.method("GET")
.uri(format!("wss://{}:{}/", &addr, addr_port))
.header("Host", &addr)
.header(UPGRADE, "websocket")
.header(CONNECTION, "upgrade")
.header(
"Sec-WebSocket-Key",
fastwebsockets::handshake::generate_key(),
)
.header("Sec-WebSocket-Version", "13")
.header("Sec-WebSocket-Protocol", "wisp-v1")
.body(Empty::<Bytes>::new())?;
let (ws, _) = handshake::client(&SpawnExecutor, req, socket).await?;
let (rx, tx) = ws.split(tokio::io::split);
let rx = FragmentCollectorRead::new(rx);
let (mux, fut) = ClientMux::new(rx, tx);
tokio::task::spawn(fut);
let mut hi: u64 = 0;
loop {
let mut channel = mux
.client_new_stream(StreamType::Tcp, addr_dest.clone(), addr_dest_port)
.await?
.into_io()
.into_asyncrw();
for _ in 0..10 {
channel.write_all(b"hiiiiiiii").await?;
hi += 1;
println!("said hi {}", hi);
}
}
#[allow(unreachable_code)]
Ok(())
}

View file

@ -168,8 +168,8 @@ impl<W: ws::WebSocketWrite + Send + 'static> ServerMuxInner<W> {
Close(inner_packet) => {
if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) {
let _ = stream.unbounded_send(WsEvent::Close(inner_packet));
self.stream_map.lock().await.remove(&packet.stream_id);
}
self.stream_map.lock().await.remove(&packet.stream_id);
}
}
} else {
@ -276,8 +276,8 @@ impl<W: ws::WebSocketWrite + Send> ClientMuxInner<W> {
Close(inner_packet) => {
if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) {
let _ = stream.unbounded_send(WsEvent::Close(inner_packet));
self.stream_map.lock().await.remove(&packet.stream_id);
}
self.stream_map.lock().await.remove(&packet.stream_id);
}
}
}