diff --git a/client/demo.js b/client/demo.js index 551600d..9c678d9 100644 --- a/client/demo.js +++ b/client/demo.js @@ -332,12 +332,12 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox console.time(); let resp = await epoxy_client.fetch(test_url); console.log(resp, resp.rawHeaders); - log(await resp.text()); + log(await resp.arrayBuffer()); console.timeEnd(); } log("done"); } catch (err) { console.error(err); - log(err); + log(err.stack); } })(); diff --git a/client/src/lib.rs b/client/src/lib.rs index 7baad83..c455faf 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -29,8 +29,8 @@ use stream_provider::{ProviderWispTransportGenerator, StreamProvider, StreamProv use thiserror::Error; use utils::{ asyncread_to_readablestream, convert_streaming_body, entries_of_object, from_entries, - is_null_body, is_redirect, object_get, object_set, object_truthy, websocket_transport, - StreamingBody, UriExt, WasmExecutor, WispTransportWrite, + is_null_body, is_redirect, object_get, object_set, object_truthy, ws_protocol, StreamingBody, + UriExt, WasmExecutor, WispTransportWrite, }; use wasm_bindgen::prelude::*; use wasm_bindgen_futures::JsFuture; @@ -44,6 +44,7 @@ use wisp_mux::{ ws::{WebSocketRead, WebSocketWrite}, CloseReason, }; +use ws_wrapper::WebSocketWrapper; #[cfg(feature = "full")] mod io_stream; @@ -52,6 +53,7 @@ mod tokioio; mod utils; #[cfg(feature = "full")] mod websocket; +mod ws_wrapper; #[wasm_bindgen(typescript_custom_section)] const EPOXYCLIENT_TYPES: &'static str = r#" @@ -59,7 +61,7 @@ type EpoxyIoStream = { read: ReadableStream, write: WritableStream, }; -type EpoxyWispTransport = string | ((wisp_v2: boolean) => { read: ReadableStream, write: WritableStream }); +type EpoxyWispTransport = string | (() => { read: ReadableStream, write: WritableStream }); type EpoxyWebSocketInput = string | ArrayBuffer; type EpoxyWebSocketHeadersInput = Headers | { [key: string]: string }; type EpoxyUrlInput = string | URL; @@ -119,7 +121,10 @@ pub enum EpoxyError { #[error("Webpki: {0:?} ({0})")] Webpki(#[from] webpki::Error), - #[error("Wisp transport: {0}")] + #[error("Wisp WebSocket failed to connect: {0}")] + WebSocketConnectFailed(String), + + #[error("Custom Wisp transport: {0}")] WispTransport(String), #[error("Invalid Wisp transport: {0}")] InvalidWispTransport(String), @@ -346,7 +351,9 @@ fn create_wisp_transport(function: Function) -> ProviderWispTransportGenerator { let arr: ArrayBuffer = pkt.dyn_into().map_err(|x| { EpoxyError::InvalidWispTransportPacket(format!("{:?}", x)) })?; - Ok::(BytesMut::from(Uint8Array::new(&arr).to_vec().as_slice())) + Ok::(BytesMut::from( + Uint8Array::new(&arr).to_vec().as_slice(), + )) }), )); let write: WritableStream = object_get(&transport, "write").into(); @@ -387,9 +394,38 @@ impl EpoxyClient { options: EpoxyClientOptions, ) -> Result { let stream_provider = if let Some(wisp_url) = transport.as_string() { + let wisp_uri: Uri = wisp_url.clone().try_into()?; + if wisp_uri.scheme_str() != Some("wss") && wisp_uri.scheme_str() != Some("ws") { + return Err(EpoxyError::InvalidUrlScheme( + wisp_uri.scheme_str().map(ToString::to_string), + )); + } + let ws_protocols = options.websocket_protocols.clone(); Arc::new(StreamProvider::new( - create_wisp_transport(websocket_transport(wisp_url, ws_protocols)), + Box::new(move |wisp_v2| { + let wisp_url = wisp_url.clone(); + let mut ws_protocols = ws_protocols.clone(); + if wisp_v2 { + // send some random data to ask the server for v2 + ws_protocols.push(ws_protocol()); + } + + Box::pin(async move { + let (write, read) = WebSocketWrapper::connect(&wisp_url, &ws_protocols)?; + while write.inner.ready_state() == 0 { + if !write.wait_for_open().await { + return Err(EpoxyError::WebSocketConnectFailed( + "websocket did not open".to_string(), + )); + } + } + Ok(( + Box::new(read) as Box, + Box::new(write) as Box, + )) + }) + }), &options, )?) } else if let Some(wisp_transport) = transport.dyn_ref::() { diff --git a/client/src/utils/js.rs b/client/src/utils/js.rs index b55b05f..a467624 100644 --- a/client/src/utils/js.rs +++ b/client/src/utils/js.rs @@ -3,7 +3,7 @@ use std::{pin::Pin, task::{Context, Poll}}; use bytes::Bytes; use futures_util::{AsyncRead, Stream, StreamExt, TryStreamExt}; use http_body_util::{Either, Full, StreamBody}; -use js_sys::{Array, Function, JsString, Object, Uint8Array}; +use js_sys::{Array, JsString, Object, Uint8Array}; use send_wrapper::SendWrapper; use wasm_bindgen::{prelude::*, JsCast, JsValue}; use wasm_streams::ReadableStream; @@ -14,73 +14,13 @@ use super::ReaderStream; #[wasm_bindgen(inline_js = r#" -class WebSocketStreamPonyfill { - url; - opened; - closed; - close; - constructor(url, options = {}) { - if (options.signal?.aborted) { - throw new DOMException('This operation was aborted', 'AbortError'); - } - this.url = url; - const ws = new WebSocket(url, options.protocols ?? []); - ws.binaryType = "arraybuffer"; - const closeWithInfo = ({ closeCode: code, reason } = {}) => ws.close(code, reason); - this.opened = new Promise((resolve, reject) => { - const errorHandler = ()=>reject(new Error("WebSocket closed before handshake complete.")); - ws.onopen = () => { - resolve({ - readable: new ReadableStream({ - start(controller) { - ws.onmessage = ({ data }) => controller.enqueue(data); - ws.onerror = e => controller.error(e); - }, - cancel: closeWithInfo, - }), - writable: new WritableStream({ - write(chunk) { ws.send(chunk); }, - abort() { ws.close(); }, - close: closeWithInfo, - }), - protocol: ws.protocol, - extensions: ws.extensions, - }); - ws.removeEventListener('error', errorHandler); - }; - ws.addEventListener('error', errorHandler); - }); - this.closed = new Promise((resolve, reject) => { - ws.onclose = ({ code, reason }) => { - resolve({ closeCode: code, reason }); - ws.removeEventListener('error', reject); - }; - ws.addEventListener('error', reject); - }); - if (options.signal) { - options.signal.onabort = () => ws.close(); - } - this.close = closeWithInfo; - } -} - -function ws_protocol() { +export function ws_protocol() { return ( [1e7]+-1e3+-4e3+-8e3+-1e11).replace(/[018]/g, c => (c ^ crypto.getRandomValues(new Uint8Array(1))[0] & 15 >> c / 4).toString(16) ); } -export function websocket_transport(url, protocols) { - const ws_impl = typeof WebSocketStream === "undefined" ? WebSocketStreamPonyfill : WebSocketStream; - return async (wisp_v2)=>{ - if (wisp_v2) protocols.push(ws_protocol()); - const ws = new ws_impl(url, { protocols }); - const { readable, writable } = await ws.opened; - return { read: readable, write: writable }; - } -} - export function object_get(obj, k) { try { return obj[k] @@ -131,8 +71,6 @@ export function from_entries(entries){ } "#)] extern "C" { - pub fn websocket_transport(url: String, protocols: Vec) -> Function; - pub fn object_get(obj: &Object, key: &str) -> JsValue; pub fn object_set(obj: &Object, key: &str, val: JsValue); @@ -144,6 +82,7 @@ extern "C" { fn entries_of_object_inner(obj: &Object) -> Vec; pub fn define_property(obj: &Object, key: &str, val: JsValue); pub fn ws_key() -> String; + pub fn ws_protocol() -> String; #[wasm_bindgen(catch)] pub fn from_entries(iterable: &JsValue) -> Result; diff --git a/client/src/ws_wrapper.rs b/client/src/ws_wrapper.rs new file mode 100644 index 0000000..85a0668 --- /dev/null +++ b/client/src/ws_wrapper.rs @@ -0,0 +1,225 @@ +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; + +use async_trait::async_trait; +use bytes::BytesMut; +use event_listener::Event; +use flume::Receiver; +use futures_util::FutureExt; +use js_sys::{Array, ArrayBuffer, Uint8Array}; +use send_wrapper::SendWrapper; +use thiserror::Error; +use wasm_bindgen::{closure::Closure, JsCast, JsValue}; +use web_sys::{BinaryType, MessageEvent, WebSocket}; +use wisp_mux::{ + ws::{Frame, LockedWebSocketWrite, Payload, WebSocketRead, WebSocketWrite}, + WispError, +}; + +use crate::EpoxyError; + +#[derive(Error, Debug)] +pub enum WebSocketError { + #[error("Unknown JS WebSocket wrapper error: {0:?}")] + Unknown(String), + #[error("Failed to call WebSocket.send: {0:?}")] + SendFailed(String), + #[error("Failed to call WebSocket.close: {0:?}")] + CloseFailed(String), +} + +impl From for WispError { + fn from(err: WebSocketError) -> Self { + Self::WsImplError(Box::new(err)) + } +} + +pub enum WebSocketMessage { + Closed, + Error(WebSocketError), + Message(Vec), +} + +pub struct WebSocketWrapper { + pub inner: SendWrapper, + open_event: Arc, + error_event: Arc, + close_event: Arc, + closed: Arc, + + // used to retain the closures + #[allow(dead_code)] + onopen: SendWrapper>, + #[allow(dead_code)] + onclose: SendWrapper>, + #[allow(dead_code)] + onerror: SendWrapper>, + #[allow(dead_code)] + onmessage: SendWrapper>, +} + +pub struct WebSocketReader { + read_rx: Receiver, + closed: Arc, + close_event: Arc, +} + +#[async_trait] +impl WebSocketRead for WebSocketReader { + async fn wisp_read_frame( + &mut self, + _: &LockedWebSocketWrite, + ) -> Result, WispError> { + use WebSocketMessage as M; + if self.closed.load(Ordering::Acquire) { + return Err(WispError::WsImplSocketClosed); + } + let res = futures_util::select! { + data = self.read_rx.recv_async() => data.ok(), + _ = self.close_event.listen().fuse() => Some(M::Closed), + }; + match res.ok_or(WispError::WsImplSocketClosed)? { + M::Message(bin) => Ok(Frame::binary(Payload::Bytes(BytesMut::from( + bin.as_slice(), + )))), + M::Error(x) => Err(x.into()), + M::Closed => Err(WispError::WsImplSocketClosed), + } + } +} + +impl WebSocketWrapper { + pub fn connect(url: &str, protocols: &[String]) -> Result<(Self, WebSocketReader), EpoxyError> { + let (read_tx, read_rx) = flume::unbounded(); + let closed = Arc::new(AtomicBool::new(false)); + + let open_event = Arc::new(Event::new()); + let close_event = Arc::new(Event::new()); + let error_event = Arc::new(Event::new()); + + let onopen_event = open_event.clone(); + let onopen = Closure::wrap( + Box::new(move || while onopen_event.notify(usize::MAX) == 0 {}) as Box, + ); + + let onmessage_tx = read_tx.clone(); + let onmessage = Closure::wrap(Box::new(move |evt: MessageEvent| { + if let Ok(arr) = evt.data().dyn_into::() { + let _ = + onmessage_tx.send(WebSocketMessage::Message(Uint8Array::new(&arr).to_vec())); + } + }) as Box); + + let onclose_closed = closed.clone(); + let onclose_event = close_event.clone(); + let onclose = Closure::wrap(Box::new(move || { + onclose_closed.store(true, Ordering::Release); + onclose_event.notify(usize::MAX); + }) as Box); + + let onerror_tx = read_tx.clone(); + let onerror_closed = closed.clone(); + let onerror_close = close_event.clone(); + let onerror_event = error_event.clone(); + let onerror = Closure::wrap(Box::new(move |e| { + let _ = onerror_tx.send(WebSocketMessage::Error(WebSocketError::Unknown(format!( + "{:?}", + e + )))); + onerror_closed.store(true, Ordering::Release); + onerror_close.notify(usize::MAX); + onerror_event.notify(usize::MAX); + }) as Box); + + let ws = if protocols.is_empty() { + WebSocket::new(url) + } else { + WebSocket::new_with_str_sequence( + url, + &protocols + .iter() + .fold(Array::new(), |acc, x| { + acc.push(&x.into()); + acc + }) + .into(), + ) + } + .map_err(|x| EpoxyError::WebSocketConnectFailed(format!("{:?}", x)))?; + ws.set_binary_type(BinaryType::Arraybuffer); + ws.set_onmessage(Some(onmessage.as_ref().unchecked_ref())); + ws.set_onopen(Some(onopen.as_ref().unchecked_ref())); + ws.set_onclose(Some(onclose.as_ref().unchecked_ref())); + ws.set_onerror(Some(onerror.as_ref().unchecked_ref())); + + Ok(( + Self { + inner: SendWrapper::new(ws), + open_event, + error_event, + close_event: close_event.clone(), + closed: closed.clone(), + onopen: SendWrapper::new(onopen), + onclose: SendWrapper::new(onclose), + onerror: SendWrapper::new(onerror), + onmessage: SendWrapper::new(onmessage), + }, + WebSocketReader { + read_rx, + closed, + close_event, + }, + )) + } + + pub async fn wait_for_open(&self) -> bool { + if self.closed.load(Ordering::Acquire) { + return false; + } + futures_util::select! { + _ = self.open_event.listen().fuse() => true, + _ = self.error_event.listen().fuse() => false, + } + } +} + +#[async_trait] +impl WebSocketWrite for WebSocketWrapper { + async fn wisp_write_frame(&mut self, frame: Frame<'_>) -> Result<(), WispError> { + use wisp_mux::ws::OpCode::*; + if self.closed.load(Ordering::Acquire) { + return Err(WispError::WsImplSocketClosed); + } + match frame.opcode { + Binary | Text => self + .inner + .send_with_u8_array(&frame.payload) + .map_err(|x| WebSocketError::SendFailed(format!("{:?}", x)).into()), + Close => { + let _ = self.inner.close(); + Ok(()) + } + _ => Err(WispError::WsImplNotSupported), + } + } + + async fn wisp_close(&mut self) -> Result<(), WispError> { + self.inner + .close() + .map_err(|x| WebSocketError::CloseFailed(format!("{:?}", x)).into()) + } +} + +impl Drop for WebSocketWrapper { + fn drop(&mut self) { + self.inner.set_onopen(None); + self.inner.set_onclose(None); + self.inner.set_onerror(None); + self.inner.set_onmessage(None); + self.closed.store(true, Ordering::Release); + self.close_event.notify(usize::MAX); + let _ = self.inner.close(); + } +}