diff --git a/client/demo.js b/client/demo.js index 17a5d0f..9c678d9 100644 --- a/client/demo.js +++ b/client/demo.js @@ -12,6 +12,7 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox const should_udp_test = params.has("udp_test"); const should_reconnect_test = params.has("reconnect_test"); const should_perf2_test = params.has("perf2_test"); + const should_duplex_test = params.has("duplex_test"); const should_wisptransport = params.has("wisptransport"); const test_url = params.get("url") || "https://httpbin.org/get"; const wisp_url = params.get("wisp") || "ws://localhost:4000/"; @@ -294,7 +295,7 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox } total_mux_multi = total_mux_multi / num_outer_tests; log(`total avg mux (${num_outer_tests} tests of ${num_inner_tests} reqs): ${total_mux_multi} ms or ${total_mux_multi / 1000} s`); - } else { + } else if (should_duplex_test) { const intervalStream = new ReadableStream({ start(c) { let count = 0; @@ -310,14 +311,13 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox }, }).pipeThrough(new TextEncoderStream()); - const resp = await epoxy_client.fetch("https://full-duplex-server.deno.dev/", { + const resp = await epoxy_client.fetch("https://httpbin.org/redirect-to?url=https://full-duplex-server.deno.dev", { method: "POST", duplex: "half", - redirect: "manual", body: intervalStream, }); - console.log("foo"); + console.log("foo", resp); const reader = resp.body.pipeThrough(new TextDecoderStream()).getReader(); @@ -328,6 +328,12 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox } console.log("done!"); + } else { + console.time(); + let resp = await epoxy_client.fetch(test_url); + console.log(resp, resp.rawHeaders); + log(await resp.arrayBuffer()); + console.timeEnd(); } log("done"); } catch (err) { diff --git a/client/src/lib.rs b/client/src/lib.rs index 52efb59..91c3fc3 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -18,7 +18,7 @@ use http::{ HeaderName, HeaderValue, Method, Request, Response, }; use http_body::Body; -use http_body_util::{BodyDataStream, BodyExt, Full}; +use http_body_util::{BodyDataStream, Full}; use hyper::{body::Incoming, Uri}; use hyper_util_wasm::client::legacy::Client; #[cfg(feature = "full")] @@ -29,8 +29,8 @@ use stream_provider::{StreamProvider, StreamProviderService}; use thiserror::Error; use utils::{ asyncread_to_readablestream, convert_streaming_body, entries_of_object, from_entries, - is_null_body, is_redirect, object_get, object_set, object_truthy, ws_protocol, - MaybeStreamingBody, StreamingBody, UriExt, WasmExecutor, WispTransportRead, WispTransportWrite, + is_null_body, is_redirect, object_get, object_set, object_truthy, ws_protocol, StreamingBody, + UriExt, WasmExecutor, WispTransportRead, WispTransportWrite, }; use wasm_bindgen::prelude::*; use wasm_bindgen_futures::JsFuture; @@ -166,6 +166,8 @@ pub enum EpoxyError { ResponseNewFailed, #[error("Failed to convert streaming body: {0}")] StreamingBodyConvertFailed(String), + #[error("Failed to tee streaming body: {0}")] + StreamingBodyTeeFailed(String), #[error("Failed to collect streaming body: {0:?} ({0})")] StreamingBodyCollectFailed(Box), } @@ -605,15 +607,7 @@ impl EpoxyClient { .await .map_err(|_| EpoxyError::InvalidRequestBody)?; body_content_type = content_type; - if matches!(body, MaybeStreamingBody::Streaming(_)) && request_redirect { - console_warn!("epoxy: streaming request body + redirect unsupported"); - // collect body instead - http_body_util::Either::Right(Full::new( - body.into_httpbody()?.collect().await.map_err(EpoxyError::StreamingBodyCollectFailed)?.to_bytes(), - )) - } else { - body.into_httpbody()? - } + body.into_httpbody()? } None => http_body_util::Either::Right(Full::new(Bytes::new())), }; @@ -674,7 +668,6 @@ impl EpoxyClient { headers_map.insert(CONTENT_LENGTH, 0.into()); } - let (mut response, response_uri, redirected) = self .send_req(request_builder.body(body)?, request_redirect) .await?; diff --git a/client/src/utils.rs b/client/src/utils.rs index c9f2b1f..d343fef 100644 --- a/client/src/utils.rs +++ b/client/src/utils.rs @@ -455,7 +455,24 @@ pub enum MaybeStreamingBody { pub struct StreamingInnerBody( Pin, std::io::Error>> + Send>>, + SendWrapper, ); +impl StreamingInnerBody { + pub fn from_teed(a: ReadableStream, b: ReadableStream) -> Result { + let reader = a + .try_into_stream() + .map_err(|x| EpoxyError::StreamingBodyConvertFailed(format!("{:?}", x)))?; + let reader = reader + .then(|x| async { + Ok::(Bytes::from(convert_body(x?).await?.0.to_vec())) + }) + .map_ok(http_body::Frame::data); + let reader = reader.map_err(|x| std::io::Error::other(format!("{:?}", x))); + let reader = Box::pin(SendWrapper::new(reader)); + + Ok(Self(reader, SendWrapper::new(b))) + } +} impl Stream for StreamingInnerBody { type Item = Result, std::io::Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -467,8 +484,20 @@ impl Stream for StreamingInnerBody { } impl Clone for StreamingInnerBody { fn clone(&self) -> Self { - console_error!("epoxy internal error: cloning streaming request body is not allowed"); - unreachable!(); + match ReadableStream::from_raw(self.1.as_raw().clone()) + .try_tee() + .map_err(|x| EpoxyError::StreamingBodyTeeFailed(format!("{:?}", x))) + .and_then(|(a, b)| StreamingInnerBody::from_teed(a, b)) + { + Ok(x) => x, + Err(x) => { + console_error!( + "epoxy internal error: failed to clone streaming body: {:?}", + x + ); + unreachable!("failed to clone streaming body"); + } + } } } @@ -478,21 +507,13 @@ impl MaybeStreamingBody { pub fn into_httpbody(self) -> Result { match self { Self::Streaming(x) => { - let reader = ReadableStream::from_raw(x); - let reader = reader - .try_into_stream() - .map_err(|x| EpoxyError::StreamingBodyConvertFailed(format!("{:?}", x)))?; + let (a, b) = ReadableStream::from_raw(x) + .try_tee() + .map_err(|x| EpoxyError::StreamingBodyTeeFailed(format!("{:?}", x)))?; - let reader = reader - .then(|x| async { - Ok::(Bytes::from(convert_body(x?).await?.0.to_vec())) - }) - .map_ok(http_body::Frame::data); - let reader = reader.map_err(|x| std::io::Error::other(format!("{:?}", x))); - - Ok(Either::Left(StreamBody::new(StreamingInnerBody(Box::pin( - SendWrapper::new(reader), - ))))) + Ok(Either::Left(StreamBody::new( + StreamingInnerBody::from_teed(a, b)?, + ))) } Self::Static(x) => Ok(Either::Right(Full::new(Bytes::from(x.to_vec())))), }