Revert "remove websocket support try 2"

This reverts commit 785cc286c9.
This commit is contained in:
Toshit Chawda 2024-11-09 14:33:43 -08:00
parent 388b17d923
commit 7362d512b9
No known key found for this signature in database
GPG key ID: 91480ED99E2B3D9D
4 changed files with 272 additions and 72 deletions

View file

@ -332,12 +332,12 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox
console.time();
let resp = await epoxy_client.fetch(test_url);
console.log(resp, resp.rawHeaders);
log(await resp.text());
log(await resp.arrayBuffer());
console.timeEnd();
}
log("done");
} catch (err) {
console.error(err);
log(err);
log(err.stack);
}
})();

View file

@ -29,8 +29,8 @@ use stream_provider::{ProviderWispTransportGenerator, StreamProvider, StreamProv
use thiserror::Error;
use utils::{
asyncread_to_readablestream, convert_streaming_body, entries_of_object, from_entries,
is_null_body, is_redirect, object_get, object_set, object_truthy, websocket_transport,
StreamingBody, UriExt, WasmExecutor, WispTransportWrite,
is_null_body, is_redirect, object_get, object_set, object_truthy, ws_protocol, StreamingBody,
UriExt, WasmExecutor, WispTransportWrite,
};
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::JsFuture;
@ -44,6 +44,7 @@ use wisp_mux::{
ws::{WebSocketRead, WebSocketWrite},
CloseReason,
};
use ws_wrapper::WebSocketWrapper;
#[cfg(feature = "full")]
mod io_stream;
@ -52,6 +53,7 @@ mod tokioio;
mod utils;
#[cfg(feature = "full")]
mod websocket;
mod ws_wrapper;
#[wasm_bindgen(typescript_custom_section)]
const EPOXYCLIENT_TYPES: &'static str = r#"
@ -59,7 +61,7 @@ type EpoxyIoStream = {
read: ReadableStream<Uint8Array>,
write: WritableStream<Uint8Array>,
};
type EpoxyWispTransport = string | ((wisp_v2: boolean) => { read: ReadableStream<ArrayBuffer>, write: WritableStream<Uint8Array> });
type EpoxyWispTransport = string | (() => { read: ReadableStream<ArrayBuffer>, write: WritableStream<Uint8Array> });
type EpoxyWebSocketInput = string | ArrayBuffer;
type EpoxyWebSocketHeadersInput = Headers | { [key: string]: string };
type EpoxyUrlInput = string | URL;
@ -119,7 +121,10 @@ pub enum EpoxyError {
#[error("Webpki: {0:?} ({0})")]
Webpki(#[from] webpki::Error),
#[error("Wisp transport: {0}")]
#[error("Wisp WebSocket failed to connect: {0}")]
WebSocketConnectFailed(String),
#[error("Custom Wisp transport: {0}")]
WispTransport(String),
#[error("Invalid Wisp transport: {0}")]
InvalidWispTransport(String),
@ -346,7 +351,9 @@ fn create_wisp_transport(function: Function) -> ProviderWispTransportGenerator {
let arr: ArrayBuffer = pkt.dyn_into().map_err(|x| {
EpoxyError::InvalidWispTransportPacket(format!("{:?}", x))
})?;
Ok::<BytesMut, EpoxyError>(BytesMut::from(Uint8Array::new(&arr).to_vec().as_slice()))
Ok::<BytesMut, EpoxyError>(BytesMut::from(
Uint8Array::new(&arr).to_vec().as_slice(),
))
}),
));
let write: WritableStream = object_get(&transport, "write").into();
@ -387,9 +394,38 @@ impl EpoxyClient {
options: EpoxyClientOptions,
) -> Result<EpoxyClient, EpoxyError> {
let stream_provider = if let Some(wisp_url) = transport.as_string() {
let wisp_uri: Uri = wisp_url.clone().try_into()?;
if wisp_uri.scheme_str() != Some("wss") && wisp_uri.scheme_str() != Some("ws") {
return Err(EpoxyError::InvalidUrlScheme(
wisp_uri.scheme_str().map(ToString::to_string),
));
}
let ws_protocols = options.websocket_protocols.clone();
Arc::new(StreamProvider::new(
create_wisp_transport(websocket_transport(wisp_url, ws_protocols)),
Box::new(move |wisp_v2| {
let wisp_url = wisp_url.clone();
let mut ws_protocols = ws_protocols.clone();
if wisp_v2 {
// send some random data to ask the server for v2
ws_protocols.push(ws_protocol());
}
Box::pin(async move {
let (write, read) = WebSocketWrapper::connect(&wisp_url, &ws_protocols)?;
while write.inner.ready_state() == 0 {
if !write.wait_for_open().await {
return Err(EpoxyError::WebSocketConnectFailed(
"websocket did not open".to_string(),
));
}
}
Ok((
Box::new(read) as Box<dyn WebSocketRead + Send>,
Box::new(write) as Box<dyn WebSocketWrite + Send>,
))
})
}),
&options,
)?)
} else if let Some(wisp_transport) = transport.dyn_ref::<Function>() {

View file

@ -3,7 +3,7 @@ use std::{pin::Pin, task::{Context, Poll}};
use bytes::Bytes;
use futures_util::{AsyncRead, Stream, StreamExt, TryStreamExt};
use http_body_util::{Either, Full, StreamBody};
use js_sys::{Array, Function, JsString, Object, Uint8Array};
use js_sys::{Array, JsString, Object, Uint8Array};
use send_wrapper::SendWrapper;
use wasm_bindgen::{prelude::*, JsCast, JsValue};
use wasm_streams::ReadableStream;
@ -14,73 +14,13 @@ use super::ReaderStream;
#[wasm_bindgen(inline_js = r#"
class WebSocketStreamPonyfill {
url;
opened;
closed;
close;
constructor(url, options = {}) {
if (options.signal?.aborted) {
throw new DOMException('This operation was aborted', 'AbortError');
}
this.url = url;
const ws = new WebSocket(url, options.protocols ?? []);
ws.binaryType = "arraybuffer";
const closeWithInfo = ({ closeCode: code, reason } = {}) => ws.close(code, reason);
this.opened = new Promise((resolve, reject) => {
const errorHandler = ()=>reject(new Error("WebSocket closed before handshake complete."));
ws.onopen = () => {
resolve({
readable: new ReadableStream({
start(controller) {
ws.onmessage = ({ data }) => controller.enqueue(data);
ws.onerror = e => controller.error(e);
},
cancel: closeWithInfo,
}),
writable: new WritableStream({
write(chunk) { ws.send(chunk); },
abort() { ws.close(); },
close: closeWithInfo,
}),
protocol: ws.protocol,
extensions: ws.extensions,
});
ws.removeEventListener('error', errorHandler);
};
ws.addEventListener('error', errorHandler);
});
this.closed = new Promise((resolve, reject) => {
ws.onclose = ({ code, reason }) => {
resolve({ closeCode: code, reason });
ws.removeEventListener('error', reject);
};
ws.addEventListener('error', reject);
});
if (options.signal) {
options.signal.onabort = () => ws.close();
}
this.close = closeWithInfo;
}
}
function ws_protocol() {
export function ws_protocol() {
return (
[1e7]+-1e3+-4e3+-8e3+-1e11).replace(/[018]/g,
c => (c ^ crypto.getRandomValues(new Uint8Array(1))[0] & 15 >> c / 4).toString(16)
);
}
export function websocket_transport(url, protocols) {
const ws_impl = typeof WebSocketStream === "undefined" ? WebSocketStreamPonyfill : WebSocketStream;
return async (wisp_v2)=>{
if (wisp_v2) protocols.push(ws_protocol());
const ws = new ws_impl(url, { protocols });
const { readable, writable } = await ws.opened;
return { read: readable, write: writable };
}
}
export function object_get(obj, k) {
try {
return obj[k]
@ -131,8 +71,6 @@ export function from_entries(entries){
}
"#)]
extern "C" {
pub fn websocket_transport(url: String, protocols: Vec<String>) -> Function;
pub fn object_get(obj: &Object, key: &str) -> JsValue;
pub fn object_set(obj: &Object, key: &str, val: JsValue);
@ -144,6 +82,7 @@ extern "C" {
fn entries_of_object_inner(obj: &Object) -> Vec<Array>;
pub fn define_property(obj: &Object, key: &str, val: JsValue);
pub fn ws_key() -> String;
pub fn ws_protocol() -> String;
#[wasm_bindgen(catch)]
pub fn from_entries(iterable: &JsValue) -> Result<Object, JsValue>;

225
client/src/ws_wrapper.rs Normal file
View file

@ -0,0 +1,225 @@
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use async_trait::async_trait;
use bytes::BytesMut;
use event_listener::Event;
use flume::Receiver;
use futures_util::FutureExt;
use js_sys::{Array, ArrayBuffer, Uint8Array};
use send_wrapper::SendWrapper;
use thiserror::Error;
use wasm_bindgen::{closure::Closure, JsCast, JsValue};
use web_sys::{BinaryType, MessageEvent, WebSocket};
use wisp_mux::{
ws::{Frame, LockedWebSocketWrite, Payload, WebSocketRead, WebSocketWrite},
WispError,
};
use crate::EpoxyError;
#[derive(Error, Debug)]
pub enum WebSocketError {
#[error("Unknown JS WebSocket wrapper error: {0:?}")]
Unknown(String),
#[error("Failed to call WebSocket.send: {0:?}")]
SendFailed(String),
#[error("Failed to call WebSocket.close: {0:?}")]
CloseFailed(String),
}
impl From<WebSocketError> for WispError {
fn from(err: WebSocketError) -> Self {
Self::WsImplError(Box::new(err))
}
}
pub enum WebSocketMessage {
Closed,
Error(WebSocketError),
Message(Vec<u8>),
}
pub struct WebSocketWrapper {
pub inner: SendWrapper<WebSocket>,
open_event: Arc<Event>,
error_event: Arc<Event>,
close_event: Arc<Event>,
closed: Arc<AtomicBool>,
// used to retain the closures
#[allow(dead_code)]
onopen: SendWrapper<Closure<dyn Fn()>>,
#[allow(dead_code)]
onclose: SendWrapper<Closure<dyn Fn()>>,
#[allow(dead_code)]
onerror: SendWrapper<Closure<dyn Fn(JsValue)>>,
#[allow(dead_code)]
onmessage: SendWrapper<Closure<dyn Fn(MessageEvent)>>,
}
pub struct WebSocketReader {
read_rx: Receiver<WebSocketMessage>,
closed: Arc<AtomicBool>,
close_event: Arc<Event>,
}
#[async_trait]
impl WebSocketRead for WebSocketReader {
async fn wisp_read_frame(
&mut self,
_: &LockedWebSocketWrite,
) -> Result<Frame<'static>, WispError> {
use WebSocketMessage as M;
if self.closed.load(Ordering::Acquire) {
return Err(WispError::WsImplSocketClosed);
}
let res = futures_util::select! {
data = self.read_rx.recv_async() => data.ok(),
_ = self.close_event.listen().fuse() => Some(M::Closed),
};
match res.ok_or(WispError::WsImplSocketClosed)? {
M::Message(bin) => Ok(Frame::binary(Payload::Bytes(BytesMut::from(
bin.as_slice(),
)))),
M::Error(x) => Err(x.into()),
M::Closed => Err(WispError::WsImplSocketClosed),
}
}
}
impl WebSocketWrapper {
pub fn connect(url: &str, protocols: &[String]) -> Result<(Self, WebSocketReader), EpoxyError> {
let (read_tx, read_rx) = flume::unbounded();
let closed = Arc::new(AtomicBool::new(false));
let open_event = Arc::new(Event::new());
let close_event = Arc::new(Event::new());
let error_event = Arc::new(Event::new());
let onopen_event = open_event.clone();
let onopen = Closure::wrap(
Box::new(move || while onopen_event.notify(usize::MAX) == 0 {}) as Box<dyn Fn()>,
);
let onmessage_tx = read_tx.clone();
let onmessage = Closure::wrap(Box::new(move |evt: MessageEvent| {
if let Ok(arr) = evt.data().dyn_into::<ArrayBuffer>() {
let _ =
onmessage_tx.send(WebSocketMessage::Message(Uint8Array::new(&arr).to_vec()));
}
}) as Box<dyn Fn(MessageEvent)>);
let onclose_closed = closed.clone();
let onclose_event = close_event.clone();
let onclose = Closure::wrap(Box::new(move || {
onclose_closed.store(true, Ordering::Release);
onclose_event.notify(usize::MAX);
}) as Box<dyn Fn()>);
let onerror_tx = read_tx.clone();
let onerror_closed = closed.clone();
let onerror_close = close_event.clone();
let onerror_event = error_event.clone();
let onerror = Closure::wrap(Box::new(move |e| {
let _ = onerror_tx.send(WebSocketMessage::Error(WebSocketError::Unknown(format!(
"{:?}",
e
))));
onerror_closed.store(true, Ordering::Release);
onerror_close.notify(usize::MAX);
onerror_event.notify(usize::MAX);
}) as Box<dyn Fn(JsValue)>);
let ws = if protocols.is_empty() {
WebSocket::new(url)
} else {
WebSocket::new_with_str_sequence(
url,
&protocols
.iter()
.fold(Array::new(), |acc, x| {
acc.push(&x.into());
acc
})
.into(),
)
}
.map_err(|x| EpoxyError::WebSocketConnectFailed(format!("{:?}", x)))?;
ws.set_binary_type(BinaryType::Arraybuffer);
ws.set_onmessage(Some(onmessage.as_ref().unchecked_ref()));
ws.set_onopen(Some(onopen.as_ref().unchecked_ref()));
ws.set_onclose(Some(onclose.as_ref().unchecked_ref()));
ws.set_onerror(Some(onerror.as_ref().unchecked_ref()));
Ok((
Self {
inner: SendWrapper::new(ws),
open_event,
error_event,
close_event: close_event.clone(),
closed: closed.clone(),
onopen: SendWrapper::new(onopen),
onclose: SendWrapper::new(onclose),
onerror: SendWrapper::new(onerror),
onmessage: SendWrapper::new(onmessage),
},
WebSocketReader {
read_rx,
closed,
close_event,
},
))
}
pub async fn wait_for_open(&self) -> bool {
if self.closed.load(Ordering::Acquire) {
return false;
}
futures_util::select! {
_ = self.open_event.listen().fuse() => true,
_ = self.error_event.listen().fuse() => false,
}
}
}
#[async_trait]
impl WebSocketWrite for WebSocketWrapper {
async fn wisp_write_frame(&mut self, frame: Frame<'_>) -> Result<(), WispError> {
use wisp_mux::ws::OpCode::*;
if self.closed.load(Ordering::Acquire) {
return Err(WispError::WsImplSocketClosed);
}
match frame.opcode {
Binary | Text => self
.inner
.send_with_u8_array(&frame.payload)
.map_err(|x| WebSocketError::SendFailed(format!("{:?}", x)).into()),
Close => {
let _ = self.inner.close();
Ok(())
}
_ => Err(WispError::WsImplNotSupported),
}
}
async fn wisp_close(&mut self) -> Result<(), WispError> {
self.inner
.close()
.map_err(|x| WebSocketError::CloseFailed(format!("{:?}", x)).into())
}
}
impl Drop for WebSocketWrapper {
fn drop(&mut self) {
self.inner.set_onopen(None);
self.inner.set_onclose(None);
self.inner.set_onerror(None);
self.inner.set_onmessage(None);
self.closed.store(true, Ordering::Release);
self.close_event.notify(usize::MAX);
let _ = self.inner.close();
}
}