From 6ca14ad26a86513da6a301ccc6b48b44008734b3 Mon Sep 17 00:00:00 2001 From: Toshit Chawda Date: Mon, 5 Feb 2024 09:08:34 -0800 Subject: [PATCH] partially implement tower trait --- Cargo.lock | 26 +++++++ Cargo.toml | 2 +- client/build.sh | 2 +- wisp/Cargo.toml | 4 ++ wisp/src/lib.rs | 5 ++ wisp/src/tokioio.rs | 171 ++++++++++++++++++++++++++++++++++++++++++++ wisp/src/tower.rs | 13 ++++ 7 files changed, 221 insertions(+), 2 deletions(-) create mode 100644 wisp/src/tokioio.rs create mode 100644 wisp/src/tower.rs diff --git a/Cargo.lock b/Cargo.lock index 40758a6..eed8bfa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1154,12 +1154,36 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + +[[package]] +name = "tower-service" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" + [[package]] name = "tracing" version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ + "log", "pin-project-lite", "tracing-core", ] @@ -1469,8 +1493,10 @@ dependencies = [ "fastwebsockets", "futures", "futures-util", + "hyper", "pin-project-lite", "tokio", + "tower", "ws_stream_wasm", ] diff --git a/Cargo.toml b/Cargo.toml index 2e8971b..2fcf5e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,5 +7,5 @@ rustls-pki-types = { git = "https://github.com/r58Playz/rustls-pki-types" } [profile.release] lto = true -opt-level = 3 +opt-level = 'z' codegen-units = 1 diff --git a/client/build.sh b/client/build.sh index 7c2725b..7f402f0 100755 --- a/client/build.sh +++ b/client/build.sh @@ -11,7 +11,7 @@ wasm-bindgen --weak-refs --target no-modules --no-modules-global epoxy --out-dir echo "[ws] wasm-bindgen finished" mv out/epoxy_client_bg.wasm out/epoxy_client_unoptimized.wasm -time wasm-opt -O4 --vacuum --dce --enable-threads --enable-bulk-memory --enable-simd out/epoxy_client_unoptimized.wasm -o out/epoxy_client_bg.wasm +time wasm-opt -Oz --vacuum --dce --enable-threads --enable-bulk-memory --enable-simd out/epoxy_client_unoptimized.wasm -o out/epoxy_client_bg.wasm echo "[ws] wasm-opt finished" AUTOGENERATED_SOURCE=$(<"out/epoxy_client.js") diff --git a/wisp/Cargo.toml b/wisp/Cargo.toml index fc834d0..9448613 100644 --- a/wisp/Cargo.toml +++ b/wisp/Cargo.toml @@ -9,11 +9,15 @@ bytes = "1.5.0" fastwebsockets = { version = "0.6.0", features = ["unstable-split"], optional = true } futures = "0.3.30" futures-util = "0.3.30" +hyper = { version = "1.1.0", optional = true } pin-project-lite = "0.2.13" tokio = { version = "1.35.1", optional = true } +tower = { version = "0.4.13", optional = true } ws_stream_wasm = { version = "0.7.4", optional = true } [features] fastwebsockets = ["dep:fastwebsockets", "dep:tokio"] ws_stream_wasm = ["dep:ws_stream_wasm"] tokio_io = ["async_io_stream/tokio_io"] +hyper_tower = ["dep:tower", "dep:hyper", "dep:tokio"] + diff --git a/wisp/src/lib.rs b/wisp/src/lib.rs index 37b81c5..e211f13 100644 --- a/wisp/src/lib.rs +++ b/wisp/src/lib.rs @@ -1,3 +1,4 @@ +#![feature(impl_trait_in_assoc_type)] #[cfg(feature = "fastwebsockets")] mod fastwebsockets; mod packet; @@ -5,6 +6,10 @@ mod stream; pub mod ws; #[cfg(feature = "ws_stream_wasm")] mod ws_stream_wasm; +#[cfg(feature = "hyper_tower")] +pub mod tokioio; +#[cfg(feature = "hyper_tower")] +pub mod tower; pub use crate::packet::*; pub use crate::stream::*; diff --git a/wisp/src/tokioio.rs b/wisp/src/tokioio.rs new file mode 100644 index 0000000..7d6acc0 --- /dev/null +++ b/wisp/src/tokioio.rs @@ -0,0 +1,171 @@ +#![allow(dead_code)] +// Taken from https://github.com/hyperium/hyper-util/blob/master/src/rt/tokio.rs +// hyper-util fails to compile on WASM as it has a dependency on socket2, but I only need +// hyper-util for TokioIo. + +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use pin_project_lite::pin_project; + +pin_project! { + /// A wrapping implementing hyper IO traits for a type that + /// implements Tokio's IO traits. + #[derive(Debug)] + pub struct TokioIo { + #[pin] + inner: T, + } +} + +impl TokioIo { + /// Wrap a type implementing Tokio's IO traits. + pub fn new(inner: T) -> Self { + Self { inner } + } + + /// Borrow the inner type. + pub fn inner(&self) -> &T { + &self.inner + } + + /// Mut borrow the inner type. + pub fn inner_mut(&mut self) -> &mut T { + &mut self.inner + } + + /// Consume this wrapper and get the inner type. + pub fn into_inner(self) -> T { + self.inner + } +} + +impl hyper::rt::Read for TokioIo +where + T: tokio::io::AsyncRead, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + mut buf: hyper::rt::ReadBufCursor<'_>, + ) -> Poll> { + let n = unsafe { + let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut()); + match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) { + Poll::Ready(Ok(())) => tbuf.filled().len(), + other => return other, + } + }; + + unsafe { + buf.advance(n); + } + Poll::Ready(Ok(())) + } +} + +impl hyper::rt::Write for TokioIo +where + T: tokio::io::AsyncWrite, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + tokio::io::AsyncWrite::poll_flush(self.project().inner, cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx) + } + + fn is_write_vectored(&self) -> bool { + tokio::io::AsyncWrite::is_write_vectored(&self.inner) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs) + } +} + +impl tokio::io::AsyncRead for TokioIo +where + T: hyper::rt::Read, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + tbuf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + //let init = tbuf.initialized().len(); + let filled = tbuf.filled().len(); + let sub_filled = unsafe { + let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut()); + + match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) { + Poll::Ready(Ok(())) => buf.filled().len(), + other => return other, + } + }; + + let n_filled = filled + sub_filled; + // At least sub_filled bytes had to have been initialized. + let n_init = sub_filled; + unsafe { + tbuf.assume_init(n_init); + tbuf.set_filled(n_filled); + } + + Poll::Ready(Ok(())) + } +} + +impl tokio::io::AsyncWrite for TokioIo +where + T: hyper::rt::Write, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + hyper::rt::Write::poll_write(self.project().inner, cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + hyper::rt::Write::poll_flush(self.project().inner, cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + hyper::rt::Write::poll_shutdown(self.project().inner, cx) + } + + fn is_write_vectored(&self) -> bool { + hyper::rt::Write::is_write_vectored(&self.inner) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs) + } +} diff --git a/wisp/src/tower.rs b/wisp/src/tower.rs new file mode 100644 index 0000000..6bf635c --- /dev/null +++ b/wisp/src/tower.rs @@ -0,0 +1,13 @@ +use futures::{Future, task::{Poll, Context}}; + +impl tower::Service for crate::ClientMux { + type Response = crate::tokioio::TokioIo>; + type Error = crate::WispError; + type Future = impl Future>; + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: hyper::Uri) -> Self::Future { + + } +}