From bfee4dc078c61eb9cd1e0b261cd35cdfb7d6d86e Mon Sep 17 00:00:00 2001 From: Toshit Chawda Date: Wed, 16 Oct 2024 21:24:04 -0700 Subject: [PATCH] use a separate hyper connection for websockets --- client/src/lib.rs | 27 +++++++++++++++++++------- client/src/utils.rs | 10 ++++++++++ client/src/websocket.rs | 42 ++++++++++++++++++++++++++++++++++------- 3 files changed, 65 insertions(+), 14 deletions(-) diff --git a/client/src/lib.rs b/client/src/lib.rs index 62a1ee3..187678c 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -238,9 +238,11 @@ cfg_if! { pub wisp_v2: bool, pub udp_extension_required: bool, pub title_case_headers: bool, + pub ws_title_case_headers: bool, #[wasm_bindgen(getter_with_clone)] pub websocket_protocols: Vec, pub redirect_limit: usize, + pub header_limit: usize, #[wasm_bindgen(getter_with_clone)] pub user_agent: String, #[wasm_bindgen(getter_with_clone)] @@ -257,6 +259,7 @@ cfg_if! { #[wasm_bindgen(getter_with_clone)] pub websocket_protocols: Vec, pub redirect_limit: usize, + pub header_limit: usize, #[wasm_bindgen(getter_with_clone)] pub user_agent: String, pub disable_certificate_validation: bool, @@ -276,12 +279,15 @@ impl EpoxyClientOptions { impl Default for EpoxyClientOptions { fn default() -> Self { Self { - wisp_v2: false, - udp_extension_required: false, + wisp_v2: false, + udp_extension_required: false, title_case_headers: false, - websocket_protocols: Vec::new(), - redirect_limit: 10, - user_agent: "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36".to_string(), + #[cfg(feature = "full")] + ws_title_case_headers: true, + websocket_protocols: Vec::new(), + redirect_limit: 10, + header_limit: 200, + user_agent: "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36".to_string(), #[cfg(feature = "full")] pem_files: Vec::new(), disable_certificate_validation: false, @@ -325,6 +331,9 @@ pub struct EpoxyClient { certs_tampered: bool, pub redirect_limit: usize, + header_limit: usize, + #[cfg(feature = "full")] + ws_title_case_headers: bool, #[wasm_bindgen(getter_with_clone)] pub user_agent: String, pub buffer_size: usize, @@ -417,19 +426,23 @@ impl EpoxyClient { let client = Client::builder(WasmExecutor) .http09_responses(true) .http1_title_case_headers(options.title_case_headers) - .http1_max_headers(200) + .http1_max_headers(options.header_limit) .build(service); Ok(Self { stream_provider, client, redirect_limit: options.redirect_limit, + header_limit: options.header_limit, user_agent: options.user_agent, + buffer_size: options.buffer_size, + + #[cfg(feature = "full")] + ws_title_case_headers: options.ws_title_case_headers, #[cfg(feature = "full")] certs_tampered: options.disable_certificate_validation || !options.pem_files.is_empty(), #[cfg(not(feature = "full"))] certs_tampered: options.disable_certificate_validation, - buffer_size: options.buffer_size, }) } diff --git a/client/src/utils.rs b/client/src/utils.rs index d7d836c..b57c152 100644 --- a/client/src/utils.rs +++ b/client/src/utils.rs @@ -37,6 +37,9 @@ use crate::{stream_provider::ProviderUnencryptedAsyncRW, EpoxyError}; extern "C" { #[wasm_bindgen(js_namespace = console, js_name = log)] pub fn js_console_log(s: &str); + + #[wasm_bindgen(js_namespace = console, js_name = error)] + pub fn js_console_error(s: &str); } #[macro_export] @@ -46,6 +49,13 @@ macro_rules! console_log { }; } +#[macro_export] +macro_rules! console_error { + ($($expr:expr),*) => { + $crate::utils::js_console_error(&format!($($expr),*)); + }; +} + pub trait UriExt { fn get_redirect(&self, location: &HeaderValue) -> Result; } diff --git a/client/src/websocket.rs b/client/src/websocket.rs index 80a0f66..b1f9067 100644 --- a/client/src/websocket.rs +++ b/client/src/websocket.rs @@ -12,20 +12,25 @@ use http::{ }, Method, Request, Response, StatusCode, Uri, }; +use http_body_util::Empty; use hyper::{ body::Incoming, + client::conn::http1, upgrade::{self, Upgraded}, }; +use hyper_util_wasm::client::legacy::connect::ConnectSvc; use js_sys::{ArrayBuffer, Function, Uint8Array}; use tokio::io::WriteHalf; use wasm_bindgen::{prelude::*, JsError, JsValue}; use wasm_bindgen_futures::spawn_local; use crate::{ + console_error, + stream_provider::StreamProviderService, tokioio::TokioIo, utils::{entries_of_object, from_entries, ws_key}, EpoxyClient, EpoxyError, EpoxyHandlers, EpoxyUrlInput, EpoxyWebSocketHeadersInput, - EpoxyWebSocketInput, HttpBody, + EpoxyWebSocketInput, }; #[wasm_bindgen] @@ -56,7 +61,7 @@ impl EpoxyWebSocket { let url: Uri = url.try_into()?; let host = url.host().ok_or(EpoxyError::NoUrlHost)?; - let mut request = Request::builder() + let mut req = Request::builder() .method(Method::GET) .uri(url.clone()) .header(HOST, host) @@ -67,24 +72,24 @@ impl EpoxyWebSocket { .header(USER_AGENT, user_agent); if !protocols.is_empty() { - request = request.header(SEC_WEBSOCKET_PROTOCOL, protocols.join(",")); + req = req.header(SEC_WEBSOCKET_PROTOCOL, protocols.join(",")); } if web_sys::Headers::instanceof(&headers) && let Ok(entries) = from_entries(&headers) { for header in entries_of_object(&entries) { - request = request.header(&header[0], &header[1]); + req = req.header(&header[0], &header[1]); } } else if headers.is_truthy() { for header in entries_of_object(&headers.into()) { - request = request.header(&header[0], &header[1]); + req = req.header(&header[0], &header[1]); } } - let request = request.body(HttpBody::new(Bytes::new()))?; + let req = req.body(Empty::new())?; - let mut response = client.client.request(request).await?; + let mut response = request(req, client).await?; verify(&response)?; let websocket = WebSocket::after_handshake( @@ -207,6 +212,29 @@ impl EpoxyWebSocket { } } +async fn request( + req: Request>, + client: &EpoxyClient, +) -> Result, EpoxyError> { + let stream = StreamProviderService(client.stream_provider.clone()) + .connect(req.uri().clone()) + .await?; + + let (mut sender, conn) = http1::Builder::new() + .title_case_headers(client.ws_title_case_headers) + .max_headers(client.header_limit) + .handshake(stream) + .await?; + + spawn_local(async move { + if let Err(err) = conn.with_upgrades().await { + console_error!("websocket connection future failed: {:?}", err); + } + }); + + Ok(sender.send_request(req).await?) +} + // https://github.com/snapview/tungstenite-rs/blob/314feea3055a93e585882fb769854a912a7e6dae/src/handshake/client.rs#L189 fn verify(response: &Response) -> Result<(), EpoxyError> { if response.status() != StatusCode::SWITCHING_PROTOCOLS {