diff --git a/client/demo.js b/client/demo.js index 9c0e467..b469edc 100644 --- a/client/demo.js +++ b/client/demo.js @@ -12,6 +12,7 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox const should_udp_test = params.has("udp_test"); const should_reconnect_test = params.has("reconnect_test"); const should_perf2_test = params.has("perf2_test"); + const should_wisptransport = params.has("wisptransport"); console.log( "%cWASM is significantly slower with DevTools open!", "color:red;font-size:3rem;font-weight:bold" @@ -29,7 +30,18 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox let epoxy_client_options = new EpoxyClientOptions(); epoxy_client_options.user_agent = navigator.userAgent; - let epoxy_client = new EpoxyClient("ws://localhost:4000", epoxy_client_options); + let epoxy_client; + + if (should_wisptransport) { + log("using wisptransport with websocketstream backend"); + epoxy_client = new EpoxyClient(async () => { + let wss = new WebSocketStream("ws://localhost:4000/"); + let {readable, writable} = await wss.opened; + return {read: readable, write: writable}; + }, epoxy_client_options); + } else { + epoxy_client = new EpoxyClient("ws://localhost:4000/", epoxy_client_options); + } const tconn0 = performance.now(); await epoxy_client.replace_stream_provider(); diff --git a/client/src/lib.rs b/client/src/lib.rs index 0196ebd..da00140 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -30,7 +30,7 @@ use utils::{ use wasm_bindgen::prelude::*; use wasm_bindgen_futures::JsFuture; use wasm_streams::ReadableStream; -use web_sys::ResponseInit; +use web_sys::{ResponseInit, WritableStream}; #[cfg(feature = "full")] use websocket::EpoxyWebSocket; #[cfg(feature = "full")] @@ -117,7 +117,11 @@ pub enum EpoxyError { impl EpoxyError { pub fn wisp_transport(value: JsValue) -> Self { - Self::WispTransport(format!("{:?}", value)) + if let Some(err) = value.dyn_ref::() { + Self::WispTransport(err.to_string().into()) + } else { + Self::WispTransport(format!("{:?}", value)) + } } } @@ -299,13 +303,11 @@ impl EpoxyClient { .into_stream(), ), }; + let write: WritableStream = object_get(&transport, "write").into(); let write = WispTransportWrite { - inner: Some(SendWrapper::new( - wasm_streams::WritableStream::from_raw( - object_get(&transport, "write").into(), - ) - .into_sink(), - )), + inner: SendWrapper::new( + write.get_writer().map_err(EpoxyError::wisp_transport)?, + ), }; Ok(( diff --git a/client/src/utils.rs b/client/src/utils.rs index d7d8935..fa507ff 100644 --- a/client/src/utils.rs +++ b/client/src/utils.rs @@ -5,14 +5,16 @@ use std::{ use async_trait::async_trait; use bytes::{buf::UninitSlice, BufMut, Bytes, BytesMut}; -use futures_util::{ready, AsyncRead, Future, SinkExt, Stream, StreamExt, TryStreamExt}; +use futures_util::{ready, AsyncRead, Future, Stream, StreamExt, TryStreamExt}; use http::{HeaderValue, Uri}; use hyper::{body::Body, rt::Executor}; use js_sys::{Array, ArrayBuffer, JsString, Object, Uint8Array}; use pin_project_lite::pin_project; use send_wrapper::SendWrapper; use wasm_bindgen::{prelude::*, JsCast, JsValue}; -use wasm_streams::{readable::IntoStream, writable::IntoSink}; +use wasm_bindgen_futures::JsFuture; +use wasm_streams::readable::IntoStream; +use web_sys::WritableStreamDefaultWriter; use wisp_mux::{ ws::{Frame, LockedWebSocketWrite, Payload, WebSocketRead, WebSocketWrite}, WispError, @@ -204,32 +206,27 @@ impl WebSocketRead for WispTransportRead { } pub struct WispTransportWrite { - pub inner: Option>>, + pub inner: SendWrapper, } #[async_trait] impl WebSocketWrite for WispTransportWrite { async fn wisp_write_frame(&mut self, frame: Frame<'_>) -> Result<(), WispError> { - SendWrapper::new( - self.inner - .as_mut() - .ok_or_else(|| WispError::WsImplError(Box::new(EpoxyError::WispTransportClosed)))? - .send(Uint8Array::from(frame.payload.as_ref()).into()), - ) + SendWrapper::new(async { + let chunk = Uint8Array::from(frame.payload.as_ref()).into(); + JsFuture::from(self.inner.write_with_chunk(&chunk)) + .await + .map(|_| ()) + .map_err(|x| WispError::WsImplError(Box::new(EpoxyError::wisp_transport(x)))) + }) .await - .map_err(|x| WispError::WsImplError(Box::new(EpoxyError::wisp_transport(x)))) } async fn wisp_close(&mut self) -> Result<(), WispError> { - SendWrapper::new( - self.inner - .take() - .ok_or_else(|| WispError::WsImplError(Box::new(EpoxyError::WispTransportClosed)))? - .take() - .abort(), - ) - .await - .map_err(|x| WispError::WsImplError(Box::new(EpoxyError::wisp_transport(x)))) + SendWrapper::new(JsFuture::from(self.inner.abort())) + .await + .map(|_| ()) + .map_err(|x| WispError::WsImplError(Box::new(EpoxyError::wisp_transport(x)))) } }