rewrite server

This commit is contained in:
Toshit Chawda 2024-07-20 22:21:51 -07:00
parent 3bf19be9f0
commit 24bfcae975
No known key found for this signature in database
GPG key ID: 91480ED99E2B3D9D
10 changed files with 1301 additions and 178 deletions

View file

@ -9,7 +9,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use crate::{ws::LockedWebSocketWrite, WispError};
fn match_payload<'a>(payload: Payload<'a>) -> crate::ws::Payload<'a> {
fn match_payload(payload: Payload<'_>) -> crate::ws::Payload<'_> {
match payload {
Payload::Bytes(x) => crate::ws::Payload::Bytes(x),
Payload::Owned(x) => crate::ws::Payload::Bytes(BytesMut::from(x.deref())),
@ -18,7 +18,7 @@ fn match_payload<'a>(payload: Payload<'a>) -> crate::ws::Payload<'a> {
}
}
fn match_payload_reverse<'a>(payload: crate::ws::Payload<'a>) -> Payload<'a> {
fn match_payload_reverse(payload: crate::ws::Payload<'_>) -> Payload<'_> {
match payload {
crate::ws::Payload::Bytes(x) => Payload::Bytes(x),
crate::ws::Payload::Borrowed(x) => Payload::Borrowed(x),
@ -94,6 +94,18 @@ impl<S: AsyncWrite + Unpin + Send> crate::ws::WebSocketWrite for WebSocketWrite<
self.write_frame(frame.into()).await.map_err(|e| e.into())
}
async fn wisp_write_split(&mut self, header: crate::ws::Frame<'_>, body: crate::ws::Frame<'_>) -> Result<(), WispError> {
let mut header = Frame::from(header);
header.fin = false;
self.write_frame(header).await?;
let mut body = Frame::from(body);
body.opcode = OpCode::Continuation;
self.write_frame(body).await?;
Ok(())
}
async fn wisp_close(&mut self) -> Result<(), WispError> {
self.write_frame(Frame::close(CloseCode::Normal.into(), b""))
.await

View file

@ -12,7 +12,7 @@ use futures::{
ready, select,
stream::{self, IntoAsyncRead},
task::{noop_waker_ref, Context, Poll},
AsyncBufRead, AsyncRead, AsyncWrite, Future, FutureExt, Sink, Stream, TryStreamExt,
AsyncBufRead, AsyncRead, AsyncWrite, FutureExt, Sink, Stream, TryStreamExt,
};
use pin_project_lite::pin_project;
use std::{
@ -79,11 +79,18 @@ impl MuxStreamRead {
Some(bytes)
}
pub(crate) fn into_stream(self) -> Pin<Box<dyn Stream<Item = Bytes> + Send>> {
pub(crate) fn into_inner_stream(self) -> Pin<Box<dyn Stream<Item = Bytes> + Send>> {
Box::pin(stream::unfold(self, |rx| async move {
Some((rx.read().await?, rx))
}))
}
/// Turn the read half into one that implements futures `Stream`, consuming it.
pub fn into_stream(self) -> MuxStreamIoStream {
MuxStreamIoStream {
rx: self.into_inner_stream(),
}
}
}
/// Write side of a multiplexor stream.
@ -101,9 +108,10 @@ pub struct MuxStreamWrite {
}
impl MuxStreamWrite {
pub(crate) async fn write_payload_internal(
pub(crate) async fn write_payload_internal<'a>(
&self,
frame: Frame<'static>,
header: Frame<'static>,
body: Frame<'a>,
) -> Result<(), WispError> {
if self.role == Role::Client
&& self.stream_type == StreamType::Tcp
@ -115,7 +123,7 @@ impl MuxStreamWrite {
return Err(WispError::StreamAlreadyClosed);
}
self.tx.write_frame(frame).await?;
self.tx.write_split(header, body).await?;
if self.role == Role::Client && self.stream_type == StreamType::Tcp {
self.flow_control.store(
@ -127,12 +135,13 @@ impl MuxStreamWrite {
}
/// Write a payload to the stream.
pub fn write_payload<'a>(
&'a self,
data: Payload<'_>,
) -> impl Future<Output = Result<(), WispError>> + 'a {
let frame: Frame<'static> = Frame::from(Packet::new_data(self.stream_id, data));
self.write_payload_internal(frame)
pub async fn write_payload(&self, data: Payload<'_>) -> Result<(), WispError> {
let frame: Frame<'static> = Frame::from(Packet::new_data(
self.stream_id,
Payload::Bytes(BytesMut::new()),
));
self.write_payload_internal(frame, Frame::binary(data))
.await
}
/// Write data to the stream.
@ -188,12 +197,14 @@ impl MuxStreamWrite {
Ok(())
}
pub(crate) fn into_sink(self) -> Pin<Box<dyn Sink<Frame<'static>, Error = WispError> + Send>> {
pub(crate) fn into_inner_sink(
self,
) -> Pin<Box<dyn Sink<Payload<'static>, Error = WispError> + Send>> {
let handle = self.get_close_handle();
Box::pin(sink_unfold::unfold(
self,
|tx, data| async move {
tx.write_payload_internal(data).await?;
tx.write_payload(data).await?;
Ok(tx)
},
handle,
@ -203,6 +214,13 @@ impl MuxStreamWrite {
},
))
}
/// Turn the write half into one that implements futures `Sink`, consuming it.
pub fn into_sink(self) -> MuxStreamIoSink {
MuxStreamIoSink {
tx: self.into_inner_sink(),
}
}
}
impl Drop for MuxStreamWrite {
@ -316,13 +334,8 @@ impl MuxStream {
/// Turn the stream into one that implements futures `Stream + Sink`, consuming it.
pub fn into_io(self) -> MuxStreamIo {
MuxStreamIo {
rx: MuxStreamIoStream {
rx: self.rx.into_stream(),
},
tx: MuxStreamIoSink {
tx: self.tx.into_sink(),
stream_id: self.stream_id,
},
rx: self.rx.into_stream(),
tx: self.tx.into_sink(),
}
}
}
@ -456,8 +469,7 @@ pin_project! {
/// Write side of a multiplexor stream that implements futures `Sink`.
pub struct MuxStreamIoSink {
#[pin]
tx: Pin<Box<dyn Sink<Frame<'static>, Error = WispError> + Send>>,
stream_id: u32,
tx: Pin<Box<dyn Sink<Payload<'static>, Error = WispError> + Send>>,
}
}
@ -477,13 +489,9 @@ impl Sink<&[u8]> for MuxStreamIoSink {
.map_err(std::io::Error::other)
}
fn start_send(self: Pin<&mut Self>, item: &[u8]) -> Result<(), Self::Error> {
let stream_id = self.stream_id;
self.project()
.tx
.start_send(Frame::from(Packet::new_data(
stream_id,
Payload::Borrowed(item),
)))
.start_send(Payload::Bytes(BytesMut::from(item)))
.map_err(std::io::Error::other)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {

View file

@ -166,6 +166,18 @@ pub trait WebSocketWrite {
/// Close the socket.
async fn wisp_close(&mut self) -> Result<(), WispError>;
/// Write a split frame to the socket.
async fn wisp_write_split(
&mut self,
header: Frame<'_>,
body: Frame<'_>,
) -> Result<(), WispError> {
let mut payload = BytesMut::from(header.payload);
payload.extend_from_slice(&body.payload);
self.wisp_write_frame(Frame::binary(Payload::Bytes(payload)))
.await
}
}
/// Locked WebSocket.
@ -183,6 +195,14 @@ impl LockedWebSocketWrite {
self.0.lock().await.wisp_write_frame(frame).await
}
pub(crate) async fn write_split(
&self,
header: Frame<'_>,
body: Frame<'_>,
) -> Result<(), WispError> {
self.0.lock().await.wisp_write_split(header, body).await
}
/// Close the websocket.
pub async fn close(&self) -> Result<(), WispError> {
self.0.lock().await.wisp_close().await