add duplex streaming support and tests (https://github.com/ading2210/libcurl.js/issues/4)

This commit is contained in:
Toshit Chawda 2024-10-30 15:24:06 -07:00
parent 353f4c9929
commit 6f14338ed6
No known key found for this signature in database
GPG key ID: 91480ED99E2B3D9D
3 changed files with 53 additions and 33 deletions

View file

@ -12,6 +12,7 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox
const should_udp_test = params.has("udp_test"); const should_udp_test = params.has("udp_test");
const should_reconnect_test = params.has("reconnect_test"); const should_reconnect_test = params.has("reconnect_test");
const should_perf2_test = params.has("perf2_test"); const should_perf2_test = params.has("perf2_test");
const should_duplex_test = params.has("duplex_test");
const should_wisptransport = params.has("wisptransport"); const should_wisptransport = params.has("wisptransport");
const test_url = params.get("url") || "https://httpbin.org/get"; const test_url = params.get("url") || "https://httpbin.org/get";
const wisp_url = params.get("wisp") || "ws://localhost:4000/"; const wisp_url = params.get("wisp") || "ws://localhost:4000/";
@ -294,7 +295,7 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox
} }
total_mux_multi = total_mux_multi / num_outer_tests; total_mux_multi = total_mux_multi / num_outer_tests;
log(`total avg mux (${num_outer_tests} tests of ${num_inner_tests} reqs): ${total_mux_multi} ms or ${total_mux_multi / 1000} s`); log(`total avg mux (${num_outer_tests} tests of ${num_inner_tests} reqs): ${total_mux_multi} ms or ${total_mux_multi / 1000} s`);
} else { } else if (should_duplex_test) {
const intervalStream = new ReadableStream({ const intervalStream = new ReadableStream({
start(c) { start(c) {
let count = 0; let count = 0;
@ -310,14 +311,13 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox
}, },
}).pipeThrough(new TextEncoderStream()); }).pipeThrough(new TextEncoderStream());
const resp = await epoxy_client.fetch("https://full-duplex-server.deno.dev/", { const resp = await epoxy_client.fetch("https://httpbin.org/redirect-to?url=https://full-duplex-server.deno.dev", {
method: "POST", method: "POST",
duplex: "half", duplex: "half",
redirect: "manual",
body: intervalStream, body: intervalStream,
}); });
console.log("foo"); console.log("foo", resp);
const reader = resp.body.pipeThrough(new TextDecoderStream()).getReader(); const reader = resp.body.pipeThrough(new TextDecoderStream()).getReader();
@ -328,6 +328,12 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox
} }
console.log("done!"); console.log("done!");
} else {
console.time();
let resp = await epoxy_client.fetch(test_url);
console.log(resp, resp.rawHeaders);
log(await resp.arrayBuffer());
console.timeEnd();
} }
log("done"); log("done");
} catch (err) { } catch (err) {

View file

@ -18,7 +18,7 @@ use http::{
HeaderName, HeaderValue, Method, Request, Response, HeaderName, HeaderValue, Method, Request, Response,
}; };
use http_body::Body; use http_body::Body;
use http_body_util::{BodyDataStream, BodyExt, Full}; use http_body_util::{BodyDataStream, Full};
use hyper::{body::Incoming, Uri}; use hyper::{body::Incoming, Uri};
use hyper_util_wasm::client::legacy::Client; use hyper_util_wasm::client::legacy::Client;
#[cfg(feature = "full")] #[cfg(feature = "full")]
@ -29,8 +29,8 @@ use stream_provider::{StreamProvider, StreamProviderService};
use thiserror::Error; use thiserror::Error;
use utils::{ use utils::{
asyncread_to_readablestream, convert_streaming_body, entries_of_object, from_entries, asyncread_to_readablestream, convert_streaming_body, entries_of_object, from_entries,
is_null_body, is_redirect, object_get, object_set, object_truthy, ws_protocol, is_null_body, is_redirect, object_get, object_set, object_truthy, ws_protocol, StreamingBody,
MaybeStreamingBody, StreamingBody, UriExt, WasmExecutor, WispTransportRead, WispTransportWrite, UriExt, WasmExecutor, WispTransportRead, WispTransportWrite,
}; };
use wasm_bindgen::prelude::*; use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::JsFuture; use wasm_bindgen_futures::JsFuture;
@ -166,6 +166,8 @@ pub enum EpoxyError {
ResponseNewFailed, ResponseNewFailed,
#[error("Failed to convert streaming body: {0}")] #[error("Failed to convert streaming body: {0}")]
StreamingBodyConvertFailed(String), StreamingBodyConvertFailed(String),
#[error("Failed to tee streaming body: {0}")]
StreamingBodyTeeFailed(String),
#[error("Failed to collect streaming body: {0:?} ({0})")] #[error("Failed to collect streaming body: {0:?} ({0})")]
StreamingBodyCollectFailed(Box<dyn Error + Sync + Send>), StreamingBodyCollectFailed(Box<dyn Error + Sync + Send>),
} }
@ -605,15 +607,7 @@ impl EpoxyClient {
.await .await
.map_err(|_| EpoxyError::InvalidRequestBody)?; .map_err(|_| EpoxyError::InvalidRequestBody)?;
body_content_type = content_type; body_content_type = content_type;
if matches!(body, MaybeStreamingBody::Streaming(_)) && request_redirect { body.into_httpbody()?
console_warn!("epoxy: streaming request body + redirect unsupported");
// collect body instead
http_body_util::Either::Right(Full::new(
body.into_httpbody()?.collect().await.map_err(EpoxyError::StreamingBodyCollectFailed)?.to_bytes(),
))
} else {
body.into_httpbody()?
}
} }
None => http_body_util::Either::Right(Full::new(Bytes::new())), None => http_body_util::Either::Right(Full::new(Bytes::new())),
}; };
@ -674,7 +668,6 @@ impl EpoxyClient {
headers_map.insert(CONTENT_LENGTH, 0.into()); headers_map.insert(CONTENT_LENGTH, 0.into());
} }
let (mut response, response_uri, redirected) = self let (mut response, response_uri, redirected) = self
.send_req(request_builder.body(body)?, request_redirect) .send_req(request_builder.body(body)?, request_redirect)
.await?; .await?;

