diff --git a/Cargo.lock b/Cargo.lock index 9555a12..26f9aec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -357,7 +357,7 @@ dependencies = [ [[package]] name = "epoxy-client" -version = "1.1.1" +version = "1.2.0" dependencies = [ "async-compression", "async_io_stream", diff --git a/client/Cargo.toml b/client/Cargo.toml index 877caec..e9d5e6f 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "epoxy-client" -version = "1.1.1" +version = "1.2.0" edition = "2021" license = "LGPL-3.0-only" diff --git a/client/demo.js b/client/demo.js index 8fcbe48..8858e13 100644 --- a/client/demo.js +++ b/client/demo.js @@ -2,7 +2,16 @@ import epoxy from "./pkg/epoxy-module-bundled.js"; onmessage = async (msg) => { console.debug("recieved demo:", msg); - let [should_feature_test, should_multiparallel_test, should_parallel_test, should_multiperf_test, should_perf_test, should_ws_test, should_tls_test] = msg.data; + let [ + should_feature_test, + should_multiparallel_test, + should_parallel_test, + should_multiperf_test, + should_perf_test, + should_ws_test, + should_tls_test, + should_udp_test + ] = msg.data; console.log( "%cWASM is significantly slower with DevTools open!", "color:red;font-size:3rem;font-weight:bold" @@ -168,10 +177,23 @@ onmessage = async (msg) => { () => log("opened"), () => log("closed"), err => console.error(err), - msg => { console.log(msg); console.log(decoder.decode(msg)) }, + msg => { console.log(msg); log(decoder.decode(msg)) }, "alicesworld.tech:443", ); await ws.send("GET / HTTP 1.1\r\nHost: alicesworld.tech\r\nConnection: close\r\n\r\n"); + await (new Promise((res, _) => setTimeout(res, 500))); + await ws.close(); + } else if (should_udp_test) { + let decoder = new TextDecoder(); + // nc -ulp 5000 + let ws = await epoxy_client.connect_udp( + () => log("opened"), + () => log("closed"), + err => console.error(err), + msg => { console.log(msg); log(decoder.decode(msg)) }, + "127.0.0.1:5000", + ); + await (new Promise((res, _) => setTimeout(res, 5000))); await ws.close(); } else { let resp = await epoxy_client.fetch("https://httpbin.org/get"); diff --git a/client/index.html b/client/index.html index 6dda924..c013161 100644 --- a/client/index.html +++ b/client/index.html @@ -16,14 +16,24 @@ const should_perf_test = params.has("perf_test"); const should_ws_test = params.has("ws_test"); const should_tls_test = params.has("rawtls_test"); + const should_udp_test = params.has("udp_test"); const worker = new Worker("demo.js", {type:'module'}); worker.onmessage = (msg) => { let el = document.createElement("pre"); - el.innerHTML = msg.data; + el.textContent = msg.data; document.getElementById("logs").appendChild(el); window.scrollTo(0, document.body.scrollHeight); }; - worker.postMessage([should_feature_test, should_multiparallel_test, should_parallel_test, should_multiperf_test, should_perf_test, should_ws_test, should_tls_test]); + worker.postMessage([ + should_feature_test, + should_multiparallel_test, + should_parallel_test, + should_multiperf_test, + should_perf_test, + should_ws_test, + should_tls_test, + should_udp_test + ]); diff --git a/client/package.json b/client/package.json index 316b643..3c3a2c6 100644 --- a/client/package.json +++ b/client/package.json @@ -1,6 +1,6 @@ { "name": "@mercuryworkshop/epoxy-tls", - "version": "1.1.1", + "version": "1.2.0", "description": "A wasm library for using raw encrypted tls/ssl/https/websocket streams on the browser", "scripts": { "build": "./build.sh" diff --git a/client/src/lib.rs b/client/src/lib.rs index cd26347..ce8e495 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -2,10 +2,12 @@ #[macro_use] mod utils; mod tls_stream; +mod udp_stream; mod websocket; mod wrappers; use tls_stream::EpxTlsStream; +use udp_stream::EpxUdpStream; use utils::{Boolinator, ReplaceErr, UriExt}; use websocket::EpxWebSocket; use wrappers::{IncomingBody, TlsWispService, WebSocketWrapper}; @@ -247,6 +249,17 @@ impl EpoxyClient { EpxTlsStream::connect(self, onopen, onclose, onerror, onmessage, url).await } + pub async fn connect_udp( + &self, + onopen: Function, + onclose: Function, + onerror: Function, + onmessage: Function, + url: String, + ) -> Result { + EpxUdpStream::connect(self, onopen, onclose, onerror, onmessage, url).await + } + pub async fn fetch(&self, url: String, options: Object) -> Result { let uri = url.parse::().replace_err("Failed to parse URL")?; let uri_scheme = uri.scheme().replace_err("URL must have a scheme")?; diff --git a/client/src/tls_stream.rs b/client/src/tls_stream.rs index e9b229e..3975982 100644 --- a/client/src/tls_stream.rs +++ b/client/src/tls_stream.rs @@ -19,8 +19,6 @@ impl EpxTlsStream { Err(jerr!("Use EpoxyClient.connect_tls() instead.")) } - // shut up - #[allow(clippy::too_many_arguments)] pub async fn connect( tcp: &EpoxyClient, onopen: Function, diff --git a/client/src/udp_stream.rs b/client/src/udp_stream.rs new file mode 100644 index 0000000..721fe48 --- /dev/null +++ b/client/src/udp_stream.rs @@ -0,0 +1,89 @@ +use crate::*; + +use futures_util::{stream::SplitSink, SinkExt}; +use js_sys::Function; + +#[wasm_bindgen(inspectable)] +pub struct EpxUdpStream { + tx: SplitSink>, + onerror: Function, + #[wasm_bindgen(readonly, getter_with_clone)] + pub url: String, +} + +#[wasm_bindgen] +impl EpxUdpStream { + #[wasm_bindgen(constructor)] + pub fn new() -> Result { + Err(jerr!("Use EpoxyClient.connect_udp() instead.")) + } + + pub async fn connect( + tcp: &EpoxyClient, + onopen: Function, + onclose: Function, + onerror: Function, + onmessage: Function, + url: String, + ) -> Result { + let onerr = onerror.clone(); + let ret: Result = async move { + let url = Uri::try_from(url).replace_err("Failed to parse URL")?; + let url_host = url.host().replace_err("URL must have a host")?; + let url_port = url.port().replace_err("URL must have a port")?.into(); + + let io = tcp + .mux + .client_new_stream(StreamType::Udp, url_host.to_string(), url_port) + .await + .replace_err("Failed to open multiplexor channel")? + .into_io(); + let (tx, mut rx) = io.split(); + + wasm_bindgen_futures::spawn_local(async move { + while let Some(Ok(data)) = rx.next().await { + let _ = onmessage.call1( + &JsValue::null(), + &jval!(Uint8Array::from(data.to_vec().as_slice())), + ); + } + let _ = onclose.call0(&JsValue::null()); + }); + + onopen + .call0(&Object::default()) + .replace_err("Failed to call onopen")?; + + Ok(Self { + tx, + onerror, + url: url.to_string(), + }) + } + .await; + if let Err(ret) = ret { + let _ = onerr.call1(&JsValue::null(), &jval!(ret.clone())); + Err(ret) + } else { + ret + } + } + + #[wasm_bindgen] + pub async fn send(&mut self, payload: Uint8Array) -> Result<(), JsError> { + let onerr = self.onerror.clone(); + let ret = self.tx.send(payload.to_vec()).await; + if let Err(ret) = ret { + let _ = onerr.call1(&JsValue::null(), &jval!(format!("{}", ret))); + Err(ret.into()) + } else { + Ok(ret?) + } + } + + #[wasm_bindgen] + pub async fn close(&mut self) -> Result<(), JsError> { + self.tx.close().await?; + Ok(()) + } +} diff --git a/server/src/main.rs b/server/src/main.rs index 3411905..dd215e7 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -108,7 +108,11 @@ async fn handle_mux( .map_err(|x| WispError::Other(Box::new(x)))?; } StreamType::Udp => { - let udp_socket = UdpSocket::bind(uri) + let udp_socket = UdpSocket::bind("0.0.0.0:0") + .await + .map_err(|x| WispError::Other(Box::new(x)))?; + udp_socket + .connect(uri) .await .map_err(|x| WispError::Other(Box::new(x)))?; let mut data = vec![0u8; 65507]; // udp standard max datagram size