fix writing to custom wisp transport

This commit is contained in:
Toshit Chawda 2024-08-19 21:25:30 -07:00
parent ef6ae49085
commit 8b8fc62baf
No known key found for this signature in database
GPG key ID: 91480ED99E2B3D9D
3 changed files with 39 additions and 28 deletions

View file

@ -12,6 +12,7 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox
const should_udp_test = params.has("udp_test");
const should_reconnect_test = params.has("reconnect_test");
const should_perf2_test = params.has("perf2_test");
const should_wisptransport = params.has("wisptransport");
console.log(
"%cWASM is significantly slower with DevTools open!",
"color:red;font-size:3rem;font-weight:bold"
@ -29,7 +30,18 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox
let epoxy_client_options = new EpoxyClientOptions();
epoxy_client_options.user_agent = navigator.userAgent;
let epoxy_client = new EpoxyClient("ws://localhost:4000", epoxy_client_options);
let epoxy_client;
if (should_wisptransport) {
log("using wisptransport with websocketstream backend");
epoxy_client = new EpoxyClient(async () => {
let wss = new WebSocketStream("ws://localhost:4000/");
let {readable, writable} = await wss.opened;
return {read: readable, write: writable};
}, epoxy_client_options);
} else {
epoxy_client = new EpoxyClient("ws://localhost:4000/", epoxy_client_options);
}
const tconn0 = performance.now();
await epoxy_client.replace_stream_provider();

View file

@ -30,7 +30,7 @@ use utils::{
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::JsFuture;
use wasm_streams::ReadableStream;
use web_sys::ResponseInit;
use web_sys::{ResponseInit, WritableStream};
#[cfg(feature = "full")]
use websocket::EpoxyWebSocket;
#[cfg(feature = "full")]
@ -117,7 +117,11 @@ pub enum EpoxyError {
impl EpoxyError {
pub fn wisp_transport(value: JsValue) -> Self {
Self::WispTransport(format!("{:?}", value))
if let Some(err) = value.dyn_ref::<js_sys::Error>() {
Self::WispTransport(err.to_string().into())
} else {
Self::WispTransport(format!("{:?}", value))
}
}
}
@ -299,13 +303,11 @@ impl EpoxyClient {
.into_stream(),
),
};
let write: WritableStream = object_get(&transport, "write").into();
let write = WispTransportWrite {
inner: Some(SendWrapper::new(
wasm_streams::WritableStream::from_raw(
object_get(&transport, "write").into(),
)
.into_sink(),
)),
inner: SendWrapper::new(
write.get_writer().map_err(EpoxyError::wisp_transport)?,
),
};
Ok((

View file

@ -5,14 +5,16 @@ use std::{
use async_trait::async_trait;
use bytes::{buf::UninitSlice, BufMut, Bytes, BytesMut};
use futures_util::{ready, AsyncRead, Future, SinkExt, Stream, StreamExt, TryStreamExt};
use futures_util::{ready, AsyncRead, Future, Stream, StreamExt, TryStreamExt};
use http::{HeaderValue, Uri};
use hyper::{body::Body, rt::Executor};
use js_sys::{Array, ArrayBuffer, JsString, Object, Uint8Array};
use pin_project_lite::pin_project;
use send_wrapper::SendWrapper;
use wasm_bindgen::{prelude::*, JsCast, JsValue};
use wasm_streams::{readable::IntoStream, writable::IntoSink};
use wasm_bindgen_futures::JsFuture;
use wasm_streams::readable::IntoStream;
use web_sys::WritableStreamDefaultWriter;
use wisp_mux::{
ws::{Frame, LockedWebSocketWrite, Payload, WebSocketRead, WebSocketWrite},
WispError,
@ -204,32 +206,27 @@ impl WebSocketRead for WispTransportRead {
}
pub struct WispTransportWrite {
pub inner: Option<SendWrapper<IntoSink<'static>>>,
pub inner: SendWrapper<WritableStreamDefaultWriter>,
}
#[async_trait]
impl WebSocketWrite for WispTransportWrite {
async fn wisp_write_frame(&mut self, frame: Frame<'_>) -> Result<(), WispError> {
SendWrapper::new(
self.inner
.as_mut()
.ok_or_else(|| WispError::WsImplError(Box::new(EpoxyError::WispTransportClosed)))?
.send(Uint8Array::from(frame.payload.as_ref()).into()),
)
SendWrapper::new(async {
let chunk = Uint8Array::from(frame.payload.as_ref()).into();
JsFuture::from(self.inner.write_with_chunk(&chunk))
.await
.map(|_| ())
.map_err(|x| WispError::WsImplError(Box::new(EpoxyError::wisp_transport(x))))
})
.await
.map_err(|x| WispError::WsImplError(Box::new(EpoxyError::wisp_transport(x))))
}
async fn wisp_close(&mut self) -> Result<(), WispError> {
SendWrapper::new(
self.inner
.take()
.ok_or_else(|| WispError::WsImplError(Box::new(EpoxyError::WispTransportClosed)))?
.take()
.abort(),
)
.await
.map_err(|x| WispError::WsImplError(Box::new(EpoxyError::wisp_transport(x))))
SendWrapper::new(JsFuture::from(self.inner.abort()))
.await
.map(|_| ())
.map_err(|x| WispError::WsImplError(Box::new(EpoxyError::wisp_transport(x))))
}
}