From 785cc286c9849e454457098280b20f7bfd89966d Mon Sep 17 00:00:00 2001 From: Toshit Chawda Date: Tue, 5 Nov 2024 21:48:41 -0800 Subject: [PATCH] remove websocket support try 2 --- client/demo.js | 4 +- client/src/lib.rs | 122 ++++++++------------- client/src/utils.rs | 84 +++++++++++++-- client/src/ws_wrapper.rs | 225 --------------------------------------- 4 files changed, 120 insertions(+), 315 deletions(-) delete mode 100644 client/src/ws_wrapper.rs diff --git a/client/demo.js b/client/demo.js index 9c678d9..551600d 100644 --- a/client/demo.js +++ b/client/demo.js @@ -332,12 +332,12 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox console.time(); let resp = await epoxy_client.fetch(test_url); console.log(resp, resp.rawHeaders); - log(await resp.arrayBuffer()); + log(await resp.text()); console.timeEnd(); } log("done"); } catch (err) { console.error(err); - log(err.stack); + log(err); } })(); diff --git a/client/src/lib.rs b/client/src/lib.rs index 83cad8a..e57c7e3 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -25,12 +25,12 @@ use hyper_util_wasm::client::legacy::Client; use io_stream::{iostream_from_asyncrw, iostream_from_stream}; use js_sys::{Array, Function, Object, Promise}; use send_wrapper::SendWrapper; -use stream_provider::{StreamProvider, StreamProviderService}; +use stream_provider::{ProviderWispTransportGenerator, StreamProvider, StreamProviderService}; use thiserror::Error; use utils::{ asyncread_to_readablestream, convert_streaming_body, entries_of_object, from_entries, - is_null_body, is_redirect, object_get, object_set, object_truthy, ws_protocol, StreamingBody, - UriExt, WasmExecutor, WispTransportRead, WispTransportWrite, + is_null_body, is_redirect, object_get, object_set, object_truthy, websocket_transport, + StreamingBody, UriExt, WasmExecutor, WispTransportRead, WispTransportWrite, }; use wasm_bindgen::prelude::*; use wasm_bindgen_futures::JsFuture; @@ -43,7 +43,6 @@ use wisp_mux::{ ws::{WebSocketRead, WebSocketWrite}, CloseReason, }; -use ws_wrapper::WebSocketWrapper; #[cfg(feature = "full")] mod io_stream; @@ -52,7 +51,6 @@ mod tokioio; mod utils; #[cfg(feature = "full")] mod websocket; -mod ws_wrapper; #[wasm_bindgen(typescript_custom_section)] const EPOXYCLIENT_TYPES: &'static str = r#" @@ -60,7 +58,7 @@ type EpoxyIoStream = { read: ReadableStream, write: WritableStream, }; -type EpoxyWispTransport = string | (() => { read: ReadableStream, write: WritableStream }); +type EpoxyWispTransport = string | ((wisp_v2: boolean) => { read: ReadableStream, write: WritableStream }); type EpoxyWebSocketInput = string | ArrayBuffer; type EpoxyWebSocketHeadersInput = Headers | { [key: string]: string }; type EpoxyUrlInput = string | URL; @@ -120,10 +118,7 @@ pub enum EpoxyError { #[error("Webpki: {0:?} ({0})")] Webpki(#[from] webpki::Error), - #[error("Wisp WebSocket failed to connect: {0}")] - WebSocketConnectFailed(String), - - #[error("Custom Wisp transport: {0}")] + #[error("Wisp transport: {0}")] WispTransport(String), #[error("Invalid Wisp transport: {0}")] InvalidWispTransport(String), @@ -324,6 +319,43 @@ impl EpoxyHandlers { } } +fn create_wisp_transport(function: Function) -> ProviderWispTransportGenerator { + let wisp_transport = SendWrapper::new(function); + Box::new(move |wisp_v2| { + let wisp_transport = wisp_transport.clone(); + Box::pin(SendWrapper::new(async move { + let transport = wisp_transport + .call1(&JsValue::NULL, &wisp_v2.into()) + .map_err(EpoxyError::wisp_transport)?; + + let transport = match transport.dyn_into::() { + Ok(transport) => { + let fut = JsFuture::from(transport); + fut.await.map_err(EpoxyError::wisp_transport)? + } + Err(transport) => transport, + } + .into(); + + let read = WispTransportRead { + inner: SendWrapper::new( + wasm_streams::ReadableStream::from_raw(object_get(&transport, "read").into()) + .into_stream(), + ), + }; + let write: WritableStream = object_get(&transport, "write").into(); + let write = WispTransportWrite { + inner: SendWrapper::new(write.get_writer().map_err(EpoxyError::wisp_transport)?), + }; + + Ok(( + Box::new(read) as Box, + Box::new(write) as Box, + )) + })) + }) +} + #[wasm_bindgen(inspectable)] pub struct EpoxyClient { stream_provider: Arc, @@ -349,80 +381,14 @@ impl EpoxyClient { options: EpoxyClientOptions, ) -> Result { let stream_provider = if let Some(wisp_url) = transport.as_string() { - let wisp_uri: Uri = wisp_url.clone().try_into()?; - if wisp_uri.scheme_str() != Some("wss") && wisp_uri.scheme_str() != Some("ws") { - return Err(EpoxyError::InvalidUrlScheme( - wisp_uri.scheme_str().map(ToString::to_string), - )); - } - let ws_protocols = options.websocket_protocols.clone(); Arc::new(StreamProvider::new( - Box::new(move |wisp_v2| { - let wisp_url = wisp_url.clone(); - let mut ws_protocols = ws_protocols.clone(); - if wisp_v2 { - // send some random data to ask the server for v2 - ws_protocols.push(ws_protocol()); - } - - Box::pin(async move { - let (write, read) = WebSocketWrapper::connect(&wisp_url, &ws_protocols)?; - while write.inner.ready_state() == 0 { - if !write.wait_for_open().await { - return Err(EpoxyError::WebSocketConnectFailed( - "websocket did not open".to_string(), - )); - } - } - Ok(( - Box::new(read) as Box, - Box::new(write) as Box, - )) - }) - }), + create_wisp_transport(websocket_transport(wisp_url, ws_protocols)), &options, )?) } else if let Some(wisp_transport) = transport.dyn_ref::() { - let wisp_transport = SendWrapper::new(wisp_transport.clone()); Arc::new(StreamProvider::new( - Box::new(move |_| { - let wisp_transport = wisp_transport.clone(); - Box::pin(SendWrapper::new(async move { - let transport = wisp_transport - .call0(&JsValue::NULL) - .map_err(EpoxyError::wisp_transport)?; - - let transport = match transport.dyn_into::() { - Ok(transport) => { - let fut = JsFuture::from(transport); - fut.await.map_err(EpoxyError::wisp_transport)? - } - Err(transport) => transport, - } - .into(); - - let read = WispTransportRead { - inner: SendWrapper::new( - wasm_streams::ReadableStream::from_raw( - object_get(&transport, "read").into(), - ) - .into_stream(), - ), - }; - let write: WritableStream = object_get(&transport, "write").into(); - let write = WispTransportWrite { - inner: SendWrapper::new( - write.get_writer().map_err(EpoxyError::wisp_transport)?, - ), - }; - - Ok(( - Box::new(read) as Box, - Box::new(write) as Box, - )) - })) - }), + create_wisp_transport(wisp_transport.clone()), &options, )?) } else { diff --git a/client/src/utils.rs b/client/src/utils.rs index 0d63aa4..4dc7fa1 100644 --- a/client/src/utils.rs +++ b/client/src/utils.rs @@ -1,5 +1,8 @@ use std::{ - io::ErrorKind, pin::Pin, sync::Arc, task::{Context, Poll} + io::ErrorKind, + pin::Pin, + sync::Arc, + task::{Context, Poll}, }; use async_trait::async_trait; @@ -17,7 +20,7 @@ use futures_util::{ready, AsyncRead, AsyncWrite, Future, Stream, StreamExt, TryS use http::{HeaderValue, Uri}; use http_body_util::{Either, Full, StreamBody}; use hyper::rt::Executor; -use js_sys::{Array, ArrayBuffer, JsString, Object, Uint8Array}; +use js_sys::{Array, ArrayBuffer, Function, JsString, Object, Uint8Array}; use pin_project_lite::pin_project; use rustls_pki_types::{CertificateDer, ServerName, UnixTime}; use send_wrapper::SendWrapper; @@ -360,6 +363,73 @@ pub fn is_null_body(code: u16) -> bool { } #[wasm_bindgen(inline_js = r#" +class WebSocketStreamPonyfill { + url; + opened; + closed; + close; + constructor(url, options = {}) { + if (options.signal?.aborted) { + throw new DOMException('This operation was aborted', 'AbortError'); + } + this.url = url; + const ws = new WebSocket(url, options.protocols ?? []); + ws.binaryType = "arraybuffer"; + const closeWithInfo = ({ closeCode: code, reason } = {}) => ws.close(code, reason); + this.opened = new Promise((resolve, reject) => { + const errorHandler = ()=>reject(new Error("WebSocket closed before handshake complete.")); + ws.onopen = () => { + resolve({ + readable: new ReadableStream({ + start(controller) { + ws.onmessage = ({ data }) => controller.enqueue(data); + ws.onerror = e => controller.error(e); + }, + cancel: closeWithInfo, + }), + writable: new WritableStream({ + write(chunk) { ws.send(chunk); }, + abort() { ws.close(); }, + close: closeWithInfo, + }), + protocol: ws.protocol, + extensions: ws.extensions, + }); + ws.removeEventListener('error', errorHandler); + }; + ws.addEventListener('error', errorHandler); + }); + this.closed = new Promise((resolve, reject) => { + ws.onclose = ({ code, reason }) => { + resolve({ closeCode: code, reason }); + ws.removeEventListener('error', reject); + }; + ws.addEventListener('error', reject); + }); + if (options.signal) { + options.signal.onabort = () => ws.close(); + } + this.close = closeWithInfo; + } +} + +function ws_protocol() { + return ( + [1e7]+-1e3+-4e3+-8e3+-1e11).replace(/[018]/g, + c => (c ^ crypto.getRandomValues(new Uint8Array(1))[0] & 15 >> c / 4).toString(16) + ); +} + +export function websocket_transport(url, protocols) { + const ws_impl = typeof WebSocketStream === "undefined" ? WebSocketStreamPonyfill : WebSocketStream; + return async (wisp_v2)=>{ + if (wisp_v2) protocols.push(ws_protocol()); + const ws = new ws_impl(url, { protocols }); + const { readable, writable } = await ws.opened; + return { read: readable, write: writable }; + } +} + export function object_get(obj, k) { try { return obj[k] @@ -403,13 +473,6 @@ export function ws_key() { return btoa(String.fromCharCode.apply(null, key)); } -export function ws_protocol() { - return ( - [1e7]+-1e3+-4e3+-8e3+-1e11).replace(/[018]/g, - c => (c ^ crypto.getRandomValues(new Uint8Array(1))[0] & 15 >> c / 4).toString(16) - ); -} - export function from_entries(entries){ var ret = {}; for(var i = 0; i < entries.length; i++) ret[entries[i][0]] = entries[i][1]; @@ -417,6 +480,8 @@ export function from_entries(entries){ } "#)] extern "C" { + pub fn websocket_transport(url: String, protocols: Vec) -> Function; + pub fn object_get(obj: &Object, key: &str) -> JsValue; pub fn object_set(obj: &Object, key: &str, val: JsValue); @@ -428,7 +493,6 @@ extern "C" { fn entries_of_object_inner(obj: &Object) -> Vec; pub fn define_property(obj: &Object, key: &str, val: JsValue); pub fn ws_key() -> String; - pub fn ws_protocol() -> String; #[wasm_bindgen(catch)] pub fn from_entries(iterable: &JsValue) -> Result; diff --git a/client/src/ws_wrapper.rs b/client/src/ws_wrapper.rs deleted file mode 100644 index 85a0668..0000000 --- a/client/src/ws_wrapper.rs +++ /dev/null @@ -1,225 +0,0 @@ -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, -}; - -use async_trait::async_trait; -use bytes::BytesMut; -use event_listener::Event; -use flume::Receiver; -use futures_util::FutureExt; -use js_sys::{Array, ArrayBuffer, Uint8Array}; -use send_wrapper::SendWrapper; -use thiserror::Error; -use wasm_bindgen::{closure::Closure, JsCast, JsValue}; -use web_sys::{BinaryType, MessageEvent, WebSocket}; -use wisp_mux::{ - ws::{Frame, LockedWebSocketWrite, Payload, WebSocketRead, WebSocketWrite}, - WispError, -}; - -use crate::EpoxyError; - -#[derive(Error, Debug)] -pub enum WebSocketError { - #[error("Unknown JS WebSocket wrapper error: {0:?}")] - Unknown(String), - #[error("Failed to call WebSocket.send: {0:?}")] - SendFailed(String), - #[error("Failed to call WebSocket.close: {0:?}")] - CloseFailed(String), -} - -impl From for WispError { - fn from(err: WebSocketError) -> Self { - Self::WsImplError(Box::new(err)) - } -} - -pub enum WebSocketMessage { - Closed, - Error(WebSocketError), - Message(Vec), -} - -pub struct WebSocketWrapper { - pub inner: SendWrapper, - open_event: Arc, - error_event: Arc, - close_event: Arc, - closed: Arc, - - // used to retain the closures - #[allow(dead_code)] - onopen: SendWrapper>, - #[allow(dead_code)] - onclose: SendWrapper>, - #[allow(dead_code)] - onerror: SendWrapper>, - #[allow(dead_code)] - onmessage: SendWrapper>, -} - -pub struct WebSocketReader { - read_rx: Receiver, - closed: Arc, - close_event: Arc, -} - -#[async_trait] -impl WebSocketRead for WebSocketReader { - async fn wisp_read_frame( - &mut self, - _: &LockedWebSocketWrite, - ) -> Result, WispError> { - use WebSocketMessage as M; - if self.closed.load(Ordering::Acquire) { - return Err(WispError::WsImplSocketClosed); - } - let res = futures_util::select! { - data = self.read_rx.recv_async() => data.ok(), - _ = self.close_event.listen().fuse() => Some(M::Closed), - }; - match res.ok_or(WispError::WsImplSocketClosed)? { - M::Message(bin) => Ok(Frame::binary(Payload::Bytes(BytesMut::from( - bin.as_slice(), - )))), - M::Error(x) => Err(x.into()), - M::Closed => Err(WispError::WsImplSocketClosed), - } - } -} - -impl WebSocketWrapper { - pub fn connect(url: &str, protocols: &[String]) -> Result<(Self, WebSocketReader), EpoxyError> { - let (read_tx, read_rx) = flume::unbounded(); - let closed = Arc::new(AtomicBool::new(false)); - - let open_event = Arc::new(Event::new()); - let close_event = Arc::new(Event::new()); - let error_event = Arc::new(Event::new()); - - let onopen_event = open_event.clone(); - let onopen = Closure::wrap( - Box::new(move || while onopen_event.notify(usize::MAX) == 0 {}) as Box, - ); - - let onmessage_tx = read_tx.clone(); - let onmessage = Closure::wrap(Box::new(move |evt: MessageEvent| { - if let Ok(arr) = evt.data().dyn_into::() { - let _ = - onmessage_tx.send(WebSocketMessage::Message(Uint8Array::new(&arr).to_vec())); - } - }) as Box); - - let onclose_closed = closed.clone(); - let onclose_event = close_event.clone(); - let onclose = Closure::wrap(Box::new(move || { - onclose_closed.store(true, Ordering::Release); - onclose_event.notify(usize::MAX); - }) as Box); - - let onerror_tx = read_tx.clone(); - let onerror_closed = closed.clone(); - let onerror_close = close_event.clone(); - let onerror_event = error_event.clone(); - let onerror = Closure::wrap(Box::new(move |e| { - let _ = onerror_tx.send(WebSocketMessage::Error(WebSocketError::Unknown(format!( - "{:?}", - e - )))); - onerror_closed.store(true, Ordering::Release); - onerror_close.notify(usize::MAX); - onerror_event.notify(usize::MAX); - }) as Box); - - let ws = if protocols.is_empty() { - WebSocket::new(url) - } else { - WebSocket::new_with_str_sequence( - url, - &protocols - .iter() - .fold(Array::new(), |acc, x| { - acc.push(&x.into()); - acc - }) - .into(), - ) - } - .map_err(|x| EpoxyError::WebSocketConnectFailed(format!("{:?}", x)))?; - ws.set_binary_type(BinaryType::Arraybuffer); - ws.set_onmessage(Some(onmessage.as_ref().unchecked_ref())); - ws.set_onopen(Some(onopen.as_ref().unchecked_ref())); - ws.set_onclose(Some(onclose.as_ref().unchecked_ref())); - ws.set_onerror(Some(onerror.as_ref().unchecked_ref())); - - Ok(( - Self { - inner: SendWrapper::new(ws), - open_event, - error_event, - close_event: close_event.clone(), - closed: closed.clone(), - onopen: SendWrapper::new(onopen), - onclose: SendWrapper::new(onclose), - onerror: SendWrapper::new(onerror), - onmessage: SendWrapper::new(onmessage), - }, - WebSocketReader { - read_rx, - closed, - close_event, - }, - )) - } - - pub async fn wait_for_open(&self) -> bool { - if self.closed.load(Ordering::Acquire) { - return false; - } - futures_util::select! { - _ = self.open_event.listen().fuse() => true, - _ = self.error_event.listen().fuse() => false, - } - } -} - -#[async_trait] -impl WebSocketWrite for WebSocketWrapper { - async fn wisp_write_frame(&mut self, frame: Frame<'_>) -> Result<(), WispError> { - use wisp_mux::ws::OpCode::*; - if self.closed.load(Ordering::Acquire) { - return Err(WispError::WsImplSocketClosed); - } - match frame.opcode { - Binary | Text => self - .inner - .send_with_u8_array(&frame.payload) - .map_err(|x| WebSocketError::SendFailed(format!("{:?}", x)).into()), - Close => { - let _ = self.inner.close(); - Ok(()) - } - _ => Err(WispError::WsImplNotSupported), - } - } - - async fn wisp_close(&mut self) -> Result<(), WispError> { - self.inner - .close() - .map_err(|x| WebSocketError::CloseFailed(format!("{:?}", x)).into()) - } -} - -impl Drop for WebSocketWrapper { - fn drop(&mut self) { - self.inner.set_onopen(None); - self.inner.set_onclose(None); - self.inner.set_onerror(None); - self.inner.set_onmessage(None); - self.closed.store(true, Ordering::Release); - self.close_event.notify(usize::MAX); - let _ = self.inner.close(); - } -}