raw tls sockets

This commit is contained in:
Toshit Chawda 2024-02-04 00:02:21 -08:00
parent ac39d82a53
commit 54011e1f8a
No known key found for this signature in database
GPG key ID: 91480ED99E2B3D9D
4 changed files with 144 additions and 32 deletions

View file

@ -12,6 +12,7 @@
const should_multiperf_test = params.has("multi_perf_test"); const should_multiperf_test = params.has("multi_perf_test");
const should_perf_test = params.has("perf_test"); const should_perf_test = params.has("perf_test");
const should_ws_test = params.has("ws_test"); const should_ws_test = params.has("ws_test");
const should_tls_test = params.has("rawtls_test");
const log = (str) => { const log = (str) => {
let el = document.createElement("div"); let el = document.createElement("div");
@ -160,6 +161,16 @@
await ws.send("data"); await ws.send("data");
await (new Promise((res, _) => setTimeout(res, 100))); await (new Promise((res, _) => setTimeout(res, 100)));
} }
} else if (should_tls_test) {
let decoder = new TextDecoder();
let ws = await epoxy_client.connect_tls(
() => console.log("opened"),
() => console.log("closed"),
err => console.error(err),
msg => { console.log(msg); console.log(decoder.decode(msg)) },
"alicesworld.tech:443",
);
await ws.send("GET / HTTP 1.1\r\nHost: alicesworld.tech\r\nConnection: close\r\n\r\n");
} else { } else {
let resp = await epoxy_client.fetch("https://httpbin.org/get"); let resp = await epoxy_client.fetch("https://httpbin.org/get");
console.warn(resp, Object.fromEntries(resp.headers)); console.warn(resp, Object.fromEntries(resp.headers));

View file

@ -1,10 +1,12 @@
#![feature(let_chains)] #![feature(let_chains)]
#[macro_use] #[macro_use]
mod utils; mod utils;
mod tls_stream;
mod tokioio; mod tokioio;
mod websocket; mod websocket;
mod wrappers; mod wrappers;
use tls_stream::EpxTlsStream;
use tokioio::TokioIo; use tokioio::TokioIo;
use utils::{ReplaceErr, UriExt}; use utils::{ReplaceErr, UriExt};
use websocket::EpxWebSocket; use websocket::EpxWebSocket;
@ -15,10 +17,7 @@ use std::sync::Arc;
use async_compression::tokio::bufread as async_comp; use async_compression::tokio::bufread as async_comp;
use async_io_stream::IoStream; use async_io_stream::IoStream;
use bytes::Bytes; use bytes::Bytes;
use futures_util::{ use futures_util::{stream::SplitSink, StreamExt};
stream::SplitSink,
StreamExt,
};
use http::{uri, HeaderName, HeaderValue, Request, Response}; use http::{uri, HeaderName, HeaderValue, Request, Response};
use hyper::{body::Incoming, client::conn::http1::Builder, Uri}; use hyper::{body::Incoming, client::conn::http1::Builder, Uri};
use js_sys::{Array, Function, Object, Reflect, Uint8Array}; use js_sys::{Array, Function, Object, Reflect, Uint8Array};
@ -30,7 +29,7 @@ use tokio_util::{
use wasm_bindgen::prelude::*; use wasm_bindgen::prelude::*;
use web_sys::TextEncoder; use web_sys::TextEncoder;
use wisp_mux::{ClientMux, MuxStreamIo, StreamType}; use wisp_mux::{ClientMux, MuxStreamIo, StreamType};
use ws_stream_wasm::{WsMeta, WsStream, WsMessage}; use ws_stream_wasm::{WsMessage, WsMeta, WsStream};
type HttpBody = http_body_util::Full<Bytes>; type HttpBody = http_body_util::Full<Bytes>;
@ -45,14 +44,14 @@ enum EpxCompression {
Gzip, Gzip,
} }
type EpxTlsStream = TlsStream<IoStream<MuxStreamIo, Vec<u8>>>; type EpxIoTlsStream = TlsStream<IoStream<MuxStreamIo, Vec<u8>>>;
type EpxUnencryptedStream = IoStream<MuxStreamIo, Vec<u8>>; type EpxIoUnencryptedStream = IoStream<MuxStreamIo, Vec<u8>>;
type EpxStream = Either<EpxTlsStream, EpxUnencryptedStream>; type EpxIoStream = Either<EpxIoTlsStream, EpxIoUnencryptedStream>;
async fn send_req( async fn send_req(
req: http::Request<HttpBody>, req: http::Request<HttpBody>,
should_redirect: bool, should_redirect: bool,
io: EpxStream, io: EpxIoStream,
) -> Result<EpxResponse, JsError> { ) -> Result<EpxResponse, JsError> {
let (mut req_sender, conn) = Builder::new() let (mut req_sender, conn) = Builder::new()
.title_case_headers(true) .title_case_headers(true)
@ -175,10 +174,7 @@ impl EpoxyClient {
}) })
} }
async fn get_http_io(&self, url: &Uri) -> Result<EpxStream, JsError> { async fn get_tls_io(&self, url_host: &str, url_port: u16) -> Result<EpxIoTlsStream, JsError> {
let url_host = url.host().replace_err("URL must have a host")?;
let url_port = utils::get_url_port(url)?;
debug!("making channel");
let channel = self let channel = self
.mux .mux
.client_new_stream(StreamType::Tcp, url_host.to_string(), url_port) .client_new_stream(StreamType::Tcp, url_host.to_string(), url_port)
@ -186,8 +182,6 @@ impl EpoxyClient {
.replace_err("Failed to create multiplexor channel")? .replace_err("Failed to create multiplexor channel")?
.into_io() .into_io()
.into_asyncrw(); .into_asyncrw();
if utils::get_is_secure(url)? {
let cloned_uri = url_host.to_string().clone(); let cloned_uri = url_host.to_string().clone();
let connector = TlsConnector::from(self.rustls_config.clone()); let connector = TlsConnector::from(self.rustls_config.clone());
debug!("connecting channel"); debug!("connecting channel");
@ -201,11 +195,29 @@ impl EpoxyClient {
.await .await
.replace_err("Failed to perform TLS handshake")?; .replace_err("Failed to perform TLS handshake")?;
debug!("connected channel"); debug!("connected channel");
Ok(EpxStream::Left(io)) Ok(io)
}
async fn get_http_io(&self, url: &Uri) -> Result<EpxIoStream, JsError> {
let url_host = url.host().replace_err("URL must have a host")?;
let url_port = utils::get_url_port(url)?;
if utils::get_is_secure(url)? {
Ok(EpxIoStream::Left(
self.get_tls_io(url_host, url_port).await?,
))
} else { } else {
debug!("making channel");
let channel = self
.mux
.client_new_stream(StreamType::Tcp, url_host.to_string(), url_port)
.await
.replace_err("Failed to create multiplexor channel")?
.into_io()
.into_asyncrw();
debug!("connecting channel"); debug!("connecting channel");
debug!("connected channel"); debug!("connected channel");
Ok(EpxStream::Right(channel)) Ok(EpxIoStream::Right(channel))
} }
} }
@ -253,11 +265,18 @@ impl EpoxyClient {
.await .await
} }
pub async fn fetch( pub async fn connect_tls(
&self, &self,
onopen: Function,
onclose: Function,
onerror: Function,
onmessage: Function,
url: String, url: String,
options: Object, ) -> Result<EpxTlsStream, JsError> {
) -> Result<web_sys::Response, JsError> { EpxTlsStream::connect(self, onopen, onclose, onerror, onmessage, url).await
}
pub async fn fetch(&self, url: String, options: Object) -> Result<web_sys::Response, JsError> {
let uri = url.parse::<uri::Uri>().replace_err("Failed to parse URL")?; let uri = url.parse::<uri::Uri>().replace_err("Failed to parse URL")?;
let uri_scheme = uri.scheme().replace_err("URL must have a scheme")?; let uri_scheme = uri.scheme().replace_err("URL must have a scheme")?;
if *uri_scheme != uri::Scheme::HTTP && *uri_scheme != uri::Scheme::HTTPS { if *uri_scheme != uri::Scheme::HTTP && *uri_scheme != uri::Scheme::HTTPS {

82
client/src/tls_stream.rs Normal file
View file

@ -0,0 +1,82 @@
use crate::*;
use js_sys::Function;
use tokio::io::{split, AsyncWriteExt, WriteHalf};
use tokio_util::io::ReaderStream;
#[wasm_bindgen]
pub struct EpxTlsStream {
tx: WriteHalf<EpxIoTlsStream>,
onerror: Function,
}
#[wasm_bindgen]
impl EpxTlsStream {
#[wasm_bindgen(constructor)]
pub fn new() -> Result<EpxTlsStream, JsError> {
Err(jerr!("Use EpoxyClient.connect_tls() instead."))
}
// shut up
#[allow(clippy::too_many_arguments)]
pub async fn connect(
tcp: &EpoxyClient,
onopen: Function,
onclose: Function,
onerror: Function,
onmessage: Function,
url: String,
) -> Result<EpxTlsStream, JsError> {
let onerr = onerror.clone();
let ret: Result<EpxTlsStream, JsError> = async move {
let url = Uri::try_from(url).replace_err("Failed to parse URL")?;
let url_host = url.host().replace_err("URL must have a host")?;
let url_port = url.port().replace_err("URL must have a port")?.into();
let io = tcp.get_tls_io(url_host, url_port).await?;
let (rx, tx) = split(io);
let mut rx = ReaderStream::new(rx);
wasm_bindgen_futures::spawn_local(async move {
while let Some(Ok(data)) = rx.next().await {
let _ = onmessage.call1(
&JsValue::null(),
&jval!(Uint8Array::from(data.to_vec().as_slice())),
);
}
let _ = onclose.call0(&JsValue::null());
});
onopen
.call0(&Object::default())
.replace_err("Failed to call onopen")?;
Ok(Self { tx, onerror })
}
.await;
if let Err(ret) = ret {
let _ = onerr.call1(&JsValue::null(), &jval!(ret.clone()));
Err(ret)
} else {
ret
}
}
#[wasm_bindgen]
pub async fn send(&mut self, payload: Uint8Array) -> Result<(), JsError> {
let onerr = self.onerror.clone();
let ret = self.tx.write_all(&payload.to_vec()).await;
if let Err(ret) = ret {
let _ = onerr.call1(&JsValue::null(), &jval!(format!("{}", ret)));
Err(ret.into())
} else {
Ok(ret?)
}
}
#[wasm_bindgen]
pub async fn close(&mut self) -> Result<(), JsError> {
self.tx.shutdown().await?;
Ok(())
}
}

View file

@ -68,7 +68,7 @@ impl EpxWebSocket {
let (mut sender, conn) = Builder::new() let (mut sender, conn) = Builder::new()
.title_case_headers(true) .title_case_headers(true)
.preserve_header_case(true) .preserve_header_case(true)
.handshake::<TokioIo<EpxStream>, Empty<Bytes>>(TokioIo::new(stream)) .handshake::<TokioIo<EpxIoStream>, Empty<Bytes>>(TokioIo::new(stream))
.await?; .await?;
wasm_bindgen_futures::spawn_local(async move { wasm_bindgen_futures::spawn_local(async move {