fix muxstreamasyncread

This commit is contained in:
Toshit Chawda 2024-07-11 16:15:37 -07:00
parent 5571a63f40
commit 31b9f1c455
No known key found for this signature in database
GPG key ID: 91480ED99E2B3D9D
8 changed files with 187 additions and 49 deletions

View file

@ -1,4 +1,4 @@
#![deny(missing_docs, warnings)]
#![deny(missing_docs)]
#![cfg_attr(docsrs, feature(doc_cfg))]
//! A library for easily creating [Wisp] clients and servers.
//!
@ -14,7 +14,6 @@ mod stream;
pub mod ws;
pub use crate::{packet::*, stream::*};
pub use async_io_stream::IoStream;
use bytes::Bytes;
use dashmap::DashMap;

View file

@ -4,15 +4,14 @@ use crate::{
CloseReason, Packet, Role, StreamType, WispError,
};
use async_io_stream::IoStream;
use bytes::{BufMut, Bytes, BytesMut};
use event_listener::Event;
use flume as mpsc;
use futures::{
channel::oneshot,
select, stream,
task::{Context, Poll},
FutureExt, Sink, Stream,
ready, select, stream::{self, IntoAsyncRead},
task::{noop_waker_ref, Context, Poll},
AsyncBufRead, AsyncRead, AsyncWrite, FutureExt, Sink, Stream, TryStreamExt,
};
use pin_project_lite::pin_project;
use std::{
@ -372,8 +371,11 @@ pin_project! {
impl MuxStreamIo {
/// Turn the stream into one that implements futures `AsyncRead + AsyncBufRead + AsyncWrite`.
pub fn into_asyncrw(self) -> IoStream<MuxStreamIo, Bytes> {
IoStream::new(self)
pub fn into_asyncrw(self) -> MuxStreamAsyncRW {
MuxStreamAsyncRW {
rx: self.rx.into_asyncread(),
tx: self.tx.into_asyncwrite(),
}
}
/// Split the stream into read and write parts, consuming it.
@ -413,6 +415,13 @@ pin_project! {
}
}
impl MuxStreamIoStream {
/// Turn the stream into one that implements futures `AsyncRead + AsyncBufRead`.
pub fn into_asyncread(self) -> MuxStreamAsyncRead {
MuxStreamAsyncRead::new(self)
}
}
impl Stream for MuxStreamIoStream {
type Item = Result<Bytes, std::io::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
@ -428,6 +437,13 @@ pin_project! {
}
}
impl MuxStreamIoSink {
/// Turn the sink into one that implements futures `AsyncWrite`.
pub fn into_asyncwrite(self) -> MuxStreamAsyncWrite {
MuxStreamAsyncWrite::new(self)
}
}
impl Sink<Bytes> for MuxStreamIoSink {
type Error = std::io::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
@ -455,3 +471,157 @@ impl Sink<Bytes> for MuxStreamIoSink {
.map_err(std::io::Error::other)
}
}
pin_project! {
/// Multiplexor stream that implements futures `AsyncRead + AsyncBufRead + AsyncWrite`.
pub struct MuxStreamAsyncRW {
#[pin]
rx: MuxStreamAsyncRead,
#[pin]
tx: MuxStreamAsyncWrite,
}
}
impl MuxStreamAsyncRW {
/// Split the stream into read and write parts, consuming it.
pub fn into_split(self) -> (MuxStreamAsyncRead, MuxStreamAsyncWrite) {
(self.rx, self.tx)
}
}
impl AsyncRead for MuxStreamAsyncRW {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
self.project().rx.poll_read(cx, buf)
}
fn poll_read_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [std::io::IoSliceMut<'_>],
) -> Poll<std::io::Result<usize>> {
self.project().rx.poll_read_vectored(cx, bufs)
}
}
impl AsyncBufRead for MuxStreamAsyncRW {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> {
self.project().rx.poll_fill_buf(cx)
}
fn consume(self: Pin<&mut Self>, amt: usize) {
self.project().rx.consume(amt)
}
}
impl AsyncWrite for MuxStreamAsyncRW {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
self.project().tx.poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
self.project().tx.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
self.project().tx.poll_close(cx)
}
}
pin_project! {
/// Read side of a multiplexor stream that implements futures `AsyncRead + AsyncBufRead`.
pub struct MuxStreamAsyncRead {
#[pin]
rx: IntoAsyncRead<MuxStreamIoStream>,
// state: Option<MuxStreamAsyncReadState>
}
}
impl MuxStreamAsyncRead {
pub(crate) fn new(stream: MuxStreamIoStream) -> Self {
Self {
rx: stream.into_async_read(),
// state: None,
}
}
}
impl AsyncRead for MuxStreamAsyncRead {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
self.project().rx.poll_read(cx, buf)
}
}
impl AsyncBufRead for MuxStreamAsyncRead {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> {
self.project().rx.poll_fill_buf(cx)
}
fn consume(self: Pin<&mut Self>, amt: usize) {
self.project().rx.consume(amt)
}
}
pin_project! {
/// Write side of a multiplexor stream that implements futures `AsyncWrite`.
pub struct MuxStreamAsyncWrite {
#[pin]
tx: MuxStreamIoSink,
error: Option<std::io::Error>
}
}
impl MuxStreamAsyncWrite {
pub(crate) fn new(sink: MuxStreamIoSink) -> Self {
Self { tx: sink, error: None }
}
}
impl AsyncWrite for MuxStreamAsyncWrite {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
if let Some(err) = self.error.take() {
return Poll::Ready(Err(err));
}
let mut this = self.as_mut().project();
ready!(this.tx.as_mut().poll_ready(cx))?;
match this.tx.as_mut().start_send(Bytes::copy_from_slice(buf)) {
Ok(()) => {
let mut cx = Context::from_waker(noop_waker_ref());
let cx = &mut cx;
match this.tx.poll_flush(cx) {
Poll::Ready(Err(err)) => {
self.error = Some(err);
}
Poll::Ready(Ok(_)) | Poll::Pending => {}
}
Poll::Ready(Ok(buf.len()))
}
Err(e) => Poll::Ready(Err(e)),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
self.project().tx.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
self.project().tx.poll_close(cx)
}
}