configurable buffer size

This commit is contained in:
Toshit Chawda 2024-10-11 08:42:29 -07:00
parent 24f891141e
commit da541e7330
No known key found for this signature in database
GPG key ID: 91480ED99E2B3D9D
5 changed files with 29 additions and 18 deletions

View file

@ -53,10 +53,10 @@ fn create_iostream(
JsValue::from(out).into() JsValue::from(out).into()
} }
pub fn iostream_from_asyncrw(asyncrw: ProviderAsyncRW) -> EpoxyIoStream { pub fn iostream_from_asyncrw(asyncrw: ProviderAsyncRW, buffer_size: usize) -> EpoxyIoStream {
let (rx, tx) = asyncrw.split(); let (rx, tx) = asyncrw.split();
create_iostream( create_iostream(
Box::pin(ReaderStream::new(Box::pin(rx)).map_err(EpoxyError::Io)), Box::pin(ReaderStream::new(Box::pin(rx), buffer_size).map_err(EpoxyError::Io)),
Box::pin(tx.into_sink().sink_map_err(EpoxyError::Io)), Box::pin(tx.into_sink().sink_map_err(EpoxyError::Io)),
) )
} }

View file

@ -26,13 +26,12 @@ use send_wrapper::SendWrapper;
use stream_provider::{StreamProvider, StreamProviderService}; use stream_provider::{StreamProvider, StreamProviderService};
use thiserror::Error; use thiserror::Error;
use utils::{ use utils::{
asyncread_to_readablestream_stream, bind_ws_connect, convert_body, entries_of_object, asyncread_to_readablestream, bind_ws_connect, convert_body, entries_of_object,
from_entries, is_null_body, is_redirect, object_get, object_set, object_truthy, IncomingBody, from_entries, is_null_body, is_redirect, object_get, object_set, object_truthy, IncomingBody,
UriExt, WasmExecutor, WispTransportRead, WispTransportWrite, UriExt, WasmExecutor, WispTransportRead, WispTransportWrite,
}; };
use wasm_bindgen::prelude::*; use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::JsFuture; use wasm_bindgen_futures::JsFuture;
use wasm_streams::ReadableStream;
use web_sys::{ResponseInit, WritableStream}; use web_sys::{ResponseInit, WritableStream};
#[cfg(feature = "full")] #[cfg(feature = "full")]
use websocket::EpoxyWebSocket; use websocket::EpoxyWebSocket;
@ -201,6 +200,7 @@ cfg_if! {
#[wasm_bindgen(getter_with_clone)] #[wasm_bindgen(getter_with_clone)]
pub pem_files: Vec<String>, pub pem_files: Vec<String>,
pub disable_certificate_validation: bool, pub disable_certificate_validation: bool,
pub buffer_size: usize,
} }
} else { } else {
#[wasm_bindgen] #[wasm_bindgen]
@ -214,6 +214,7 @@ cfg_if! {
#[wasm_bindgen(getter_with_clone)] #[wasm_bindgen(getter_with_clone)]
pub user_agent: String, pub user_agent: String,
pub disable_certificate_validation: bool, pub disable_certificate_validation: bool,
pub buffer_size: usize,
} }
} }
} }
@ -238,6 +239,7 @@ impl Default for EpoxyClientOptions {
#[cfg(feature = "full")] #[cfg(feature = "full")]
pem_files: Vec::new(), pem_files: Vec::new(),
disable_certificate_validation: false, disable_certificate_validation: false,
buffer_size: 16384,
} }
} }
} }
@ -326,6 +328,7 @@ pub struct EpoxyClient {
pub redirect_limit: usize, pub redirect_limit: usize,
#[wasm_bindgen(getter_with_clone)] #[wasm_bindgen(getter_with_clone)]
pub user_agent: String, pub user_agent: String,
pub buffer_size: usize,
} }
#[wasm_bindgen] #[wasm_bindgen]
@ -364,6 +367,7 @@ impl EpoxyClient {
certs_tampered: options.disable_certificate_validation || !options.pem_files.is_empty(), certs_tampered: options.disable_certificate_validation || !options.pem_files.is_empty(),
#[cfg(not(feature = "full"))] #[cfg(not(feature = "full"))]
certs_tampered: options.disable_certificate_validation, certs_tampered: options.disable_certificate_validation,
buffer_size: options.buffer_size,
}) })
} }
@ -391,7 +395,7 @@ impl EpoxyClient {
.stream_provider .stream_provider
.get_asyncread(StreamType::Tcp, host.to_string(), port) .get_asyncread(StreamType::Tcp, host.to_string(), port)
.await?; .await?;
Ok(iostream_from_asyncrw(Either::Right(stream))) Ok(iostream_from_asyncrw(Either::Right(stream), self.buffer_size))
} }
#[cfg(feature = "full")] #[cfg(feature = "full")]
@ -403,7 +407,7 @@ impl EpoxyClient {
.stream_provider .stream_provider
.get_tls_stream(host.to_string(), port) .get_tls_stream(host.to_string(), port)
.await?; .await?;
Ok(iostream_from_asyncrw(Either::Left(stream))) Ok(iostream_from_asyncrw(Either::Left(stream), self.buffer_size))
} }
#[cfg(feature = "full")] #[cfg(feature = "full")]
@ -624,14 +628,14 @@ impl EpoxyClient {
}, },
None => Either::Right(response_body), None => Either::Right(response_body),
}; };
Some(ReadableStream::from_stream(asyncread_to_readablestream_stream(decompressed_body)).into_raw()) Some(asyncread_to_readablestream(Box::pin(decompressed_body), self.buffer_size))
} else { } else {
None None
}; };
} else { } else {
let response_stream = if !is_null_body(response.status().as_u16()) { let response_stream = if !is_null_body(response.status().as_u16()) {
let response_body = IncomingBody::new(response.into_body()).into_async_read(); let response_body = IncomingBody::new(response.into_body()).into_async_read();
Some(ReadableStream::from_stream(asyncread_to_readablestream_stream(response_body)).into_raw()) Some(asyncread_to_readablestream(Box::pin(response_body)))
} else { } else {
None None
}; };

View file

@ -24,7 +24,7 @@ use rustls_pki_types::{CertificateDer, ServerName, UnixTime};
use send_wrapper::SendWrapper; use send_wrapper::SendWrapper;
use wasm_bindgen::{prelude::*, JsCast, JsValue}; use wasm_bindgen::{prelude::*, JsCast, JsValue};
use wasm_bindgen_futures::JsFuture; use wasm_bindgen_futures::JsFuture;
use wasm_streams::readable::IntoStream; use wasm_streams::{readable::IntoStream, ReadableStream};
use web_sys::WritableStreamDefaultWriter; use web_sys::WritableStreamDefaultWriter;
use wisp_mux::{ use wisp_mux::{
ws::{Frame, LockedWebSocketWrite, Payload, WebSocketRead, WebSocketWrite}, ws::{Frame, LockedWebSocketWrite, Payload, WebSocketRead, WebSocketWrite},
@ -119,11 +119,11 @@ pin_project! {
} }
impl<R: AsyncRead> ReaderStream<R> { impl<R: AsyncRead> ReaderStream<R> {
pub fn new(reader: R) -> Self { pub fn new(reader: R, capacity: usize) -> Self {
ReaderStream { ReaderStream {
reader: Some(reader), reader: Some(reader),
buf: BytesMut::new(), buf: BytesMut::new(),
capacity: 4096, capacity,
} }
} }
} }
@ -502,12 +502,16 @@ pub fn entries_of_object(obj: &Object) -> Vec<Vec<String>> {
.collect() .collect()
} }
pub fn asyncread_to_readablestream_stream<R: AsyncRead>( pub fn asyncread_to_readablestream(
read: R, read: Pin<Box<dyn AsyncRead>>,
) -> impl Stream<Item = Result<JsValue, JsValue>> { buffer_size: usize,
ReaderStream::new(read) ) -> web_sys::ReadableStream {
.map_ok(|x| Uint8Array::from(x.as_ref()).into()) ReadableStream::from_stream(
.map_err(|x| EpoxyError::from(x).into()) ReaderStream::new(read, buffer_size)
.map_ok(|x| Uint8Array::from(x.as_ref()).into())
.map_err(|x| EpoxyError::from(x).into()),
)
.into_raw()
} }
pub fn object_truthy(val: JsValue) -> Option<JsValue> { pub fn object_truthy(val: JsValue) -> Option<JsValue> {

View file

@ -176,6 +176,8 @@ pub struct WispConfig {
pub struct StreamConfig { pub struct StreamConfig {
/// Whether or not to enable TCP nodelay. /// Whether or not to enable TCP nodelay.
pub tcp_nodelay: bool, pub tcp_nodelay: bool,
/// Buffer size of reads from TCP sockets.
pub buffer_size: usize,
/// Whether or not to allow Wisp clients to create UDP streams. /// Whether or not to allow Wisp clients to create UDP streams.
pub allow_udp: bool, pub allow_udp: bool,
@ -393,6 +395,7 @@ impl Default for StreamConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
tcp_nodelay: false, tcp_nodelay: false,
buffer_size: 16384,
allow_udp: true, allow_udp: true,
allow_wsproxy_udp: false, allow_wsproxy_udp: false,

View file

@ -52,7 +52,7 @@ async fn copy_read_fast(
} }
async fn copy_write_fast(muxtx: MuxStreamWrite, tcprx: OwnedReadHalf) -> anyhow::Result<()> { async fn copy_write_fast(muxtx: MuxStreamWrite, tcprx: OwnedReadHalf) -> anyhow::Result<()> {
let mut tcprx = BufReader::new(tcprx); let mut tcprx = BufReader::with_capacity(CONFIG.stream.buffer_size, tcprx);
loop { loop {
let buf = tcprx.fill_buf().await?; let buf = tcprx.fill_buf().await?;