From da541e7330f5d49193454e2e64ad693fe9122093 Mon Sep 17 00:00:00 2001 From: Toshit Chawda Date: Fri, 11 Oct 2024 08:42:29 -0700 Subject: [PATCH] configurable buffer size --- client/src/io_stream.rs | 4 ++-- client/src/lib.rs | 16 ++++++++++------ client/src/utils.rs | 22 +++++++++++++--------- server/src/config.rs | 3 +++ server/src/handle/wisp/mod.rs | 2 +- 5 files changed, 29 insertions(+), 18 deletions(-) diff --git a/client/src/io_stream.rs b/client/src/io_stream.rs index 5cc79e5..17a3aca 100644 --- a/client/src/io_stream.rs +++ b/client/src/io_stream.rs @@ -53,10 +53,10 @@ fn create_iostream( JsValue::from(out).into() } -pub fn iostream_from_asyncrw(asyncrw: ProviderAsyncRW) -> EpoxyIoStream { +pub fn iostream_from_asyncrw(asyncrw: ProviderAsyncRW, buffer_size: usize) -> EpoxyIoStream { let (rx, tx) = asyncrw.split(); create_iostream( - Box::pin(ReaderStream::new(Box::pin(rx)).map_err(EpoxyError::Io)), + Box::pin(ReaderStream::new(Box::pin(rx), buffer_size).map_err(EpoxyError::Io)), Box::pin(tx.into_sink().sink_map_err(EpoxyError::Io)), ) } diff --git a/client/src/lib.rs b/client/src/lib.rs index eb80325..2952289 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -26,13 +26,12 @@ use send_wrapper::SendWrapper; use stream_provider::{StreamProvider, StreamProviderService}; use thiserror::Error; use utils::{ - asyncread_to_readablestream_stream, bind_ws_connect, convert_body, entries_of_object, + asyncread_to_readablestream, bind_ws_connect, convert_body, entries_of_object, from_entries, is_null_body, is_redirect, object_get, object_set, object_truthy, IncomingBody, UriExt, WasmExecutor, WispTransportRead, WispTransportWrite, }; use wasm_bindgen::prelude::*; use wasm_bindgen_futures::JsFuture; -use wasm_streams::ReadableStream; use web_sys::{ResponseInit, WritableStream}; #[cfg(feature = "full")] use websocket::EpoxyWebSocket; @@ -201,6 +200,7 @@ cfg_if! { #[wasm_bindgen(getter_with_clone)] pub pem_files: Vec, pub disable_certificate_validation: bool, + pub buffer_size: usize, } } else { #[wasm_bindgen] @@ -214,6 +214,7 @@ cfg_if! { #[wasm_bindgen(getter_with_clone)] pub user_agent: String, pub disable_certificate_validation: bool, + pub buffer_size: usize, } } } @@ -238,6 +239,7 @@ impl Default for EpoxyClientOptions { #[cfg(feature = "full")] pem_files: Vec::new(), disable_certificate_validation: false, + buffer_size: 16384, } } } @@ -326,6 +328,7 @@ pub struct EpoxyClient { pub redirect_limit: usize, #[wasm_bindgen(getter_with_clone)] pub user_agent: String, + pub buffer_size: usize, } #[wasm_bindgen] @@ -364,6 +367,7 @@ impl EpoxyClient { certs_tampered: options.disable_certificate_validation || !options.pem_files.is_empty(), #[cfg(not(feature = "full"))] certs_tampered: options.disable_certificate_validation, + buffer_size: options.buffer_size, }) } @@ -391,7 +395,7 @@ impl EpoxyClient { .stream_provider .get_asyncread(StreamType::Tcp, host.to_string(), port) .await?; - Ok(iostream_from_asyncrw(Either::Right(stream))) + Ok(iostream_from_asyncrw(Either::Right(stream), self.buffer_size)) } #[cfg(feature = "full")] @@ -403,7 +407,7 @@ impl EpoxyClient { .stream_provider .get_tls_stream(host.to_string(), port) .await?; - Ok(iostream_from_asyncrw(Either::Left(stream))) + Ok(iostream_from_asyncrw(Either::Left(stream), self.buffer_size)) } #[cfg(feature = "full")] @@ -624,14 +628,14 @@ impl EpoxyClient { }, None => Either::Right(response_body), }; - Some(ReadableStream::from_stream(asyncread_to_readablestream_stream(decompressed_body)).into_raw()) + Some(asyncread_to_readablestream(Box::pin(decompressed_body), self.buffer_size)) } else { None }; } else { let response_stream = if !is_null_body(response.status().as_u16()) { let response_body = IncomingBody::new(response.into_body()).into_async_read(); - Some(ReadableStream::from_stream(asyncread_to_readablestream_stream(response_body)).into_raw()) + Some(asyncread_to_readablestream(Box::pin(response_body))) } else { None }; diff --git a/client/src/utils.rs b/client/src/utils.rs index 52bbb16..b2ad1e7 100644 --- a/client/src/utils.rs +++ b/client/src/utils.rs @@ -24,7 +24,7 @@ use rustls_pki_types::{CertificateDer, ServerName, UnixTime}; use send_wrapper::SendWrapper; use wasm_bindgen::{prelude::*, JsCast, JsValue}; use wasm_bindgen_futures::JsFuture; -use wasm_streams::readable::IntoStream; +use wasm_streams::{readable::IntoStream, ReadableStream}; use web_sys::WritableStreamDefaultWriter; use wisp_mux::{ ws::{Frame, LockedWebSocketWrite, Payload, WebSocketRead, WebSocketWrite}, @@ -119,11 +119,11 @@ pin_project! { } impl ReaderStream { - pub fn new(reader: R) -> Self { + pub fn new(reader: R, capacity: usize) -> Self { ReaderStream { reader: Some(reader), buf: BytesMut::new(), - capacity: 4096, + capacity, } } } @@ -502,12 +502,16 @@ pub fn entries_of_object(obj: &Object) -> Vec> { .collect() } -pub fn asyncread_to_readablestream_stream( - read: R, -) -> impl Stream> { - ReaderStream::new(read) - .map_ok(|x| Uint8Array::from(x.as_ref()).into()) - .map_err(|x| EpoxyError::from(x).into()) +pub fn asyncread_to_readablestream( + read: Pin>, + buffer_size: usize, +) -> web_sys::ReadableStream { + ReadableStream::from_stream( + ReaderStream::new(read, buffer_size) + .map_ok(|x| Uint8Array::from(x.as_ref()).into()) + .map_err(|x| EpoxyError::from(x).into()), + ) + .into_raw() } pub fn object_truthy(val: JsValue) -> Option { diff --git a/server/src/config.rs b/server/src/config.rs index dc8dfa9..ea73aa2 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -176,6 +176,8 @@ pub struct WispConfig { pub struct StreamConfig { /// Whether or not to enable TCP nodelay. pub tcp_nodelay: bool, + /// Buffer size of reads from TCP sockets. + pub buffer_size: usize, /// Whether or not to allow Wisp clients to create UDP streams. pub allow_udp: bool, @@ -393,6 +395,7 @@ impl Default for StreamConfig { fn default() -> Self { Self { tcp_nodelay: false, + buffer_size: 16384, allow_udp: true, allow_wsproxy_udp: false, diff --git a/server/src/handle/wisp/mod.rs b/server/src/handle/wisp/mod.rs index 37dbda3..446e0d3 100644 --- a/server/src/handle/wisp/mod.rs +++ b/server/src/handle/wisp/mod.rs @@ -52,7 +52,7 @@ async fn copy_read_fast( } async fn copy_write_fast(muxtx: MuxStreamWrite, tcprx: OwnedReadHalf) -> anyhow::Result<()> { - let mut tcprx = BufReader::new(tcprx); + let mut tcprx = BufReader::with_capacity(CONFIG.stream.buffer_size, tcprx); loop { let buf = tcprx.fill_buf().await?;