diff --git a/client/demo.js b/client/demo.js index 1a37d88..38a7dfd 100644 --- a/client/demo.js +++ b/client/demo.js @@ -16,6 +16,7 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox const test_url = params.get("url"); const wisp_url = params.get("wisp") || "ws://localhost:4000/"; const wisp_v1 = params.has("v1"); + const wisp_udp = params.has("udp_extension"); console.log( "%cWASM is significantly slower with DevTools open!", "color:red;font-size:3rem;font-weight:bold" @@ -33,6 +34,7 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox let epoxy_client_options = new EpoxyClientOptions(); epoxy_client_options.user_agent = navigator.userAgent; epoxy_client_options.wisp_v2 = !wisp_v1; + epoxy_client_options.udp_extension_required = wisp_udp; let epoxy_client; diff --git a/client/src/lib.rs b/client/src/lib.rs index 5c6dc03..568a003 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -27,8 +27,8 @@ use stream_provider::{StreamProvider, StreamProviderService}; use thiserror::Error; use utils::{ asyncread_to_readablestream, convert_body, entries_of_object, from_entries, is_null_body, - is_redirect, object_get, object_set, object_truthy, IncomingBody, UriExt, WasmExecutor, - WispTransportRead, WispTransportWrite, + is_redirect, object_get, object_set, object_truthy, ws_protocol, IncomingBody, UriExt, + WasmExecutor, WispTransportRead, WispTransportWrite, }; use wasm_bindgen::prelude::*; use wasm_bindgen_futures::JsFuture; @@ -118,8 +118,8 @@ pub enum EpoxyError { #[error("Webpki: {0:?} ({0})")] Webpki(#[from] webpki::Error), - #[error("Wisp WebSocket failed to connect")] - WebSocketConnectFailed, + #[error("Wisp WebSocket failed to connect: {0}")] + WebSocketConnectFailed(String), #[error("Custom Wisp transport: {0}")] WispTransport(String), @@ -357,14 +357,20 @@ impl EpoxyClient { let ws_protocols = options.websocket_protocols.clone(); Arc::new(StreamProvider::new( - Box::new(move || { + Box::new(move |wisp_v2| { let wisp_url = wisp_url.clone(); - let ws_protocols = ws_protocols.clone(); + let mut ws_protocols = ws_protocols.clone(); + if wisp_v2 { + // send some random data to ask the server for v2 + ws_protocols.push(ws_protocol()); + } Box::pin(async move { let (write, read) = WebSocketWrapper::connect(&wisp_url, &ws_protocols)?; if !write.wait_for_open().await { - return Err(EpoxyError::WebSocketConnectFailed); + return Err(EpoxyError::WebSocketConnectFailed( + "websocket did not open".to_string(), + )); } Ok(( Box::new(read) as Box, @@ -377,7 +383,7 @@ impl EpoxyClient { } else if let Some(wisp_transport) = transport.dyn_ref::() { let wisp_transport = SendWrapper::new(wisp_transport.clone()); Arc::new(StreamProvider::new( - Box::new(move || { + Box::new(move |_| { let wisp_transport = wisp_transport.clone(); Box::pin(SendWrapper::new(async move { let transport = wisp_transport @@ -431,10 +437,8 @@ impl EpoxyClient { .http1_max_headers(options.header_limit); #[cfg(feature = "full")] - builder - .http2_max_concurrent_reset_streams(10); // set back to default, on wasm it is 0 - let client = builder - .build(service); + builder.http2_max_concurrent_reset_streams(10); // set back to default, on wasm it is 0 + let client = builder.build(service); Ok(Self { stream_provider, diff --git a/client/src/stream_provider.rs b/client/src/stream_provider.rs index 5ffed81..abc7131 100644 --- a/client/src/stream_provider.rs +++ b/client/src/stream_provider.rs @@ -31,7 +31,7 @@ pub type ProviderUnencryptedAsyncRW = MuxStreamAsyncRW; pub type ProviderTlsAsyncRW = IgnoreCloseNotify; pub type ProviderAsyncRW = Either; pub type ProviderWispTransportGenerator = Box< - dyn Fn() -> Pin< + dyn Fn(bool) -> Pin< Box< dyn Future< Output = Result< @@ -124,7 +124,7 @@ impl StreamProvider { None }; - let (read, write) = (self.wisp_generator)().await?; + let (read, write) = (self.wisp_generator)(self.wisp_v2).await?; let client = ClientMux::create(read, write, extensions).await?; let (mux, fut) = if self.udp_extension { diff --git a/client/src/utils.rs b/client/src/utils.rs index a79d2f5..bf27ec3 100644 --- a/client/src/utils.rs +++ b/client/src/utils.rs @@ -417,6 +417,13 @@ export function ws_key() { return btoa(String.fromCharCode.apply(null, key)); } +export function ws_protocol() { + return ( + [1e7]+-1e3+-4e3+-8e3+-1e11).replace(/[018]/g, + c => (c ^ crypto.getRandomValues(new Uint8Array(1))[0] & 15 >> c / 4).toString(16) + ); +} + export function from_entries(entries){ var ret = {}; for(var i = 0; i < entries.length; i++) ret[entries[i][0]] = entries[i][1]; @@ -433,6 +440,7 @@ extern "C" { fn entries_of_object_inner(obj: &Object) -> Vec; pub fn define_property(obj: &Object, key: &str, val: JsValue); pub fn ws_key() -> String; + pub fn ws_protocol() -> String; #[wasm_bindgen(catch)] pub fn from_entries(iterable: &JsValue) -> Result; diff --git a/client/src/ws_wrapper.rs b/client/src/ws_wrapper.rs index 52aeb0f..607df83 100644 --- a/client/src/ws_wrapper.rs +++ b/client/src/ws_wrapper.rs @@ -153,7 +153,7 @@ impl WebSocketWrapper { .into(), ) } - .map_err(|_| EpoxyError::WebSocketConnectFailed)?; + .map_err(|x| EpoxyError::WebSocketConnectFailed(format!("{:?}", x)))?; ws.set_binary_type(BinaryType::Arraybuffer); ws.set_onmessage(Some(onmessage.as_ref().unchecked_ref())); ws.set_onopen(Some(onopen.as_ref().unchecked_ref())); diff --git a/server/src/handle/wisp/mod.rs b/server/src/handle/wisp/mod.rs index 446e0d3..d3eac0d 100644 --- a/server/src/handle/wisp/mod.rs +++ b/server/src/handle/wisp/mod.rs @@ -211,7 +211,7 @@ async fn handle_stream( } } -pub async fn handle_wisp(stream: WispResult, id: String) -> anyhow::Result<()> { +pub async fn handle_wisp(stream: WispResult, is_v2: bool, id: String) -> anyhow::Result<()> { let (read, write) = stream; cfg_if! { if #[cfg(feature = "twisp")] { @@ -232,11 +232,16 @@ pub async fn handle_wisp(stream: WispResult, id: String) -> anyhow::Result<()> { } } - let (mux, fut) = ServerMux::create(read, write, buffer_size, extensions) - .await - .context("failed to create server multiplexor")? - .with_required_extensions(&required_extensions) - .await?; + let (mux, fut) = ServerMux::create( + read, + write, + buffer_size, + if is_v2 { extensions } else { None }, + ) + .await + .context("failed to create server multiplexor")? + .with_required_extensions(&required_extensions) + .await?; let mux = Arc::new(mux); debug!( diff --git a/server/src/main.rs b/server/src/main.rs index 8a75189..a98b122 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -208,7 +208,7 @@ fn handle_stream(stream: ServerRouteResult, id: String) { tokio::spawn(async move { CLIENTS.insert(id.clone(), (DashMap::new(), false)); let res = match stream { - ServerRouteResult::Wisp(stream) => handle_wisp(stream, id.clone()).await, + ServerRouteResult::Wisp(stream, is_v2) => handle_wisp(stream, is_v2, id.clone()).await, ServerRouteResult::WsProxy(ws, path, udp) => { handle_wsproxy(ws, id.clone(), path, udp).await } diff --git a/server/src/route.rs b/server/src/route.rs index 5f910e4..c6fb601 100644 --- a/server/src/route.rs +++ b/server/src/route.rs @@ -5,8 +5,8 @@ use bytes::Bytes; use fastwebsockets::{upgrade::UpgradeFut, FragmentCollector}; use http_body_util::Full; use hyper::{ - body::Incoming, server::conn::http1::Builder, service::service_fn, HeaderMap, Request, - Response, StatusCode, + body::Incoming, header::SEC_WEBSOCKET_PROTOCOL, server::conn::http1::Builder, + service::service_fn, HeaderMap, Request, Response, StatusCode, }; use hyper_util::rt::TokioIo; use log::{debug, error, trace}; @@ -25,6 +25,25 @@ use crate::{ CONFIG, }; +pub type WispResult = ( + Box, + Box, +); + +pub enum ServerRouteResult { + Wisp(WispResult, bool), + WsProxy(WebSocketStreamWrapper, String, bool), +} + +impl Display for ServerRouteResult { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Wisp(..) => write!(f, "Wisp"), + Self::WsProxy(_, path, udp) => write!(f, "WsProxy path {:?} udp {:?}", path, udp), + } + } +} + type Body = Full; fn non_ws_resp() -> anyhow::Result> { Ok(Response::builder() @@ -57,7 +76,7 @@ fn get_header(headers: &HeaderMap, header: &str) -> Option { } enum HttpUpgradeResult { - Wisp, + Wisp(bool), WsProxy(String, bool), } @@ -90,7 +109,7 @@ where let (resp, fut) = fastwebsockets::upgrade::upgrade(&mut req)?; // replace body of Empty with Full - let resp = Response::from_parts(resp.into_parts().0, Body::new(Bytes::new())); + let mut resp = Response::from_parts(resp.into_parts().0, Body::new(Bytes::new())); let headers = req.headers(); let ip_header = if CONFIG.server.use_real_ip_headers { @@ -99,25 +118,27 @@ where None }; - if req - .uri() - .path() - .starts_with(&(CONFIG.wisp.prefix.clone() + "/")) - { + let ws_protocol = headers.get(SEC_WEBSOCKET_PROTOCOL); + let req_path = req.uri().path().to_string(); + + if req_path.starts_with(&(CONFIG.wisp.prefix.clone() + "/")) { + let has_ws_protocol = ws_protocol.is_some(); tokio::spawn(async move { - if let Err(err) = (callback)(fut, HttpUpgradeResult::Wisp, ip_header).await { + if let Err(err) = + (callback)(fut, HttpUpgradeResult::Wisp(has_ws_protocol), ip_header).await + { error!("error while serving client: {:?}", err); } }); + if let Some(protocol) = ws_protocol { + resp.headers_mut() + .append(SEC_WEBSOCKET_PROTOCOL, protocol.clone()); + } } else if CONFIG.wisp.allow_wsproxy { let udp = req.uri().query().unwrap_or_default() == "?udp"; tokio::spawn(async move { - if let Err(err) = (callback)( - fut, - HttpUpgradeResult::WsProxy(req.uri().path().to_string(), udp), - ip_header, - ) - .await + if let Err(err) = + (callback)(fut, HttpUpgradeResult::WsProxy(req_path, udp), ip_header).await { error!("error while serving client: {:?}", err); } @@ -162,7 +183,7 @@ pub async fn route( ws.set_auto_pong(false); match res { - HttpUpgradeResult::Wisp => { + HttpUpgradeResult::Wisp(is_v2) => { let (read, write) = ws.split(|x| { let parts = x .into_inner() @@ -173,10 +194,10 @@ pub async fn route( }); (callback)( - ServerRouteResult::Wisp(( - Box::new(read), - Box::new(write), - )), + ServerRouteResult::Wisp( + (Box::new(read), Box::new(write)), + is_v2, + ), maybe_ip, ) } @@ -208,29 +229,10 @@ pub async fn route( let write = GenericWebSocketWrite::new(FramedWrite::new(write, codec)); (callback)( - ServerRouteResult::Wisp((Box::new(read), Box::new(write))), + ServerRouteResult::Wisp((Box::new(read), Box::new(write)), true), None, ); } } Ok(()) } - -pub type WispResult = ( - Box, - Box, -); - -pub enum ServerRouteResult { - Wisp(WispResult), - WsProxy(WebSocketStreamWrapper, String, bool), -} - -impl Display for ServerRouteResult { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Wisp(_) => write!(f, "Wisp"), - Self::WsProxy(_, path, udp) => write!(f, "WsProxy path {:?} udp {:?}", path, udp), - } - } -}