From fc632983970d150d56177c7a8424907b7308aaaa Mon Sep 17 00:00:00 2001 From: Toshit Chawda Date: Wed, 23 Oct 2024 23:07:55 -0700 Subject: [PATCH] separate stream into separate files --- wisp/src/lib.rs | 1 - wisp/src/mux/mod.rs | 2 +- wisp/src/stream/compat.rs | 335 ++++++++++++++++++++++++++ wisp/src/{stream.rs => stream/mod.rs} | 332 +------------------------ wisp/src/{ => stream}/sink_unfold.rs | 0 5 files changed, 341 insertions(+), 329 deletions(-) create mode 100644 wisp/src/stream/compat.rs rename wisp/src/{stream.rs => stream/mod.rs} (57%) rename wisp/src/{ => stream}/sink_unfold.rs (100%) diff --git a/wisp/src/lib.rs b/wisp/src/lib.rs index 73a8c90..f9dce1c 100644 --- a/wisp/src/lib.rs +++ b/wisp/src/lib.rs @@ -14,7 +14,6 @@ pub mod generic; mod inner; mod mux; mod packet; -mod sink_unfold; mod stream; pub mod ws; diff --git a/wisp/src/mux/mod.rs b/wisp/src/mux/mod.rs index 8e9c765..63a0a61 100644 --- a/wisp/src/mux/mod.rs +++ b/wisp/src/mux/mod.rs @@ -2,7 +2,7 @@ mod client; mod server; use std::{future::Future, pin::Pin, time::Duration}; -pub use client::ClientMux; +pub use client::{ClientMux, ClientMuxResult}; use futures::{select, FutureExt}; use futures_timer::Delay; pub use server::{ServerMux, ServerMuxResult}; diff --git a/wisp/src/stream/compat.rs b/wisp/src/stream/compat.rs new file mode 100644 index 0000000..508ef9a --- /dev/null +++ b/wisp/src/stream/compat.rs @@ -0,0 +1,335 @@ +use std::{ + pin::Pin, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + task::{Context, Poll}, +}; + +use bytes::{Bytes, BytesMut}; +use futures::{ + ready, stream::IntoAsyncRead, task::noop_waker_ref, AsyncBufRead, AsyncRead, AsyncWrite, Sink, + Stream, TryStreamExt, +}; +use pin_project_lite::pin_project; + +use crate::{ws::Payload, AtomicCloseReason, CloseReason, WispError}; + +pin_project! { + /// Multiplexor stream that implements futures `Stream + Sink`. + pub struct MuxStreamIo { + #[pin] + pub(crate) rx: MuxStreamIoStream, + #[pin] + pub(crate) tx: MuxStreamIoSink, + } +} + +impl MuxStreamIo { + /// Turn the stream into one that implements futures `AsyncRead + AsyncBufRead + AsyncWrite`. + pub fn into_asyncrw(self) -> MuxStreamAsyncRW { + MuxStreamAsyncRW { + rx: self.rx.into_asyncread(), + tx: self.tx.into_asyncwrite(), + } + } + + /// Get the stream's close reason, if it was closed. + pub fn get_close_reason(&self) -> Option { + self.rx.get_close_reason() + } + + /// Split the stream into read and write parts, consuming it. + pub fn into_split(self) -> (MuxStreamIoStream, MuxStreamIoSink) { + (self.rx, self.tx) + } +} + +impl Stream for MuxStreamIo { + type Item = Result; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().rx.poll_next(cx) + } +} + +impl Sink for MuxStreamIo { + type Error = std::io::Error; + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().tx.poll_ready(cx) + } + fn start_send(self: Pin<&mut Self>, item: BytesMut) -> Result<(), Self::Error> { + self.project().tx.start_send(item) + } + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().tx.poll_flush(cx) + } + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().tx.poll_close(cx) + } +} + +pin_project! { + /// Read side of a multiplexor stream that implements futures `Stream`. + pub struct MuxStreamIoStream { + #[pin] + pub(crate) rx: Pin + Send>>, + pub(crate) is_closed: Arc, + pub(crate) close_reason: Arc, + } +} + +impl MuxStreamIoStream { + /// Turn the stream into one that implements futures `AsyncRead + AsyncBufRead`. + pub fn into_asyncread(self) -> MuxStreamAsyncRead { + MuxStreamAsyncRead::new(self) + } + + /// Get the stream's close reason, if it was closed. + pub fn get_close_reason(&self) -> Option { + if self.is_closed.load(Ordering::Acquire) { + Some(self.close_reason.load(Ordering::Acquire)) + } else { + None + } + } +} + +impl Stream for MuxStreamIoStream { + type Item = Result; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().rx.poll_next(cx).map(|x| x.map(Ok)) + } +} + +pin_project! { + /// Write side of a multiplexor stream that implements futures `Sink`. + pub struct MuxStreamIoSink { + #[pin] + pub(crate) tx: Pin, Error = WispError> + Send>>, + pub(crate) is_closed: Arc, + pub(crate) close_reason: Arc, + } +} + +impl MuxStreamIoSink { + /// Turn the sink into one that implements futures `AsyncWrite`. + pub fn into_asyncwrite(self) -> MuxStreamAsyncWrite { + MuxStreamAsyncWrite::new(self) + } + + /// Get the stream's close reason, if it was closed. + pub fn get_close_reason(&self) -> Option { + if self.is_closed.load(Ordering::Acquire) { + Some(self.close_reason.load(Ordering::Acquire)) + } else { + None + } + } +} + +impl Sink for MuxStreamIoSink { + type Error = std::io::Error; + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project() + .tx + .poll_ready(cx) + .map_err(std::io::Error::other) + } + fn start_send(self: Pin<&mut Self>, item: BytesMut) -> Result<(), Self::Error> { + self.project() + .tx + .start_send(Payload::Bytes(item)) + .map_err(std::io::Error::other) + } + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project() + .tx + .poll_flush(cx) + .map_err(std::io::Error::other) + } + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project() + .tx + .poll_close(cx) + .map_err(std::io::Error::other) + } +} + +pin_project! { + /// Multiplexor stream that implements futures `AsyncRead + AsyncBufRead + AsyncWrite`. + pub struct MuxStreamAsyncRW { + #[pin] + rx: MuxStreamAsyncRead, + #[pin] + tx: MuxStreamAsyncWrite, + } +} + +impl MuxStreamAsyncRW { + /// Get the stream's close reason, if it was closed. + pub fn get_close_reason(&self) -> Option { + self.rx.get_close_reason() + } + + /// Split the stream into read and write parts, consuming it. + pub fn into_split(self) -> (MuxStreamAsyncRead, MuxStreamAsyncWrite) { + (self.rx, self.tx) + } +} + +impl AsyncRead for MuxStreamAsyncRW { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + self.project().rx.poll_read(cx, buf) + } + + fn poll_read_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [std::io::IoSliceMut<'_>], + ) -> Poll> { + self.project().rx.poll_read_vectored(cx, bufs) + } +} + +impl AsyncBufRead for MuxStreamAsyncRW { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().rx.poll_fill_buf(cx) + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + self.project().rx.consume(amt) + } +} + +impl AsyncWrite for MuxStreamAsyncRW { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.project().tx.poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().tx.poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().tx.poll_close(cx) + } +} + +pin_project! { + /// Read side of a multiplexor stream that implements futures `AsyncRead + AsyncBufRead`. + pub struct MuxStreamAsyncRead { + #[pin] + rx: IntoAsyncRead, + is_closed: Arc, + close_reason: Arc, + } +} + +impl MuxStreamAsyncRead { + pub(crate) fn new(stream: MuxStreamIoStream) -> Self { + Self { + is_closed: stream.is_closed.clone(), + close_reason: stream.close_reason.clone(), + rx: stream.into_async_read(), + } + } + + /// Get the stream's close reason, if it was closed. + pub fn get_close_reason(&self) -> Option { + if self.is_closed.load(Ordering::Acquire) { + Some(self.close_reason.load(Ordering::Acquire)) + } else { + None + } + } +} + +impl AsyncRead for MuxStreamAsyncRead { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + self.project().rx.poll_read(cx, buf) + } +} +impl AsyncBufRead for MuxStreamAsyncRead { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().rx.poll_fill_buf(cx) + } + fn consume(self: Pin<&mut Self>, amt: usize) { + self.project().rx.consume(amt) + } +} + +pin_project! { + /// Write side of a multiplexor stream that implements futures `AsyncWrite`. + pub struct MuxStreamAsyncWrite { + #[pin] + tx: MuxStreamIoSink, + error: Option + } +} + +impl MuxStreamAsyncWrite { + pub(crate) fn new(sink: MuxStreamIoSink) -> Self { + Self { + tx: sink, + error: None, + } + } + + /// Get the stream's close reason, if it was closed. + pub fn get_close_reason(&self) -> Option { + self.tx.get_close_reason() + } +} + +impl AsyncWrite for MuxStreamAsyncWrite { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + if let Some(err) = self.error.take() { + return Poll::Ready(Err(err)); + } + + let mut this = self.as_mut().project(); + + ready!(this.tx.as_mut().poll_ready(cx))?; + match this.tx.as_mut().start_send(buf.into()) { + Ok(()) => { + let mut cx = Context::from_waker(noop_waker_ref()); + let cx = &mut cx; + + match this.tx.poll_flush(cx) { + Poll::Ready(Err(err)) => { + self.error = Some(err); + } + Poll::Ready(Ok(_)) | Poll::Pending => {} + } + + Poll::Ready(Ok(buf.len())) + } + Err(e) => Poll::Ready(Err(e)), + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().tx.poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().tx.poll_close(cx) + } +} diff --git a/wisp/src/stream.rs b/wisp/src/stream/mod.rs similarity index 57% rename from wisp/src/stream.rs rename to wisp/src/stream/mod.rs index c6286a7..ff0828a 100644 --- a/wisp/src/stream.rs +++ b/wisp/src/stream/mod.rs @@ -1,6 +1,9 @@ +mod compat; +mod sink_unfold; +pub use compat::*; + use crate::{ inner::WsEvent, - sink_unfold, ws::{Frame, LockedWebSocketWrite, Payload}, AtomicCloseReason, CloseReason, Packet, Role, StreamType, WispError, }; @@ -8,14 +11,7 @@ use crate::{ use bytes::{BufMut, Bytes, BytesMut}; use event_listener::Event; use flume as mpsc; -use futures::{ - channel::oneshot, - ready, select, - stream::{self, IntoAsyncRead}, - task::{noop_waker_ref, Context, Poll}, - AsyncBufRead, AsyncRead, AsyncWrite, FutureExt, Sink, Stream, TryStreamExt, -}; -use pin_project_lite::pin_project; +use futures::{channel::oneshot, select, stream, FutureExt, Sink, Stream}; use std::{ pin::Pin, sync::{ @@ -439,321 +435,3 @@ impl MuxProtocolExtensionStream { .await } } - -pin_project! { - /// Multiplexor stream that implements futures `Stream + Sink`. - pub struct MuxStreamIo { - #[pin] - rx: MuxStreamIoStream, - #[pin] - tx: MuxStreamIoSink, - } -} - -impl MuxStreamIo { - /// Turn the stream into one that implements futures `AsyncRead + AsyncBufRead + AsyncWrite`. - pub fn into_asyncrw(self) -> MuxStreamAsyncRW { - MuxStreamAsyncRW { - rx: self.rx.into_asyncread(), - tx: self.tx.into_asyncwrite(), - } - } - - /// Get the stream's close reason, if it was closed. - pub fn get_close_reason(&self) -> Option { - self.rx.get_close_reason() - } - - /// Split the stream into read and write parts, consuming it. - pub fn into_split(self) -> (MuxStreamIoStream, MuxStreamIoSink) { - (self.rx, self.tx) - } -} - -impl Stream for MuxStreamIo { - type Item = Result; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().rx.poll_next(cx) - } -} - -impl Sink for MuxStreamIo { - type Error = std::io::Error; - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().tx.poll_ready(cx) - } - fn start_send(self: Pin<&mut Self>, item: BytesMut) -> Result<(), Self::Error> { - self.project().tx.start_send(item) - } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().tx.poll_flush(cx) - } - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().tx.poll_close(cx) - } -} - -pin_project! { - /// Read side of a multiplexor stream that implements futures `Stream`. - pub struct MuxStreamIoStream { - #[pin] - rx: Pin + Send>>, - is_closed: Arc, - close_reason: Arc, - } -} - -impl MuxStreamIoStream { - /// Turn the stream into one that implements futures `AsyncRead + AsyncBufRead`. - pub fn into_asyncread(self) -> MuxStreamAsyncRead { - MuxStreamAsyncRead::new(self) - } - - /// Get the stream's close reason, if it was closed. - pub fn get_close_reason(&self) -> Option { - if self.is_closed.load(Ordering::Acquire) { - Some(self.close_reason.load(Ordering::Acquire)) - } else { - None - } - } -} - -impl Stream for MuxStreamIoStream { - type Item = Result; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().rx.poll_next(cx).map(|x| x.map(Ok)) - } -} - -pin_project! { - /// Write side of a multiplexor stream that implements futures `Sink`. - pub struct MuxStreamIoSink { - #[pin] - tx: Pin, Error = WispError> + Send>>, - is_closed: Arc, - close_reason: Arc, - } -} - -impl MuxStreamIoSink { - /// Turn the sink into one that implements futures `AsyncWrite`. - pub fn into_asyncwrite(self) -> MuxStreamAsyncWrite { - MuxStreamAsyncWrite::new(self) - } - - /// Get the stream's close reason, if it was closed. - pub fn get_close_reason(&self) -> Option { - if self.is_closed.load(Ordering::Acquire) { - Some(self.close_reason.load(Ordering::Acquire)) - } else { - None - } - } -} - -impl Sink for MuxStreamIoSink { - type Error = std::io::Error; - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project() - .tx - .poll_ready(cx) - .map_err(std::io::Error::other) - } - fn start_send(self: Pin<&mut Self>, item: BytesMut) -> Result<(), Self::Error> { - self.project() - .tx - .start_send(Payload::Bytes(item)) - .map_err(std::io::Error::other) - } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project() - .tx - .poll_flush(cx) - .map_err(std::io::Error::other) - } - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project() - .tx - .poll_close(cx) - .map_err(std::io::Error::other) - } -} - -pin_project! { - /// Multiplexor stream that implements futures `AsyncRead + AsyncBufRead + AsyncWrite`. - pub struct MuxStreamAsyncRW { - #[pin] - rx: MuxStreamAsyncRead, - #[pin] - tx: MuxStreamAsyncWrite, - } -} - -impl MuxStreamAsyncRW { - /// Get the stream's close reason, if it was closed. - pub fn get_close_reason(&self) -> Option { - self.rx.get_close_reason() - } - - /// Split the stream into read and write parts, consuming it. - pub fn into_split(self) -> (MuxStreamAsyncRead, MuxStreamAsyncWrite) { - (self.rx, self.tx) - } -} - -impl AsyncRead for MuxStreamAsyncRW { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - self.project().rx.poll_read(cx, buf) - } - - fn poll_read_vectored( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - bufs: &mut [std::io::IoSliceMut<'_>], - ) -> Poll> { - self.project().rx.poll_read_vectored(cx, bufs) - } -} - -impl AsyncBufRead for MuxStreamAsyncRW { - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().rx.poll_fill_buf(cx) - } - - fn consume(self: Pin<&mut Self>, amt: usize) { - self.project().rx.consume(amt) - } -} - -impl AsyncWrite for MuxStreamAsyncRW { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - self.project().tx.poll_write(cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().tx.poll_flush(cx) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().tx.poll_close(cx) - } -} - -pin_project! { - /// Read side of a multiplexor stream that implements futures `AsyncRead + AsyncBufRead`. - pub struct MuxStreamAsyncRead { - #[pin] - rx: IntoAsyncRead, - is_closed: Arc, - close_reason: Arc, - } -} - -impl MuxStreamAsyncRead { - pub(crate) fn new(stream: MuxStreamIoStream) -> Self { - Self { - is_closed: stream.is_closed.clone(), - close_reason: stream.close_reason.clone(), - rx: stream.into_async_read(), - } - } - - /// Get the stream's close reason, if it was closed. - pub fn get_close_reason(&self) -> Option { - if self.is_closed.load(Ordering::Acquire) { - Some(self.close_reason.load(Ordering::Acquire)) - } else { - None - } - } -} - -impl AsyncRead for MuxStreamAsyncRead { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - self.project().rx.poll_read(cx, buf) - } -} -impl AsyncBufRead for MuxStreamAsyncRead { - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().rx.poll_fill_buf(cx) - } - fn consume(self: Pin<&mut Self>, amt: usize) { - self.project().rx.consume(amt) - } -} - -pin_project! { - /// Write side of a multiplexor stream that implements futures `AsyncWrite`. - pub struct MuxStreamAsyncWrite { - #[pin] - tx: MuxStreamIoSink, - error: Option - } -} - -impl MuxStreamAsyncWrite { - pub(crate) fn new(sink: MuxStreamIoSink) -> Self { - Self { - tx: sink, - error: None, - } - } - - /// Get the stream's close reason, if it was closed. - pub fn get_close_reason(&self) -> Option { - self.tx.get_close_reason() - } -} - -impl AsyncWrite for MuxStreamAsyncWrite { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - if let Some(err) = self.error.take() { - return Poll::Ready(Err(err)); - } - - let mut this = self.as_mut().project(); - - ready!(this.tx.as_mut().poll_ready(cx))?; - match this.tx.as_mut().start_send(buf.into()) { - Ok(()) => { - let mut cx = Context::from_waker(noop_waker_ref()); - let cx = &mut cx; - - match this.tx.poll_flush(cx) { - Poll::Ready(Err(err)) => { - self.error = Some(err); - } - Poll::Ready(Ok(_)) | Poll::Pending => {} - } - - Poll::Ready(Ok(buf.len())) - } - Err(e) => Poll::Ready(Err(e)), - } - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().tx.poll_flush(cx) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().tx.poll_close(cx) - } -} diff --git a/wisp/src/sink_unfold.rs b/wisp/src/stream/sink_unfold.rs similarity index 100% rename from wisp/src/sink_unfold.rs rename to wisp/src/stream/sink_unfold.rs