From 31b9f1c455f16bec61734d61d23d348a5ab0c168 Mon Sep 17 00:00:00 2001 From: Toshit Chawda Date: Thu, 11 Jul 2024 16:15:37 -0700 Subject: [PATCH] fix muxstreamasyncread --- Cargo.lock | 28 +----- client/Cargo.toml | 2 +- client/package.json | 2 +- client/src/stream_provider.rs | 5 +- server/src/main.rs | 13 +-- wisp/Cargo.toml | 1 - wisp/src/lib.rs | 3 +- wisp/src/stream.rs | 182 ++++++++++++++++++++++++++++++++-- 8 files changed, 187 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0be46bc..b125e37 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -143,16 +143,6 @@ dependencies = [ "syn", ] -[[package]] -name = "async_io_stream" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6d7b9decdf35d8908a7e3ef02f64c5e9b1695e230154c0e8de3969142d9b94c" -dependencies = [ - "futures", - "rustc_version", -] - [[package]] name = "atomic-counter" version = "1.0.1" @@ -521,7 +511,7 @@ checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" [[package]] name = "epoxy-client" -version = "2.0.6" +version = "2.0.7" dependencies = [ "async-compression", "async-trait", @@ -1557,15 +1547,6 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" -[[package]] -name = "rustc_version" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" -dependencies = [ - "semver", -] - [[package]] name = "rustix" version = "0.38.34" @@ -1672,12 +1653,6 @@ dependencies = [ "libc", ] -[[package]] -name = "semver" -version = "1.0.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" - [[package]] name = "send_wrapper" version = "0.4.0" @@ -2487,7 +2462,6 @@ name = "wisp-mux" version = "5.0.0" dependencies = [ "async-trait", - "async_io_stream", "bytes", "dashmap", "event-listener", diff --git a/client/Cargo.toml b/client/Cargo.toml index b2e5d1b..fcaa57e 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "epoxy-client" -version = "2.0.6" +version = "2.0.7" edition = "2021" [lib] diff --git a/client/package.json b/client/package.json index 6e74d8b..35792a5 100644 --- a/client/package.json +++ b/client/package.json @@ -1,6 +1,6 @@ { "name": "@mercuryworkshop/epoxy-tls", - "version": "2.0.6-1", + "version": "2.0.7-1", "description": "A wasm library for using raw encrypted tls/ssl/https/websocket streams on the browser", "scripts": { "build": "./build.sh" diff --git a/client/src/stream_provider.rs b/client/src/stream_provider.rs index a75821c..d2e209f 100644 --- a/client/src/stream_provider.rs +++ b/client/src/stream_provider.rs @@ -1,6 +1,5 @@ use std::{pin::Pin, sync::Arc, task::Poll}; -use bytes::Bytes; use futures_rustls::{ rustls::{ClientConfig, RootCertStore}, TlsConnector, TlsStream, @@ -17,7 +16,7 @@ use wasm_bindgen::{JsCast, JsValue}; use wasm_bindgen_futures::spawn_local; use wisp_mux::{ extensions::{udp::UdpProtocolExtensionBuilder, ProtocolExtensionBuilder}, - ClientMux, IoStream, MuxStreamIo, StreamType, + ClientMux, MuxStreamAsyncRW, MuxStreamIo, StreamType, }; use crate::{ws_wrapper::WebSocketWrapper, EpoxyClientOptions, EpoxyError}; @@ -49,7 +48,7 @@ pub struct StreamProvider { } pub type ProviderUnencryptedStream = MuxStreamIo; -pub type ProviderUnencryptedAsyncRW = IoStream; +pub type ProviderUnencryptedAsyncRW = MuxStreamAsyncRW; pub type ProviderTlsAsyncRW = TlsStream; pub type ProviderAsyncRW = Either; diff --git a/server/src/main.rs b/server/src/main.rs index ceedff0..d44da1c 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -17,7 +17,7 @@ use hyper_util::rt::TokioIo; #[cfg(unix)] use tokio::net::{UnixListener, UnixStream}; use tokio::{ - io::{copy, copy_bidirectional, AsyncBufReadExt, AsyncWriteExt}, + io::{copy, AsyncBufReadExt, AsyncWriteExt}, net::{lookup_host, TcpListener, TcpStream, UdpSocket}, select, }; @@ -34,7 +34,7 @@ use wisp_mux::{ udp::UdpProtocolExtensionBuilder, ProtocolExtensionBuilder, }, - CloseReason, ConnectPacket, MuxStream, IoStream, ServerMux, StreamType, WispError, + CloseReason, ConnectPacket, MuxStream, MuxStreamAsyncRW, ServerMux, StreamType, WispError, }; type HttpBody = http_body_util::Full; @@ -269,8 +269,6 @@ async fn accept_http( } } -// re-enable once MuxStreamAsyncRW is fixed -/* async fn copy_buf(mux: MuxStreamAsyncRW, tcp: TcpStream) -> std::io::Result<()> { let (muxrx, muxtx) = mux.into_split(); let mut muxrx = muxrx.compat(); @@ -302,7 +300,6 @@ async fn copy_buf(mux: MuxStreamAsyncRW, tcp: TcpStream) -> std::io::Result<()> x = slow_fut => x.map(|_| ()), } } -*/ async fn handle_mux( packet: ConnectPacket, @@ -314,9 +311,9 @@ async fn handle_mux( ); match packet.stream_type { StreamType::Tcp => { - let mut tcp_stream = TcpStream::connect(uri).await?; - let mut mux = stream.into_io().into_asyncrw().compat(); - copy_bidirectional(&mut mux, &mut tcp_stream).await?; + let tcp_stream = TcpStream::connect(uri).await?; + let mux = stream.into_io().into_asyncrw(); + copy_buf(mux, tcp_stream).await?; } StreamType::Udp => { let uri = lookup_host(uri) diff --git a/wisp/Cargo.toml b/wisp/Cargo.toml index 1578b1f..875d448 100644 --- a/wisp/Cargo.toml +++ b/wisp/Cargo.toml @@ -10,7 +10,6 @@ edition = "2021" [dependencies] async-trait = "0.1.79" -async_io_stream = "0.3.3" bytes = "1.5.0" dashmap = { version = "5.5.3", features = ["inline"] } event-listener = "5.0.0" diff --git a/wisp/src/lib.rs b/wisp/src/lib.rs index 90a3f35..ca33c3d 100644 --- a/wisp/src/lib.rs +++ b/wisp/src/lib.rs @@ -1,4 +1,4 @@ -#![deny(missing_docs, warnings)] +#![deny(missing_docs)] #![cfg_attr(docsrs, feature(doc_cfg))] //! A library for easily creating [Wisp] clients and servers. //! @@ -14,7 +14,6 @@ mod stream; pub mod ws; pub use crate::{packet::*, stream::*}; -pub use async_io_stream::IoStream; use bytes::Bytes; use dashmap::DashMap; diff --git a/wisp/src/stream.rs b/wisp/src/stream.rs index ff65ade..5d5c115 100644 --- a/wisp/src/stream.rs +++ b/wisp/src/stream.rs @@ -4,15 +4,14 @@ use crate::{ CloseReason, Packet, Role, StreamType, WispError, }; -use async_io_stream::IoStream; use bytes::{BufMut, Bytes, BytesMut}; use event_listener::Event; use flume as mpsc; use futures::{ channel::oneshot, - select, stream, - task::{Context, Poll}, - FutureExt, Sink, Stream, + ready, select, stream::{self, IntoAsyncRead}, + task::{noop_waker_ref, Context, Poll}, + AsyncBufRead, AsyncRead, AsyncWrite, FutureExt, Sink, Stream, TryStreamExt, }; use pin_project_lite::pin_project; use std::{ @@ -372,8 +371,11 @@ pin_project! { impl MuxStreamIo { /// Turn the stream into one that implements futures `AsyncRead + AsyncBufRead + AsyncWrite`. - pub fn into_asyncrw(self) -> IoStream { - IoStream::new(self) + pub fn into_asyncrw(self) -> MuxStreamAsyncRW { + MuxStreamAsyncRW { + rx: self.rx.into_asyncread(), + tx: self.tx.into_asyncwrite(), + } } /// Split the stream into read and write parts, consuming it. @@ -413,6 +415,13 @@ pin_project! { } } +impl MuxStreamIoStream { + /// Turn the stream into one that implements futures `AsyncRead + AsyncBufRead`. + pub fn into_asyncread(self) -> MuxStreamAsyncRead { + MuxStreamAsyncRead::new(self) + } +} + impl Stream for MuxStreamIoStream { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -428,6 +437,13 @@ pin_project! { } } +impl MuxStreamIoSink { + /// Turn the sink into one that implements futures `AsyncWrite`. + pub fn into_asyncwrite(self) -> MuxStreamAsyncWrite { + MuxStreamAsyncWrite::new(self) + } +} + impl Sink for MuxStreamIoSink { type Error = std::io::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -455,3 +471,157 @@ impl Sink for MuxStreamIoSink { .map_err(std::io::Error::other) } } + +pin_project! { + /// Multiplexor stream that implements futures `AsyncRead + AsyncBufRead + AsyncWrite`. + pub struct MuxStreamAsyncRW { + #[pin] + rx: MuxStreamAsyncRead, + #[pin] + tx: MuxStreamAsyncWrite, + } +} + +impl MuxStreamAsyncRW { + /// Split the stream into read and write parts, consuming it. + pub fn into_split(self) -> (MuxStreamAsyncRead, MuxStreamAsyncWrite) { + (self.rx, self.tx) + } +} + +impl AsyncRead for MuxStreamAsyncRW { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + self.project().rx.poll_read(cx, buf) + } + + fn poll_read_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [std::io::IoSliceMut<'_>], + ) -> Poll> { + self.project().rx.poll_read_vectored(cx, bufs) + } +} + +impl AsyncBufRead for MuxStreamAsyncRW { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().rx.poll_fill_buf(cx) + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + self.project().rx.consume(amt) + } +} + +impl AsyncWrite for MuxStreamAsyncRW { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.project().tx.poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().tx.poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().tx.poll_close(cx) + } +} + +pin_project! { + /// Read side of a multiplexor stream that implements futures `AsyncRead + AsyncBufRead`. + pub struct MuxStreamAsyncRead { + #[pin] + rx: IntoAsyncRead, + // state: Option + } +} + +impl MuxStreamAsyncRead { + pub(crate) fn new(stream: MuxStreamIoStream) -> Self { + Self { + rx: stream.into_async_read(), + // state: None, + } + } +} + +impl AsyncRead for MuxStreamAsyncRead { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + self.project().rx.poll_read(cx, buf) + } +} +impl AsyncBufRead for MuxStreamAsyncRead { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().rx.poll_fill_buf(cx) + } + fn consume(self: Pin<&mut Self>, amt: usize) { + self.project().rx.consume(amt) + } +} + +pin_project! { + /// Write side of a multiplexor stream that implements futures `AsyncWrite`. + pub struct MuxStreamAsyncWrite { + #[pin] + tx: MuxStreamIoSink, + error: Option + } +} + +impl MuxStreamAsyncWrite { + pub(crate) fn new(sink: MuxStreamIoSink) -> Self { + Self { tx: sink, error: None } + } +} + +impl AsyncWrite for MuxStreamAsyncWrite { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + if let Some(err) = self.error.take() { + return Poll::Ready(Err(err)); + } + + let mut this = self.as_mut().project(); + + ready!(this.tx.as_mut().poll_ready(cx))?; + match this.tx.as_mut().start_send(Bytes::copy_from_slice(buf)) { + Ok(()) => { + let mut cx = Context::from_waker(noop_waker_ref()); + let cx = &mut cx; + + match this.tx.poll_flush(cx) { + Poll::Ready(Err(err)) => { + self.error = Some(err); + } + Poll::Ready(Ok(_)) | Poll::Pending => {} + } + + Poll::Ready(Ok(buf.len())) + } + Err(e) => Poll::Ready(Err(e)), + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().tx.poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().tx.poll_close(cx) + } +}