diff --git a/Cargo.lock b/Cargo.lock index ac0baec..02b3cd1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -101,7 +101,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6d7b9decdf35d8908a7e3ef02f64c5e9b1695e230154c0e8de3969142d9b94c" dependencies = [ "futures", - "pharos", "rustc_version", "tokio", ] @@ -365,6 +364,7 @@ dependencies = [ "base64", "bytes", "console_error_panic_hook", + "event-listener", "fastwebsockets", "futures-util", "getrandom", @@ -375,6 +375,7 @@ dependencies = [ "js-sys", "pin-project-lite", "ring", + "send_wrapper", "tokio", "tokio-rustls", "tokio-util", @@ -385,7 +386,6 @@ dependencies = [ "web-sys", "webpki-roots", "wisp-mux", - "ws_stream_wasm", ] [[package]] @@ -425,9 +425,9 @@ dependencies = [ [[package]] name = "event-listener" -version = "5.0.0" +version = "5.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b72557800024fabbaa2449dd4bf24e37b93702d457a4d4f2b0dd1f0f039f20c1" +checksum = "2b5fb89194fa3cad959b833185b3063ba881dbfc7030680b314250779fb4cc91" dependencies = [ "concurrent-queue", "parking", @@ -959,16 +959,6 @@ dependencies = [ "windows-targets 0.48.5", ] -[[package]] -name = "pharos" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9567389417feee6ce15dd6527a8a1ecac205ef62c2932bcf3d9f6fc5b78b414" -dependencies = [ - "futures", - "rustc_version", -] - [[package]] name = "pin-project" version = "1.1.4" @@ -1860,7 +1850,7 @@ checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" [[package]] name = "wisp-mux" -version = "1.1.1" +version = "1.1.2" dependencies = [ "async_io_stream", "bytes", @@ -1873,26 +1863,6 @@ dependencies = [ "pin-project-lite", "tokio", "tower-service", - "ws_stream_wasm", -] - -[[package]] -name = "ws_stream_wasm" -version = "0.7.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7999f5f4217fe3818726b66257a4475f71e74ffd190776ad053fa159e50737f5" -dependencies = [ - "async_io_stream", - "futures", - "js-sys", - "log", - "pharos", - "rustc_version", - "send_wrapper", - "thiserror", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", ] [[package]] diff --git a/client/Cargo.toml b/client/Cargo.toml index 1031bf6..a31ed47 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -14,24 +14,25 @@ hyper = { version = "1.1.0", features = ["client", "http1", "http2"] } pin-project-lite = "0.2.13" wasm-bindgen = { version = "0.2.91", features = ["enable-interning"] } wasm-bindgen-futures = "0.4.39" -ws_stream_wasm = { version = "0.7.4", features = ["tokio_io"] } futures-util = "0.3.30" js-sys = "0.3.66" webpki-roots = "0.26.0" tokio-rustls = "0.25.0" -web-sys = { version = "0.3.66", features = ["TextEncoder", "Response", "ResponseInit"] } +web-sys = { version = "0.3.66", features = ["TextEncoder", "Response", "ResponseInit", "WebSocket", "BinaryType", "MessageEvent"] } wasm-streams = "0.4.0" tokio-util = { version = "0.7.10", features = ["io"] } async-compression = { version = "0.4.5", features = ["tokio", "gzip", "brotli"] } fastwebsockets = { version = "0.6.0", features = ["unstable-split"] } base64 = "0.21.7" -wisp-mux = { path = "../wisp", features = ["ws_stream_wasm", "tokio_io", "hyper_tower"] } +wisp-mux = { path = "../wisp", features = ["tokio_io", "hyper_tower"] } async_io_stream = { version = "0.3.3", features = ["tokio_io"] } getrandom = { version = "0.2.12", features = ["js"] } hyper-util-wasm = { version = "0.1.3", features = ["client", "client-legacy", "http1", "http2"] } tokio = { version = "1.36.0", default-features = false } tower-service = "0.3.2" console_error_panic_hook = "0.1.7" +send_wrapper = "0.6.0" +event-listener = "5.2.0" [dependencies.ring] features = ["wasm32_unknown_unknown_js"] diff --git a/client/src/lib.rs b/client/src/lib.rs index 1693e67..862fbf5 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -8,14 +8,14 @@ mod wrappers; use tls_stream::EpxTlsStream; use utils::{Boolinator, ReplaceErr, UriExt}; use websocket::EpxWebSocket; -use wrappers::{IncomingBody, TlsWispService}; +use wrappers::{IncomingBody, TlsWispService, WebSocketWrapper}; use std::sync::Arc; use async_compression::tokio::bufread as async_comp; use async_io_stream::IoStream; use bytes::Bytes; -use futures_util::{stream::SplitSink, StreamExt}; +use futures_util::StreamExt; use http::{uri, HeaderName, HeaderValue, Request, Response}; use hyper::{body::Incoming, Uri}; use hyper_util_wasm::client::legacy::Client; @@ -28,7 +28,6 @@ use tokio_util::{ use wasm_bindgen::{intern, prelude::*}; use web_sys::TextEncoder; use wisp_mux::{tokioio::TokioIo, tower::ServiceWrapper, ClientMux, MuxStreamIo, StreamType}; -use ws_stream_wasm::{WsMessage, WsMeta, WsStream}; type HttpBody = http_body_util::Full; @@ -64,12 +63,11 @@ fn init() { intern("rawHeaders"); } - #[wasm_bindgen(inspectable)] pub struct EpoxyClient { rustls_config: Arc, - mux: Arc>>, - hyper_client: Client>, HttpBody>, + mux: Arc>, + hyper_client: Client, HttpBody>, #[wasm_bindgen(getter_with_clone)] pub useragent: String, #[wasm_bindgen(js_name = "redirectLimit")] @@ -96,12 +94,11 @@ impl EpoxyClient { } debug!("connecting to ws {:?}", ws_url); - let (_, ws) = WsMeta::connect(ws_url, vec![]) + let (wtx, wrx) = WebSocketWrapper::connect(ws_url, vec![]) .await .replace_err("Failed to connect to websocket")?; debug!("connected!"); - let (wtx, wrx) = ws.split(); let (mux, fut) = ClientMux::new(wrx, wtx).await?; let mux = Arc::new(mux); diff --git a/client/src/wrappers.rs b/client/src/wrappers.rs index 5df0814..8513108 100644 --- a/client/src/wrappers.rs +++ b/client/src/wrappers.rs @@ -4,11 +4,21 @@ use std::{ task::{Context, Poll}, }; +use event_listener::Event; use futures_util::Stream; use hyper::body::Body; +use js_sys::ArrayBuffer; use pin_project_lite::pin_project; +use send_wrapper::SendWrapper; use std::future::Future; -use wisp_mux::{tokioio::TokioIo, tower::ServiceWrapper, WispError}; +use tokio::sync::mpsc; +use web_sys::{BinaryType, MessageEvent, WebSocket}; +use wisp_mux::{ + tokioio::TokioIo, + tower::ServiceWrapper, + ws::{Frame, LockedWebSocketWrite, WebSocketRead, WebSocketWrite}, + WispError, +}; pin_project! { pub struct IncomingBody { @@ -51,7 +61,6 @@ where pub rustls_config: Arc, } - impl tower_service::Service for TlsWispService { @@ -107,3 +116,173 @@ impl Clone for TlsWispService< } } } + +#[derive(Debug)] +pub enum WebSocketError { + Closed, + Unknown, + SendFailed, +} + +impl std::fmt::Display for WebSocketError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + use WebSocketError::*; + match self { + Closed => write!(f, "Websocket closed"), + Unknown => write!(f, "Unknown error"), + SendFailed => write!(f, "Send failed"), + } + } +} + +impl std::error::Error for WebSocketError {} + +impl From for WispError { + fn from(err: WebSocketError) -> Self { + Self::WsImplError(Box::new(err)) + } +} + +pub enum WebSocketMessage { + Close, + Error, + Message(Vec), +} + +pub struct WebSocketWrapper { + inner: SendWrapper, + + // used to retain the closures + #[allow(dead_code)] + onopen: SendWrapper>, + #[allow(dead_code)] + onclose: SendWrapper>, + #[allow(dead_code)] + onerror: SendWrapper>, + #[allow(dead_code)] + onmessage: SendWrapper>, +} + +pub struct WebSocketReader { + read_rx: mpsc::UnboundedReceiver, +} + +impl WebSocketRead for WebSocketReader { + async fn wisp_read_frame( + &mut self, + _: &LockedWebSocketWrite, + ) -> Result { + use WebSocketMessage::*; + match self + .read_rx + .recv() + .await + .ok_or(WispError::WsImplError(Box::new(WebSocketError::Closed)))? + { + Message(bin) => Ok(Frame::binary(bin.into())), + Error => Err(WebSocketError::Unknown.into()), + Close => Err(WebSocketError::Closed.into()), + } + } +} + +impl WebSocketWrapper { + pub async fn connect( + url: String, + protocols: Vec, + ) -> Result<(Self, WebSocketReader), JsValue> { + let ws = if protocols.is_empty() { + WebSocket::new(&url) + } else { + WebSocket::new_with_str_sequence( + &url, + &protocols + .iter() + .fold(Array::new(), |acc, x| { + acc.push(&jval!(x)); + acc + }) + .into(), + ) + } + .replace_err("Failed to make websocket")?; + + ws.set_binary_type(BinaryType::Arraybuffer); + + let (read_tx, read_rx) = mpsc::unbounded_channel(); + + let open_event = Arc::new(Event::new()); + + let open_event_tx = open_event.clone(); + let onopen = Closure::wrap( + Box::new(move || while open_event_tx.notify(usize::MAX) == 0 {}) as Box, + ); + + let onmessage_tx = read_tx.clone(); + let onmessage = Closure::wrap(Box::new(move |evt: MessageEvent| { + if let Ok(arr) = evt.data().dyn_into::() { + let _ = + onmessage_tx.send(WebSocketMessage::Message(Uint8Array::new(&arr).to_vec())); + } + }) as Box); + + ws.set_onopen(Some(onopen.as_ref().unchecked_ref())); + ws.set_onmessage(Some(onmessage.as_ref().unchecked_ref())); + + let onclose_tx = read_tx.clone(); + let onclose = Closure::wrap(Box::new(move || { + let _ = onclose_tx.send(WebSocketMessage::Close); + }) as Box); + + let onerror_tx = read_tx.clone(); + let onerror = Closure::wrap(Box::new(move || { + let _ = onerror_tx.send(WebSocketMessage::Error); + }) as Box); + + ws.set_onclose(Some(onclose.as_ref().unchecked_ref())); + ws.set_onerror(Some(onerror.as_ref().unchecked_ref())); + + open_event.listen().await; + + Ok(( + Self { + inner: SendWrapper::new(ws), + onopen: SendWrapper::new(onopen), + onclose: SendWrapper::new(onclose), + onerror: SendWrapper::new(onerror), + onmessage: SendWrapper::new(onmessage), + }, + WebSocketReader { read_rx }, + )) + } +} + +impl WebSocketWrite for WebSocketWrapper { + async fn wisp_write_frame(&mut self, frame: Frame) -> Result<(), WispError> { + use wisp_mux::ws::OpCode::*; + match frame.opcode { + Binary => self + .inner + .send_with_u8_array(&frame.payload) + .map_err(|_| WebSocketError::SendFailed.into()), + Text => self + .inner + .send_with_u8_array(&frame.payload) + .map_err(|_| WebSocketError::SendFailed.into()), + Close => { + let _ = self.inner.close(); + Ok(()) + } + _ => Err(WispError::WsImplNotSupported), + } + } +} + +impl Drop for WebSocketWrapper { + fn drop(&mut self) { + self.inner.set_onopen(None); + self.inner.set_onclose(None); + self.inner.set_onerror(None); + self.inner.set_onmessage(None); + } +} diff --git a/wisp/Cargo.toml b/wisp/Cargo.toml index 6d426d2..774a871 100644 --- a/wisp/Cargo.toml +++ b/wisp/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "wisp-mux" -version = "1.1.1" +version = "1.1.2" license = "AGPL-3.0-only" description = "A library for easily creating Wisp servers and clients." homepage = "https://github.com/MercuryWorkshop/epoxy-tls/tree/multiplexed/wisp" @@ -20,13 +20,12 @@ hyper-util-wasm = { version = "0.1.3", features = ["client", "client-legacy"], o pin-project-lite = "0.2.13" tokio = { version = "1.35.1", optional = true, default-features = false } tower-service = { version = "0.3.2", optional = true } -ws_stream_wasm = { version = "0.7.4", optional = true } [features] fastwebsockets = ["dep:fastwebsockets", "dep:tokio"] -ws_stream_wasm = ["dep:ws_stream_wasm"] tokio_io = ["async_io_stream/tokio_io"] hyper_tower = ["dep:tower-service", "dep:hyper", "dep:tokio", "dep:hyper-util-wasm"] [package.metadata.docs.rs] -features = ["hyper_tower"] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] diff --git a/wisp/src/lib.rs b/wisp/src/lib.rs index 93cc870..49dc45d 100644 --- a/wisp/src/lib.rs +++ b/wisp/src/lib.rs @@ -1,21 +1,23 @@ #![deny(missing_docs)] #![feature(impl_trait_in_assoc_type)] +#![cfg_attr(docsrs, feature(doc_cfg))] //! A library for easily creating [Wisp] clients and servers. //! //! [Wisp]: https://github.com/MercuryWorkshop/wisp-protocol #[cfg(feature = "fastwebsockets")] +#[cfg_attr(docsrs, doc(cfg(feature = "fastwebsockets")))] mod fastwebsockets; mod packet; mod sink_unfold; mod stream; #[cfg(feature = "hyper_tower")] +#[cfg_attr(docsrs, doc(cfg(feature = "hyper_tower")))] pub mod tokioio; #[cfg(feature = "hyper_tower")] +#[cfg_attr(docsrs, doc(cfg(feature = "hyper_tower")))] pub mod tower; pub mod ws; -#[cfg(feature = "ws_stream_wasm")] -mod ws_stream_wasm; pub use crate::packet::*; pub use crate::stream::*; diff --git a/wisp/src/ws_stream_wasm.rs b/wisp/src/ws_stream_wasm.rs deleted file mode 100644 index 410b537..0000000 --- a/wisp/src/ws_stream_wasm.rs +++ /dev/null @@ -1,60 +0,0 @@ -use futures::{stream::{SplitStream, SplitSink}, SinkExt, StreamExt}; -use ws_stream_wasm::{WsErr, WsMessage, WsStream}; - -impl From for crate::ws::Frame { - fn from(msg: WsMessage) -> Self { - use crate::ws::OpCode; - match msg { - WsMessage::Text(str) => Self { - finished: true, - opcode: OpCode::Text, - payload: str.into(), - }, - WsMessage::Binary(bin) => Self { - finished: true, - opcode: OpCode::Binary, - payload: bin.into(), - }, - } - } -} - -impl TryFrom for WsMessage { - type Error = crate::WispError; - fn try_from(msg: crate::ws::Frame) -> Result { - use crate::ws::OpCode; - match msg.opcode { - OpCode::Text => Ok(Self::Text(std::str::from_utf8(&msg.payload)?.to_string())), - OpCode::Binary => Ok(Self::Binary(msg.payload.to_vec())), - _ => Err(Self::Error::WsImplNotSupported), - } - } -} - -impl From for crate::WispError { - fn from(err: WsErr) -> Self { - Self::WsImplError(Box::new(err)) - } -} - -impl crate::ws::WebSocketRead for SplitStream { - async fn wisp_read_frame( - &mut self, - _: &crate::ws::LockedWebSocketWrite, - ) -> Result { - Ok(self - .next() - .await - .ok_or(crate::WispError::WsImplSocketClosed)? - .into()) - } -} - -impl crate::ws::WebSocketWrite for SplitSink { - async fn wisp_write_frame(&mut self, frame: crate::ws::Frame) -> Result<(), crate::WispError> { - self - .send(frame.try_into()?) - .await - .map_err(|e| e.into()) - } -}