diff --git a/Cargo.lock b/Cargo.lock index cc2678a..a0df337 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -354,7 +354,7 @@ dependencies = [ [[package]] name = "epoxy-client" -version = "1.3.0" +version = "1.4.0" dependencies = [ "async-compression", "async_io_stream", @@ -1849,7 +1849,7 @@ checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8" [[package]] name = "wisp-mux" -version = "1.2.0" +version = "1.2.1" dependencies = [ "async_io_stream", "bytes", diff --git a/client/Cargo.toml b/client/Cargo.toml index d392d09..b0da251 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "epoxy-client" -version = "1.3.0" +version = "1.4.0" edition = "2021" license = "LGPL-3.0-only" diff --git a/client/demo.js b/client/demo.js index 772dff6..950305c 100644 --- a/client/demo.js +++ b/client/demo.js @@ -171,7 +171,7 @@ onmessage = async (msg) => { ); while (true) { log("sending `data`"); - await ws.send("data"); + await ws.send_text("data"); await (new Promise((res, _) => setTimeout(res, 50))); } } else if (should_tls_test) { @@ -181,14 +181,14 @@ onmessage = async (msg) => { () => log("closed"), err => console.error(err), msg => { console.log(msg); log(decoder.decode(msg)) }, - "alicesworld.tech:443", + "google.com:443", ); - await ws.send("GET / HTTP 1.1\r\nHost: alicesworld.tech\r\nConnection: close\r\n\r\n"); + await ws.send((new TextEncoder()).encode("GET / HTTP 1.1\r\nHost: google.com\r\nConnection: close\r\n\r\n")); await (new Promise((res, _) => setTimeout(res, 500))); await ws.close(); } else if (should_udp_test) { let decoder = new TextDecoder(); - // nc -ulp 5000 + // tokio example: `cargo r --example echo-udp -- 127.0.0.1:5000` let ws = await epoxy_client.connect_udp( () => log("opened"), () => log("closed"), @@ -196,8 +196,11 @@ onmessage = async (msg) => { msg => { console.log(msg); log(decoder.decode(msg)) }, "127.0.0.1:5000", ); - await (new Promise((res, _) => setTimeout(res, 5000))); - await ws.close(); + while (true) { + log("sending `data`"); + await ws.send((new TextEncoder()).encode("data")); + await (new Promise((res, _) => setTimeout(res, 50))); + } } else if (should_reconnect_test) { while (true) { try { diff --git a/client/package.json b/client/package.json index 99a5c52..f734c23 100644 --- a/client/package.json +++ b/client/package.json @@ -1,6 +1,6 @@ { "name": "@mercuryworkshop/epoxy-tls", - "version": "1.3.0", + "version": "1.4.0", "description": "A wasm library for using raw encrypted tls/ssl/https/websocket streams on the browser", "scripts": { "build": "./build.sh" diff --git a/client/src/websocket.rs b/client/src/websocket.rs index 63d949c..fff1f44 100644 --- a/client/src/websocket.rs +++ b/client/src/websocket.rs @@ -105,7 +105,8 @@ impl EpxWebSocket { let _ = onclose.call0(&JsValue::null()); break; } - _ => panic!("unknown opcode {:?}", frame.opcode), + // ping/pong/continue + _ => {}, } } }); @@ -126,7 +127,7 @@ impl EpxWebSocket { } #[wasm_bindgen] - pub async fn send(&self, payload: String) -> Result<(), JsError> { + pub async fn send_text(&self, payload: String) -> Result<(), JsError> { let onerr = self.onerror.clone(); let ret = self .tx @@ -135,7 +136,24 @@ impl EpxWebSocket { .write_frame(Frame::text(Payload::Owned(payload.as_bytes().to_vec()))) .await; if let Err(ret) = ret { - let _ = onerr.call1(&JsValue::null(), &jval!(format!("{}", ret))); + let _ = onerr.call1(&JsValue::null(), &jval!(ret.to_string())); + Err(ret.into()) + } else { + Ok(ret?) + } + } + + #[wasm_bindgen] + pub async fn send_binary(&self, payload: Uint8Array) -> Result<(), JsError> { + let onerr = self.onerror.clone(); + let ret = self + .tx + .lock() + .await + .write_frame(Frame::binary(Payload::Owned(payload.to_vec()))) + .await; + if let Err(ret) = ret { + let _ = onerr.call1(&JsValue::null(), &jval!(ret.to_string())); Err(ret.into()) } else { Ok(ret?) diff --git a/wisp/Cargo.toml b/wisp/Cargo.toml index db960e3..b41e6ae 100644 --- a/wisp/Cargo.toml +++ b/wisp/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "wisp-mux" -version = "1.2.0" +version = "1.2.1" license = "LGPL-3.0-only" description = "A library for easily creating Wisp servers and clients." homepage = "https://github.com/MercuryWorkshop/epoxy-tls/tree/multiplexed/wisp" diff --git a/wisp/src/lib.rs b/wisp/src/lib.rs index 79971ba..59796d9 100644 --- a/wisp/src/lib.rs +++ b/wisp/src/lib.rs @@ -182,7 +182,12 @@ impl ServerMuxInner { .await?; loop { - let packet: Packet = rx.wisp_read_frame(&self.tx).await?.try_into()?; + let frame = rx.wisp_read_frame(&self.tx).await?; + if frame.opcode == ws::OpCode::Close { + break Ok(()); + } + let packet = Packet::try_from(frame)?; + use PacketType::*; match packet.packet_type { Connect(inner_packet) => { @@ -368,29 +373,32 @@ impl ClientMuxInner { { loop { let frame = rx.wisp_read_frame(&self.tx).await?; - if let Ok(packet) = Packet::try_from(frame) { - use PacketType::*; - match packet.packet_type { - Connect(_) => unreachable!(), - Data(data) => { - if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { - let _ = stream.stream.unbounded_send(MuxEvent::Send(data)); - } + if frame.opcode == ws::OpCode::Close { + break Ok(()); + } + let packet = Packet::try_from(frame)?; + + use PacketType::*; + match packet.packet_type { + Connect(_) => unreachable!(), + Data(data) => { + if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { + let _ = stream.stream.unbounded_send(MuxEvent::Send(data)); } - Continue(inner_packet) => { - if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { - stream - .flow_control - .store(inner_packet.buffer_remaining, Ordering::Release); - let _ = stream.flow_control_event.notify(u32::MAX); - } + } + Continue(inner_packet) => { + if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { + stream + .flow_control + .store(inner_packet.buffer_remaining, Ordering::Release); + let _ = stream.flow_control_event.notify(u32::MAX); } - Close(inner_packet) => { - if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { - let _ = stream.stream.unbounded_send(MuxEvent::Close(inner_packet)); - } - self.stream_map.lock().await.remove(&packet.stream_id); + } + Close(inner_packet) => { + if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { + let _ = stream.stream.unbounded_send(MuxEvent::Close(inner_packet)); } + self.stream_map.lock().await.remove(&packet.stream_id); } } }