remove unnecessary mut self references

This commit is contained in:
Toshit Chawda 2024-01-31 08:13:06 -08:00
parent 619a2a61c7
commit fa2b84d646
No known key found for this signature in database
GPG key ID: 91480ED99E2B3D9D
6 changed files with 55 additions and 19 deletions

View file

@ -5,6 +5,7 @@
);
const should_feature_test = (new URL(window.location.href)).searchParams.has("feature_test");
const should_parallel_test = (new URL(window.location.href)).searchParams.has("parallel_test");
const should_perf_test = (new URL(window.location.href)).searchParams.has("perf_test");
const should_ws_test = (new URL(window.location.href)).searchParams.has("ws_test");
@ -36,6 +37,40 @@
console.warn(url, resp, Object.fromEntries(resp.headers));
console.warn(await resp.text());
}
} else if (should_parallel_test) {
const test_mux = async (url) => {
const t0 = performance.now();
await epoxy_client.fetch(url);
const t1 = performance.now();
return t1 - t0;
};
const test_native = async (url) => {
const t0 = performance.now();
await fetch(url);
const t1 = performance.now();
return t1 - t0;
};
const num_tests = 10;
let total_mux = 0;
await Promise.all([...Array(num_tests).keys()].map(async i=>{
log(`running mux test ${i}`);
return await test_mux("https://httpbin.org/get");
})).then((vals)=>{total_mux = vals.reduce((acc, x) => acc + x, 0)});
total_mux = total_mux / num_tests;
let total_native = 0;
await Promise.all([...Array(num_tests).keys()].map(async i=>{
log(`running native test ${i}`);
return await test_native("https://httpbin.org/get");
})).then((vals)=>{total_native = vals.reduce((acc, x) => acc + x, 0)});
total_native = total_native / num_tests;
log(`avg mux (10) took ${total_mux} ms or ${total_mux / 1000} s`);
log(`avg native (10) took ${total_native} ms or ${total_native / 1000} s`);
log(`mux - native: ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`);
} else if (should_perf_test) {
const test_mux = async (url) => {
const t0 = performance.now();

View file

@ -173,7 +173,7 @@ impl EpoxyClient {
})
}
async fn get_http_io(&mut self, url: &Uri) -> Result<EpxStream, JsError> {
async fn get_http_io(&self, url: &Uri) -> Result<EpxStream, JsError> {
let url_host = url.host().replace_err("URL must have a host")?;
let url_port = utils::get_url_port(url)?;
let channel = self
@ -203,7 +203,7 @@ impl EpoxyClient {
}
async fn send_req(
&mut self,
&self,
req: http::Request<HttpBody>,
should_redirect: bool,
) -> Result<(hyper::Response<Incoming>, Uri, bool), JsError> {
@ -231,7 +231,7 @@ impl EpoxyClient {
// shut up
#[allow(clippy::too_many_arguments)]
pub async fn connect_ws(
&mut self,
&self,
onopen: Function,
onclose: Function,
onerror: Function,
@ -247,7 +247,7 @@ impl EpoxyClient {
}
pub async fn fetch(
&mut self,
&self,
url: String,
options: Object,
) -> Result<web_sys::Response, JsError> {

View file

@ -30,7 +30,7 @@ impl EpxWebSocket {
// shut up
#[allow(clippy::too_many_arguments)]
pub async fn connect(
tcp: &mut EpoxyClient,
tcp: &EpoxyClient,
onopen: Function,
onclose: Function,
onerror: Function,

View file

@ -117,6 +117,7 @@ async fn handle_mux(
loop {
tokio::select! {
event = stream.read() => {
println!("ws rx");
match event {
Some(event) => match event {
WsEvent::Send(data) => {
@ -128,6 +129,7 @@ async fn handle_mux(
}
},
event = tcp_stream_framed.next() => {
println!("tcp rx");
match event.and_then(|x| x.ok()) {
Some(event) => stream.write(event.into()).await?,
None => return Ok(true),
@ -175,8 +177,8 @@ async fn accept_ws(
println!("{:?}: connected", addr);
ServerMux::handle(rx, tx, &mut |packet, stream| async move {
let mut close_err = stream.get_close_handle();
let mut close_ok = stream.get_close_handle();
let close_err = stream.get_close_handle();
let close_ok = stream.get_close_handle();
tokio::spawn(async move {
let _ = handle_mux(packet, stream)
.or_else(|err| async move {

View file

@ -10,7 +10,7 @@ pub use crate::packet::*;
pub use crate::stream::*;
use dashmap::DashMap;
use futures::{channel::mpsc, Future, StreamExt};
use futures::{channel::mpsc, Future, FutureExt, StreamExt};
use std::sync::{
atomic::{AtomicBool, AtomicU32, Ordering},
Arc,
@ -109,11 +109,10 @@ impl<W: ws::WebSocketWrite + Send + 'static> ServerMux<W> {
R: ws::WebSocketRead,
FR: std::future::Future<Output = Result<(), WispError>>,
{
futures::try_join! {
self.server_close_loop(close_rx, self.stream_map.clone(), self.tx.clone()),
self.server_msg_loop(rx, handler_fn)
futures::select! {
x = self.server_close_loop(close_rx, self.stream_map.clone(), self.tx.clone()).fuse() => x,
x = self.server_msg_loop(rx, handler_fn).fuse() => x
}
.map(|_| ())
}
async fn server_close_loop(
@ -299,7 +298,7 @@ impl<W: ws::WebSocketWrite + Send + 'static> ClientMux<W> {
}
pub async fn client_new_stream(
&mut self,
&self,
stream_type: StreamType,
host: String,
port: u16,

View file

@ -69,7 +69,7 @@ where
}
impl<W: crate::ws::WebSocketWrite + Send + 'static> MuxStreamWrite<W> {
pub async fn write(&mut self, data: Bytes) -> Result<(), crate::WispError> {
pub async fn write(&self, data: Bytes) -> Result<(), crate::WispError> {
if self.is_closed.load(Ordering::Acquire) {
return Err(crate::WispError::StreamAlreadyClosed);
}
@ -86,7 +86,7 @@ impl<W: crate::ws::WebSocketWrite + Send + 'static> MuxStreamWrite<W> {
}
}
pub async fn close(&mut self, reason: u8) -> Result<(), crate::WispError> {
pub async fn close(&self, reason: u8) -> Result<(), crate::WispError> {
if self.is_closed.load(Ordering::Acquire) {
return Err(crate::WispError::StreamAlreadyClosed);
}
@ -102,7 +102,7 @@ impl<W: crate::ws::WebSocketWrite + Send + 'static> MuxStreamWrite<W> {
}
pub(crate) fn into_sink(self) -> Pin<Box<dyn Sink<Bytes, Error = crate::WispError> + Send>> {
Box::pin(sink::unfold(self, |mut tx, data| async move {
Box::pin(sink::unfold(self, |tx, data| async move {
tx.write(data).await?;
Ok(tx)
}))
@ -155,7 +155,7 @@ impl<W: crate::ws::WebSocketWrite + Send + 'static> MuxStream<W> {
self.rx.read().await
}
pub async fn write(&mut self, data: Bytes) -> Result<(), crate::WispError> {
pub async fn write(&self, data: Bytes) -> Result<(), crate::WispError> {
self.tx.write(data).await
}
@ -163,7 +163,7 @@ impl<W: crate::ws::WebSocketWrite + Send + 'static> MuxStream<W> {
self.tx.get_close_handle()
}
pub async fn close(&mut self, reason: u8) -> Result<(), crate::WispError> {
pub async fn close(&self, reason: u8) -> Result<(), crate::WispError> {
self.tx.close(reason).await
}
@ -186,7 +186,7 @@ pub struct MuxStreamCloser {
}
impl MuxStreamCloser {
pub async fn close(&mut self, reason: u8) -> Result<(), crate::WispError> {
pub async fn close(&self, reason: u8) -> Result<(), crate::WispError> {
if self.is_closed.load(Ordering::Acquire) {
return Err(crate::WispError::StreamAlreadyClosed);
}