diff --git a/server/src/main.rs b/server/src/main.rs index 7205d18..d3c5d94 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -17,7 +17,7 @@ use tokio::net::{TcpListener, TcpStream, UdpSocket}; use tokio_native_tls::{native_tls, TlsAcceptor}; use tokio_util::codec::{BytesCodec, Framed}; -use wisp_mux::{ws, ConnectPacket, MuxStream, ServerMux, StreamType, WispError, WsEvent}; +use wisp_mux::{ws, ConnectPacket, MuxStream, ServerMux, StreamType, WispError, MuxEvent}; type HttpBody = http_body_util::Full; @@ -97,7 +97,7 @@ async fn accept_http( .collect::>(), ) }) && protocols.contains(&"wisp-v1") - && (uri == "" || uri == "/") + && (uri.is_empty() || uri == "/") { tokio::spawn(async move { accept_ws(fut, addr.clone()).await }); res.headers_mut().insert( @@ -105,7 +105,7 @@ async fn accept_http( HeaderValue::from_str("wisp-v1").unwrap(), ); } else { - let uri = uri.strip_prefix("/").unwrap_or(uri).to_string(); + let uri = uri.strip_prefix('/').unwrap_or(uri).to_string(); tokio::spawn(async move { accept_wsproxy(fut, uri, addr.clone()).await }); } @@ -154,10 +154,10 @@ async fn handle_mux( event = stream.read() => { match event { Some(event) => match event { - WsEvent::Send(data) => { + MuxEvent::Send(data) => { udp_socket.send(&data).await.map_err(|x| WispError::Other(Box::new(x)))?; } - WsEvent::Close(_) => return Ok(false), + MuxEvent::Close(_) => return Ok(false), }, None => break, } diff --git a/wisp/README.md b/wisp/README.md new file mode 100644 index 0000000..ee6a62b --- /dev/null +++ b/wisp/README.md @@ -0,0 +1,2 @@ +# wisp-mux +A library for easily creating [Wisp](https://github.com/MercuryWorkshop/wisp-protocol) servers and clients. diff --git a/wisp/src/lib.rs b/wisp/src/lib.rs index 7ad9931..f3e153e 100644 --- a/wisp/src/lib.rs +++ b/wisp/src/lib.rs @@ -1,4 +1,9 @@ +#![deny(missing_docs)] #![feature(impl_trait_in_assoc_type)] +//! A library for easily creating [Wisp] clients and servers. +//! +//! [Wisp]: https://github.com/MercuryWorkshop/wisp-protocol + #[cfg(feature = "fastwebsockets")] mod fastwebsockets; mod packet; @@ -24,29 +29,49 @@ use std::{ }, }; +/// The role of the multiplexor. #[derive(Debug, PartialEq, Copy, Clone)] pub enum Role { + /// Client side, can create new channels to proxy. Client, + /// Server side, can listen for channels to proxy. Server, } +/// Errors the Wisp implementation can return. #[derive(Debug)] pub enum WispError { + /// The packet recieved did not have enough data. PacketTooSmall, + /// The packet recieved had an invalid type. InvalidPacketType, + /// The stream had an invalid type. InvalidStreamType, + /// The stream had an invalid ID. InvalidStreamId, + /// The URI recieved was invalid. InvalidUri, + /// The URI recieved had no host. UriHasNoHost, + /// The URI recieved had no port. UriHasNoPort, + /// The max stream count was reached. MaxStreamCountReached, + /// The stream had already been closed. StreamAlreadyClosed, + /// The websocket frame recieved had an invalid type. WsFrameInvalidType, + /// The websocket frame recieved was not finished. WsFrameNotFinished, + /// Error specific to the websocket implementation. WsImplError(Box), + /// The websocket implementation socket closed. WsImplSocketClosed, + /// The websocket implementation did not support the action. WsImplNotSupported, + /// The string was invalid UTF-8. Utf8Error(std::str::Utf8Error), + /// Other error. Other(Box), } @@ -87,17 +112,17 @@ where W: ws::WebSocketWrite + Send + 'static, { tx: ws::LockedWebSocketWrite, - stream_map: Arc>>>, - close_tx: mpsc::UnboundedSender, + stream_map: Arc>>>, + close_tx: mpsc::UnboundedSender, } impl ServerMuxInner { pub async fn into_future( self, rx: R, - close_rx: mpsc::UnboundedReceiver, + close_rx: mpsc::UnboundedReceiver, muxstream_sender: mpsc::UnboundedSender<(ConnectPacket, MuxStream)>, - buffer_size: u32 + buffer_size: u32, ) -> Result<(), WispError> where R: ws::WebSocketRead, @@ -107,20 +132,20 @@ impl ServerMuxInner { x = self.server_msg_loop(rx, muxstream_sender, buffer_size).fuse() => x }; self.stream_map.lock().await.iter().for_each(|x| { - let _ = x.1.unbounded_send(WsEvent::Close(ClosePacket::new(0x01))); + let _ = x.1.unbounded_send(MuxEvent::Close(ClosePacket::new(0x01))); }); ret } async fn server_close_loop( &self, - mut close_rx: mpsc::UnboundedReceiver, - stream_map: Arc>>>, + mut close_rx: mpsc::UnboundedReceiver, + stream_map: Arc>>>, tx: ws::LockedWebSocketWrite, ) -> Result<(), WispError> { while let Some(msg) = close_rx.next().await { match msg { - MuxEvent::Close(stream_id, reason, channel) => { + WsEvent::Close(stream_id, reason, channel) => { if stream_map.lock().await.remove(&stream_id).is_some() { let _ = channel.send( tx.write_frame(Packet::new_close(stream_id, reason).into()) @@ -154,6 +179,7 @@ impl ServerMuxInner { match packet.packet { Connect(inner_packet) => { let (ch_tx, ch_rx) = mpsc::unbounded(); + let stream_type = inner_packet.stream_type; self.stream_map.lock().await.insert(packet.stream_id, ch_tx); muxstream_sender .unbounded_send(( @@ -161,6 +187,7 @@ impl ServerMuxInner { MuxStream::new( packet.stream_id, Role::Server, + stream_type, ch_rx, self.tx.clone(), self.close_tx.clone(), @@ -173,13 +200,13 @@ impl ServerMuxInner { } Data(data) => { if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { - let _ = stream.unbounded_send(WsEvent::Send(data)); + let _ = stream.unbounded_send(MuxEvent::Send(data)); } } Continue(_) => unreachable!(), Close(inner_packet) => { if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { - let _ = stream.unbounded_send(WsEvent::Close(inner_packet)); + let _ = stream.unbounded_send(MuxEvent::Close(inner_packet)); } self.stream_map.lock().await.remove(&packet.stream_id); } @@ -193,6 +220,25 @@ impl ServerMuxInner { } } +/// Server-side multiplexor. +/// +/// # Example +/// ``` +/// use wisp_mux::ServerMux; +/// +/// let (mux, fut) = ServerMux::new(rx, tx, 128); +/// tokio::spawn(async move { +/// if let Err(e) = fut.await { +/// println!("error in multiplexor: {:?}", e); +/// } +/// }); +/// while let Some((packet, stream)) = mux.server_new_stream().await { +/// tokio::spawn(async move { +/// let url = format!("{}:{}", packet.destination_hostname, packet.destination_port); +/// // do something with `url` and `packet.stream_type` +/// }); +/// } +/// ``` pub struct ServerMux where W: ws::WebSocketWrite + Send + 'static, @@ -201,11 +247,16 @@ where } impl ServerMux { - pub fn new(read: R, write: W, buffer_size: u32) -> (Self, impl Future>) + /// Create a new server-side multiplexor. + pub fn new( + read: R, + write: W, + buffer_size: u32, + ) -> (Self, impl Future>) where R: ws::WebSocketRead, { - let (close_tx, close_rx) = mpsc::unbounded::(); + let (close_tx, close_rx) = mpsc::unbounded::(); let (tx, rx) = mpsc::unbounded::<(ConnectPacket, MuxStream)>(); let write = ws::LockedWebSocketWrite::new(write); let map = Arc::new(Mutex::new(HashMap::new())); @@ -220,25 +271,31 @@ impl ServerMux { ) } + /// Wait for a stream to be created. pub async fn server_new_stream(&mut self) -> Option<(ConnectPacket, MuxStream)> { self.muxstream_recv.next().await } } -pub struct ClientMuxInner +struct ClientMuxMapValue { + stream: mpsc::UnboundedSender, + flow_control: Arc, + flow_control_event: Arc, +} + +struct ClientMuxInner where W: ws::WebSocketWrite, { tx: ws::LockedWebSocketWrite, - stream_map: - Arc, Arc, Arc)>>>, + stream_map: Arc>>, } impl ClientMuxInner { - pub async fn into_future( + pub(crate) async fn into_future( self, rx: R, - close_rx: mpsc::UnboundedReceiver, + close_rx: mpsc::UnboundedReceiver, ) -> Result<(), WispError> where R: ws::WebSocketRead, @@ -251,11 +308,11 @@ impl ClientMuxInner { async fn client_bg_loop( &self, - mut close_rx: mpsc::UnboundedReceiver, + mut close_rx: mpsc::UnboundedReceiver, ) -> Result<(), WispError> { while let Some(msg) = close_rx.next().await { match msg { - MuxEvent::Close(stream_id, reason, channel) => { + WsEvent::Close(stream_id, reason, channel) => { if self.stream_map.lock().await.remove(&stream_id).is_some() { let _ = channel.send( self.tx @@ -282,20 +339,20 @@ impl ClientMuxInner { Connect(_) => unreachable!(), Data(data) => { if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { - let _ = stream.0.unbounded_send(WsEvent::Send(data)); + let _ = stream.stream.unbounded_send(MuxEvent::Send(data)); } } Continue(inner_packet) => { if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { stream - .1 + .flow_control .store(inner_packet.buffer_remaining, Ordering::Release); - let _ = stream.2.notify(u32::MAX); + let _ = stream.flow_control_event.notify(u32::MAX); } } Close(inner_packet) => { if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { - let _ = stream.0.unbounded_send(WsEvent::Close(inner_packet)); + let _ = stream.stream.unbounded_send(MuxEvent::Close(inner_packet)); } self.stream_map.lock().await.remove(&packet.stream_id); } @@ -306,19 +363,33 @@ impl ClientMuxInner { } } +/// Client side multiplexor. +/// +/// # Example +/// ``` +/// use wisp_mux::{ClientMux, StreamType}; +/// +/// let (mux, fut) = ClientMux::new(rx, tx).await?; +/// tokio::spawn(async move { +/// if let Err(e) = fut.await { +/// println!("error in multiplexor: {:?}", e); +/// } +/// }); +/// let stream = mux.client_new_stream(StreamType::Tcp, "google.com", 80); +/// ``` pub struct ClientMux where W: ws::WebSocketWrite, { tx: ws::LockedWebSocketWrite, - stream_map: - Arc, Arc, Arc)>>>, + stream_map: Arc>>, next_free_stream_id: AtomicU32, - close_tx: mpsc::UnboundedSender, + close_tx: mpsc::UnboundedSender, buf_size: u32, } impl ClientMux { + /// Create a new client side multiplexor. pub async fn new( mut read: R, write: W, @@ -332,7 +403,7 @@ impl ClientMux { return Err(WispError::InvalidStreamId); } if let PacketType::Continue(packet) = first_packet.packet { - let (tx, rx) = mpsc::unbounded::(); + let (tx, rx) = mpsc::unbounded::(); let map = Arc::new(Mutex::new(HashMap::new())); Ok(( Self { @@ -353,6 +424,7 @@ impl ClientMux { } } + /// Create a new stream, multiplexed through Wisp. pub async fn client_new_stream( &self, stream_type: StreamType, @@ -372,13 +444,18 @@ impl ClientMux { .ok_or(WispError::MaxStreamCountReached)?, Ordering::Release, ); - self.stream_map - .lock() - .await - .insert(stream_id, (ch_tx, flow_control.clone(), evt.clone())); + self.stream_map.lock().await.insert( + stream_id, + ClientMuxMapValue { + stream: ch_tx, + flow_control: flow_control.clone(), + flow_control_event: evt.clone(), + }, + ); Ok(MuxStream::new( stream_id, Role::Client, + stream_type, ch_rx, self.tx.clone(), self.close_tx.clone(), diff --git a/wisp/src/packet.rs b/wisp/src/packet.rs index 505bbe6..e52a8be 100644 --- a/wisp/src/packet.rs +++ b/wisp/src/packet.rs @@ -2,9 +2,12 @@ use crate::ws; use crate::WispError; use bytes::{Buf, BufMut, Bytes}; -#[derive(Debug)] +/// Wisp stream type. +#[derive(Debug, PartialEq, Copy, Clone)] pub enum StreamType { + /// TCP Wisp stream. Tcp = 0x01, + /// UDP Wisp stream. Udp = 0x02, } @@ -20,15 +23,26 @@ impl TryFrom for StreamType { } } -#[derive(Debug)] +/// Packet used to create a new stream. +/// +/// See [the docs](https://github.com/MercuryWorkshop/wisp-protocol/blob/main/protocol.md#0x01---connect). +#[derive(Debug, Clone)] pub struct ConnectPacket { + /// Whether the new stream should use a TCP or UDP socket. pub stream_type: StreamType, + /// Destination TCP/UDP port for the new stream. pub destination_port: u16, + /// Destination hostname, in a UTF-8 string. pub destination_hostname: String, } impl ConnectPacket { - pub fn new(stream_type: StreamType, destination_port: u16, destination_hostname: String) -> Self { + /// Create a new connect packet. + pub fn new( + stream_type: StreamType, + destination_port: u16, + destination_hostname: String, + ) -> Self { Self { stream_type, destination_port, @@ -61,12 +75,17 @@ impl From for Vec { } } -#[derive(Debug)] +/// Packet used for Wisp TCP stream flow control. +/// +/// See [the docs](https://github.com/MercuryWorkshop/wisp-protocol/blob/main/protocol.md#0x03---continue). +#[derive(Debug, Copy, Clone)] pub struct ContinuePacket { + /// Number of packets that the server can buffer for the current stream. pub buffer_remaining: u32, } impl ContinuePacket { + /// Create a new continue packet. pub fn new(buffer_remaining: u32) -> Self { Self { buffer_remaining } } @@ -92,12 +111,21 @@ impl From for Vec { } } -#[derive(Debug)] +/// Packet used to close a stream. +/// +/// See [the +/// docs](https://github.com/MercuryWorkshop/wisp-protocol/blob/main/protocol.md#0x04---close). +#[derive(Debug, Copy, Clone)] pub struct ClosePacket { + /// The close reason. + /// + /// See [the + /// docs](https://github.com/MercuryWorkshop/wisp-protocol/blob/main/protocol.md#clientserver-close-reasons). pub reason: u8, } impl ClosePacket { + /// Create a new close packet. pub fn new(reason: u8) -> Self { Self { reason } } @@ -123,15 +151,21 @@ impl From for Vec { } } -#[derive(Debug)] +#[derive(Debug, Clone)] +/// Type of packet recieved. pub enum PacketType { + /// Connect packet. Connect(ConnectPacket), + /// Data packet. Data(Bytes), + /// Continue packet. Continue(ContinuePacket), + /// Close packet. Close(ClosePacket), } impl PacketType { + /// Get the packet type used in the protocol. pub fn as_u8(&self) -> u8 { use PacketType::*; match self { @@ -155,17 +189,24 @@ impl From for Vec { } } -#[derive(Debug)] +/// Wisp protocol packet. +#[derive(Debug, Clone)] pub struct Packet { + /// Stream this packet is associated with. pub stream_id: u32, + /// Packet recieved. pub packet: PacketType, } impl Packet { + /// Create a new packet. + /// + /// The helper functions should be used for most use cases. pub fn new(stream_id: u32, packet: PacketType) -> Self { Self { stream_id, packet } } + /// Create a new connect packet. pub fn new_connect( stream_id: u32, stream_type: StreamType, @@ -182,6 +223,7 @@ impl Packet { } } + /// Create a new data packet. pub fn new_data(stream_id: u32, data: Bytes) -> Self { Self { stream_id, @@ -189,6 +231,7 @@ impl Packet { } } + /// Create a new continue packet. pub fn new_continue(stream_id: u32, buffer_remaining: u32) -> Self { Self { stream_id, @@ -196,6 +239,7 @@ impl Packet { } } + /// Create a new close packet. pub fn new_close(stream_id: u32, reason: u8) -> Self { Self { stream_id, diff --git a/wisp/src/stream.rs b/wisp/src/stream.rs index f561edb..1c249a2 100644 --- a/wisp/src/stream.rs +++ b/wisp/src/stream.rs @@ -16,35 +16,43 @@ use std::{ }, }; -pub enum WsEvent { +/// Multiplexor event recieved from a Wisp stream. +pub enum MuxEvent { + /// The other side has sent data. Send(Bytes), + /// The other side has closed. Close(crate::ClosePacket), } -pub enum MuxEvent { +pub(crate) enum WsEvent { Close(u32, u8, oneshot::Sender>), } +/// Read side of a multiplexor stream. pub struct MuxStreamRead where W: crate::ws::WebSocketWrite, { + /// ID of the stream. pub stream_id: u32, + /// Type of the stream. + pub stream_type: crate::StreamType, role: crate::Role, tx: crate::ws::LockedWebSocketWrite, - rx: mpsc::UnboundedReceiver, + rx: mpsc::UnboundedReceiver, is_closed: Arc, flow_control: Arc, } impl MuxStreamRead { - pub async fn read(&mut self) -> Option { + /// Read an event from the stream. + pub async fn read(&mut self) -> Option { if self.is_closed.load(Ordering::Acquire) { return None; } match self.rx.next().await? { - WsEvent::Send(bytes) => { - if self.role == crate::Role::Server { + MuxEvent::Send(bytes) => { + if self.role == crate::Role::Server && self.stream_type == crate::StreamType::Tcp { let old_val = self.flow_control.fetch_add(1, Ordering::SeqCst); self.tx .write_frame( @@ -53,11 +61,11 @@ impl MuxStreamRead { .await .ok()?; } - Some(WsEvent::Send(bytes)) + Some(MuxEvent::Send(bytes)) } - WsEvent::Close(packet) => { + MuxEvent::Close(packet) => { self.is_closed.store(true, Ordering::Release); - Some(WsEvent::Close(packet)) + Some(MuxEvent::Close(packet)) } } } @@ -67,8 +75,8 @@ impl MuxStreamRead { let evt = rx.read().await?; Some(( match evt { - WsEvent::Send(bytes) => bytes, - WsEvent::Close(_) => return None, + MuxEvent::Send(bytes) => bytes, + MuxEvent::Close(_) => return None, }, rx, )) @@ -76,25 +84,28 @@ impl MuxStreamRead { } } +/// Write side of a multiplexor stream. pub struct MuxStreamWrite where W: crate::ws::WebSocketWrite, { + /// ID of the stream. pub stream_id: u32, role: crate::Role, tx: crate::ws::LockedWebSocketWrite, - close_channel: mpsc::UnboundedSender, + close_channel: mpsc::UnboundedSender, is_closed: Arc, continue_recieved: Arc, flow_control: Arc, } impl MuxStreamWrite { + /// Write data to the stream. pub async fn write(&self, data: Bytes) -> Result<(), crate::WispError> { if self.is_closed.load(Ordering::Acquire) { return Err(crate::WispError::StreamAlreadyClosed); } - if self.role == crate::Role::Client && self.flow_control.load(Ordering::Acquire) <= 0 { + if self.role == crate::Role::Client && self.flow_control.load(Ordering::Acquire) == 0 { self.continue_recieved.listen().await; } self.tx @@ -112,6 +123,17 @@ impl MuxStreamWrite { Ok(()) } + /// Get a handle to close the connection. + /// + /// Useful to close the connection without having access to the stream. + /// + /// # Example + /// ``` + /// let handle = stream.get_close_handle(); + /// if let Err(error) = handle_stream(stream) { + /// handle.close(0x01); + /// } + /// ``` pub fn get_close_handle(&self) -> MuxStreamCloser { MuxStreamCloser { stream_id: self.stream_id, @@ -120,13 +142,14 @@ impl MuxStreamWrite { } } + /// Close the stream. You will no longer be able to write or read after this has been called. pub async fn close(&self, reason: u8) -> Result<(), crate::WispError> { if self.is_closed.load(Ordering::Acquire) { return Err(crate::WispError::StreamAlreadyClosed); } let (tx, rx) = oneshot::channel::>(); self.close_channel - .unbounded_send(MuxEvent::Close(self.stream_id, reason, tx)) + .unbounded_send(WsEvent::Close(self.stream_id, reason, tx)) .map_err(|x| crate::WispError::Other(Box::new(x)))?; rx.await .map_err(|x| crate::WispError::Other(Box::new(x)))??; @@ -148,26 +171,30 @@ impl Drop for MuxStreamWrite { let (tx, _) = oneshot::channel::>(); let _ = self .close_channel - .unbounded_send(MuxEvent::Close(self.stream_id, 0x01, tx)); + .unbounded_send(WsEvent::Close(self.stream_id, 0x01, tx)); } } +/// Multiplexor stream. pub struct MuxStream where W: crate::ws::WebSocketWrite, { + /// ID of the stream. pub stream_id: u32, rx: MuxStreamRead, tx: MuxStreamWrite, } impl MuxStream { + #[allow(clippy::too_many_arguments)] pub(crate) fn new( stream_id: u32, role: crate::Role, - rx: mpsc::UnboundedReceiver, + stream_type: crate::StreamType, + rx: mpsc::UnboundedReceiver, tx: crate::ws::LockedWebSocketWrite, - close_channel: mpsc::UnboundedSender, + close_channel: mpsc::UnboundedSender, is_closed: Arc, flow_control: Arc, continue_recieved: Arc @@ -176,6 +203,7 @@ impl MuxStream { stream_id, rx: MuxStreamRead { stream_id, + stream_type, role, tx: tx.clone(), rx, @@ -194,26 +222,42 @@ impl MuxStream { } } - pub async fn read(&mut self) -> Option { + /// Read an event from the stream. + pub async fn read(&mut self) -> Option { self.rx.read().await } + /// Write data to the stream. pub async fn write(&self, data: Bytes) -> Result<(), crate::WispError> { self.tx.write(data).await } + /// Get a handle to close the connection. + /// + /// Useful to close the connection without having access to the stream. + /// + /// # Example + /// ``` + /// let handle = stream.get_close_handle(); + /// if let Err(error) = handle_stream(stream) { + /// handle.close(0x01); + /// } + /// ``` pub fn get_close_handle(&self) -> MuxStreamCloser { self.tx.get_close_handle() } + /// Close the stream. You will no longer be able to write or read after this has been called. pub async fn close(&self, reason: u8) -> Result<(), crate::WispError> { self.tx.close(reason).await } + /// Split the stream into read and write parts, consuming it. pub fn into_split(self) -> (MuxStreamRead, MuxStreamWrite) { (self.rx, self.tx) } + /// Turn the stream into one that implements futures `Stream + Sink`, consuming it. pub fn into_io(self) -> MuxStreamIo { MuxStreamIo { rx: self.rx.into_stream(), @@ -222,20 +266,23 @@ impl MuxStream { } } +/// Close handle for a multiplexor stream. pub struct MuxStreamCloser { - stream_id: u32, - close_channel: mpsc::UnboundedSender, + /// ID of the stream. + pub stream_id: u32, + close_channel: mpsc::UnboundedSender, is_closed: Arc, } impl MuxStreamCloser { + /// Close the stream. You will no longer be able to write or read after this has been called. pub async fn close(&self, reason: u8) -> Result<(), crate::WispError> { if self.is_closed.load(Ordering::Acquire) { return Err(crate::WispError::StreamAlreadyClosed); } let (tx, rx) = oneshot::channel::>(); self.close_channel - .unbounded_send(MuxEvent::Close(self.stream_id, reason, tx)) + .unbounded_send(WsEvent::Close(self.stream_id, reason, tx)) .map_err(|x| crate::WispError::Other(Box::new(x)))?; rx.await .map_err(|x| crate::WispError::Other(Box::new(x)))??; @@ -245,6 +292,7 @@ impl MuxStreamCloser { } pin_project! { + /// Multiplexor stream that implements futures `Stream + Sink`. pub struct MuxStreamIo { #[pin] rx: Pin + Send>>, @@ -254,6 +302,10 @@ pin_project! { } impl MuxStreamIo { + /// Turn the stream into one that implements futures `AsyncRead + AsyncWrite`. + /// + /// Enable the `tokio_io` feature to implement the tokio version of `AsyncRead` and + /// `AsyncWrite`. pub fn into_asyncrw(self) -> IoStream> { IoStream::new(self) } diff --git a/wisp/src/tokioio.rs b/wisp/src/tokioio.rs index a3ca7be..62061b9 100644 --- a/wisp/src/tokioio.rs +++ b/wisp/src/tokioio.rs @@ -1,6 +1,5 @@ #![allow(dead_code)] -// Taken from https://github.com/hyperium/hyper-util/blob/master/src/rt/tokio.rs -// hyper-util fails to compile on WASM as it has a dependency on socket2 +//! hyper_util::rt::tokio::TokioIo use std::{ pin::Pin, diff --git a/wisp/src/tower.rs b/wisp/src/tower.rs index 06f3ebc..368de8f 100644 --- a/wisp/src/tower.rs +++ b/wisp/src/tower.rs @@ -1,3 +1,4 @@ +//! Helper that implements a Tower Service for a client multiplexor. use crate::{tokioio::TokioIo, ws::WebSocketWrite, ClientMux, MuxStreamIo, StreamType, WispError}; use async_io_stream::IoStream; use futures::{ @@ -6,6 +7,7 @@ use futures::{ }; use std::sync::Arc; +/// Wrapper struct that implements a Tower Service sfor a client multiplexor. pub struct ServiceWrapper(pub Arc>); impl tower_service::Service for ServiceWrapper { diff --git a/wisp/src/ws.rs b/wisp/src/ws.rs index f75c526..610351c 100644 --- a/wisp/src/ws.rs +++ b/wisp/src/ws.rs @@ -1,23 +1,41 @@ +//! Abstraction over WebSocket implementations. +//! +//! Use the [`fastwebsockets`] and [`ws_stream_wasm`] implementations of these traits as an example +//! for implementing them for other WebSocket implementations. +//! +//! [`fastwebsockets`]: https://github.com/MercuryWorkshop/epoxy-tls/blob/multiplexed/wisp/src/fastwebsockets.rs +//! [`ws_stream_wasm`]: https://github.com/MercuryWorkshop/epoxy-tls/blob/multiplexed/wisp/src/ws_stream_wasm.rs use bytes::Bytes; use futures::lock::Mutex; use std::sync::Arc; +/// Opcode of the WebSocket frame. #[derive(Debug, PartialEq, Clone, Copy)] pub enum OpCode { + /// Text frame. Text, + /// Binary frame. Binary, + /// Close frame. Close, + /// Ping frame. Ping, + /// Pong frame. Pong, } +/// WebSocket frame. pub struct Frame { + /// Whether the frame is finished or not. pub finished: bool, + /// Opcode of the WebSocket frame. pub opcode: OpCode, + /// Payload of the WebSocket frame. pub payload: Bytes, } impl Frame { + /// Create a new text frame. pub fn text(payload: Bytes) -> Self { Self { finished: true, @@ -26,6 +44,7 @@ impl Frame { } } + /// Create a new binary frame. pub fn binary(payload: Bytes) -> Self { Self { finished: true, @@ -34,6 +53,7 @@ impl Frame { } } + /// Create a new close frame. pub fn close(payload: Bytes) -> Self { Self { finished: true, @@ -43,27 +63,34 @@ impl Frame { } } +/// Generic WebSocket read trait. pub trait WebSocketRead { + /// Read a frame from the socket. fn wisp_read_frame( &mut self, tx: &crate::ws::LockedWebSocketWrite, ) -> impl std::future::Future> + Send; } +/// Generic WebSocket write trait. pub trait WebSocketWrite { + /// Write a frame to the socket. fn wisp_write_frame( &mut self, frame: Frame, ) -> impl std::future::Future> + Send; } +/// Locked WebSocket that can be shared between threads. pub struct LockedWebSocketWrite(Arc>); impl LockedWebSocketWrite { + /// Create a new locked websocket. pub fn new(ws: S) -> Self { Self(Arc::new(Mutex::new(ws))) } + /// Write a frame to the websocket. pub async fn write_frame(&self, frame: Frame) -> Result<(), crate::WispError> { self.0.lock().await.wisp_write_frame(frame).await }