diff --git a/Cargo.lock b/Cargo.lock index 472c372..87e882f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,35 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + +[[package]] +name = "async-compression" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc2d0cfb2a7388d34f590e76686704c494ed7aaceed62ee1ba35cbf363abc2a5" +dependencies = [ + "brotli", + "flate2", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", +] + [[package]] name = "async_io_stream" version = "0.3.3" @@ -71,6 +100,27 @@ dependencies = [ "generic-array", ] +[[package]] +name = "brotli" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "516074a47ef4bce09577a3b379392300159ce5b1ba2e501ff1c819950066100f" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "2.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e2e4afe60d7dd600fdd3de8d0f08c2b7ec039712e3b6137ff98b7004e82de4f" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bumpalo" version = "3.14.0" @@ -139,6 +189,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crc32fast" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +dependencies = [ + "cfg-if", +] + [[package]] name = "crypto-common" version = "0.1.6" @@ -187,6 +246,16 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" +[[package]] +name = "flate2" +version = "1.0.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46303f565772937ffe1d394a4fac6f411c6013172fadde9dcdb1e147a086940e" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1454,6 +1523,7 @@ dependencies = [ name = "wstcp-client" version = "1.0.0" dependencies = [ + "async-compression", "bytes", "console_error_panic_hook", "either", diff --git a/client/Cargo.toml b/client/Cargo.toml index 7455715..87cef46 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -28,7 +28,8 @@ tokio-rustls = "0.25.0" web-sys = { version = "0.3.66", features = ["TextEncoder", "Navigator", "Response", "ResponseInit"] } wasm-streams = "0.4.0" either = "1.9.0" -tokio-util = "0.7.10" +tokio-util = { version = "0.7.10", features = ["io"] } +async-compression = { version = "0.4.5", features = ["tokio", "gzip", "brotli"] } [dependencies.getrandom] features = ["js"] diff --git a/client/src/lib.rs b/client/src/lib.rs index a64935a..b67ac4f 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -10,17 +10,18 @@ use wrappers::{IncomingBody, WsStreamWrapper}; use std::sync::Arc; +use async_compression::tokio::bufread as async_comp; use bytes::Bytes; +use futures_util::StreamExt; use http::{uri, HeaderName, HeaderValue, Request, Response}; -use hyper::{ - body::Incoming, - client::conn::http1::Builder, - Uri, -}; +use hyper::{body::Incoming, client::conn::http1::Builder, Uri}; use js_sys::{Array, Object, Reflect, Uint8Array}; use penguin_mux_wasm::{Multiplexor, MuxStream, Role}; use tokio_rustls::{client::TlsStream, rustls, rustls::RootCertStore, TlsConnector}; -use tokio_util::either::Either; +use tokio_util::{ + either::Either, + io::{ReaderStream, StreamReader}, +}; use wasm_bindgen::prelude::*; use web_sys::TextEncoder; @@ -32,6 +33,11 @@ enum WsTcpResponse { Redirect((Response, http::Request, Uri)), } +enum WsTcpCompression { + Brotli, + Gzip, +} + type WsTcpTlsStream = TlsStream>; type WsTcpUnencryptedStream = MuxStream; type WsTcpStream = Either; @@ -246,6 +252,7 @@ impl WsTcp { let mut builder = Request::builder().uri(uri.clone()).method(req_method); let headers_map = builder.headers_mut().replace_err("Failed to get headers")?; + headers_map.insert("Accept-Encoding", HeaderValue::from_str("gzip, br")?); headers_map.insert("Connection", HeaderValue::from_str("close")?); headers_map.insert("User-Agent", HeaderValue::from_str(&self.useragent)?); headers_map.insert("Host", HeaderValue::from_str(uri_host)?); @@ -290,8 +297,36 @@ impl WsTcp { .status(resp.status().as_u16()) .status_text(resp.status().canonical_reason().unwrap_or_default()); - let body = IncomingBody::new(resp.into_body()); - let stream = wasm_streams::ReadableStream::from_stream(body); + let compression = match resp + .headers() + .get("Content-Encoding") + .and_then(|val| val.to_str().ok()) + .unwrap_or_default() + { + "gzip" => Some(WsTcpCompression::Gzip), + "br" => Some(WsTcpCompression::Brotli), + _ => None, + }; + + let incoming_body = IncomingBody::new(resp.into_body()); + let decompressed_body = match compression { + Some(alg) => match alg { + WsTcpCompression::Gzip => Either::Left(Either::Left(ReaderStream::new( + async_comp::GzipDecoder::new(StreamReader::new(incoming_body)), + ))), + WsTcpCompression::Brotli => Either::Left(Either::Right(ReaderStream::new( + async_comp::BrotliDecoder::new(StreamReader::new(incoming_body)), + ))), + }, + None => Either::Right(incoming_body), + }; + let stream = wasm_streams::ReadableStream::from_stream(decompressed_body.map(|x| { + Ok(Uint8Array::from( + x.replace_err_jv("Failed to get frame from response")? + .as_ref(), + ) + .into()) + })); let resp = web_sys::Response::new_with_opt_readable_stream_and_init( Some(&stream.into_raw()), diff --git a/client/src/web/index.js b/client/src/web/index.js index 7a0fb80..0d383b5 100644 --- a/client/src/web/index.js +++ b/client/src/web/index.js @@ -14,10 +14,12 @@ const tconn1 = performance.now(); console.warn(`conn establish took ${tconn1 - tconn0} ms or ${(tconn1 - tconn0) / 1000} s`); - let resp = await wstcp.fetch("http://httpbin.org/post", {method: "POST", body: "wstcp", headers: {"User-Agent": "wstcp"}}); - console.warn(resp); - console.warn(Object.fromEntries(resp.headers)); - console.warn(await resp.text()); + for (const url of ["https://httpbin.org/get", "https://httpbin.org/gzip", "https://httpbin.org/brotli"]) { + let resp = await wstcp.fetch(url); + console.warn(resp); + console.warn(Object.fromEntries(resp.headers)); + console.warn(await resp.text()); + } if (should_test) { const test_mux = async (url) => { @@ -38,13 +40,13 @@ let total_mux = 0; for (const _ of Array(num_tests).keys()) { - total_mux += await test_mux("https://httpbin.org/get"); + total_mux += await test_mux("https://httpbin.org/gzip"); } total_mux = total_mux / num_tests; let total_native = 0; for (const _ of Array(num_tests).keys()) { - total_native += await test_native("https://httpbin.org/get"); + total_native += await test_native("https://httpbin.org/gzip"); } total_native = total_native / num_tests; diff --git a/client/src/wrappers.rs b/client/src/wrappers.rs index 01fa61e..1ecc702 100644 --- a/client/src/wrappers.rs +++ b/client/src/wrappers.rs @@ -130,7 +130,7 @@ impl IncomingBody { } impl Stream for IncomingBody { - type Item = Result; + type Item = std::io::Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); let ret = this.incoming.poll_frame(cx); @@ -138,9 +138,7 @@ impl Stream for IncomingBody { Poll::Ready(item) => Poll::>::Ready(match item { Some(frame) => frame .map(|x| { - x.into_data() - .map(|x| Uint8Array::from(x.as_ref()).into()) - .replace_err_jv("Error creating uint8array from http frame") + x.into_data().map_err(|_| std::io::Error::other("not data frame")) }) .ok(), None => None,