View file

@ -455,7 +455,24 @@ pub enum MaybeStreamingBody {
pub struct StreamingInnerBody( pub struct StreamingInnerBody(
Pin<Box<dyn Stream<Item = Result<http_body::Frame<Bytes>, std::io::Error>> + Send>>, Pin<Box<dyn Stream<Item = Result<http_body::Frame<Bytes>, std::io::Error>> + Send>>,
SendWrapper<ReadableStream>,
); );
impl StreamingInnerBody {
pub fn from_teed(a: ReadableStream, b: ReadableStream) -> Result<Self, EpoxyError> {
let reader = a
.try_into_stream()
.map_err(|x| EpoxyError::StreamingBodyConvertFailed(format!("{:?}", x)))?;
let reader = reader
.then(|x| async {
Ok::<Bytes, JsValue>(Bytes::from(convert_body(x?).await?.0.to_vec()))
})
.map_ok(http_body::Frame::data);
let reader = reader.map_err(|x| std::io::Error::other(format!("{:?}", x)));
let reader = Box::pin(SendWrapper::new(reader));
Ok(Self(reader, SendWrapper::new(b)))
}
}
impl Stream for StreamingInnerBody { impl Stream for StreamingInnerBody {
type Item = Result<http_body::Frame<Bytes>, std::io::Error>; type Item = Result<http_body::Frame<Bytes>, std::io::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
@ -467,8 +484,20 @@ impl Stream for StreamingInnerBody {
} }
impl Clone for StreamingInnerBody { impl Clone for StreamingInnerBody {
fn clone(&self) -> Self { fn clone(&self) -> Self {
console_error!("epoxy internal error: cloning streaming request body is not allowed"); match ReadableStream::from_raw(self.1.as_raw().clone())
unreachable!(); .try_tee()
.map_err(|x| EpoxyError::StreamingBodyTeeFailed(format!("{:?}", x)))
.and_then(|(a, b)| StreamingInnerBody::from_teed(a, b))
{
Ok(x) => x,
Err(x) => {
console_error!(
"epoxy internal error: failed to clone streaming body: {:?}",
x
);
unreachable!("failed to clone streaming body");
}
}
} }
} }
@ -478,21 +507,13 @@ impl MaybeStreamingBody {
pub fn into_httpbody(self) -> Result<StreamingBody, EpoxyError> { pub fn into_httpbody(self) -> Result<StreamingBody, EpoxyError> {
match self { match self {
Self::Streaming(x) => { Self::Streaming(x) => {
let reader = ReadableStream::from_raw(x); let (a, b) = ReadableStream::from_raw(x)
let reader = reader .try_tee()
.try_into_stream() .map_err(|x| EpoxyError::StreamingBodyTeeFailed(format!("{:?}", x)))?;
.map_err(|x| EpoxyError::StreamingBodyConvertFailed(format!("{:?}", x)))?;
let reader = reader Ok(Either::Left(StreamBody::new(
.then(|x| async { StreamingInnerBody::from_teed(a, b)?,
Ok::<Bytes, JsValue>(Bytes::from(convert_body(x?).await?.0.to_vec())) )))
})
.map_ok(http_body::Frame::data);
let reader = reader.map_err(|x| std::io::Error::other(format!("{:?}", x)));
Ok(Either::Left(StreamBody::new(StreamingInnerBody(Box::pin(
SendWrapper::new(reader),
)))))
} }
Self::Static(x) => Ok(Either::Right(Full::new(Bytes::from(x.to_vec())))), Self::Static(x) => Ok(Either::Right(Full::new(Bytes::from(x.to_vec())))),
} }