diff --git a/Cargo.lock b/Cargo.lock index 8503e63..ad9e295 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -696,6 +696,7 @@ dependencies = [ "futures-util", "getrandom", "http", + "http-body", "http-body-util", "hyper", "hyper-util-wasm", @@ -1197,9 +1198,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.9" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41296eb09f183ac68eec06e03cdbea2e759633d4067b2f6552fc2e009bcad08b" +checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4" dependencies = [ "bytes", "futures-channel", @@ -1876,9 +1877,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.15" +version = "0.23.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fbb44d7acc4e873d613422379f69f237a1b141928c02f6bc6ccfddddc2d7993" +checksum = "eee87ff5d9b36712a58574e12e9f0ea80f915a5b0ac518d322b24a465617925e" dependencies = [ "once_cell", "ring", @@ -1952,18 +1953,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.213" +version = "1.0.214" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ea7893ff5e2466df8d720bb615088341b295f849602c6956047f8f80f0e9bc1" +checksum = "f55c3193aca71c12ad7890f1785d2b73e1b9f63a0bbc353c08ef26fe03fc56b5" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.213" +version = "1.0.214" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e85ad2009c50b58e87caa8cd6dac16bdf511bbfb7af6c33df902396aa480fa5" +checksum = "de523f781f095e28fa605cdce0f8307e451cc0fd14e2eb4cd2e98a355b147766" dependencies = [ "proc-macro2", "quote", diff --git a/client/Cargo.toml b/client/Cargo.toml index e7e144b..e108a47 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -17,6 +17,7 @@ flume = "0.11.0" futures-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"] } futures-util = { version = "0.3.30", features = ["sink"] } http = "1.1.0" +http-body = "1.0.1" http-body-util = "0.1.2" hyper = "1.4.1" hyper-util-wasm = { git = "https://github.com/r58Playz/hyper-util-wasm", branch = "opinionated", version = "0.1.7", features = ["client-legacy", "http1"] } diff --git a/client/demo.js b/client/demo.js index 2d6e485..17a5d0f 100644 --- a/client/demo.js +++ b/client/demo.js @@ -295,11 +295,39 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox total_mux_multi = total_mux_multi / num_outer_tests; log(`total avg mux (${num_outer_tests} tests of ${num_inner_tests} reqs): ${total_mux_multi} ms or ${total_mux_multi / 1000} s`); } else { - console.time(); - let resp = await epoxy_client.fetch(test_url); - console.log(resp, resp.rawHeaders); - log(await resp.arrayBuffer()); - console.timeEnd(); + const intervalStream = new ReadableStream({ + start(c) { + let count = 0; + const timer = setInterval(() => { + console.log("sent!"); + c.enqueue("Hello\n"); + if (count === 5) { + clearInterval(timer); + c.close(); + } + count++; + }, 1000); + }, + }).pipeThrough(new TextEncoderStream()); + + const resp = await epoxy_client.fetch("https://full-duplex-server.deno.dev/", { + method: "POST", + duplex: "half", + redirect: "manual", + body: intervalStream, + }); + + console.log("foo"); + + const reader = resp.body.pipeThrough(new TextDecoderStream()).getReader(); + + while (true) { + const { value, done } = await reader.read(); + if (done) break; + console.log(value); + } + + console.log("done!"); } log("done"); } catch (err) { diff --git a/client/src/lib.rs b/client/src/lib.rs index 19a74b3..52efb59 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1,5 +1,5 @@ #![feature(let_chains, impl_trait_in_assoc_type)] -use std::{str::FromStr, sync::Arc}; +use std::{error::Error, str::FromStr, sync::Arc}; #[cfg(feature = "full")] use async_compression::futures::bufread as async_comp; @@ -11,13 +11,14 @@ use futures_util::TryStreamExt; use http::{ header::{ InvalidHeaderName, InvalidHeaderValue, ACCEPT_ENCODING, CONNECTION, CONTENT_LENGTH, - CONTENT_TYPE, USER_AGENT, + CONTENT_TYPE, LOCATION, USER_AGENT, }, method::InvalidMethod, uri::{InvalidUri, InvalidUriParts}, HeaderName, HeaderValue, Method, Request, Response, }; -use http_body_util::BodyDataStream; +use http_body::Body; +use http_body_util::{BodyDataStream, BodyExt, Full}; use hyper::{body::Incoming, Uri}; use hyper_util_wasm::client::legacy::Client; #[cfg(feature = "full")] @@ -27,9 +28,9 @@ use send_wrapper::SendWrapper; use stream_provider::{StreamProvider, StreamProviderService}; use thiserror::Error; use utils::{ - asyncread_to_readablestream, convert_body, entries_of_object, from_entries, is_null_body, - is_redirect, object_get, object_set, object_truthy, ws_protocol, UriExt, WasmExecutor, - WispTransportRead, WispTransportWrite, + asyncread_to_readablestream, convert_streaming_body, entries_of_object, from_entries, + is_null_body, is_redirect, object_get, object_set, object_truthy, ws_protocol, + MaybeStreamingBody, StreamingBody, UriExt, WasmExecutor, WispTransportRead, WispTransportWrite, }; use wasm_bindgen::prelude::*; use wasm_bindgen_futures::JsFuture; @@ -92,8 +93,6 @@ impl TryFrom for Uri { } } -type HttpBody = http_body_util::Full; - #[derive(Debug, Error)] pub enum EpoxyError { #[error("Invalid DNS name: {0:?} ({0})")] @@ -165,6 +164,10 @@ pub enum EpoxyError { ResponseHeadersFromEntriesFailed, #[error("Failed to construct response object")] ResponseNewFailed, + #[error("Failed to convert streaming body: {0}")] + StreamingBodyConvertFailed(String), + #[error("Failed to collect streaming body: {0:?} ({0})")] + StreamingBodyCollectFailed(Box), } impl EpoxyError { @@ -219,10 +222,9 @@ impl From for EpoxyError { } } -#[derive(Debug)] enum EpoxyResponse { Success(Response), - Redirect((Response, http::Request)), + Redirect((Response, http::Request)), } #[cfg(feature = "full")] @@ -327,7 +329,7 @@ impl EpoxyHandlers { #[wasm_bindgen(inspectable)] pub struct EpoxyClient { stream_provider: Arc, - client: Client, + client: Client, certs_tampered: bool, @@ -520,7 +522,7 @@ impl EpoxyClient { async fn send_req_inner( &self, - req: http::Request, + req: http::Request, should_redirect: bool, ) -> Result { let new_req = if should_redirect { @@ -534,7 +536,7 @@ impl EpoxyClient { Ok(res) => { if is_redirect(res.status().as_u16()) && let Some(mut new_req) = new_req - && let Some(location) = res.headers().get("Location") + && let Some(location) = res.headers().get(LOCATION) && let Ok(redirect_url) = new_req.uri().get_redirect(location) { *new_req.uri_mut() = redirect_url; @@ -549,7 +551,7 @@ impl EpoxyClient { async fn send_req( &self, - req: http::Request, + req: http::Request, should_redirect: bool, ) -> Result<(hyper::Response, Uri, bool), EpoxyError> { let mut redirected = false; @@ -599,13 +601,21 @@ impl EpoxyClient { let mut body_content_type: Option = None; let body = match object_truthy(object_get(&options, "body")) { Some(buf) => { - let (body, content_type) = convert_body(buf) + let (body, content_type) = convert_streaming_body(buf) .await .map_err(|_| EpoxyError::InvalidRequestBody)?; body_content_type = content_type; - Bytes::from(body.to_vec()) + if matches!(body, MaybeStreamingBody::Streaming(_)) && request_redirect { + console_warn!("epoxy: streaming request body + redirect unsupported"); + // collect body instead + http_body_util::Either::Right(Full::new( + body.into_httpbody()?.collect().await.map_err(EpoxyError::StreamingBodyCollectFailed)?.to_bytes(), + )) + } else { + body.into_httpbody()? + } } - None => Bytes::new(), + None => http_body_util::Either::Right(Full::new(Bytes::new())), }; let headers = object_truthy(object_get(&options, "headers")).and_then(|val| { @@ -654,12 +664,19 @@ impl EpoxyClient { } } - if matches!(request_method, Method::POST | Method::PUT | Method::PATCH) && body.is_empty() { + let body_empty = if let http_body_util::Either::Right(ref x) = body { + x.size_hint().exact().unwrap_or(1) == 0 + } else { + false + }; + + if matches!(request_method, Method::POST | Method::PUT | Method::PATCH) && body_empty { headers_map.insert(CONTENT_LENGTH, 0.into()); } + let (mut response, response_uri, redirected) = self - .send_req(request_builder.body(HttpBody::new(body))?, request_redirect) + .send_req(request_builder.body(body)?, request_redirect) .await?; if self.certs_tampered { diff --git a/client/src/utils.rs b/client/src/utils.rs index 73f08c5..c9f2b1f 100644 --- a/client/src/utils.rs +++ b/client/src/utils.rs @@ -17,6 +17,7 @@ use futures_rustls::{ }; use futures_util::{ready, AsyncRead, AsyncWrite, Future, Stream, StreamExt, TryStreamExt}; use http::{HeaderValue, Uri}; +use http_body_util::{Either, Full, StreamBody}; use hyper::rt::Executor; use js_sys::{Array, ArrayBuffer, JsString, Object, Uint8Array}; use pin_project_lite::pin_project; @@ -38,6 +39,9 @@ extern "C" { #[wasm_bindgen(js_namespace = console, js_name = log)] pub fn js_console_log(s: &str); + #[wasm_bindgen(js_namespace = console, js_name = warn)] + pub fn js_console_warn(s: &str); + #[wasm_bindgen(js_namespace = console, js_name = error)] pub fn js_console_error(s: &str); } @@ -48,6 +52,12 @@ macro_rules! console_log { $crate::utils::js_console_log(&format!($($expr),*)); }; } +#[macro_export] +macro_rules! console_warn { + ($($expr:expr),*) => { + $crate::utils::js_console_warn(&format!($($expr),*)); + }; +} #[macro_export] macro_rules! console_error { @@ -375,6 +385,18 @@ export async function convert_body_inner(body) { return [new Uint8Array(await req.arrayBuffer()), type]; } +export async function convert_streaming_body_inner(body) { + try { + let req = new Request("", { method: "POST", body }); + let type = req.headers.get("content-type"); + return [false, new Uint8Array(await req.arrayBuffer()), type]; + } catch(x) { + let req = new Request("", { method: "POST", duplex: "half", body }); + let type = req.headers.get("content-type"); + return [true, req.body, type]; + } +} + export function entries_of_object_inner(obj) { return Object.entries(obj).map(x => x.map(String)); } @@ -408,6 +430,8 @@ extern "C" { #[wasm_bindgen(catch)] async fn convert_body_inner(val: JsValue) -> Result; + #[wasm_bindgen(catch)] + async fn convert_streaming_body_inner(val: JsValue) -> Result; fn entries_of_object_inner(obj: &Object) -> Vec; pub fn define_property(obj: &Object, key: &str, val: JsValue); @@ -420,8 +444,74 @@ extern "C" { pub async fn convert_body(val: JsValue) -> Result<(Uint8Array, Option), JsValue> { let req: Array = convert_body_inner(val).await?.unchecked_into(); - let str: Option = object_truthy(req.at(1)).map(|x| x.unchecked_into()); - Ok((req.at(0).unchecked_into(), str.map(Into::into))) + let content_type: Option = object_truthy(req.at(1)).map(|x| x.unchecked_into()); + Ok((req.at(0).unchecked_into(), content_type.map(Into::into))) +} + +pub enum MaybeStreamingBody { + Streaming(web_sys::ReadableStream), + Static(Uint8Array), +} + +pub struct StreamingInnerBody( + Pin, std::io::Error>> + Send>>, +); +impl Stream for StreamingInnerBody { + type Item = Result, std::io::Error>; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.0.poll_next_unpin(cx) + } + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } +} +impl Clone for StreamingInnerBody { + fn clone(&self) -> Self { + console_error!("epoxy internal error: cloning streaming request body is not allowed"); + unreachable!(); + } +} + +pub type StreamingBody = Either, Full>; + +impl MaybeStreamingBody { + pub fn into_httpbody(self) -> Result { + match self { + Self::Streaming(x) => { + let reader = ReadableStream::from_raw(x); + let reader = reader + .try_into_stream() + .map_err(|x| EpoxyError::StreamingBodyConvertFailed(format!("{:?}", x)))?; + + let reader = reader + .then(|x| async { + Ok::(Bytes::from(convert_body(x?).await?.0.to_vec())) + }) + .map_ok(http_body::Frame::data); + let reader = reader.map_err(|x| std::io::Error::other(format!("{:?}", x))); + + Ok(Either::Left(StreamBody::new(StreamingInnerBody(Box::pin( + SendWrapper::new(reader), + ))))) + } + Self::Static(x) => Ok(Either::Right(Full::new(Bytes::from(x.to_vec())))), + } + } +} + +pub async fn convert_streaming_body( + val: JsValue, +) -> Result<(MaybeStreamingBody, Option), JsValue> { + let req: Array = convert_streaming_body_inner(val).await?.unchecked_into(); + let content_type: Option = object_truthy(req.at(2)).map(|x| x.unchecked_into()); + + let body = if req.at(0).is_truthy() { + MaybeStreamingBody::Streaming(req.at(1).unchecked_into()) + } else { + MaybeStreamingBody::Static(req.at(1).unchecked_into()) + }; + + Ok((body, content_type.map(Into::into))) } pub fn entries_of_object(obj: &Object) -> Vec> {