mirror of
https://github.com/MercuryWorkshop/epoxy-tls.git
synced 2025-05-12 22:10:01 -04:00
streaming body test
This commit is contained in:
parent
f226dd9939
commit
353f4c9929
5 changed files with 171 additions and 34 deletions
17
Cargo.lock
generated
17
Cargo.lock
generated
|
@ -696,6 +696,7 @@ dependencies = [
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"getrandom",
|
"getrandom",
|
||||||
"http",
|
"http",
|
||||||
|
"http-body",
|
||||||
"http-body-util",
|
"http-body-util",
|
||||||
"hyper",
|
"hyper",
|
||||||
"hyper-util-wasm",
|
"hyper-util-wasm",
|
||||||
|
@ -1197,9 +1198,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hyper-util"
|
name = "hyper-util"
|
||||||
version = "0.1.9"
|
version = "0.1.10"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "41296eb09f183ac68eec06e03cdbea2e759633d4067b2f6552fc2e009bcad08b"
|
checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bytes",
|
"bytes",
|
||||||
"futures-channel",
|
"futures-channel",
|
||||||
|
@ -1876,9 +1877,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rustls"
|
name = "rustls"
|
||||||
version = "0.23.15"
|
version = "0.23.16"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "5fbb44d7acc4e873d613422379f69f237a1b141928c02f6bc6ccfddddc2d7993"
|
checksum = "eee87ff5d9b36712a58574e12e9f0ea80f915a5b0ac518d322b24a465617925e"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"ring",
|
"ring",
|
||||||
|
@ -1952,18 +1953,18 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde"
|
name = "serde"
|
||||||
version = "1.0.213"
|
version = "1.0.214"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "3ea7893ff5e2466df8d720bb615088341b295f849602c6956047f8f80f0e9bc1"
|
checksum = "f55c3193aca71c12ad7890f1785d2b73e1b9f63a0bbc353c08ef26fe03fc56b5"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"serde_derive",
|
"serde_derive",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde_derive"
|
name = "serde_derive"
|
||||||
version = "1.0.213"
|
version = "1.0.214"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "7e85ad2009c50b58e87caa8cd6dac16bdf511bbfb7af6c33df902396aa480fa5"
|
checksum = "de523f781f095e28fa605cdce0f8307e451cc0fd14e2eb4cd2e98a355b147766"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
|
|
|
@ -17,6 +17,7 @@ flume = "0.11.0"
|
||||||
futures-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"] }
|
futures-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"] }
|
||||||
futures-util = { version = "0.3.30", features = ["sink"] }
|
futures-util = { version = "0.3.30", features = ["sink"] }
|
||||||
http = "1.1.0"
|
http = "1.1.0"
|
||||||
|
http-body = "1.0.1"
|
||||||
http-body-util = "0.1.2"
|
http-body-util = "0.1.2"
|
||||||
hyper = "1.4.1"
|
hyper = "1.4.1"
|
||||||
hyper-util-wasm = { git = "https://github.com/r58Playz/hyper-util-wasm", branch = "opinionated", version = "0.1.7", features = ["client-legacy", "http1"] }
|
hyper-util-wasm = { git = "https://github.com/r58Playz/hyper-util-wasm", branch = "opinionated", version = "0.1.7", features = ["client-legacy", "http1"] }
|
||||||
|
|
|
@ -295,11 +295,39 @@ import initEpoxy, { EpoxyClient, EpoxyClientOptions, EpoxyHandlers, info as epox
|
||||||
total_mux_multi = total_mux_multi / num_outer_tests;
|
total_mux_multi = total_mux_multi / num_outer_tests;
|
||||||
log(`total avg mux (${num_outer_tests} tests of ${num_inner_tests} reqs): ${total_mux_multi} ms or ${total_mux_multi / 1000} s`);
|
log(`total avg mux (${num_outer_tests} tests of ${num_inner_tests} reqs): ${total_mux_multi} ms or ${total_mux_multi / 1000} s`);
|
||||||
} else {
|
} else {
|
||||||
console.time();
|
const intervalStream = new ReadableStream({
|
||||||
let resp = await epoxy_client.fetch(test_url);
|
start(c) {
|
||||||
console.log(resp, resp.rawHeaders);
|
let count = 0;
|
||||||
log(await resp.arrayBuffer());
|
const timer = setInterval(() => {
|
||||||
console.timeEnd();
|
console.log("sent!");
|
||||||
|
c.enqueue("Hello\n");
|
||||||
|
if (count === 5) {
|
||||||
|
clearInterval(timer);
|
||||||
|
c.close();
|
||||||
|
}
|
||||||
|
count++;
|
||||||
|
}, 1000);
|
||||||
|
},
|
||||||
|
}).pipeThrough(new TextEncoderStream());
|
||||||
|
|
||||||
|
const resp = await epoxy_client.fetch("https://full-duplex-server.deno.dev/", {
|
||||||
|
method: "POST",
|
||||||
|
duplex: "half",
|
||||||
|
redirect: "manual",
|
||||||
|
body: intervalStream,
|
||||||
|
});
|
||||||
|
|
||||||
|
console.log("foo");
|
||||||
|
|
||||||
|
const reader = resp.body.pipeThrough(new TextDecoderStream()).getReader();
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
const { value, done } = await reader.read();
|
||||||
|
if (done) break;
|
||||||
|
console.log(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log("done!");
|
||||||
}
|
}
|
||||||
log("done");
|
log("done");
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
#![feature(let_chains, impl_trait_in_assoc_type)]
|
#![feature(let_chains, impl_trait_in_assoc_type)]
|
||||||
use std::{str::FromStr, sync::Arc};
|
use std::{error::Error, str::FromStr, sync::Arc};
|
||||||
|
|
||||||
#[cfg(feature = "full")]
|
#[cfg(feature = "full")]
|
||||||
use async_compression::futures::bufread as async_comp;
|
use async_compression::futures::bufread as async_comp;
|
||||||
|
@ -11,13 +11,14 @@ use futures_util::TryStreamExt;
|
||||||
use http::{
|
use http::{
|
||||||
header::{
|
header::{
|
||||||
InvalidHeaderName, InvalidHeaderValue, ACCEPT_ENCODING, CONNECTION, CONTENT_LENGTH,
|
InvalidHeaderName, InvalidHeaderValue, ACCEPT_ENCODING, CONNECTION, CONTENT_LENGTH,
|
||||||
CONTENT_TYPE, USER_AGENT,
|
CONTENT_TYPE, LOCATION, USER_AGENT,
|
||||||
},
|
},
|
||||||
method::InvalidMethod,
|
method::InvalidMethod,
|
||||||
uri::{InvalidUri, InvalidUriParts},
|
uri::{InvalidUri, InvalidUriParts},
|
||||||
HeaderName, HeaderValue, Method, Request, Response,
|
HeaderName, HeaderValue, Method, Request, Response,
|
||||||
};
|
};
|
||||||
use http_body_util::BodyDataStream;
|
use http_body::Body;
|
||||||
|
use http_body_util::{BodyDataStream, BodyExt, Full};
|
||||||
use hyper::{body::Incoming, Uri};
|
use hyper::{body::Incoming, Uri};
|
||||||
use hyper_util_wasm::client::legacy::Client;
|
use hyper_util_wasm::client::legacy::Client;
|
||||||
#[cfg(feature = "full")]
|
#[cfg(feature = "full")]
|
||||||
|
@ -27,9 +28,9 @@ use send_wrapper::SendWrapper;
|
||||||
use stream_provider::{StreamProvider, StreamProviderService};
|
use stream_provider::{StreamProvider, StreamProviderService};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use utils::{
|
use utils::{
|
||||||
asyncread_to_readablestream, convert_body, entries_of_object, from_entries, is_null_body,
|
asyncread_to_readablestream, convert_streaming_body, entries_of_object, from_entries,
|
||||||
is_redirect, object_get, object_set, object_truthy, ws_protocol, UriExt, WasmExecutor,
|
is_null_body, is_redirect, object_get, object_set, object_truthy, ws_protocol,
|
||||||
WispTransportRead, WispTransportWrite,
|
MaybeStreamingBody, StreamingBody, UriExt, WasmExecutor, WispTransportRead, WispTransportWrite,
|
||||||
};
|
};
|
||||||
use wasm_bindgen::prelude::*;
|
use wasm_bindgen::prelude::*;
|
||||||
use wasm_bindgen_futures::JsFuture;
|
use wasm_bindgen_futures::JsFuture;
|
||||||
|
@ -92,8 +93,6 @@ impl TryFrom<EpoxyUrlInput> for Uri {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type HttpBody = http_body_util::Full<Bytes>;
|
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
pub enum EpoxyError {
|
pub enum EpoxyError {
|
||||||
#[error("Invalid DNS name: {0:?} ({0})")]
|
#[error("Invalid DNS name: {0:?} ({0})")]
|
||||||
|
@ -165,6 +164,10 @@ pub enum EpoxyError {
|
||||||
ResponseHeadersFromEntriesFailed,
|
ResponseHeadersFromEntriesFailed,
|
||||||
#[error("Failed to construct response object")]
|
#[error("Failed to construct response object")]
|
||||||
ResponseNewFailed,
|
ResponseNewFailed,
|
||||||
|
#[error("Failed to convert streaming body: {0}")]
|
||||||
|
StreamingBodyConvertFailed(String),
|
||||||
|
#[error("Failed to collect streaming body: {0:?} ({0})")]
|
||||||
|
StreamingBodyCollectFailed(Box<dyn Error + Sync + Send>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EpoxyError {
|
impl EpoxyError {
|
||||||
|
@ -219,10 +222,9 @@ impl From<CloseReason> for EpoxyError {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
enum EpoxyResponse {
|
enum EpoxyResponse {
|
||||||
Success(Response<Incoming>),
|
Success(Response<Incoming>),
|
||||||
Redirect((Response<Incoming>, http::Request<HttpBody>)),
|
Redirect((Response<Incoming>, http::Request<StreamingBody>)),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "full")]
|
#[cfg(feature = "full")]
|
||||||
|
@ -327,7 +329,7 @@ impl EpoxyHandlers {
|
||||||
#[wasm_bindgen(inspectable)]
|
#[wasm_bindgen(inspectable)]
|
||||||
pub struct EpoxyClient {
|
pub struct EpoxyClient {
|
||||||
stream_provider: Arc<StreamProvider>,
|
stream_provider: Arc<StreamProvider>,
|
||||||
client: Client<StreamProviderService, HttpBody>,
|
client: Client<StreamProviderService, StreamingBody>,
|
||||||
|
|
||||||
certs_tampered: bool,
|
certs_tampered: bool,
|
||||||
|
|
||||||
|
@ -520,7 +522,7 @@ impl EpoxyClient {
|
||||||
|
|
||||||
async fn send_req_inner(
|
async fn send_req_inner(
|
||||||
&self,
|
&self,
|
||||||
req: http::Request<HttpBody>,
|
req: http::Request<StreamingBody>,
|
||||||
should_redirect: bool,
|
should_redirect: bool,
|
||||||
) -> Result<EpoxyResponse, EpoxyError> {
|
) -> Result<EpoxyResponse, EpoxyError> {
|
||||||
let new_req = if should_redirect {
|
let new_req = if should_redirect {
|
||||||
|
@ -534,7 +536,7 @@ impl EpoxyClient {
|
||||||
Ok(res) => {
|
Ok(res) => {
|
||||||
if is_redirect(res.status().as_u16())
|
if is_redirect(res.status().as_u16())
|
||||||
&& let Some(mut new_req) = new_req
|
&& let Some(mut new_req) = new_req
|
||||||
&& let Some(location) = res.headers().get("Location")
|
&& let Some(location) = res.headers().get(LOCATION)
|
||||||
&& let Ok(redirect_url) = new_req.uri().get_redirect(location)
|
&& let Ok(redirect_url) = new_req.uri().get_redirect(location)
|
||||||
{
|
{
|
||||||
*new_req.uri_mut() = redirect_url;
|
*new_req.uri_mut() = redirect_url;
|
||||||
|
@ -549,7 +551,7 @@ impl EpoxyClient {
|
||||||
|
|
||||||
async fn send_req(
|
async fn send_req(
|
||||||
&self,
|
&self,
|
||||||
req: http::Request<HttpBody>,
|
req: http::Request<StreamingBody>,
|
||||||
should_redirect: bool,
|
should_redirect: bool,
|
||||||
) -> Result<(hyper::Response<Incoming>, Uri, bool), EpoxyError> {
|
) -> Result<(hyper::Response<Incoming>, Uri, bool), EpoxyError> {
|
||||||
let mut redirected = false;
|
let mut redirected = false;
|
||||||
|
@ -599,13 +601,21 @@ impl EpoxyClient {
|
||||||
let mut body_content_type: Option<String> = None;
|
let mut body_content_type: Option<String> = None;
|
||||||
let body = match object_truthy(object_get(&options, "body")) {
|
let body = match object_truthy(object_get(&options, "body")) {
|
||||||
Some(buf) => {
|
Some(buf) => {
|
||||||
let (body, content_type) = convert_body(buf)
|
let (body, content_type) = convert_streaming_body(buf)
|
||||||
.await
|
.await
|
||||||
.map_err(|_| EpoxyError::InvalidRequestBody)?;
|
.map_err(|_| EpoxyError::InvalidRequestBody)?;
|
||||||
body_content_type = content_type;
|
body_content_type = content_type;
|
||||||
Bytes::from(body.to_vec())
|
if matches!(body, MaybeStreamingBody::Streaming(_)) && request_redirect {
|
||||||
|
console_warn!("epoxy: streaming request body + redirect unsupported");
|
||||||
|
// collect body instead
|
||||||
|
http_body_util::Either::Right(Full::new(
|
||||||
|
body.into_httpbody()?.collect().await.map_err(EpoxyError::StreamingBodyCollectFailed)?.to_bytes(),
|
||||||
|
))
|
||||||
|
} else {
|
||||||
|
body.into_httpbody()?
|
||||||
}
|
}
|
||||||
None => Bytes::new(),
|
}
|
||||||
|
None => http_body_util::Either::Right(Full::new(Bytes::new())),
|
||||||
};
|
};
|
||||||
|
|
||||||
let headers = object_truthy(object_get(&options, "headers")).and_then(|val| {
|
let headers = object_truthy(object_get(&options, "headers")).and_then(|val| {
|
||||||
|
@ -654,12 +664,19 @@ impl EpoxyClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if matches!(request_method, Method::POST | Method::PUT | Method::PATCH) && body.is_empty() {
|
let body_empty = if let http_body_util::Either::Right(ref x) = body {
|
||||||
|
x.size_hint().exact().unwrap_or(1) == 0
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
};
|
||||||
|
|
||||||
|
if matches!(request_method, Method::POST | Method::PUT | Method::PATCH) && body_empty {
|
||||||
headers_map.insert(CONTENT_LENGTH, 0.into());
|
headers_map.insert(CONTENT_LENGTH, 0.into());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
let (mut response, response_uri, redirected) = self
|
let (mut response, response_uri, redirected) = self
|
||||||
.send_req(request_builder.body(HttpBody::new(body))?, request_redirect)
|
.send_req(request_builder.body(body)?, request_redirect)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
if self.certs_tampered {
|
if self.certs_tampered {
|
||||||
|
|
|
@ -17,6 +17,7 @@ use futures_rustls::{
|
||||||
};
|
};
|
||||||
use futures_util::{ready, AsyncRead, AsyncWrite, Future, Stream, StreamExt, TryStreamExt};
|
use futures_util::{ready, AsyncRead, AsyncWrite, Future, Stream, StreamExt, TryStreamExt};
|
||||||
use http::{HeaderValue, Uri};
|
use http::{HeaderValue, Uri};
|
||||||
|
use http_body_util::{Either, Full, StreamBody};
|
||||||
use hyper::rt::Executor;
|
use hyper::rt::Executor;
|
||||||
use js_sys::{Array, ArrayBuffer, JsString, Object, Uint8Array};
|
use js_sys::{Array, ArrayBuffer, JsString, Object, Uint8Array};
|
||||||
use pin_project_lite::pin_project;
|
use pin_project_lite::pin_project;
|
||||||
|
@ -38,6 +39,9 @@ extern "C" {
|
||||||
#[wasm_bindgen(js_namespace = console, js_name = log)]
|
#[wasm_bindgen(js_namespace = console, js_name = log)]
|
||||||
pub fn js_console_log(s: &str);
|
pub fn js_console_log(s: &str);
|
||||||
|
|
||||||
|
#[wasm_bindgen(js_namespace = console, js_name = warn)]
|
||||||
|
pub fn js_console_warn(s: &str);
|
||||||
|
|
||||||
#[wasm_bindgen(js_namespace = console, js_name = error)]
|
#[wasm_bindgen(js_namespace = console, js_name = error)]
|
||||||
pub fn js_console_error(s: &str);
|
pub fn js_console_error(s: &str);
|
||||||
}
|
}
|
||||||
|
@ -48,6 +52,12 @@ macro_rules! console_log {
|
||||||
$crate::utils::js_console_log(&format!($($expr),*));
|
$crate::utils::js_console_log(&format!($($expr),*));
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! console_warn {
|
||||||
|
($($expr:expr),*) => {
|
||||||
|
$crate::utils::js_console_warn(&format!($($expr),*));
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
#[macro_export]
|
#[macro_export]
|
||||||
macro_rules! console_error {
|
macro_rules! console_error {
|
||||||
|
@ -375,6 +385,18 @@ export async function convert_body_inner(body) {
|
||||||
return [new Uint8Array(await req.arrayBuffer()), type];
|
return [new Uint8Array(await req.arrayBuffer()), type];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function convert_streaming_body_inner(body) {
|
||||||
|
try {
|
||||||
|
let req = new Request("", { method: "POST", body });
|
||||||
|
let type = req.headers.get("content-type");
|
||||||
|
return [false, new Uint8Array(await req.arrayBuffer()), type];
|
||||||
|
} catch(x) {
|
||||||
|
let req = new Request("", { method: "POST", duplex: "half", body });
|
||||||
|
let type = req.headers.get("content-type");
|
||||||
|
return [true, req.body, type];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export function entries_of_object_inner(obj) {
|
export function entries_of_object_inner(obj) {
|
||||||
return Object.entries(obj).map(x => x.map(String));
|
return Object.entries(obj).map(x => x.map(String));
|
||||||
}
|
}
|
||||||
|
@ -408,6 +430,8 @@ extern "C" {
|
||||||
|
|
||||||
#[wasm_bindgen(catch)]
|
#[wasm_bindgen(catch)]
|
||||||
async fn convert_body_inner(val: JsValue) -> Result<JsValue, JsValue>;
|
async fn convert_body_inner(val: JsValue) -> Result<JsValue, JsValue>;
|
||||||
|
#[wasm_bindgen(catch)]
|
||||||
|
async fn convert_streaming_body_inner(val: JsValue) -> Result<JsValue, JsValue>;
|
||||||
|
|
||||||
fn entries_of_object_inner(obj: &Object) -> Vec<Array>;
|
fn entries_of_object_inner(obj: &Object) -> Vec<Array>;
|
||||||
pub fn define_property(obj: &Object, key: &str, val: JsValue);
|
pub fn define_property(obj: &Object, key: &str, val: JsValue);
|
||||||
|
@ -420,8 +444,74 @@ extern "C" {
|
||||||
|
|
||||||
pub async fn convert_body(val: JsValue) -> Result<(Uint8Array, Option<String>), JsValue> {
|
pub async fn convert_body(val: JsValue) -> Result<(Uint8Array, Option<String>), JsValue> {
|
||||||
let req: Array = convert_body_inner(val).await?.unchecked_into();
|
let req: Array = convert_body_inner(val).await?.unchecked_into();
|
||||||
let str: Option<JsString> = object_truthy(req.at(1)).map(|x| x.unchecked_into());
|
let content_type: Option<JsString> = object_truthy(req.at(1)).map(|x| x.unchecked_into());
|
||||||
Ok((req.at(0).unchecked_into(), str.map(Into::into)))
|
Ok((req.at(0).unchecked_into(), content_type.map(Into::into)))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum MaybeStreamingBody {
|
||||||
|
Streaming(web_sys::ReadableStream),
|
||||||
|
Static(Uint8Array),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct StreamingInnerBody(
|
||||||
|
Pin<Box<dyn Stream<Item = Result<http_body::Frame<Bytes>, std::io::Error>> + Send>>,
|
||||||
|
);
|
||||||
|
impl Stream for StreamingInnerBody {
|
||||||
|
type Item = Result<http_body::Frame<Bytes>, std::io::Error>;
|
||||||
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
|
self.0.poll_next_unpin(cx)
|
||||||
|
}
|
||||||
|
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||||
|
self.0.size_hint()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl Clone for StreamingInnerBody {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
console_error!("epoxy internal error: cloning streaming request body is not allowed");
|
||||||
|
unreachable!();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type StreamingBody = Either<StreamBody<StreamingInnerBody>, Full<Bytes>>;
|
||||||
|
|
||||||
|
impl MaybeStreamingBody {
|
||||||
|
pub fn into_httpbody(self) -> Result<StreamingBody, EpoxyError> {
|
||||||
|
match self {
|
||||||
|
Self::Streaming(x) => {
|
||||||
|
let reader = ReadableStream::from_raw(x);
|
||||||
|
let reader = reader
|
||||||
|
.try_into_stream()
|
||||||
|
.map_err(|x| EpoxyError::StreamingBodyConvertFailed(format!("{:?}", x)))?;
|
||||||
|
|
||||||
|
let reader = reader
|
||||||
|
.then(|x| async {
|
||||||
|
Ok::<Bytes, JsValue>(Bytes::from(convert_body(x?).await?.0.to_vec()))
|
||||||
|
})
|
||||||
|
.map_ok(http_body::Frame::data);
|
||||||
|
let reader = reader.map_err(|x| std::io::Error::other(format!("{:?}", x)));
|
||||||
|
|
||||||
|
Ok(Either::Left(StreamBody::new(StreamingInnerBody(Box::pin(
|
||||||
|
SendWrapper::new(reader),
|
||||||
|
)))))
|
||||||
|
}
|
||||||
|
Self::Static(x) => Ok(Either::Right(Full::new(Bytes::from(x.to_vec())))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn convert_streaming_body(
|
||||||
|
val: JsValue,
|
||||||
|
) -> Result<(MaybeStreamingBody, Option<String>), JsValue> {
|
||||||
|
let req: Array = convert_streaming_body_inner(val).await?.unchecked_into();
|
||||||
|
let content_type: Option<JsString> = object_truthy(req.at(2)).map(|x| x.unchecked_into());
|
||||||
|
|
||||||
|
let body = if req.at(0).is_truthy() {
|
||||||
|
MaybeStreamingBody::Streaming(req.at(1).unchecked_into())
|
||||||
|
} else {
|
||||||
|
MaybeStreamingBody::Static(req.at(1).unchecked_into())
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok((body, content_type.map(Into::into)))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn entries_of_object(obj: &Object) -> Vec<Vec<String>> {
|
pub fn entries_of_object(obj: &Object) -> Vec<Vec<String>> {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue