add udp sockets

This commit is contained in:
Toshit Chawda 2024-03-03 10:47:10 -08:00
parent 75c48ccded
commit 5a24f53454
No known key found for this signature in database
GPG key ID: 91480ED99E2B3D9D
9 changed files with 146 additions and 10 deletions

2
Cargo.lock generated
View file

@ -357,7 +357,7 @@ dependencies = [
[[package]]
name = "epoxy-client"
version = "1.1.1"
version = "1.2.0"
dependencies = [
"async-compression",
"async_io_stream",

View file

@ -1,6 +1,6 @@
[package]
name = "epoxy-client"
version = "1.1.1"
version = "1.2.0"
edition = "2021"
license = "LGPL-3.0-only"

View file

@ -2,7 +2,16 @@ import epoxy from "./pkg/epoxy-module-bundled.js";
onmessage = async (msg) => {
console.debug("recieved demo:", msg);
let [should_feature_test, should_multiparallel_test, should_parallel_test, should_multiperf_test, should_perf_test, should_ws_test, should_tls_test] = msg.data;
let [
should_feature_test,
should_multiparallel_test,
should_parallel_test,
should_multiperf_test,
should_perf_test,
should_ws_test,
should_tls_test,
should_udp_test
] = msg.data;
console.log(
"%cWASM is significantly slower with DevTools open!",
"color:red;font-size:3rem;font-weight:bold"
@ -168,10 +177,23 @@ onmessage = async (msg) => {
() => log("opened"),
() => log("closed"),
err => console.error(err),
msg => { console.log(msg); console.log(decoder.decode(msg)) },
msg => { console.log(msg); log(decoder.decode(msg)) },
"alicesworld.tech:443",
);
await ws.send("GET / HTTP 1.1\r\nHost: alicesworld.tech\r\nConnection: close\r\n\r\n");
await (new Promise((res, _) => setTimeout(res, 500)));
await ws.close();
} else if (should_udp_test) {
let decoder = new TextDecoder();
// nc -ulp 5000
let ws = await epoxy_client.connect_udp(
() => log("opened"),
() => log("closed"),
err => console.error(err),
msg => { console.log(msg); log(decoder.decode(msg)) },
"127.0.0.1:5000",
);
await (new Promise((res, _) => setTimeout(res, 5000)));
await ws.close();
} else {
let resp = await epoxy_client.fetch("https://httpbin.org/get");

View file

@ -16,14 +16,24 @@
const should_perf_test = params.has("perf_test");
const should_ws_test = params.has("ws_test");
const should_tls_test = params.has("rawtls_test");
const should_udp_test = params.has("udp_test");
const worker = new Worker("demo.js", {type:'module'});
worker.onmessage = (msg) => {
let el = document.createElement("pre");
el.innerHTML = msg.data;
el.textContent = msg.data;
document.getElementById("logs").appendChild(el);
window.scrollTo(0, document.body.scrollHeight);
};
worker.postMessage([should_feature_test, should_multiparallel_test, should_parallel_test, should_multiperf_test, should_perf_test, should_ws_test, should_tls_test]);
worker.postMessage([
should_feature_test,
should_multiparallel_test,
should_parallel_test,
should_multiperf_test,
should_perf_test,
should_ws_test,
should_tls_test,
should_udp_test
]);
</script>
</head>

View file

@ -1,6 +1,6 @@
{
"name": "@mercuryworkshop/epoxy-tls",
"version": "1.1.1",
"version": "1.2.0",
"description": "A wasm library for using raw encrypted tls/ssl/https/websocket streams on the browser",
"scripts": {
"build": "./build.sh"

View file

@ -2,10 +2,12 @@
#[macro_use]
mod utils;
mod tls_stream;
mod udp_stream;
mod websocket;
mod wrappers;
use tls_stream::EpxTlsStream;
use udp_stream::EpxUdpStream;
use utils::{Boolinator, ReplaceErr, UriExt};
use websocket::EpxWebSocket;
use wrappers::{IncomingBody, TlsWispService, WebSocketWrapper};
@ -247,6 +249,17 @@ impl EpoxyClient {
EpxTlsStream::connect(self, onopen, onclose, onerror, onmessage, url).await
}
pub async fn connect_udp(
&self,
onopen: Function,
onclose: Function,
onerror: Function,
onmessage: Function,
url: String,
) -> Result<EpxUdpStream, JsError> {
EpxUdpStream::connect(self, onopen, onclose, onerror, onmessage, url).await
}
pub async fn fetch(&self, url: String, options: Object) -> Result<web_sys::Response, JsError> {
let uri = url.parse::<uri::Uri>().replace_err("Failed to parse URL")?;
let uri_scheme = uri.scheme().replace_err("URL must have a scheme")?;

View file

@ -19,8 +19,6 @@ impl EpxTlsStream {
Err(jerr!("Use EpoxyClient.connect_tls() instead."))
}
// shut up
#[allow(clippy::too_many_arguments)]
pub async fn connect(
tcp: &EpoxyClient,
onopen: Function,

89
client/src/udp_stream.rs Normal file
View file

@ -0,0 +1,89 @@
use crate::*;
use futures_util::{stream::SplitSink, SinkExt};
use js_sys::Function;
#[wasm_bindgen(inspectable)]
pub struct EpxUdpStream {
tx: SplitSink<MuxStreamIo, Vec<u8>>,
onerror: Function,
#[wasm_bindgen(readonly, getter_with_clone)]
pub url: String,
}
#[wasm_bindgen]
impl EpxUdpStream {
#[wasm_bindgen(constructor)]
pub fn new() -> Result<EpxUdpStream, JsError> {
Err(jerr!("Use EpoxyClient.connect_udp() instead."))
}
pub async fn connect(
tcp: &EpoxyClient,
onopen: Function,
onclose: Function,
onerror: Function,
onmessage: Function,
url: String,
) -> Result<EpxUdpStream, JsError> {
let onerr = onerror.clone();
let ret: Result<EpxUdpStream, JsError> = async move {
let url = Uri::try_from(url).replace_err("Failed to parse URL")?;
let url_host = url.host().replace_err("URL must have a host")?;
let url_port = url.port().replace_err("URL must have a port")?.into();
let io = tcp
.mux
.client_new_stream(StreamType::Udp, url_host.to_string(), url_port)
.await
.replace_err("Failed to open multiplexor channel")?
.into_io();
let (tx, mut rx) = io.split();
wasm_bindgen_futures::spawn_local(async move {
while let Some(Ok(data)) = rx.next().await {
let _ = onmessage.call1(
&JsValue::null(),
&jval!(Uint8Array::from(data.to_vec().as_slice())),
);
}
let _ = onclose.call0(&JsValue::null());
});
onopen
.call0(&Object::default())
.replace_err("Failed to call onopen")?;
Ok(Self {
tx,
onerror,
url: url.to_string(),
})
}
.await;
if let Err(ret) = ret {
let _ = onerr.call1(&JsValue::null(), &jval!(ret.clone()));
Err(ret)
} else {
ret
}
}
#[wasm_bindgen]
pub async fn send(&mut self, payload: Uint8Array) -> Result<(), JsError> {
let onerr = self.onerror.clone();
let ret = self.tx.send(payload.to_vec()).await;
if let Err(ret) = ret {
let _ = onerr.call1(&JsValue::null(), &jval!(format!("{}", ret)));
Err(ret.into())
} else {
Ok(ret?)
}
}
#[wasm_bindgen]
pub async fn close(&mut self) -> Result<(), JsError> {
self.tx.close().await?;
Ok(())
}
}

View file

@ -108,7 +108,11 @@ async fn handle_mux(
.map_err(|x| WispError::Other(Box::new(x)))?;
}
StreamType::Udp => {
let udp_socket = UdpSocket::bind(uri)
let udp_socket = UdpSocket::bind("0.0.0.0:0")
.await
.map_err(|x| WispError::Other(Box::new(x)))?;
udp_socket
.connect(uri)
.await
.map_err(|x| WispError::Other(Box::new(x)))?;
let mut data = vec![0u8; 65507]; // udp standard max datagram size