From f226dd99392ad5325ab26f86b12306bebcda2c01 Mon Sep 17 00:00:00 2001 From: Toshit Chawda Date: Tue, 29 Oct 2024 22:09:15 -0700 Subject: [PATCH] use http_body_util streaming body --- client/demo.js | 447 ++++++++++++++++++++++---------------------- client/src/lib.rs | 9 +- client/src/utils.rs | 30 +-- 3 files changed, 233 insertions(+), 253 deletions(-) diff --git a/client/demo.js b/client/demo.js index 0d4e595..2d6e485 100644 --- a/client/demo.js +++ b/client/demo.js @@ -17,6 +17,7 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox const wisp_url = params.get("wisp") || "ws://localhost:4000/"; const wisp_v1 = params.has("v1"); const wisp_udp = params.has("udp_extension"); + const disable_certverif = params.has("disable_certverif"); console.log( "%cWASM is significantly slower with DevTools open!", "color:red;font-size:3rem;font-weight:bold" @@ -30,86 +31,112 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox window.scrollTo(0, document.body.scrollHeight); } - await initEpoxy(); - let epoxy_client_options = new EpoxyClientOptions(); - epoxy_client_options.user_agent = navigator.userAgent; - epoxy_client_options.wisp_v2 = !wisp_v1; - epoxy_client_options.udp_extension_required = wisp_udp; + try { + await initEpoxy(); + let epoxy_client_options = new EpoxyClientOptions(); + epoxy_client_options.user_agent = navigator.userAgent; + epoxy_client_options.wisp_v2 = !wisp_v1; + epoxy_client_options.udp_extension_required = wisp_udp; + epoxy_client_options.disable_certificate_validation = disable_certverif; - let epoxy_client; + let epoxy_client; - log(`connecting to "${wisp_url}"${wisp_v1 ? " with wisp v1" : ""}`); - if (should_wisptransport) { - log("using wisptransport with websocketstream backend"); - epoxy_client = new EpoxyClient(async () => { - let wss = new WebSocketStream(wisp_url); - let { readable, writable } = await wss.opened; - return { read: readable, write: writable }; - }, epoxy_client_options); - } else { - epoxy_client = new EpoxyClient(wisp_url, epoxy_client_options); - } - - const tconn0 = performance.now(); - await epoxy_client.replace_stream_provider(); - const tconn1 = performance.now(); - log(`conn establish took ${tconn1 - tconn0} ms or ${(tconn1 - tconn0) / 1000} s`); - - // epoxy classes are inspectable - console.log(epoxy_client); - // you can change the user agent and redirect limit in JS - epoxy_client.redirect_limit = 15; - console.log(epoxyInfo); - - const test_mux = async (url) => { - const t0 = performance.now(); - await epoxy_client.fetch(url).then(r=>r.text()); - const t1 = performance.now(); - return t1 - t0; - }; - - const test_native = async (url) => { - const t0 = performance.now(); - await fetch(url, { cache: "no-store" }).then(r=>r.text()); - const t1 = performance.now(); - return t1 - t0; - }; - - const readableStream = (buffer) => { - return new ReadableStream({ - start(controller) { - controller.enqueue(buffer); - controller.close(); - } - }); - }; - - if (should_feature_test) { - let formdata = new FormData(); - formdata.append("a", "b"); - for (const url of [ - ["https://httpbin.org/post", { method: "POST", body: readableStream((new TextEncoder()).encode("abc")) }], - ["https://httpbin.org/get", {}], - [new URL("https://httpbin.org/get"), {}], - ["https://httpbin.org/gzip", {}], - ["https://httpbin.org/brotli", {}], - ["https://httpbin.org/redirect/11", {}], - ["https://httpbin.org/redirect/1", { redirect: "manual" }], - ["https://httpbin.org/post", { method: "POST", body: new URLSearchParams("a=b") }], - ["https://httpbin.org/post", { method: "POST", body: formdata }], - ["https://httpbin.org/post", { method: "POST", body: "a" }], - ["https://httpbin.org/post", { method: "POST", body: (new TextEncoder()).encode("abc") }], - ["https://httpbin.org/get", { headers: { "a": "b", "b": "c" } }], - ["https://httpbin.org/get", { headers: new Headers({ "a": "b", "b": "c" }) }] - ]) { - let resp = await epoxy_client.fetch(url[0], url[1]); - console.warn(url, resp, Object.fromEntries(resp.headers)); - log(await resp.text()); + log(`connecting to "${wisp_url}"${wisp_v1 ? " with wisp v1" : ""}`); + if (should_wisptransport) { + log("using wisptransport with websocketstream backend"); + epoxy_client = new EpoxyClient(async () => { + let wss = new WebSocketStream(wisp_url); + let { readable, writable } = await wss.opened; + return { read: readable, write: writable }; + }, epoxy_client_options); + } else { + epoxy_client = new EpoxyClient(wisp_url, epoxy_client_options); } - } else if (should_multiparallel_test) { - const num_tests = 10; - let total_mux_minus_native = 0; - for (const _ of Array(num_tests).keys()) { + + const tconn0 = performance.now(); + await epoxy_client.replace_stream_provider(); + const tconn1 = performance.now(); + log(`conn establish took ${tconn1 - tconn0} ms or ${(tconn1 - tconn0) / 1000} s`); + + // epoxy classes are inspectable + console.log(epoxy_client); + // you can change the user agent and redirect limit in JS + epoxy_client.redirect_limit = 15; + console.log(epoxyInfo); + + const test_mux = async (url) => { + const t0 = performance.now(); + await epoxy_client.fetch(url).then(r => r.text()); + const t1 = performance.now(); + return t1 - t0; + }; + + const test_native = async (url) => { + const t0 = performance.now(); + await fetch(url, { cache: "no-store" }).then(r => r.text()); + const t1 = performance.now(); + return t1 - t0; + }; + + const readableStream = (buffer) => { + return new ReadableStream({ + start(controller) { + controller.enqueue(buffer); + controller.close(); + } + }); + }; + + if (should_feature_test) { + let formdata = new FormData(); + formdata.append("a", "b"); + for (const url of [ + ["https://httpbin.org/post", { method: "POST", body: readableStream((new TextEncoder()).encode("abc")) }], + ["https://httpbin.org/get", {}], + [new URL("https://httpbin.org/get"), {}], + ["https://httpbin.org/gzip", {}], + ["https://httpbin.org/brotli", {}], + ["https://httpbin.org/redirect/11", {}], + ["https://httpbin.org/redirect/1", { redirect: "manual" }], + ["https://httpbin.org/post", { method: "POST", body: new URLSearchParams("a=b") }], + ["https://httpbin.org/post", { method: "POST", body: formdata }], + ["https://httpbin.org/post", { method: "POST", body: "a" }], + ["https://httpbin.org/post", { method: "POST", body: (new TextEncoder()).encode("abc") }], + ["https://httpbin.org/get", { headers: { "a": "b", "b": "c" } }], + ["https://httpbin.org/get", { headers: new Headers({ "a": "b", "b": "c" }) }] + ]) { + let resp = await epoxy_client.fetch(url[0], url[1]); + console.warn(url, resp, Object.fromEntries(resp.headers)); + log(await resp.text()); + } + } else if (should_multiparallel_test) { + const num_tests = 10; + let total_mux_minus_native = 0; + for (const _ of Array(num_tests).keys()) { + let total_mux = 0; + await Promise.all([...Array(num_tests).keys()].map(async i => { + log(`running mux test ${i}`); + return await test_mux(test_url); + })).then((vals) => { total_mux = vals.reduce((acc, x) => acc + x, 0) }); + total_mux = total_mux / num_tests; + + let total_native = 0; + await Promise.all([...Array(num_tests).keys()].map(async i => { + log(`running native test ${i}`); + return await test_native(test_url); + })).then((vals) => { total_native = vals.reduce((acc, x) => acc + x, 0) }); + total_native = total_native / num_tests; + + log(`avg mux (${num_tests}) took ${total_mux} ms or ${total_mux / 1000} s`); + log(`avg native (${num_tests}) took ${total_native} ms or ${total_native / 1000} s`); + log(`avg mux - avg native (${num_tests}): ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`); + total_mux_minus_native += total_mux - total_native; + } + total_mux_minus_native = total_mux_minus_native / num_tests; + log(`total mux - native (${num_tests} tests of ${num_tests} reqs): ${total_mux_minus_native} ms or ${total_mux_minus_native / 1000} s`); + } else if (should_parallel_test) { + const num_tests = 10; + let total_mux = 0; await Promise.all([...Array(num_tests).keys()].map(async i => { log(`running mux test ${i}`); @@ -127,34 +154,34 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox log(`avg mux (${num_tests}) took ${total_mux} ms or ${total_mux / 1000} s`); log(`avg native (${num_tests}) took ${total_native} ms or ${total_native / 1000} s`); log(`avg mux - avg native (${num_tests}): ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`); - total_mux_minus_native += total_mux - total_native; - } - total_mux_minus_native = total_mux_minus_native / num_tests; - log(`total mux - native (${num_tests} tests of ${num_tests} reqs): ${total_mux_minus_native} ms or ${total_mux_minus_native / 1000} s`); - } else if (should_parallel_test) { - const num_tests = 10; + } else if (should_multiperf_test) { + const num_tests = 10; + let total_mux_minus_native = 0; + for (const _ of Array(num_tests).keys()) { + let total_mux = 0; + for (const i of Array(num_tests).keys()) { + log(`running mux test ${i}`); + total_mux += await test_mux(test_url); + } + total_mux = total_mux / num_tests; - let total_mux = 0; - await Promise.all([...Array(num_tests).keys()].map(async i => { - log(`running mux test ${i}`); - return await test_mux(test_url); - })).then((vals) => { total_mux = vals.reduce((acc, x) => acc + x, 0) }); - total_mux = total_mux / num_tests; + let total_native = 0; + for (const i of Array(num_tests).keys()) { + log(`running native test ${i}`); + total_native += await test_native(test_url); + } + total_native = total_native / num_tests; - let total_native = 0; - await Promise.all([...Array(num_tests).keys()].map(async i => { - log(`running native test ${i}`); - return await test_native(test_url); - })).then((vals) => { total_native = vals.reduce((acc, x) => acc + x, 0) }); - total_native = total_native / num_tests; + log(`avg mux (${num_tests}) took ${total_mux} ms or ${total_mux / 1000} s`); + log(`avg native (${num_tests}) took ${total_native} ms or ${total_native / 1000} s`); + log(`avg mux - avg native (${num_tests}): ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`); + total_mux_minus_native += total_mux - total_native; + } + total_mux_minus_native = total_mux_minus_native / num_tests; + log(`total mux - native (${num_tests} tests of ${num_tests} reqs): ${total_mux_minus_native} ms or ${total_mux_minus_native / 1000} s`); + } else if (should_perf_test) { + const num_tests = 10; - log(`avg mux (${num_tests}) took ${total_mux} ms or ${total_mux / 1000} s`); - log(`avg native (${num_tests}) took ${total_native} ms or ${total_native / 1000} s`); - log(`avg mux - avg native (${num_tests}): ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`); - } else if (should_multiperf_test) { - const num_tests = 10; - let total_mux_minus_native = 0; - for (const _ of Array(num_tests).keys()) { let total_mux = 0; for (const i of Array(num_tests).keys()) { log(`running mux test ${i}`); @@ -172,131 +199,111 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox log(`avg mux (${num_tests}) took ${total_mux} ms or ${total_mux / 1000} s`); log(`avg native (${num_tests}) took ${total_native} ms or ${total_native / 1000} s`); log(`avg mux - avg native (${num_tests}): ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`); - total_mux_minus_native += total_mux - total_native; - } - total_mux_minus_native = total_mux_minus_native / num_tests; - log(`total mux - native (${num_tests} tests of ${num_tests} reqs): ${total_mux_minus_native} ms or ${total_mux_minus_native / 1000} s`); - } else if (should_perf_test) { - const num_tests = 10; - - let total_mux = 0; - for (const i of Array(num_tests).keys()) { - log(`running mux test ${i}`); - total_mux += await test_mux(test_url); - } - total_mux = total_mux / num_tests; - - let total_native = 0; - for (const i of Array(num_tests).keys()) { - log(`running native test ${i}`); - total_native += await test_native(test_url); - } - total_native = total_native / num_tests; - - log(`avg mux (${num_tests}) took ${total_mux} ms or ${total_mux / 1000} s`); - log(`avg native (${num_tests}) took ${total_native} ms or ${total_native / 1000} s`); - log(`avg mux - avg native (${num_tests}): ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`); - } else if (should_ws_test) { - let handlers = new EpoxyHandlers( - () => log("opened"), - () => log("closed"), - err => console.error(err), - msg => log(`got "${msg}"`) - ); - let ws = await epoxy_client.connect_websocket( - handlers, - "wss://echo.websocket.events", - [], - { "x-header": "abc" }, - ); - let i = 0; - while (true) { - log(`sending \`data${i}\``); - await ws.send("data" + i); - i++; - await (new Promise((res, _) => setTimeout(res, 10))); - } - } else if (should_tls_test) { - let decoder = new TextDecoder(); - const { read, write } = await epoxy_client.connect_tls( - "google.com:443", - ); - const reader = read.getReader(); - const writer = write.getWriter(); - - log("opened"); - - (async () => { + } else if (should_ws_test) { + let handlers = new EpoxyHandlers( + () => log("opened"), + () => log("closed"), + err => console.error(err), + msg => log(`got "${msg}"`) + ); + let ws = await epoxy_client.connect_websocket( + handlers, + "wss://echo.websocket.events", + [], + { "x-header": "abc" }, + ); + let i = 0; while (true) { - const { value: msg, done } = await reader.read(); - if (done || !msg) break; - console.log(msg); - log(decoder.decode(msg)) + log(`sending \`data${i}\``); + await ws.send("data" + i); + i++; + await (new Promise((res, _) => setTimeout(res, 10))); } - log("closed"); - })(); + } else if (should_tls_test) { + let decoder = new TextDecoder(); + const { read, write } = await epoxy_client.connect_tls( + "google.com:443", + ); + const reader = read.getReader(); + const writer = write.getWriter(); - await writer.write(new TextEncoder('utf-8').encode("GET / HTTP 1.1\r\nHost: google.com\r\n\r\n")); - await (new Promise((res, _) => setTimeout(res, 500))); - await writer.close(); - } else if (should_udp_test) { - let decoder = new TextDecoder(); - // tokio example: `cargo r --example echo-udp -- 127.0.0.1:5000` - const { read, write } = await epoxy_client.connect_udp( - "127.0.0.1:5000", - ); + log("opened"); - const reader = read.getReader(); - const writer = write.getWriter(); + (async () => { + while (true) { + const { value: msg, done } = await reader.read(); + if (done || !msg) break; + console.log(msg); + log(decoder.decode(msg)) + } + log("closed"); + })(); - log("opened"); - - (async () => { - while (true) { - const { value: msg, done } = await reader.read(); - if (done || !msg) break; - console.log(msg); - log(decoder.decode(msg)) - } - log("closed"); - })(); - - while (true) { - log("sending `data`"); - await writer.write(new TextEncoder('utf-8').encode("data")); - await (new Promise((res, _) => setTimeout(res, 100))); - } - } else if (should_reconnect_test) { - while (true) { - try { - await epoxy_client.fetch(test_url); - } catch (e) { console.error(e) } - log("sent req"); + await writer.write(new TextEncoder('utf-8').encode("GET / HTTP 1.1\r\nHost: google.com\r\n\r\n")); await (new Promise((res, _) => setTimeout(res, 500))); - } - } else if (should_perf2_test) { - const num_outer_tests = 10; - const num_inner_tests = 50; - let total_mux_multi = 0; - for (const _ of Array(num_outer_tests).keys()) { - let total_mux = 0; - await Promise.all([...Array(num_inner_tests).keys()].map(async i => { - log(`running mux test ${i}`); - return await test_mux(test_url); - })).then((vals) => { total_mux = vals.reduce((acc, x) => acc + x, 0) }); - total_mux = total_mux / num_inner_tests; + await writer.close(); + } else if (should_udp_test) { + let decoder = new TextDecoder(); + // tokio example: `cargo r --example echo-udp -- 127.0.0.1:5000` + const { read, write } = await epoxy_client.connect_udp( + "127.0.0.1:5000", + ); - log(`avg mux (${num_inner_tests}) took ${total_mux} ms or ${total_mux / 1000} s`); - total_mux_multi += total_mux; + const reader = read.getReader(); + const writer = write.getWriter(); + + log("opened"); + + (async () => { + while (true) { + const { value: msg, done } = await reader.read(); + if (done || !msg) break; + console.log(msg); + log(decoder.decode(msg)) + } + log("closed"); + })(); + + while (true) { + log("sending `data`"); + await writer.write(new TextEncoder('utf-8').encode("data")); + await (new Promise((res, _) => setTimeout(res, 100))); + } + } else if (should_reconnect_test) { + while (true) { + try { + await epoxy_client.fetch(test_url); + } catch (e) { console.error(e) } + log("sent req"); + await (new Promise((res, _) => setTimeout(res, 500))); + } + } else if (should_perf2_test) { + const num_outer_tests = 10; + const num_inner_tests = 50; + let total_mux_multi = 0; + for (const _ of Array(num_outer_tests).keys()) { + let total_mux = 0; + await Promise.all([...Array(num_inner_tests).keys()].map(async i => { + log(`running mux test ${i}`); + return await test_mux(test_url); + })).then((vals) => { total_mux = vals.reduce((acc, x) => acc + x, 0) }); + total_mux = total_mux / num_inner_tests; + + log(`avg mux (${num_inner_tests}) took ${total_mux} ms or ${total_mux / 1000} s`); + total_mux_multi += total_mux; + } + total_mux_multi = total_mux_multi / num_outer_tests; + log(`total avg mux (${num_outer_tests} tests of ${num_inner_tests} reqs): ${total_mux_multi} ms or ${total_mux_multi / 1000} s`); + } else { + console.time(); + let resp = await epoxy_client.fetch(test_url); + console.log(resp, resp.rawHeaders); + log(await resp.arrayBuffer()); + console.timeEnd(); } - total_mux_multi = total_mux_multi / num_outer_tests; - log(`total avg mux (${num_outer_tests} tests of ${num_inner_tests} reqs): ${total_mux_multi} ms or ${total_mux_multi / 1000} s`); - } else { - console.time(); - let resp = await epoxy_client.fetch(test_url); - console.log(resp, resp.rawHeaders); - log(await resp.text()); - console.timeEnd(); + log("done"); + } catch (err) { + console.error(err); + log(err.stack); } - log("done"); })(); diff --git a/client/src/lib.rs b/client/src/lib.rs index abb4d77..19a74b3 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -17,6 +17,7 @@ use http::{ uri::{InvalidUri, InvalidUriParts}, HeaderName, HeaderValue, Method, Request, Response, }; +use http_body_util::BodyDataStream; use hyper::{body::Incoming, Uri}; use hyper_util_wasm::client::legacy::Client; #[cfg(feature = "full")] @@ -27,8 +28,8 @@ use stream_provider::{StreamProvider, StreamProviderService}; use thiserror::Error; use utils::{ asyncread_to_readablestream, convert_body, entries_of_object, from_entries, is_null_body, - is_redirect, object_get, object_set, object_truthy, ws_protocol, IncomingBody, UriExt, - WasmExecutor, WispTransportRead, WispTransportWrite, + is_redirect, object_get, object_set, object_truthy, ws_protocol, UriExt, WasmExecutor, + WispTransportRead, WispTransportWrite, }; use wasm_bindgen::prelude::*; use wasm_bindgen_futures::JsFuture; @@ -701,7 +702,7 @@ impl EpoxyClient { _ => None, }; - let response_body = IncomingBody::new(response.into_body()).into_async_read(); + let response_body = BodyDataStream::new(response.into_body()).map_err(std::io::Error::other).into_async_read(); let decompressed_body = match compression { Some(alg) => match alg { EpoxyCompression::Gzip => { @@ -719,7 +720,7 @@ impl EpoxyClient { }; } else { let response_stream = if !is_null_body(response.status().as_u16()) { - let response_body = IncomingBody::new(response.into_body()).into_async_read(); + let response_body = BodyDataStream::new(response.into_body()).map_err(std::io::Error::other).into_async_read(); Some(asyncread_to_readablestream(Box::pin(response_body), self.buffer_size)) } else { None diff --git a/client/src/utils.rs b/client/src/utils.rs index bf27ec3..73f08c5 100644 --- a/client/src/utils.rs +++ b/client/src/utils.rs @@ -17,7 +17,7 @@ use futures_rustls::{ }; use futures_util::{ready, AsyncRead, AsyncWrite, Future, Stream, StreamExt, TryStreamExt}; use http::{HeaderValue, Uri}; -use hyper::{body::Body, rt::Executor}; +use hyper::rt::Executor; use js_sys::{Array, ArrayBuffer, JsString, Object, Uint8Array}; use pin_project_lite::pin_project; use rustls_pki_types::{CertificateDer, ServerName, UnixTime}; @@ -90,34 +90,6 @@ where } } -pin_project! { - pub struct IncomingBody { - #[pin] - incoming: hyper::body::Incoming, - } -} - -impl IncomingBody { - pub fn new(incoming: hyper::body::Incoming) -> IncomingBody { - IncomingBody { incoming } - } -} - -impl Stream for IncomingBody { - type Item = std::io::Result; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().incoming.poll_frame(cx).map(|x| { - x.map(|x| { - x.map_err(std::io::Error::other).and_then(|x| { - x.into_data().map_err(|_| { - std::io::Error::other("trailer frame recieved; not implemented") - }) - }) - }) - }) - } -} - pin_project! { #[derive(Debug)] pub struct ReaderStream {