initial multiplexor

This commit is contained in:
Toshit Chawda 2024-01-09 08:20:14 -08:00
parent 85cf164de0
commit d508f90a62
No known key found for this signature in database
GPG key ID: 91480ED99E2B3D9D
15 changed files with 1253 additions and 257 deletions

151
client/src/lib.rs Normal file
View file

@ -0,0 +1,151 @@
#[macro_use]
mod utils;
mod tokioio;
mod wsstreamwrapper;
use tokioio::TokioIo;
use wsstreamwrapper::WsStreamWrapper;
use std::sync::Arc;
use bytes::Bytes;
use http::{uri, Request};
use hyper::{body::Incoming, client::conn as hyper_conn};
use js_sys::Object;
use penguin_mux_wasm::{Multiplexor, MuxStream, Role};
use tokio_rustls::{client::TlsStream, rustls, rustls::RootCertStore, TlsConnector};
use wasm_bindgen::prelude::*;
type MuxIo = TokioIo<MuxStream<WsStreamWrapper>>;
type MuxRustlsIo = TokioIo<TlsStream<MuxStream<WsStreamWrapper>>>;
type HttpBody = http_body_util::Full<Bytes>;
#[wasm_bindgen(start)]
async fn start() {
utils::set_panic_hook();
}
#[wasm_bindgen]
pub struct WsTcpWorker {
rustls_config: Arc<rustls::ClientConfig>,
mux: Multiplexor<WsStreamWrapper>,
}
#[wasm_bindgen]
impl WsTcpWorker {
#[wasm_bindgen(constructor)]
pub async fn new(ws_url: String) -> Result<WsTcpWorker, JsValue> {
let ws_uri = ws_url
.parse::<uri::Uri>()
.expect_throw("Failed to parse websocket URL");
let ws_uri_scheme = ws_uri
.scheme_str()
.expect_throw("Websocket URL must have a scheme");
if ws_uri_scheme != "ws" && ws_uri_scheme != "wss" {
return Err("Scheme must be either `ws` or `wss`".into());
}
debug!("connecting to ws {:?}", ws_url);
let (ws, wsmeta) = WsStreamWrapper::connect(ws_url, None)
.await
.expect_throw("Failed to connect to websocket");
debug!("connected!");
let mux = Multiplexor::new(ws, Role::Client, None, None);
debug!("wsmeta ready state: {:?}", wsmeta.ready_state());
let mut certstore = RootCertStore::empty();
certstore.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
let rustls_config = Arc::new(
rustls::ClientConfig::builder()
.with_root_certificates(certstore)
.with_no_client_auth(),
);
Ok(WsTcpWorker { mux, rustls_config })
}
pub async fn fetch(&self, url: String, options: Object) -> Result<(), JsValue> {
let uri = url.parse::<uri::Uri>().expect_throw("Failed to parse URL");
let uri_scheme = uri.scheme().expect_throw("URL must have a scheme");
if *uri_scheme != uri::Scheme::HTTP && *uri_scheme != uri::Scheme::HTTPS {
return Err("Scheme must be either `http` or `https`".into());
}
let uri_host = uri.host().expect_throw("URL must have a host");
let uri_port = if let Some(port) = uri.port() {
port.as_u16()
} else {
// can't use match, compiler error
// error: to use a constant of type `Scheme` in a pattern, `Scheme` must be annotated with `#[derive(PartialEq, Eq)]`
if *uri_scheme == uri::Scheme::HTTP {
80
} else if *uri_scheme == uri::Scheme::HTTPS {
443
} else {
return Err("Failed to coerce port from scheme".into());
}
};
let channel = self
.mux
.client_new_stream_channel(uri_host.as_bytes(), uri_port)
.await
.expect_throw("Failed to create multiplexor channel");
let request = Request::builder()
.header("Host", uri_host)
.header("Connection", "close")
.method("GET")
.body(HttpBody::new(Bytes::new()))
.expect_throw("Failed to create request");
let resp: hyper::Response<Incoming>;
if *uri_scheme == uri::Scheme::HTTPS {
let cloned_uri = uri_host.to_string().clone();
let connector = TlsConnector::from(self.rustls_config.clone());
let io = connector
.connect(
cloned_uri
.try_into()
.expect_throw("Failed to parse URL (rustls)"),
channel,
)
.await
.expect_throw("Failed to perform TLS handshake");
let io = TokioIo::new(io);
let (mut req_sender, conn) = hyper_conn::http1::handshake::<MuxRustlsIo, HttpBody>(io)
.await
.expect_throw("Failed to connect to host");
wasm_bindgen_futures::spawn_local(async move {
if let Err(e) = conn.await {
error!("wstcp: error in muxed hyper connection! {:?}", e);
}
});
debug!("sending req tls");
resp = req_sender.send_request(request).await.expect_throw("Failed to send request");
debug!("recieved resp");
} else {
let io = TokioIo::new(channel);
let (mut req_sender, conn) = hyper_conn::http1::handshake::<MuxIo, HttpBody>(io)
.await
.expect_throw("Failed to connect to host");
wasm_bindgen_futures::spawn_local(async move {
if let Err(e) = conn.await {
error!("err in conn: {:?}", e);
}
});
debug!("sending req");
resp = req_sender.send_request(request).await.expect_throw("Failed to send request");
debug!("recieved resp");
}
log!("{:?}", resp);
Ok(())
}
}

171
client/src/tokioio.rs Normal file
View file

@ -0,0 +1,171 @@
#![allow(dead_code)]
// Taken from https://github.com/hyperium/hyper-util/blob/master/src/rt/tokio.rs
// hyper-util fails to compile on WASM as it has a dependency on socket2, but I only need
// hyper-util for TokioIo.
use std::{
pin::Pin,
task::{Context, Poll},
};
use pin_project_lite::pin_project;
pin_project! {
/// A wrapping implementing hyper IO traits for a type that
/// implements Tokio's IO traits.
#[derive(Debug)]
pub struct TokioIo<T> {
#[pin]
inner: T,
}
}
impl<T> TokioIo<T> {
/// Wrap a type implementing Tokio's IO traits.
pub fn new(inner: T) -> Self {
Self { inner }
}
/// Borrow the inner type.
pub fn inner(&self) -> &T {
&self.inner
}
/// Mut borrow the inner type.
pub fn inner_mut(&mut self) -> &mut T {
&mut self.inner
}
/// Consume this wrapper and get the inner type.
pub fn into_inner(self) -> T {
self.inner
}
}
impl<T> hyper::rt::Read for TokioIo<T>
where
T: tokio::io::AsyncRead,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
mut buf: hyper::rt::ReadBufCursor<'_>,
) -> Poll<Result<(), std::io::Error>> {
let n = unsafe {
let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut());
match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) {
Poll::Ready(Ok(())) => tbuf.filled().len(),
other => return other,
}
};
unsafe {
buf.advance(n);
}
Poll::Ready(Ok(()))
}
}
impl<T> hyper::rt::Write for TokioIo<T>
where
T: tokio::io::AsyncWrite,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
tokio::io::AsyncWrite::poll_flush(self.project().inner, cx)
}
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx)
}
fn is_write_vectored(&self) -> bool {
tokio::io::AsyncWrite::is_write_vectored(&self.inner)
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> Poll<Result<usize, std::io::Error>> {
tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs)
}
}
impl<T> tokio::io::AsyncRead for TokioIo<T>
where
T: hyper::rt::Read,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
tbuf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<Result<(), std::io::Error>> {
//let init = tbuf.initialized().len();
let filled = tbuf.filled().len();
let sub_filled = unsafe {
let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut());
match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) {
Poll::Ready(Ok(())) => buf.filled().len(),
other => return other,
}
};
let n_filled = filled + sub_filled;
// At least sub_filled bytes had to have been initialized.
let n_init = sub_filled;
unsafe {
tbuf.assume_init(n_init);
tbuf.set_filled(n_filled);
}
Poll::Ready(Ok(()))
}
}
impl<T> tokio::io::AsyncWrite for TokioIo<T>
where
T: hyper::rt::Write,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
hyper::rt::Write::poll_write(self.project().inner, cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
hyper::rt::Write::poll_flush(self.project().inner, cx)
}
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
hyper::rt::Write::poll_shutdown(self.project().inner, cx)
}
fn is_write_vectored(&self) -> bool {
hyper::rt::Write::is_write_vectored(&self.inner)
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> Poll<Result<usize, std::io::Error>> {
hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs)
}
}

29
client/src/utils.rs Normal file
View file

@ -0,0 +1,29 @@
use wasm_bindgen::prelude::*;
pub fn set_panic_hook() {
#[cfg(feature = "console_error_panic_hook")]
console_error_panic_hook::set_once();
}
#[wasm_bindgen]
extern "C" {
#[wasm_bindgen(js_namespace = console, js_name = debug)]
pub fn console_debug(s: &str);
#[wasm_bindgen(js_namespace = console, js_name = log)]
pub fn console_log(s: &str);
#[wasm_bindgen(js_namespace = console, js_name = error)]
pub fn console_error(s: &str);
}
macro_rules! debug {
($($t:tt)*) => (utils::console_debug(&format_args!($($t)*).to_string()))
}
macro_rules! log {
($($t:tt)*) => (utils::console_log(&format_args!($($t)*).to_string()))
}
macro_rules! error {
($($t:tt)*) => (utils::console_error(&format_args!($($t)*).to_string()))
}

