remove most of io_stream and replace with web streams

This commit is contained in:
Toshit Chawda 2024-10-09 23:03:55 -07:00
parent 7c9605474d
commit dcf638efca
No known key found for this signature in database
GPG key ID: 91480ED99E2B3D9D
3 changed files with 107 additions and 241 deletions

View file

@ -29,6 +29,7 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox
await initEpoxy();
let epoxy_client_options = new EpoxyClientOptions();
epoxy_client_options.user_agent = navigator.userAgent;
epoxy_client_options.wisp_v2 = true;
let epoxy_client;
@ -36,8 +37,8 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox
log("using wisptransport with websocketstream backend");
epoxy_client = new EpoxyClient(async () => {
let wss = new WebSocketStream("ws://localhost:4000/");
let {readable, writable} = await wss.opened;
return {read: readable, write: writable};
let { readable, writable } = await wss.opened;
return { read: readable, write: writable };
}, epoxy_client_options);
} else {
epoxy_client = new EpoxyClient("ws://localhost:4000/", epoxy_client_options);
@ -210,35 +211,52 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox
}
} else if (should_tls_test) {
let decoder = new TextDecoder();
let handlers = new EpoxyHandlers(
() => log("opened"),
() => log("closed"),
err => console.error(err),
msg => { console.log(msg); console.log(decoder.decode(msg).split("\r\n\r\n")[1].length); log(decoder.decode(msg)) },
);
let ws = await epoxy_client.connect_tls(
handlers,
const { read, write } = await epoxy_client.connect_tls(
"google.com:443",
);
await ws.send("GET / HTTP 1.1\r\nHost: google.com\r\nConnection: close\r\n\r\n");
const reader = read.getReader();
const writer = write.getWriter();
log("opened");
(async () => {
while (true) {
const { value: msg, done } = await reader.read();
if (done || !msg) break;
console.log(msg);
log(decoder.decode(msg))
}
log("closed");
})();
await writer.write(new TextEncoder('utf-8').encode("GET / HTTP 1.1\r\nHost: google.com\r\n\r\n"));
await (new Promise((res, _) => setTimeout(res, 500)));
await ws.close();
await writer.close();
} else if (should_udp_test) {
let decoder = new TextDecoder();
let handlers = new EpoxyHandlers(
() => log("opened"),
() => log("closed"),
err => console.error(err),
msg => { console.log(msg); log(decoder.decode(msg)) },
);
// tokio example: `cargo r --example echo-udp -- 127.0.0.1:5000`
let ws = await epoxy_client.connect_udp(
handlers,
const { read, write } = await epoxy_client.connect_udp(
"127.0.0.1:5000",
);
const reader = read.getReader();
const writer = write.getWriter();
log("opened");
(async () => {
while (true) {
const { value: msg, done } = await reader.read();
if (done || !msg) break;
console.log(msg);
log(decoder.decode(msg))
}
log("closed");
})();
while (true) {
log("sending `data`");
await ws.send("data");
await writer.write(new TextEncoder('utf-8').encode("data"));
await (new Promise((res, _) => setTimeout(res, 100)));
}
} else if (should_reconnect_test) {

View file

@ -1,186 +1,70 @@
use bytes::{buf::UninitSlice, BufMut, BytesMut};
use futures_util::{io::WriteHalf, lock::Mutex, AsyncReadExt, AsyncWriteExt, SinkExt, StreamExt};
use js_sys::{Function, Uint8Array};
use std::pin::Pin;
use bytes::{Bytes, BytesMut};
use futures_util::{AsyncReadExt, AsyncWriteExt, Sink, SinkExt, Stream, TryStreamExt};
use js_sys::{Object, Uint8Array};
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::spawn_local;
use wisp_mux::MuxStreamIoSink;
use wasm_streams::{ReadableStream, WritableStream};
use crate::{
stream_provider::{ProviderAsyncRW, ProviderUnencryptedStream},
utils::convert_body,
EpoxyError, EpoxyHandlers,
utils::{convert_body, object_set, ReaderStream},
EpoxyError,
};
#[wasm_bindgen(typescript_custom_section)]
const IO_STREAM_RET: &'static str = r#"
type EpoxyIoStream = {
read: ReadableStream<Uint8Array>,
write: WritableStream<Uint8Array>,
}
"#;
#[wasm_bindgen]
pub struct EpoxyIoStream {
tx: Mutex<WriteHalf<ProviderAsyncRW>>,
onerror: Function,
extern "C" {
#[wasm_bindgen(typescript_type = "EpoxyIoStream")]
pub type EpoxyIoStream;
}
#[wasm_bindgen]
impl EpoxyIoStream {
pub(crate) fn connect(stream: ProviderAsyncRW, handlers: EpoxyHandlers) -> Self {
let (mut rx, tx) = stream.split();
let tx = Mutex::new(tx);
let EpoxyHandlers {
onopen,
onclose,
onerror,
onmessage,
} = handlers;
let onerror_cloned = onerror.clone();
// similar to tokio_util::io::ReaderStream
spawn_local(async move {
let mut buf = BytesMut::with_capacity(4096);
loop {
match rx
.read(unsafe {
std::mem::transmute::<&mut UninitSlice, &mut [u8]>(buf.chunk_mut())
})
.await
{
Ok(cnt) => {
if cnt > 0 {
unsafe { buf.advance_mut(cnt) };
let _ = onmessage
.call1(&JsValue::null(), &Uint8Array::from(buf.split().as_ref()));
}
}
Err(err) => {
let _ = onerror.call1(&JsValue::null(), &JsError::from(err).into());
break;
}
}
}
let _ = onclose.call0(&JsValue::null());
});
let _ = onopen.call0(&JsValue::null());
Self {
tx,
onerror: onerror_cloned,
}
}
pub async fn send(&self, payload: JsValue) -> Result<(), EpoxyError> {
let ret: Result<(), EpoxyError> = async move {
let payload = convert_body(payload)
fn create_iostream(
stream: Pin<Box<dyn Stream<Item = Result<Bytes, EpoxyError>>>>,
sink: Pin<Box<dyn Sink<BytesMut, Error = EpoxyError>>>,
) -> EpoxyIoStream {
let read = ReadableStream::from_stream(
stream
.map_ok(|x| Uint8Array::from(x.as_ref()).into())
.map_err(Into::into),
)
.into_raw();
let write = WritableStream::from_sink(
sink.with(|x| async {
convert_body(x)
.await
.map_err(|_| EpoxyError::InvalidPayload)?
.0
.to_vec();
Ok(self.tx.lock().await.write_all(&payload).await?)
}
.await;
.map_err(|_| EpoxyError::InvalidPayload)
.map(|x| BytesMut::from(x.0.to_vec().as_slice()))
})
.sink_map_err(Into::into),
)
.into_raw();
match ret {
Ok(ok) => Ok(ok),
Err(err) => {
let _ = self
.onerror
.call1(&JsValue::null(), &err.to_string().into());
Err(err)
}
}
}
pub async fn close(&self) -> Result<(), EpoxyError> {
match self.tx.lock().await.close().await {
Ok(ok) => Ok(ok),
Err(err) => {
let _ = self
.onerror
.call1(&JsValue::null(), &err.to_string().into());
Err(err.into())
}
}
}
let out = Object::new();
object_set(&out, "read", read.into());
object_set(&out, "write", write.into());
JsValue::from(out).into()
}
#[wasm_bindgen]
pub struct EpoxyUdpStream {
tx: Mutex<MuxStreamIoSink>,
onerror: Function,
pub fn iostream_from_asyncrw(asyncrw: ProviderAsyncRW) -> EpoxyIoStream {
let (rx, tx) = asyncrw.split();
create_iostream(
Box::pin(ReaderStream::new(Box::pin(rx)).map_err(EpoxyError::Io)),
Box::pin(tx.into_sink().sink_map_err(EpoxyError::Io)),
)
}
#[wasm_bindgen]
impl EpoxyUdpStream {
pub(crate) fn connect(stream: ProviderUnencryptedStream, handlers: EpoxyHandlers) -> Self {
let (mut rx, tx) = stream.into_split();
let EpoxyHandlers {
onopen,
onclose,
onerror,
onmessage,
} = handlers;
let onerror_cloned = onerror.clone();
spawn_local(async move {
while let Some(packet) = rx.next().await {
match packet {
Ok(buf) => {
let _ = onmessage.call1(&JsValue::null(), &Uint8Array::from(buf.as_ref()));
}
Err(err) => {
let _ = onerror.call1(&JsValue::null(), &JsError::from(err).into());
break;
}
}
}
let _ = onclose.call0(&JsValue::null());
});
let _ = onopen.call0(&JsValue::null());
Self {
tx: tx.into(),
onerror: onerror_cloned,
}
}
pub async fn send(&self, payload: JsValue) -> Result<(), EpoxyError> {
let ret: Result<(), EpoxyError> = async move {
let payload = convert_body(payload)
.await
.map_err(|_| EpoxyError::InvalidPayload)?
.0
.to_vec();
Ok(self
.tx
.lock()
.await
.send(BytesMut::from(payload.as_slice()))
.await?)
}
.await;
match ret {
Ok(ok) => Ok(ok),
Err(err) => {
let _ = self
.onerror
.call1(&JsValue::null(), &err.to_string().into());
Err(err)
}
}
}
pub async fn close(&self) -> Result<(), EpoxyError> {
match self.tx.lock().await.close().await {
Ok(ok) => Ok(ok),
Err(err) => {
let _ = self
.onerror
.call1(&JsValue::null(), &err.to_string().into());
Err(err.into())
}
}
}
pub fn iostream_from_stream(stream: ProviderUnencryptedStream) -> EpoxyIoStream {
let (rx, tx) = stream.into_split();
create_iostream(
Box::pin(rx.map_err(EpoxyError::Io)),
Box::pin(tx.sink_map_err(EpoxyError::Io)),
)
}

View file

@ -20,7 +20,7 @@ use http::{
use hyper::{body::Incoming, Uri};
use hyper_util_wasm::client::legacy::Client;
#[cfg(feature = "full")]
use io_stream::{EpoxyIoStream, EpoxyUdpStream};
use io_stream::{iostream_from_asyncrw, iostream_from_stream, EpoxyIoStream};
use js_sys::{Array, Function, Object, Promise};
use send_wrapper::SendWrapper;
use stream_provider::{StreamProvider, StreamProviderService};
@ -312,7 +312,7 @@ fn get_stream_provider(
))
}))
}),
&options,
options,
)
}
@ -383,75 +383,39 @@ impl EpoxyClient {
}
#[cfg(feature = "full")]
pub async fn connect_tcp(
&self,
handlers: EpoxyHandlers,
url: String,
) -> Result<EpoxyIoStream, EpoxyError> {
pub async fn connect_tcp(&self, url: String) -> Result<EpoxyIoStream, EpoxyError> {
let url: Uri = url.try_into()?;
let host = url.host().ok_or(EpoxyError::NoUrlHost)?;
let port = url.port_u16().ok_or(EpoxyError::NoUrlPort)?;
match self
let stream = self
.stream_provider
.get_asyncread(StreamType::Tcp, host.to_string(), port)
.await
{
Ok(stream) => Ok(EpoxyIoStream::connect(Either::Right(stream), handlers)),
Err(err) => {
let _ = handlers
.onerror
.call1(&JsValue::null(), &err.to_string().into());
Err(err)
}
}
.await?;
Ok(iostream_from_asyncrw(Either::Right(stream)))
}
#[cfg(feature = "full")]
pub async fn connect_tls(
&self,
handlers: EpoxyHandlers,
url: String,
) -> Result<EpoxyIoStream, EpoxyError> {
pub async fn connect_tls(&self, url: String) -> Result<EpoxyIoStream, EpoxyError> {
let url: Uri = url.try_into()?;
let host = url.host().ok_or(EpoxyError::NoUrlHost)?;
let port = url.port_u16().ok_or(EpoxyError::NoUrlPort)?;
match self
let stream = self
.stream_provider
.get_tls_stream(host.to_string(), port)
.await
{
Ok(stream) => Ok(EpoxyIoStream::connect(Either::Left(stream), handlers)),
Err(err) => {
let _ = handlers
.onerror
.call1(&JsValue::null(), &err.to_string().into());
Err(err)
}
}
.await?;
Ok(iostream_from_asyncrw(Either::Left(stream)))
}
#[cfg(feature = "full")]
pub async fn connect_udp(
&self,
handlers: EpoxyHandlers,
url: String,
) -> Result<EpoxyUdpStream, EpoxyError> {
pub async fn connect_udp(&self, url: String) -> Result<EpoxyIoStream, EpoxyError> {
let url: Uri = url.try_into()?;
let host = url.host().ok_or(EpoxyError::NoUrlHost)?;
let port = url.port_u16().ok_or(EpoxyError::NoUrlPort)?;
match self
let stream = self
.stream_provider
.get_stream(StreamType::Udp, host.to_string(), port)
.await
{
Ok(stream) => Ok(EpoxyUdpStream::connect(stream, handlers)),
Err(err) => {
let _ = handlers
.onerror
.call1(&JsValue::null(), &err.to_string().into());
Err(err)
}
}
.await?;
Ok(iostream_from_stream(stream))
}
async fn send_req_inner(