make wisp-mux no longer eat data, fix wisp-mux stream read api

This commit is contained in:
Toshit Chawda 2024-11-03 12:47:05 -08:00
parent 0c8fe25089
commit f3a78a1715
No known key found for this signature in database
GPG key ID: 91480ED99E2B3D9D
4 changed files with 23 additions and 16 deletions

View file

@ -153,7 +153,7 @@ async fn handle_stream(
muxstream.write(&data[..size]).await?;
}
data = muxstream.read() => {
if let Some(data) = data {
if let Some(data) = data? {
stream.send(&data).await?;
} else {
break Ok(());

View file

@ -177,7 +177,11 @@ impl<R: WebSocketRead + Send> MuxInner<R> {
stream_id: u32,
stream_type: StreamType,
) -> Result<(MuxMapValue, MuxStream), WispError> {
let (ch_tx, ch_rx) = mpsc::bounded(self.buffer_size as usize);
let (ch_tx, ch_rx) = mpsc::bounded(if self.role == Role::Server {
self.buffer_size as usize
} else {
usize::MAX
});
let should_flow_control = self.tcp_extensions.contains(&stream_type.into());
let flow_control_event: Arc<Event> = Event::new().into();

View file

@ -73,7 +73,7 @@ pin_project! {
/// Read side of a multiplexor stream that implements futures `Stream`.
pub struct MuxStreamIoStream {
#[pin]
pub(crate) rx: Pin<Box<dyn Stream<Item = Bytes> + Send>>,
pub(crate) rx: Pin<Box<dyn Stream<Item = Result<Bytes, WispError>> + Send>>,
pub(crate) is_closed: Arc<AtomicBool>,
pub(crate) close_reason: Arc<AtomicCloseReason>,
}
@ -98,7 +98,7 @@ impl MuxStreamIoStream {
impl Stream for MuxStreamIoStream {
type Item = Result<Bytes, std::io::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().rx.poll_next(cx).map(|x| x.map(Ok))
self.project().rx.poll_next(cx).map_err(std::io::Error::other)
}
}

View file

@ -3,7 +3,9 @@ mod sink_unfold;
pub use compat::*;
use crate::{
inner::WsEvent, ws::{Frame, LockedWebSocketWrite, Payload}, AtomicCloseReason, CloseReason, Packet, Role, StreamType, WispError
inner::WsEvent,
ws::{Frame, LockedWebSocketWrite, Payload},
AtomicCloseReason, CloseReason, Packet, Role, StreamType, WispError,
};
use bytes::{BufMut, Bytes, BytesMut};
@ -42,13 +44,13 @@ pub struct MuxStreamRead {
impl MuxStreamRead {
/// Read an event from the stream.
pub async fn read(&self) -> Option<Bytes> {
if self.is_closed.load(Ordering::Acquire) {
return None;
pub async fn read(&self) -> Result<Option<Bytes>, WispError> {
if self.rx.is_empty() && self.is_closed.load(Ordering::Acquire) {
return Ok(None);
}
let bytes = select! {
x = self.rx.recv_async() => x.ok()?,
_ = self.is_closed_event.listen().fuse() => return None
x = self.rx.recv_async() => x.map_err(|_| WispError::MuxMessageFailedToRecv)?,
_ = self.is_closed_event.listen().fuse() => return Ok(None)
};
if self.role == Role::Server && self.should_flow_control {
let val = self.flow_control_read.fetch_add(1, Ordering::AcqRel) + 1;
@ -61,17 +63,18 @@ impl MuxStreamRead {
)
.into(),
)
.await
.ok()?;
.await?;
self.flow_control_read.store(0, Ordering::Release);
}
}
Some(bytes)
Ok(Some(bytes))
}
pub(crate) fn into_inner_stream(self) -> Pin<Box<dyn Stream<Item = Bytes> + Send>> {
pub(crate) fn into_inner_stream(
self,
) -> Pin<Box<dyn Stream<Item = Result<Bytes, WispError>> + Send>> {
Box::pin(stream::unfold(self, |rx| async move {
Some((rx.read().await?, rx))
Some((rx.read().await.transpose()?, rx))
}))
}
@ -311,7 +314,7 @@ impl MuxStream {
}
/// Read an event from the stream.
pub async fn read(&self) -> Option<Bytes> {
pub async fn read(&self) -> Result<Option<Bytes>, WispError> {
self.rx.read().await
}