From 5b3fc56b38d006315f43a85292117c73044613d7 Mon Sep 17 00:00:00 2001 From: Toshit Chawda Date: Wed, 27 Nov 2024 20:20:31 -0800 Subject: [PATCH] send payload everywhere --- client/src/io_stream.rs | 2 +- wisp/src/mux/inner.rs | 6 +++--- wisp/src/stream/compat.rs | 8 ++++---- wisp/src/stream/mod.rs | 10 +++++----- wisp/src/ws.rs | 17 ++++++++++++++++- 5 files changed, 29 insertions(+), 14 deletions(-) diff --git a/client/src/io_stream.rs b/client/src/io_stream.rs index 7ee474c..ddb6657 100644 --- a/client/src/io_stream.rs +++ b/client/src/io_stream.rs @@ -50,7 +50,7 @@ pub fn iostream_from_asyncrw(asyncrw: ProviderAsyncRW, buffer_size: usize) -> Ep pub fn iostream_from_stream(stream: ProviderUnencryptedStream) -> EpoxyIoStream { let (rx, tx) = stream.into_split(); create_iostream( - Box::pin(rx.map_err(EpoxyError::Io)), + Box::pin(rx.map_ok(Bytes::from).map_err(EpoxyError::Io)), Box::pin(tx.sink_map_err(EpoxyError::Io)), ) } diff --git a/wisp/src/mux/inner.rs b/wisp/src/mux/inner.rs index 042436d..4917f27 100644 --- a/wisp/src/mux/inner.rs +++ b/wisp/src/mux/inner.rs @@ -9,7 +9,7 @@ use crate::{ AtomicCloseReason, ClosePacket, CloseReason, ConnectPacket, MuxStream, Packet, PacketType, Role, StreamType, WispError, }; -use bytes::{Bytes, BytesMut}; +use bytes::BytesMut; use event_listener::Event; use flume as mpsc; use futures::{channel::oneshot, select, stream::unfold, FutureExt, StreamExt}; @@ -31,7 +31,7 @@ pub(crate) enum WsEvent { } struct MuxMapValue { - stream: mpsc::Sender, + stream: mpsc::Sender>, stream_type: StreamType, should_flow_control: bool, @@ -414,7 +414,7 @@ impl MuxInner { data.extend_from_slice(&extra_frame.payload); } } - let _ = stream.stream.try_send(data.freeze()); + let _ = stream.stream.try_send(Payload::Bytes(data)); if self.role == Role::Server && stream.should_flow_control { stream.flow_control.store( stream diff --git a/wisp/src/stream/compat.rs b/wisp/src/stream/compat.rs index b309685..21f54b3 100644 --- a/wisp/src/stream/compat.rs +++ b/wisp/src/stream/compat.rs @@ -7,7 +7,7 @@ use std::{ task::{Context, Poll}, }; -use bytes::{Bytes, BytesMut}; +use bytes::BytesMut; use futures::{ ready, stream::IntoAsyncRead, task::noop_waker_ref, AsyncBufRead, AsyncRead, AsyncWrite, Sink, Stream, TryStreamExt, @@ -47,7 +47,7 @@ impl MuxStreamIo { } impl Stream for MuxStreamIo { - type Item = Result; + type Item = Result, std::io::Error>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project().rx.poll_next(cx) } @@ -73,7 +73,7 @@ pin_project! { /// Read side of a multiplexor stream that implements futures `Stream`. pub struct MuxStreamIoStream { #[pin] - pub(crate) rx: Pin> + Send>>, + pub(crate) rx: Pin, WispError>> + Send>>, pub(crate) is_closed: Arc, pub(crate) close_reason: Arc, } @@ -96,7 +96,7 @@ impl MuxStreamIoStream { } impl Stream for MuxStreamIoStream { - type Item = Result; + type Item = Result, std::io::Error>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project() .rx diff --git a/wisp/src/stream/mod.rs b/wisp/src/stream/mod.rs index 6c5e23a..8f5c704 100644 --- a/wisp/src/stream/mod.rs +++ b/wisp/src/stream/mod.rs @@ -30,7 +30,7 @@ pub struct MuxStreamRead { role: Role, tx: LockedWebSocketWrite, - rx: mpsc::Receiver, + rx: mpsc::Receiver>, is_closed: Arc, is_closed_event: Arc, @@ -44,7 +44,7 @@ pub struct MuxStreamRead { impl MuxStreamRead { /// Read an event from the stream. - pub async fn read(&self) -> Result, WispError> { + pub async fn read(&self) -> Result>, WispError> { if self.rx.is_empty() && self.is_closed.load(Ordering::Acquire) { return Ok(None); } @@ -72,7 +72,7 @@ impl MuxStreamRead { pub(crate) fn into_inner_stream( self, - ) -> Pin> + Send>> { + ) -> Pin, WispError>> + Send>> { Box::pin(stream::unfold(self, |rx| async move { Some((rx.read().await.transpose()?, rx)) })) @@ -271,7 +271,7 @@ impl MuxStream { stream_id: u32, role: Role, stream_type: StreamType, - rx: mpsc::Receiver, + rx: mpsc::Receiver>, mux_tx: mpsc::Sender>, tx: LockedWebSocketWrite, is_closed: Arc, @@ -320,7 +320,7 @@ impl MuxStream { } /// Read an event from the stream. - pub async fn read(&self) -> Result, WispError> { + pub async fn read(&self) -> Result>, WispError> { self.rx.read().await } diff --git a/wisp/src/ws.rs b/wisp/src/ws.rs index 62e9910..002567e 100644 --- a/wisp/src/ws.rs +++ b/wisp/src/ws.rs @@ -7,7 +7,7 @@ use std::{future::Future, ops::Deref, pin::Pin, sync::Arc}; use crate::WispError; -use bytes::{Buf, BytesMut}; +use bytes::{Buf, Bytes, BytesMut}; use futures::{lock::Mutex, TryFutureExt}; /// Payload of the websocket frame. @@ -51,6 +51,15 @@ impl From> for BytesMut { } } +impl From> for Bytes { + fn from(value: Payload<'static>) -> Self { + match value { + Payload::Bytes(x) => x.freeze(), + Payload::Borrowed(x) => x.into(), + } + } +} + impl Deref for Payload<'_> { type Target = [u8]; fn deref(&self) -> &Self::Target { @@ -61,6 +70,12 @@ impl Deref for Payload<'_> { } } +impl AsRef<[u8]> for Payload<'_> { + fn as_ref(&self) -> &[u8] { + self + } +} + impl Clone for Payload<'_> { fn clone(&self) -> Self { match self {