View file

@ -0,0 +1,9 @@
<html>
<head>
<title>wstcp</title>
<script src="wstcp_client.js"></script>
<script src="index.js"></script>
</head>
<body>
</body>
</html>

5
client/src/web/index.js Normal file
View file

@ -0,0 +1,5 @@
(async () => {
await wasm_bindgen("./wstcp_client_bg.wasm");
let wstcp = await new wasm_bindgen.WsTcpWorker("wss://localhost:4000");
await wstcp.fetch("https://alicesworld.tech");
})();

View file

@ -0,0 +1,121 @@
use crate::*;
use std::{
pin::Pin,
task::{Context, Poll},
};
use futures_util::{Sink, Stream};
use penguin_mux_wasm::ws;
use pin_project_lite::pin_project;
use ws_stream_wasm::{WsErr, WsMessage, WsMeta, WsStream};
pin_project! {
pub struct WsStreamWrapper {
#[pin]
ws: WsStream,
}
}
impl WsStreamWrapper {
pub async fn connect(
url: impl AsRef<str>,
protocols: impl Into<Option<Vec<&str>>>,
) -> Result<(Self, WsMeta), WsErr> {
let (wsmeta, wsstream) = WsMeta::connect(url, protocols).await?;
debug!("readystate {:?}", wsstream.ready_state());
Ok((WsStreamWrapper { ws: wsstream }, wsmeta))
}
}
impl Stream for WsStreamWrapper {
type Item = Result<ws::Message, ws::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
debug!("poll_next: {:?}", cx);
let this = self.project();
let ret = this.ws.poll_next(cx);
match ret {
Poll::Ready(item) => Poll::<Option<Self::Item>>::Ready(item.map(|x| {
Ok(match x {
WsMessage::Text(txt) => ws::Message::Text(txt),
WsMessage::Binary(bin) => ws::Message::Binary(bin),
})
})),
Poll::Pending => Poll::<Option<Self::Item>>::Pending,
}
}
}
fn wserr_to_ws_err(err: WsErr) -> ws::Error {
debug!("err: {:?}", err);
match err {
WsErr::ConnectionNotOpen => ws::Error::AlreadyClosed,
_ => ws::Error::Io(std::io::Error::other(format!("{:?}", err))),
}
}
impl Sink<ws::Message> for WsStreamWrapper {
type Error = ws::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
debug!("poll_ready: {:?}", cx);
let this = self.project();
let ret = this.ws.poll_ready(cx);
match ret {
Poll::Ready(item) => Poll::<Result<(), Self::Error>>::Ready(match item {
Ok(_) => Ok(()),
Err(err) => Err(wserr_to_ws_err(err)),
}),
Poll::Pending => Poll::<Result<(), Self::Error>>::Pending,
}
}
fn start_send(self: Pin<&mut Self>, item: ws::Message) -> Result<(), Self::Error> {
debug!("start_send: {:?}", item);
use ws::Message::*;
let item = match item {
Text(txt) => WsMessage::Text(txt),
Binary(bin) => WsMessage::Binary(bin),
Close(_) => {
debug!("closing");
return match self.ws.wrapped().close() {
Ok(_) => Ok(()),
Err(err) => Err(ws::Error::Io(std::io::Error::other(format!(
"ws close err: {:?}",
err
)))),
}
}
Ping(_) | Pong(_) | Frame(_) => return Ok(()),
};
let this = self.project();
let ret = this.ws.start_send(item);
match ret {
Ok(_) => Ok(()),
Err(err) => Err(wserr_to_ws_err(err)),
}
}
// no point wrapping this as it's not going to do anything
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Ok(()).into()
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
debug!("poll closing {:?}", cx);
let this = self.project();
let ret = this.ws.poll_close(cx);
match ret {
Poll::Ready(item) => Poll::<Result<(), Self::Error>>::Ready(match item {
Ok(_) => Ok(()),
Err(err) => Err(wserr_to_ws_err(err)),
}),
Poll::Pending => Poll::<Result<(), Self::Error>>::Pending,
}
}
}
impl ws::WebSocketStream for WsStreamWrapper {
fn ping_auto_pong(&self) -> bool {
true
}
}