diff --git a/client/demo.js b/client/demo.js index 231842a..30379b8 100644 --- a/client/demo.js +++ b/client/demo.js @@ -12,6 +12,7 @@ const should_multiperf_test = params.has("multi_perf_test"); const should_perf_test = params.has("perf_test"); const should_ws_test = params.has("ws_test"); + const should_tls_test = params.has("rawtls_test"); const log = (str) => { let el = document.createElement("div"); @@ -160,6 +161,16 @@ await ws.send("data"); await (new Promise((res, _) => setTimeout(res, 100))); } + } else if (should_tls_test) { + let decoder = new TextDecoder(); + let ws = await epoxy_client.connect_tls( + () => console.log("opened"), + () => console.log("closed"), + err => console.error(err), + msg => { console.log(msg); console.log(decoder.decode(msg)) }, + "alicesworld.tech:443", + ); + await ws.send("GET / HTTP 1.1\r\nHost: alicesworld.tech\r\nConnection: close\r\n\r\n"); } else { let resp = await epoxy_client.fetch("https://httpbin.org/get"); console.warn(resp, Object.fromEntries(resp.headers)); diff --git a/client/src/lib.rs b/client/src/lib.rs index 0ea0467..afd7b20 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1,10 +1,12 @@ #![feature(let_chains)] #[macro_use] mod utils; +mod tls_stream; mod tokioio; mod websocket; mod wrappers; +use tls_stream::EpxTlsStream; use tokioio::TokioIo; use utils::{ReplaceErr, UriExt}; use websocket::EpxWebSocket; @@ -15,10 +17,7 @@ use std::sync::Arc; use async_compression::tokio::bufread as async_comp; use async_io_stream::IoStream; use bytes::Bytes; -use futures_util::{ - stream::SplitSink, - StreamExt, -}; +use futures_util::{stream::SplitSink, StreamExt}; use http::{uri, HeaderName, HeaderValue, Request, Response}; use hyper::{body::Incoming, client::conn::http1::Builder, Uri}; use js_sys::{Array, Function, Object, Reflect, Uint8Array}; @@ -30,7 +29,7 @@ use tokio_util::{ use wasm_bindgen::prelude::*; use web_sys::TextEncoder; use wisp_mux::{ClientMux, MuxStreamIo, StreamType}; -use ws_stream_wasm::{WsMeta, WsStream, WsMessage}; +use ws_stream_wasm::{WsMessage, WsMeta, WsStream}; type HttpBody = http_body_util::Full; @@ -45,14 +44,14 @@ enum EpxCompression { Gzip, } -type EpxTlsStream = TlsStream>>; -type EpxUnencryptedStream = IoStream>; -type EpxStream = Either; +type EpxIoTlsStream = TlsStream>>; +type EpxIoUnencryptedStream = IoStream>; +type EpxIoStream = Either; async fn send_req( req: http::Request, should_redirect: bool, - io: EpxStream, + io: EpxIoStream, ) -> Result { let (mut req_sender, conn) = Builder::new() .title_case_headers(true) @@ -175,10 +174,7 @@ impl EpoxyClient { }) } - async fn get_http_io(&self, url: &Uri) -> Result { - let url_host = url.host().replace_err("URL must have a host")?; - let url_port = utils::get_url_port(url)?; - debug!("making channel"); + async fn get_tls_io(&self, url_host: &str, url_port: u16) -> Result { let channel = self .mux .client_new_stream(StreamType::Tcp, url_host.to_string(), url_port) @@ -186,26 +182,42 @@ impl EpoxyClient { .replace_err("Failed to create multiplexor channel")? .into_io() .into_asyncrw(); + let cloned_uri = url_host.to_string().clone(); + let connector = TlsConnector::from(self.rustls_config.clone()); + debug!("connecting channel"); + let io = connector + .connect( + cloned_uri + .try_into() + .replace_err("Failed to parse URL (rustls)")?, + channel, + ) + .await + .replace_err("Failed to perform TLS handshake")?; + debug!("connected channel"); + Ok(io) + } + + async fn get_http_io(&self, url: &Uri) -> Result { + let url_host = url.host().replace_err("URL must have a host")?; + let url_port = utils::get_url_port(url)?; if utils::get_is_secure(url)? { - let cloned_uri = url_host.to_string().clone(); - let connector = TlsConnector::from(self.rustls_config.clone()); - debug!("connecting channel"); - let io = connector - .connect( - cloned_uri - .try_into() - .replace_err("Failed to parse URL (rustls)")?, - channel, - ) - .await - .replace_err("Failed to perform TLS handshake")?; - debug!("connected channel"); - Ok(EpxStream::Left(io)) + Ok(EpxIoStream::Left( + self.get_tls_io(url_host, url_port).await?, + )) } else { + debug!("making channel"); + let channel = self + .mux + .client_new_stream(StreamType::Tcp, url_host.to_string(), url_port) + .await + .replace_err("Failed to create multiplexor channel")? + .into_io() + .into_asyncrw(); debug!("connecting channel"); debug!("connected channel"); - Ok(EpxStream::Right(channel)) + Ok(EpxIoStream::Right(channel)) } } @@ -253,11 +265,18 @@ impl EpoxyClient { .await } - pub async fn fetch( + pub async fn connect_tls( &self, + onopen: Function, + onclose: Function, + onerror: Function, + onmessage: Function, url: String, - options: Object, - ) -> Result { + ) -> Result { + EpxTlsStream::connect(self, onopen, onclose, onerror, onmessage, url).await + } + + pub async fn fetch(&self, url: String, options: Object) -> Result { let uri = url.parse::().replace_err("Failed to parse URL")?; let uri_scheme = uri.scheme().replace_err("URL must have a scheme")?; if *uri_scheme != uri::Scheme::HTTP && *uri_scheme != uri::Scheme::HTTPS { diff --git a/client/src/tls_stream.rs b/client/src/tls_stream.rs new file mode 100644 index 0000000..97e61a7 --- /dev/null +++ b/client/src/tls_stream.rs @@ -0,0 +1,82 @@ +use crate::*; + +use js_sys::Function; +use tokio::io::{split, AsyncWriteExt, WriteHalf}; +use tokio_util::io::ReaderStream; + +#[wasm_bindgen] +pub struct EpxTlsStream { + tx: WriteHalf, + onerror: Function, +} + +#[wasm_bindgen] +impl EpxTlsStream { + #[wasm_bindgen(constructor)] + pub fn new() -> Result { + Err(jerr!("Use EpoxyClient.connect_tls() instead.")) + } + + // shut up + #[allow(clippy::too_many_arguments)] + pub async fn connect( + tcp: &EpoxyClient, + onopen: Function, + onclose: Function, + onerror: Function, + onmessage: Function, + url: String, + ) -> Result { + let onerr = onerror.clone(); + let ret: Result = async move { + let url = Uri::try_from(url).replace_err("Failed to parse URL")?; + let url_host = url.host().replace_err("URL must have a host")?; + let url_port = url.port().replace_err("URL must have a port")?.into(); + + let io = tcp.get_tls_io(url_host, url_port).await?; + let (rx, tx) = split(io); + let mut rx = ReaderStream::new(rx); + + wasm_bindgen_futures::spawn_local(async move { + while let Some(Ok(data)) = rx.next().await { + let _ = onmessage.call1( + &JsValue::null(), + &jval!(Uint8Array::from(data.to_vec().as_slice())), + ); + } + let _ = onclose.call0(&JsValue::null()); + }); + + onopen + .call0(&Object::default()) + .replace_err("Failed to call onopen")?; + + Ok(Self { tx, onerror }) + } + .await; + if let Err(ret) = ret { + let _ = onerr.call1(&JsValue::null(), &jval!(ret.clone())); + Err(ret) + } else { + ret + } + } + + #[wasm_bindgen] + pub async fn send(&mut self, payload: Uint8Array) -> Result<(), JsError> { + let onerr = self.onerror.clone(); + let ret = self.tx.write_all(&payload.to_vec()).await; + if let Err(ret) = ret { + let _ = onerr.call1(&JsValue::null(), &jval!(format!("{}", ret))); + Err(ret.into()) + } else { + Ok(ret?) + } + } + + #[wasm_bindgen] + pub async fn close(&mut self) -> Result<(), JsError> { + self.tx.shutdown().await?; + Ok(()) + } +} diff --git a/client/src/websocket.rs b/client/src/websocket.rs index addae2c..a83b755 100644 --- a/client/src/websocket.rs +++ b/client/src/websocket.rs @@ -68,7 +68,7 @@ impl EpxWebSocket { let (mut sender, conn) = Builder::new() .title_case_headers(true) .preserve_header_case(true) - .handshake::, Empty>(TokioIo::new(stream)) + .handshake::, Empty>(TokioIo::new(stream)) .await?; wasm_bindgen_futures::spawn_local(async move {