diff --git a/client/demo.js b/client/demo.js index ba8aa03..93abdec 100644 --- a/client/demo.js +++ b/client/demo.js @@ -29,6 +29,7 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox await initEpoxy(); let epoxy_client_options = new EpoxyClientOptions(); epoxy_client_options.user_agent = navigator.userAgent; + epoxy_client_options.wisp_v2 = true; let epoxy_client; @@ -36,8 +37,8 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox log("using wisptransport with websocketstream backend"); epoxy_client = new EpoxyClient(async () => { let wss = new WebSocketStream("ws://localhost:4000/"); - let {readable, writable} = await wss.opened; - return {read: readable, write: writable}; + let { readable, writable } = await wss.opened; + return { read: readable, write: writable }; }, epoxy_client_options); } else { epoxy_client = new EpoxyClient("ws://localhost:4000/", epoxy_client_options); @@ -210,35 +211,52 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox } } else if (should_tls_test) { let decoder = new TextDecoder(); - let handlers = new EpoxyHandlers( - () => log("opened"), - () => log("closed"), - err => console.error(err), - msg => { console.log(msg); console.log(decoder.decode(msg).split("\r\n\r\n")[1].length); log(decoder.decode(msg)) }, - ); - let ws = await epoxy_client.connect_tls( - handlers, + const { read, write } = await epoxy_client.connect_tls( "google.com:443", ); - await ws.send("GET / HTTP 1.1\r\nHost: google.com\r\nConnection: close\r\n\r\n"); + const reader = read.getReader(); + const writer = write.getWriter(); + + log("opened"); + + (async () => { + while (true) { + const { value: msg, done } = await reader.read(); + if (done || !msg) break; + console.log(msg); + log(decoder.decode(msg)) + } + log("closed"); + })(); + + await writer.write(new TextEncoder('utf-8').encode("GET / HTTP 1.1\r\nHost: google.com\r\n\r\n")); await (new Promise((res, _) => setTimeout(res, 500))); - await ws.close(); + await writer.close(); } else if (should_udp_test) { let decoder = new TextDecoder(); - let handlers = new EpoxyHandlers( - () => log("opened"), - () => log("closed"), - err => console.error(err), - msg => { console.log(msg); log(decoder.decode(msg)) }, - ); // tokio example: `cargo r --example echo-udp -- 127.0.0.1:5000` - let ws = await epoxy_client.connect_udp( - handlers, + const { read, write } = await epoxy_client.connect_udp( "127.0.0.1:5000", ); + + const reader = read.getReader(); + const writer = write.getWriter(); + + log("opened"); + + (async () => { + while (true) { + const { value: msg, done } = await reader.read(); + if (done || !msg) break; + console.log(msg); + log(decoder.decode(msg)) + } + log("closed"); + })(); + while (true) { log("sending `data`"); - await ws.send("data"); + await writer.write(new TextEncoder('utf-8').encode("data")); await (new Promise((res, _) => setTimeout(res, 100))); } } else if (should_reconnect_test) { diff --git a/client/src/io_stream.rs b/client/src/io_stream.rs index 35df5e9..5cc79e5 100644 --- a/client/src/io_stream.rs +++ b/client/src/io_stream.rs @@ -1,186 +1,70 @@ -use bytes::{buf::UninitSlice, BufMut, BytesMut}; -use futures_util::{io::WriteHalf, lock::Mutex, AsyncReadExt, AsyncWriteExt, SinkExt, StreamExt}; -use js_sys::{Function, Uint8Array}; +use std::pin::Pin; + +use bytes::{Bytes, BytesMut}; +use futures_util::{AsyncReadExt, AsyncWriteExt, Sink, SinkExt, Stream, TryStreamExt}; +use js_sys::{Object, Uint8Array}; use wasm_bindgen::prelude::*; -use wasm_bindgen_futures::spawn_local; -use wisp_mux::MuxStreamIoSink; +use wasm_streams::{ReadableStream, WritableStream}; use crate::{ stream_provider::{ProviderAsyncRW, ProviderUnencryptedStream}, - utils::convert_body, - EpoxyError, EpoxyHandlers, + utils::{convert_body, object_set, ReaderStream}, + EpoxyError, }; +#[wasm_bindgen(typescript_custom_section)] +const IO_STREAM_RET: &'static str = r#" +type EpoxyIoStream = { + read: ReadableStream, + write: WritableStream, +} +"#; + #[wasm_bindgen] -pub struct EpoxyIoStream { - tx: Mutex>, - onerror: Function, +extern "C" { + #[wasm_bindgen(typescript_type = "EpoxyIoStream")] + pub type EpoxyIoStream; } -#[wasm_bindgen] -impl EpoxyIoStream { - pub(crate) fn connect(stream: ProviderAsyncRW, handlers: EpoxyHandlers) -> Self { - let (mut rx, tx) = stream.split(); - let tx = Mutex::new(tx); - - let EpoxyHandlers { - onopen, - onclose, - onerror, - onmessage, - } = handlers; - - let onerror_cloned = onerror.clone(); - - // similar to tokio_util::io::ReaderStream - spawn_local(async move { - let mut buf = BytesMut::with_capacity(4096); - loop { - match rx - .read(unsafe { - std::mem::transmute::<&mut UninitSlice, &mut [u8]>(buf.chunk_mut()) - }) - .await - { - Ok(cnt) => { - if cnt > 0 { - unsafe { buf.advance_mut(cnt) }; - - let _ = onmessage - .call1(&JsValue::null(), &Uint8Array::from(buf.split().as_ref())); - } - } - Err(err) => { - let _ = onerror.call1(&JsValue::null(), &JsError::from(err).into()); - break; - } - } - } - let _ = onclose.call0(&JsValue::null()); - }); - - let _ = onopen.call0(&JsValue::null()); - - Self { - tx, - onerror: onerror_cloned, - } - } - - pub async fn send(&self, payload: JsValue) -> Result<(), EpoxyError> { - let ret: Result<(), EpoxyError> = async move { - let payload = convert_body(payload) +fn create_iostream( + stream: Pin>>>, + sink: Pin>>, +) -> EpoxyIoStream { + let read = ReadableStream::from_stream( + stream + .map_ok(|x| Uint8Array::from(x.as_ref()).into()) + .map_err(Into::into), + ) + .into_raw(); + let write = WritableStream::from_sink( + sink.with(|x| async { + convert_body(x) .await - .map_err(|_| EpoxyError::InvalidPayload)? - .0 - .to_vec(); - Ok(self.tx.lock().await.write_all(&payload).await?) - } - .await; + .map_err(|_| EpoxyError::InvalidPayload) + .map(|x| BytesMut::from(x.0.to_vec().as_slice())) + }) + .sink_map_err(Into::into), + ) + .into_raw(); - match ret { - Ok(ok) => Ok(ok), - Err(err) => { - let _ = self - .onerror - .call1(&JsValue::null(), &err.to_string().into()); - Err(err) - } - } - } - - pub async fn close(&self) -> Result<(), EpoxyError> { - match self.tx.lock().await.close().await { - Ok(ok) => Ok(ok), - Err(err) => { - let _ = self - .onerror - .call1(&JsValue::null(), &err.to_string().into()); - Err(err.into()) - } - } - } + let out = Object::new(); + object_set(&out, "read", read.into()); + object_set(&out, "write", write.into()); + JsValue::from(out).into() } -#[wasm_bindgen] -pub struct EpoxyUdpStream { - tx: Mutex, - onerror: Function, +pub fn iostream_from_asyncrw(asyncrw: ProviderAsyncRW) -> EpoxyIoStream { + let (rx, tx) = asyncrw.split(); + create_iostream( + Box::pin(ReaderStream::new(Box::pin(rx)).map_err(EpoxyError::Io)), + Box::pin(tx.into_sink().sink_map_err(EpoxyError::Io)), + ) } -#[wasm_bindgen] -impl EpoxyUdpStream { - pub(crate) fn connect(stream: ProviderUnencryptedStream, handlers: EpoxyHandlers) -> Self { - let (mut rx, tx) = stream.into_split(); - - let EpoxyHandlers { - onopen, - onclose, - onerror, - onmessage, - } = handlers; - - let onerror_cloned = onerror.clone(); - - spawn_local(async move { - while let Some(packet) = rx.next().await { - match packet { - Ok(buf) => { - let _ = onmessage.call1(&JsValue::null(), &Uint8Array::from(buf.as_ref())); - } - Err(err) => { - let _ = onerror.call1(&JsValue::null(), &JsError::from(err).into()); - break; - } - } - } - let _ = onclose.call0(&JsValue::null()); - }); - - let _ = onopen.call0(&JsValue::null()); - - Self { - tx: tx.into(), - onerror: onerror_cloned, - } - } - - pub async fn send(&self, payload: JsValue) -> Result<(), EpoxyError> { - let ret: Result<(), EpoxyError> = async move { - let payload = convert_body(payload) - .await - .map_err(|_| EpoxyError::InvalidPayload)? - .0 - .to_vec(); - Ok(self - .tx - .lock() - .await - .send(BytesMut::from(payload.as_slice())) - .await?) - } - .await; - - match ret { - Ok(ok) => Ok(ok), - Err(err) => { - let _ = self - .onerror - .call1(&JsValue::null(), &err.to_string().into()); - Err(err) - } - } - } - - pub async fn close(&self) -> Result<(), EpoxyError> { - match self.tx.lock().await.close().await { - Ok(ok) => Ok(ok), - Err(err) => { - let _ = self - .onerror - .call1(&JsValue::null(), &err.to_string().into()); - Err(err.into()) - } - } - } +pub fn iostream_from_stream(stream: ProviderUnencryptedStream) -> EpoxyIoStream { + let (rx, tx) = stream.into_split(); + create_iostream( + Box::pin(rx.map_err(EpoxyError::Io)), + Box::pin(tx.sink_map_err(EpoxyError::Io)), + ) } diff --git a/client/src/lib.rs b/client/src/lib.rs index c77debf..eb80325 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -20,7 +20,7 @@ use http::{ use hyper::{body::Incoming, Uri}; use hyper_util_wasm::client::legacy::Client; #[cfg(feature = "full")] -use io_stream::{EpoxyIoStream, EpoxyUdpStream}; +use io_stream::{iostream_from_asyncrw, iostream_from_stream, EpoxyIoStream}; use js_sys::{Array, Function, Object, Promise}; use send_wrapper::SendWrapper; use stream_provider::{StreamProvider, StreamProviderService}; @@ -312,7 +312,7 @@ fn get_stream_provider( )) })) }), - &options, + options, ) } @@ -383,75 +383,39 @@ impl EpoxyClient { } #[cfg(feature = "full")] - pub async fn connect_tcp( - &self, - handlers: EpoxyHandlers, - url: String, - ) -> Result { + pub async fn connect_tcp(&self, url: String) -> Result { let url: Uri = url.try_into()?; let host = url.host().ok_or(EpoxyError::NoUrlHost)?; let port = url.port_u16().ok_or(EpoxyError::NoUrlPort)?; - match self + let stream = self .stream_provider .get_asyncread(StreamType::Tcp, host.to_string(), port) - .await - { - Ok(stream) => Ok(EpoxyIoStream::connect(Either::Right(stream), handlers)), - Err(err) => { - let _ = handlers - .onerror - .call1(&JsValue::null(), &err.to_string().into()); - Err(err) - } - } + .await?; + Ok(iostream_from_asyncrw(Either::Right(stream))) } #[cfg(feature = "full")] - pub async fn connect_tls( - &self, - handlers: EpoxyHandlers, - url: String, - ) -> Result { + pub async fn connect_tls(&self, url: String) -> Result { let url: Uri = url.try_into()?; let host = url.host().ok_or(EpoxyError::NoUrlHost)?; let port = url.port_u16().ok_or(EpoxyError::NoUrlPort)?; - match self + let stream = self .stream_provider .get_tls_stream(host.to_string(), port) - .await - { - Ok(stream) => Ok(EpoxyIoStream::connect(Either::Left(stream), handlers)), - Err(err) => { - let _ = handlers - .onerror - .call1(&JsValue::null(), &err.to_string().into()); - Err(err) - } - } + .await?; + Ok(iostream_from_asyncrw(Either::Left(stream))) } #[cfg(feature = "full")] - pub async fn connect_udp( - &self, - handlers: EpoxyHandlers, - url: String, - ) -> Result { + pub async fn connect_udp(&self, url: String) -> Result { let url: Uri = url.try_into()?; let host = url.host().ok_or(EpoxyError::NoUrlHost)?; let port = url.port_u16().ok_or(EpoxyError::NoUrlPort)?; - match self + let stream = self .stream_provider .get_stream(StreamType::Udp, host.to_string(), port) - .await - { - Ok(stream) => Ok(EpoxyUdpStream::connect(stream, handlers)), - Err(err) => { - let _ = handlers - .onerror - .call1(&JsValue::null(), &err.to_string().into()); - Err(err) - } - } + .await?; + Ok(iostream_from_stream(stream)) } async fn send_req_inner(