use http_body_util streaming body

This commit is contained in:
Toshit Chawda 2024-10-29 22:09:15 -07:00
parent 7f3d122108
commit f226dd9939
No known key found for this signature in database
GPG key ID: 91480ED99E2B3D9D
3 changed files with 233 additions and 253 deletions

View file

@ -17,6 +17,7 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox
const wisp_url = params.get("wisp") || "ws://localhost:4000/"; const wisp_url = params.get("wisp") || "ws://localhost:4000/";
const wisp_v1 = params.has("v1"); const wisp_v1 = params.has("v1");
const wisp_udp = params.has("udp_extension"); const wisp_udp = params.has("udp_extension");
const disable_certverif = params.has("disable_certverif");
console.log( console.log(
"%cWASM is significantly slower with DevTools open!", "%cWASM is significantly slower with DevTools open!",
"color:red;font-size:3rem;font-weight:bold" "color:red;font-size:3rem;font-weight:bold"
@ -30,86 +31,112 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox
window.scrollTo(0, document.body.scrollHeight); window.scrollTo(0, document.body.scrollHeight);
} }
await initEpoxy(); try {
let epoxy_client_options = new EpoxyClientOptions(); await initEpoxy();
epoxy_client_options.user_agent = navigator.userAgent; let epoxy_client_options = new EpoxyClientOptions();
epoxy_client_options.wisp_v2 = !wisp_v1; epoxy_client_options.user_agent = navigator.userAgent;
epoxy_client_options.udp_extension_required = wisp_udp; epoxy_client_options.wisp_v2 = !wisp_v1;
epoxy_client_options.udp_extension_required = wisp_udp;
epoxy_client_options.disable_certificate_validation = disable_certverif;
let epoxy_client; let epoxy_client;
log(`connecting to "${wisp_url}"${wisp_v1 ? " with wisp v1" : ""}`); log(`connecting to "${wisp_url}"${wisp_v1 ? " with wisp v1" : ""}`);
if (should_wisptransport) { if (should_wisptransport) {
log("using wisptransport with websocketstream backend"); log("using wisptransport with websocketstream backend");
epoxy_client = new EpoxyClient(async () => { epoxy_client = new EpoxyClient(async () => {
let wss = new WebSocketStream(wisp_url); let wss = new WebSocketStream(wisp_url);
let { readable, writable } = await wss.opened; let { readable, writable } = await wss.opened;
return { read: readable, write: writable }; return { read: readable, write: writable };
}, epoxy_client_options); }, epoxy_client_options);
} else { } else {
epoxy_client = new EpoxyClient(wisp_url, epoxy_client_options); epoxy_client = new EpoxyClient(wisp_url, epoxy_client_options);
}
const tconn0 = performance.now();
await epoxy_client.replace_stream_provider();
const tconn1 = performance.now();
log(`conn establish took ${tconn1 - tconn0} ms or ${(tconn1 - tconn0) / 1000} s`);
// epoxy classes are inspectable
console.log(epoxy_client);
// you can change the user agent and redirect limit in JS
epoxy_client.redirect_limit = 15;
console.log(epoxyInfo);
const test_mux = async (url) => {
const t0 = performance.now();
await epoxy_client.fetch(url).then(r=>r.text());
const t1 = performance.now();
return t1 - t0;
};
const test_native = async (url) => {
const t0 = performance.now();
await fetch(url, { cache: "no-store" }).then(r=>r.text());
const t1 = performance.now();
return t1 - t0;
};
const readableStream = (buffer) => {
return new ReadableStream({
start(controller) {
controller.enqueue(buffer);
controller.close();
}
});
};
if (should_feature_test) {
let formdata = new FormData();
formdata.append("a", "b");
for (const url of [
["https://httpbin.org/post", { method: "POST", body: readableStream((new TextEncoder()).encode("abc")) }],
["https://httpbin.org/get", {}],
[new URL("https://httpbin.org/get"), {}],
["https://httpbin.org/gzip", {}],
["https://httpbin.org/brotli", {}],
["https://httpbin.org/redirect/11", {}],
["https://httpbin.org/redirect/1", { redirect: "manual" }],
["https://httpbin.org/post", { method: "POST", body: new URLSearchParams("a=b") }],
["https://httpbin.org/post", { method: "POST", body: formdata }],
["https://httpbin.org/post", { method: "POST", body: "a" }],
["https://httpbin.org/post", { method: "POST", body: (new TextEncoder()).encode("abc") }],
["https://httpbin.org/get", { headers: { "a": "b", "b": "c" } }],
["https://httpbin.org/get", { headers: new Headers({ "a": "b", "b": "c" }) }]
]) {
let resp = await epoxy_client.fetch(url[0], url[1]);
console.warn(url, resp, Object.fromEntries(resp.headers));
log(await resp.text());
} }
} else if (should_multiparallel_test) {
const num_tests = 10; const tconn0 = performance.now();
let total_mux_minus_native = 0; await epoxy_client.replace_stream_provider();
for (const _ of Array(num_tests).keys()) { const tconn1 = performance.now();
log(`conn establish took ${tconn1 - tconn0} ms or ${(tconn1 - tconn0) / 1000} s`);
// epoxy classes are inspectable
console.log(epoxy_client);
// you can change the user agent and redirect limit in JS
epoxy_client.redirect_limit = 15;
console.log(epoxyInfo);
const test_mux = async (url) => {
const t0 = performance.now();
await epoxy_client.fetch(url).then(r => r.text());
const t1 = performance.now();
return t1 - t0;
};
const test_native = async (url) => {
const t0 = performance.now();
await fetch(url, { cache: "no-store" }).then(r => r.text());
const t1 = performance.now();
return t1 - t0;
};
const readableStream = (buffer) => {
return new ReadableStream({
start(controller) {
controller.enqueue(buffer);
controller.close();
}
});
};
if (should_feature_test) {
let formdata = new FormData();
formdata.append("a", "b");
for (const url of [
["https://httpbin.org/post", { method: "POST", body: readableStream((new TextEncoder()).encode("abc")) }],
["https://httpbin.org/get", {}],
[new URL("https://httpbin.org/get"), {}],
["https://httpbin.org/gzip", {}],
["https://httpbin.org/brotli", {}],
["https://httpbin.org/redirect/11", {}],
["https://httpbin.org/redirect/1", { redirect: "manual" }],
["https://httpbin.org/post", { method: "POST", body: new URLSearchParams("a=b") }],
["https://httpbin.org/post", { method: "POST", body: formdata }],
["https://httpbin.org/post", { method: "POST", body: "a" }],
["https://httpbin.org/post", { method: "POST", body: (new TextEncoder()).encode("abc") }],
["https://httpbin.org/get", { headers: { "a": "b", "b": "c" } }],
["https://httpbin.org/get", { headers: new Headers({ "a": "b", "b": "c" }) }]
]) {
let resp = await epoxy_client.fetch(url[0], url[1]);
console.warn(url, resp, Object.fromEntries(resp.headers));
log(await resp.text());
}
} else if (should_multiparallel_test) {
const num_tests = 10;
let total_mux_minus_native = 0;
for (const _ of Array(num_tests).keys()) {
let total_mux = 0;
await Promise.all([...Array(num_tests).keys()].map(async i => {
log(`running mux test ${i}`);
return await test_mux(test_url);
})).then((vals) => { total_mux = vals.reduce((acc, x) => acc + x, 0) });
total_mux = total_mux / num_tests;
let total_native = 0;
await Promise.all([...Array(num_tests).keys()].map(async i => {
log(`running native test ${i}`);
return await test_native(test_url);
})).then((vals) => { total_native = vals.reduce((acc, x) => acc + x, 0) });
total_native = total_native / num_tests;
log(`avg mux (${num_tests}) took ${total_mux} ms or ${total_mux / 1000} s`);
log(`avg native (${num_tests}) took ${total_native} ms or ${total_native / 1000} s`);
log(`avg mux - avg native (${num_tests}): ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`);
total_mux_minus_native += total_mux - total_native;
}
total_mux_minus_native = total_mux_minus_native / num_tests;
log(`total mux - native (${num_tests} tests of ${num_tests} reqs): ${total_mux_minus_native} ms or ${total_mux_minus_native / 1000} s`);
} else if (should_parallel_test) {
const num_tests = 10;
let total_mux = 0; let total_mux = 0;
await Promise.all([...Array(num_tests).keys()].map(async i => { await Promise.all([...Array(num_tests).keys()].map(async i => {
log(`running mux test ${i}`); log(`running mux test ${i}`);
@ -127,34 +154,34 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox
log(`avg mux (${num_tests}) took ${total_mux} ms or ${total_mux / 1000} s`); log(`avg mux (${num_tests}) took ${total_mux} ms or ${total_mux / 1000} s`);
log(`avg native (${num_tests}) took ${total_native} ms or ${total_native / 1000} s`); log(`avg native (${num_tests}) took ${total_native} ms or ${total_native / 1000} s`);
log(`avg mux - avg native (${num_tests}): ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`); log(`avg mux - avg native (${num_tests}): ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`);
total_mux_minus_native += total_mux - total_native; } else if (should_multiperf_test) {
} const num_tests = 10;
total_mux_minus_native = total_mux_minus_native / num_tests; let total_mux_minus_native = 0;
log(`total mux - native (${num_tests} tests of ${num_tests} reqs): ${total_mux_minus_native} ms or ${total_mux_minus_native / 1000} s`); for (const _ of Array(num_tests).keys()) {
} else if (should_parallel_test) { let total_mux = 0;
const num_tests = 10; for (const i of Array(num_tests).keys()) {
log(`running mux test ${i}`);
total_mux += await test_mux(test_url);
}
total_mux = total_mux / num_tests;
let total_mux = 0; let total_native = 0;
await Promise.all([...Array(num_tests).keys()].map(async i => { for (const i of Array(num_tests).keys()) {
log(`running mux test ${i}`); log(`running native test ${i}`);
return await test_mux(test_url); total_native += await test_native(test_url);
})).then((vals) => { total_mux = vals.reduce((acc, x) => acc + x, 0) }); }
total_mux = total_mux / num_tests; total_native = total_native / num_tests;
let total_native = 0; log(`avg mux (${num_tests}) took ${total_mux} ms or ${total_mux / 1000} s`);
await Promise.all([...Array(num_tests).keys()].map(async i => { log(`avg native (${num_tests}) took ${total_native} ms or ${total_native / 1000} s`);
log(`running native test ${i}`); log(`avg mux - avg native (${num_tests}): ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`);
return await test_native(test_url); total_mux_minus_native += total_mux - total_native;
})).then((vals) => { total_native = vals.reduce((acc, x) => acc + x, 0) }); }
total_native = total_native / num_tests; total_mux_minus_native = total_mux_minus_native / num_tests;
log(`total mux - native (${num_tests} tests of ${num_tests} reqs): ${total_mux_minus_native} ms or ${total_mux_minus_native / 1000} s`);
} else if (should_perf_test) {
const num_tests = 10;
log(`avg mux (${num_tests}) took ${total_mux} ms or ${total_mux / 1000} s`);
log(`avg native (${num_tests}) took ${total_native} ms or ${total_native / 1000} s`);
log(`avg mux - avg native (${num_tests}): ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`);
} else if (should_multiperf_test) {
const num_tests = 10;
let total_mux_minus_native = 0;
for (const _ of Array(num_tests).keys()) {
let total_mux = 0; let total_mux = 0;
for (const i of Array(num_tests).keys()) { for (const i of Array(num_tests).keys()) {
log(`running mux test ${i}`); log(`running mux test ${i}`);
@ -172,131 +199,111 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox
log(`avg mux (${num_tests}) took ${total_mux} ms or ${total_mux / 1000} s`); log(`avg mux (${num_tests}) took ${total_mux} ms or ${total_mux / 1000} s`);
log(`avg native (${num_tests}) took ${total_native} ms or ${total_native / 1000} s`); log(`avg native (${num_tests}) took ${total_native} ms or ${total_native / 1000} s`);
log(`avg mux - avg native (${num_tests}): ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`); log(`avg mux - avg native (${num_tests}): ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`);
total_mux_minus_native += total_mux - total_native; } else if (should_ws_test) {
} let handlers = new EpoxyHandlers(
total_mux_minus_native = total_mux_minus_native / num_tests; () => log("opened"),
log(`total mux - native (${num_tests} tests of ${num_tests} reqs): ${total_mux_minus_native} ms or ${total_mux_minus_native / 1000} s`); () => log("closed"),
} else if (should_perf_test) { err => console.error(err),
const num_tests = 10; msg => log(`got "${msg}"`)
);
let total_mux = 0; let ws = await epoxy_client.connect_websocket(
for (const i of Array(num_tests).keys()) { handlers,
log(`running mux test ${i}`); "wss://echo.websocket.events",
total_mux += await test_mux(test_url); [],
} { "x-header": "abc" },
total_mux = total_mux / num_tests; );
let i = 0;
let total_native = 0;
for (const i of Array(num_tests).keys()) {
log(`running native test ${i}`);
total_native += await test_native(test_url);
}
total_native = total_native / num_tests;
log(`avg mux (${num_tests}) took ${total_mux} ms or ${total_mux / 1000} s`);
log(`avg native (${num_tests}) took ${total_native} ms or ${total_native / 1000} s`);
log(`avg mux - avg native (${num_tests}): ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`);
} else if (should_ws_test) {
let handlers = new EpoxyHandlers(
() => log("opened"),
() => log("closed"),
err => console.error(err),
msg => log(`got "${msg}"`)
);
let ws = await epoxy_client.connect_websocket(
handlers,
"wss://echo.websocket.events",
[],
{ "x-header": "abc" },
);
let i = 0;
while (true) {
log(`sending \`data${i}\``);
await ws.send("data" + i);
i++;
await (new Promise((res, _) => setTimeout(res, 10)));
}
} else if (should_tls_test) {
let decoder = new TextDecoder();
const { read, write } = await epoxy_client.connect_tls(
"google.com:443",
);
const reader = read.getReader();
const writer = write.getWriter();
log("opened");
(async () => {
while (true) { while (true) {
const { value: msg, done } = await reader.read(); log(`sending \`data${i}\``);
if (done || !msg) break; await ws.send("data" + i);
console.log(msg); i++;
log(decoder.decode(msg)) await (new Promise((res, _) => setTimeout(res, 10)));
} }
log("closed"); } else if (should_tls_test) {
})(); let decoder = new TextDecoder();
const { read, write } = await epoxy_client.connect_tls(
"google.com:443",
);
const reader = read.getReader();
const writer = write.getWriter();
await writer.write(new TextEncoder('utf-8').encode("GET / HTTP 1.1\r\nHost: google.com\r\n\r\n")); log("opened");
await (new Promise((res, _) => setTimeout(res, 500)));
await writer.close();
} else if (should_udp_test) {
let decoder = new TextDecoder();
// tokio example: `cargo r --example echo-udp -- 127.0.0.1:5000`
const { read, write } = await epoxy_client.connect_udp(
"127.0.0.1:5000",
);
const reader = read.getReader(); (async () => {
const writer = write.getWriter(); while (true) {
const { value: msg, done } = await reader.read();
if (done || !msg) break;
console.log(msg);
log(decoder.decode(msg))
}
log("closed");
})();
log("opened"); await writer.write(new TextEncoder('utf-8').encode("GET / HTTP 1.1\r\nHost: google.com\r\n\r\n"));
(async () => {
while (true) {
const { value: msg, done } = await reader.read();
if (done || !msg) break;
console.log(msg);
log(decoder.decode(msg))
}
log("closed");
})();
while (true) {
log("sending `data`");
await writer.write(new TextEncoder('utf-8').encode("data"));
await (new Promise((res, _) => setTimeout(res, 100)));
}
} else if (should_reconnect_test) {
while (true) {
try {
await epoxy_client.fetch(test_url);
} catch (e) { console.error(e) }
log("sent req");
await (new Promise((res, _) => setTimeout(res, 500))); await (new Promise((res, _) => setTimeout(res, 500)));
} await writer.close();
} else if (should_perf2_test) { } else if (should_udp_test) {
const num_outer_tests = 10; let decoder = new TextDecoder();
const num_inner_tests = 50; // tokio example: `cargo r --example echo-udp -- 127.0.0.1:5000`
let total_mux_multi = 0; const { read, write } = await epoxy_client.connect_udp(
for (const _ of Array(num_outer_tests).keys()) { "127.0.0.1:5000",
let total_mux = 0; );
await Promise.all([...Array(num_inner_tests).keys()].map(async i => {
log(`running mux test ${i}`);
return await test_mux(test_url);
})).then((vals) => { total_mux = vals.reduce((acc, x) => acc + x, 0) });
total_mux = total_mux / num_inner_tests;
log(`avg mux (${num_inner_tests}) took ${total_mux} ms or ${total_mux / 1000} s`); const reader = read.getReader();
total_mux_multi += total_mux; const writer = write.getWriter();
log("opened");
(async () => {
while (true) {
const { value: msg, done } = await reader.read();
if (done || !msg) break;
console.log(msg);
log(decoder.decode(msg))
}
log("closed");
})();
while (true) {
log("sending `data`");
await writer.write(new TextEncoder('utf-8').encode("data"));
await (new Promise((res, _) => setTimeout(res, 100)));
}
} else if (should_reconnect_test) {
while (true) {
try {
await epoxy_client.fetch(test_url);
} catch (e) { console.error(e) }
log("sent req");
await (new Promise((res, _) => setTimeout(res, 500)));
}
} else if (should_perf2_test) {
const num_outer_tests = 10;
const num_inner_tests = 50;
let total_mux_multi = 0;
for (const _ of Array(num_outer_tests).keys()) {
let total_mux = 0;
await Promise.all([...Array(num_inner_tests).keys()].map(async i => {
log(`running mux test ${i}`);
return await test_mux(test_url);
})).then((vals) => { total_mux = vals.reduce((acc, x) => acc + x, 0) });
total_mux = total_mux / num_inner_tests;
log(`avg mux (${num_inner_tests}) took ${total_mux} ms or ${total_mux / 1000} s`);
total_mux_multi += total_mux;
}
total_mux_multi = total_mux_multi / num_outer_tests;
log(`total avg mux (${num_outer_tests} tests of ${num_inner_tests} reqs): ${total_mux_multi} ms or ${total_mux_multi / 1000} s`);
} else {
console.time();
let resp = await epoxy_client.fetch(test_url);
console.log(resp, resp.rawHeaders);
log(await resp.arrayBuffer());
console.timeEnd();
} }
total_mux_multi = total_mux_multi / num_outer_tests; log("done");
log(`total avg mux (${num_outer_tests} tests of ${num_inner_tests} reqs): ${total_mux_multi} ms or ${total_mux_multi / 1000} s`); } catch (err) {
} else { console.error(err);
console.time(); log(err.stack);
let resp = await epoxy_client.fetch(test_url);
console.log(resp, resp.rawHeaders);
log(await resp.text());
console.timeEnd();
} }
log("done");
})(); })();

View file

@ -17,6 +17,7 @@ use http::{
uri::{InvalidUri, InvalidUriParts}, uri::{InvalidUri, InvalidUriParts},
HeaderName, HeaderValue, Method, Request, Response, HeaderName, HeaderValue, Method, Request, Response,
}; };
use http_body_util::BodyDataStream;
use hyper::{body::Incoming, Uri}; use hyper::{body::Incoming, Uri};
use hyper_util_wasm::client::legacy::Client; use hyper_util_wasm::client::legacy::Client;
#[cfg(feature = "full")] #[cfg(feature = "full")]
@ -27,8 +28,8 @@ use stream_provider::{StreamProvider, StreamProviderService};
use thiserror::Error; use thiserror::Error;
use utils::{ use utils::{
asyncread_to_readablestream, convert_body, entries_of_object, from_entries, is_null_body, asyncread_to_readablestream, convert_body, entries_of_object, from_entries, is_null_body,
is_redirect, object_get, object_set, object_truthy, ws_protocol, IncomingBody, UriExt, is_redirect, object_get, object_set, object_truthy, ws_protocol, UriExt, WasmExecutor,
WasmExecutor, WispTransportRead, WispTransportWrite, WispTransportRead, WispTransportWrite,
}; };
use wasm_bindgen::prelude::*; use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::JsFuture; use wasm_bindgen_futures::JsFuture;
@ -701,7 +702,7 @@ impl EpoxyClient {
_ => None, _ => None,
}; };
let response_body = IncomingBody::new(response.into_body()).into_async_read(); let response_body = BodyDataStream::new(response.into_body()).map_err(std::io::Error::other).into_async_read();
let decompressed_body = match compression { let decompressed_body = match compression {
Some(alg) => match alg { Some(alg) => match alg {
EpoxyCompression::Gzip => { EpoxyCompression::Gzip => {
@ -719,7 +720,7 @@ impl EpoxyClient {
}; };
} else { } else {
let response_stream = if !is_null_body(response.status().as_u16()) { let response_stream = if !is_null_body(response.status().as_u16()) {
let response_body = IncomingBody::new(response.into_body()).into_async_read(); let response_body = BodyDataStream::new(response.into_body()).map_err(std::io::Error::other).into_async_read();
Some(asyncread_to_readablestream(Box::pin(response_body), self.buffer_size)) Some(asyncread_to_readablestream(Box::pin(response_body), self.buffer_size))
} else { } else {
None None

View file

@ -17,7 +17,7 @@ use futures_rustls::{
}; };
use futures_util::{ready, AsyncRead, AsyncWrite, Future, Stream, StreamExt, TryStreamExt}; use futures_util::{ready, AsyncRead, AsyncWrite, Future, Stream, StreamExt, TryStreamExt};
use http::{HeaderValue, Uri}; use http::{HeaderValue, Uri};
use hyper::{body::Body, rt::Executor}; use hyper::rt::Executor;
use js_sys::{Array, ArrayBuffer, JsString, Object, Uint8Array}; use js_sys::{Array, ArrayBuffer, JsString, Object, Uint8Array};
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use rustls_pki_types::{CertificateDer, ServerName, UnixTime}; use rustls_pki_types::{CertificateDer, ServerName, UnixTime};
@ -90,34 +90,6 @@ where
} }
} }
pin_project! {
pub struct IncomingBody {
#[pin]
incoming: hyper::body::Incoming,
}
}
impl IncomingBody {
pub fn new(incoming: hyper::body::Incoming) -> IncomingBody {
IncomingBody { incoming }
}
}
impl Stream for IncomingBody {
type Item = std::io::Result<Bytes>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().incoming.poll_frame(cx).map(|x| {
x.map(|x| {
x.map_err(std::io::Error::other).and_then(|x| {
x.into_data().map_err(|_| {
std::io::Error::other("trailer frame recieved; not implemented")
})
})
})
})
}
}
pin_project! { pin_project! {
#[derive(Debug)] #[derive(Debug)]
pub struct ReaderStream<R> { pub struct ReaderStream<R> {