mirror of
https://github.com/MercuryWorkshop/epoxy-tls.git
synced 2025-05-13 06:20:02 -04:00
return response object
This commit is contained in:
parent
3b4cd96614
commit
eb511a317d
6 changed files with 167 additions and 72 deletions
14
Cargo.lock
generated
14
Cargo.lock
generated
|
@ -1248,6 +1248,19 @@ version = "0.2.89"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "7ab9b36309365056cd639da3134bf87fa8f3d86008abf99e612384a6eecd459f"
|
checksum = "7ab9b36309365056cd639da3134bf87fa8f3d86008abf99e612384a6eecd459f"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "wasm-streams"
|
||||||
|
version = "0.4.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "b65dc4c90b63b118468cf747d8bf3566c1913ef60be765b5730ead9e0a3ba129"
|
||||||
|
dependencies = [
|
||||||
|
"futures-util",
|
||||||
|
"js-sys",
|
||||||
|
"wasm-bindgen",
|
||||||
|
"wasm-bindgen-futures",
|
||||||
|
"web-sys",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "web-sys"
|
name = "web-sys"
|
||||||
version = "0.3.66"
|
version = "0.3.66"
|
||||||
|
@ -1437,6 +1450,7 @@ dependencies = [
|
||||||
"tokio-rustls",
|
"tokio-rustls",
|
||||||
"wasm-bindgen",
|
"wasm-bindgen",
|
||||||
"wasm-bindgen-futures",
|
"wasm-bindgen-futures",
|
||||||
|
"wasm-streams",
|
||||||
"web-sys",
|
"web-sys",
|
||||||
"webpki-roots",
|
"webpki-roots",
|
||||||
"ws_stream_wasm",
|
"ws_stream_wasm",
|
||||||
|
|
|
@ -25,7 +25,8 @@ futures-util = "0.3.30"
|
||||||
js-sys = "0.3.66"
|
js-sys = "0.3.66"
|
||||||
webpki-roots = "0.26.0"
|
webpki-roots = "0.26.0"
|
||||||
tokio-rustls = "0.25.0"
|
tokio-rustls = "0.25.0"
|
||||||
web-sys = { version = "0.3.66", features = ["TextEncoder", "Navigator"] }
|
web-sys = { version = "0.3.66", features = ["TextEncoder", "Navigator", "Response", "ResponseInit"] }
|
||||||
|
wasm-streams = "0.4.0"
|
||||||
|
|
||||||
[dependencies.getrandom]
|
[dependencies.getrandom]
|
||||||
features = ["js"]
|
features = ["js"]
|
||||||
|
|
|
@ -1,19 +1,18 @@
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
mod utils;
|
mod utils;
|
||||||
mod tokioio;
|
mod tokioio;
|
||||||
mod wsstreamwrapper;
|
mod wrappers;
|
||||||
|
|
||||||
use tokioio::TokioIo;
|
use tokioio::TokioIo;
|
||||||
use utils::ReplaceErr;
|
use utils::ReplaceErr;
|
||||||
use wsstreamwrapper::WsStreamWrapper;
|
use wrappers::{IncomingBody, WsStreamWrapper};
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use http::{uri, HeaderName, HeaderValue, Request, Response};
|
use http::{uri, HeaderName, HeaderValue, Request, Response};
|
||||||
use http_body_util::BodyExt;
|
|
||||||
use hyper::{body::Incoming, client::conn as hyper_conn};
|
use hyper::{body::Incoming, client::conn as hyper_conn};
|
||||||
use js_sys::{Object, Reflect, Uint8Array};
|
use js_sys::{Array, Object, Reflect, Uint8Array};
|
||||||
use penguin_mux_wasm::{Multiplexor, Role};
|
use penguin_mux_wasm::{Multiplexor, Role};
|
||||||
use tokio_rustls::{rustls, rustls::RootCertStore, TlsConnector};
|
use tokio_rustls::{rustls, rustls::RootCertStore, TlsConnector};
|
||||||
use wasm_bindgen::prelude::*;
|
use wasm_bindgen::prelude::*;
|
||||||
|
@ -35,7 +34,6 @@ where
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
debug!("sending req");
|
|
||||||
req_sender
|
req_sender
|
||||||
.send_request(req)
|
.send_request(req)
|
||||||
.await
|
.await
|
||||||
|
@ -48,16 +46,16 @@ async fn start() {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[wasm_bindgen]
|
#[wasm_bindgen]
|
||||||
pub struct WsTcpWorker {
|
pub struct WsTcp {
|
||||||
rustls_config: Arc<rustls::ClientConfig>,
|
rustls_config: Arc<rustls::ClientConfig>,
|
||||||
mux: Multiplexor<WsStreamWrapper>,
|
mux: Multiplexor<WsStreamWrapper>,
|
||||||
useragent: String,
|
useragent: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[wasm_bindgen]
|
#[wasm_bindgen]
|
||||||
impl WsTcpWorker {
|
impl WsTcp {
|
||||||
#[wasm_bindgen(constructor)]
|
#[wasm_bindgen(constructor)]
|
||||||
pub async fn new(ws_url: String, useragent: String) -> Result<WsTcpWorker, JsError> {
|
pub async fn new(ws_url: String, useragent: String) -> Result<WsTcp, JsError> {
|
||||||
let ws_uri = ws_url
|
let ws_uri = ws_url
|
||||||
.parse::<uri::Uri>()
|
.parse::<uri::Uri>()
|
||||||
.replace_err("Failed to parse websocket url")?;
|
.replace_err("Failed to parse websocket url")?;
|
||||||
|
@ -70,14 +68,12 @@ impl WsTcpWorker {
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("connecting to ws {:?}", ws_url);
|
debug!("connecting to ws {:?}", ws_url);
|
||||||
let (ws, wsmeta) = WsStreamWrapper::connect(ws_url, None)
|
let ws = WsStreamWrapper::connect(ws_url, None)
|
||||||
.await
|
.await
|
||||||
.replace_err("Failed to connect to websocket")?;
|
.replace_err("Failed to connect to websocket")?;
|
||||||
debug!("connected!");
|
debug!("connected!");
|
||||||
let mux = Multiplexor::new(ws, Role::Client, None, None);
|
let mux = Multiplexor::new(ws, Role::Client, None, None);
|
||||||
|
|
||||||
debug!("wsmeta ready state: {:?}", wsmeta.ready_state());
|
|
||||||
|
|
||||||
let mut certstore = RootCertStore::empty();
|
let mut certstore = RootCertStore::empty();
|
||||||
certstore.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
|
certstore.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
|
||||||
|
|
||||||
|
@ -87,18 +83,18 @@ impl WsTcpWorker {
|
||||||
.with_no_client_auth(),
|
.with_no_client_auth(),
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok(WsTcpWorker {
|
Ok(WsTcp {
|
||||||
mux,
|
mux,
|
||||||
rustls_config,
|
rustls_config,
|
||||||
useragent,
|
useragent,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn fetch(&self, url: String, options: Object) -> Result<(), JsError> {
|
pub async fn fetch(&self, url: String, options: Object) -> Result<web_sys::Response, JsError> {
|
||||||
let uri = url.parse::<uri::Uri>().replace_err("Failed to parse URL")?;
|
let uri = url.parse::<uri::Uri>().replace_err("Failed to parse URL")?;
|
||||||
let uri_scheme = uri.scheme().replace_err("URL must have a scheme")?;
|
let uri_scheme = uri.scheme().replace_err("URL must have a scheme")?;
|
||||||
if *uri_scheme != uri::Scheme::HTTP && *uri_scheme != uri::Scheme::HTTPS {
|
if *uri_scheme != uri::Scheme::HTTP && *uri_scheme != uri::Scheme::HTTPS {
|
||||||
return Err(nerr!("Scheme must be either `http` or `https`"));
|
return Err(jerr!("Scheme must be either `http` or `https`"));
|
||||||
}
|
}
|
||||||
let uri_host = uri.host().replace_err("URL must have a host")?;
|
let uri_host = uri.host().replace_err("URL must have a host")?;
|
||||||
let uri_port = if let Some(port) = uri.port() {
|
let uri_port = if let Some(port) = uri.port() {
|
||||||
|
@ -111,20 +107,19 @@ impl WsTcpWorker {
|
||||||
} else if *uri_scheme == uri::Scheme::HTTPS {
|
} else if *uri_scheme == uri::Scheme::HTTPS {
|
||||||
443
|
443
|
||||||
} else {
|
} else {
|
||||||
return Err(nerr!("Failed to coerce port from scheme"));
|
return Err(jerr!("Failed to coerce port from scheme"));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let req_method_string: String = match Reflect::get(&options, &JsValue::from_str("method")) {
|
let req_method_string: String = match Reflect::get(&options, &jval!("method")) {
|
||||||
Ok(val) => val.as_string().unwrap_or("GET".to_string()),
|
Ok(val) => val.as_string().unwrap_or("GET".to_string()),
|
||||||
Err(_) => "GET".to_string(),
|
Err(_) => "GET".to_string(),
|
||||||
};
|
};
|
||||||
debug!("method {:?}", req_method_string);
|
|
||||||
let req_method: http::Method =
|
let req_method: http::Method =
|
||||||
http::Method::try_from(<String as AsRef<str>>::as_ref(&req_method_string))
|
http::Method::try_from(<String as AsRef<str>>::as_ref(&req_method_string))
|
||||||
.replace_err("Invalid http method")?;
|
.replace_err("Invalid http method")?;
|
||||||
|
|
||||||
let body_jsvalue: Option<JsValue> = Reflect::get(&options, &JsValue::from_str("body")).ok();
|
let body_jsvalue: Option<JsValue> = Reflect::get(&options, &jval!("body")).ok();
|
||||||
let body = if let Some(val) = body_jsvalue {
|
let body = if let Some(val) = body_jsvalue {
|
||||||
if val.is_string() {
|
if val.is_string() {
|
||||||
let str = val
|
let str = val
|
||||||
|
@ -146,16 +141,15 @@ impl WsTcpWorker {
|
||||||
None => Bytes::new(),
|
None => Bytes::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let headers: Option<Vec<Vec<String>>> =
|
let headers: Option<Vec<Vec<String>>> = Reflect::get(&options, &jval!("headers"))
|
||||||
Reflect::get(&options, &JsValue::from_str("headers"))
|
.map(|val| {
|
||||||
.map(|val| {
|
if val.is_truthy() {
|
||||||
if val.is_truthy() {
|
Some(utils::entries_of_object(&Object::from(val)))
|
||||||
Some(utils::entries_of_object(&Object::from(val)))
|
} else {
|
||||||
} else {
|
None
|
||||||
None
|
}
|
||||||
}
|
})
|
||||||
})
|
.unwrap_or(None);
|
||||||
.unwrap_or(None);
|
|
||||||
|
|
||||||
let mut builder = Request::builder().uri(uri.clone()).method(req_method);
|
let mut builder = Request::builder().uri(uri.clone()).method(req_method);
|
||||||
|
|
||||||
|
@ -186,7 +180,7 @@ impl WsTcpWorker {
|
||||||
.await
|
.await
|
||||||
.replace_err("Failed to create multiplexor channel")?;
|
.replace_err("Failed to create multiplexor channel")?;
|
||||||
|
|
||||||
let mut resp: hyper::Response<Incoming>;
|
let resp: hyper::Response<Incoming>;
|
||||||
|
|
||||||
if *uri_scheme == uri::Scheme::HTTPS {
|
if *uri_scheme == uri::Scheme::HTTPS {
|
||||||
let cloned_uri = uri_host.to_string().clone();
|
let cloned_uri = uri_host.to_string().clone();
|
||||||
|
@ -205,11 +199,42 @@ impl WsTcpWorker {
|
||||||
resp = send_req(request, TokioIo::new(channel)).await?;
|
resp = send_req(request, TokioIo::new(channel)).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
log!("{:?}", resp);
|
let resp_headers_jsarray = resp
|
||||||
let body = resp.body_mut().collect();
|
.headers()
|
||||||
let body_bytes = body.await.replace_err("Failed to get body")?.to_bytes();
|
.iter()
|
||||||
log!("{}", std::str::from_utf8(&body_bytes).replace_err("e")?);
|
.filter_map(|val| {
|
||||||
|
Some(Array::of2(
|
||||||
|
&jval!(val.0.as_str()),
|
||||||
|
&jval!(val.1.to_str().ok()?),
|
||||||
|
))
|
||||||
|
})
|
||||||
|
.collect::<Array>();
|
||||||
|
|
||||||
Ok(())
|
let resp_headers = Object::from_entries(&resp_headers_jsarray)
|
||||||
|
.replace_err("Failed to create response headers object")?;
|
||||||
|
|
||||||
|
let mut respinit = web_sys::ResponseInit::new();
|
||||||
|
respinit
|
||||||
|
.headers(&resp_headers)
|
||||||
|
.status(resp.status().as_u16())
|
||||||
|
.status_text(resp.status().canonical_reason().unwrap_or_default());
|
||||||
|
|
||||||
|
let body = IncomingBody::new(resp.into_body());
|
||||||
|
let stream = wasm_streams::ReadableStream::from_stream(body);
|
||||||
|
|
||||||
|
let resp = web_sys::Response::new_with_opt_readable_stream_and_init(
|
||||||
|
Some(&stream.into_raw()),
|
||||||
|
&respinit,
|
||||||
|
)
|
||||||
|
.replace_err("Failed to make response")?;
|
||||||
|
|
||||||
|
Object::define_property(
|
||||||
|
&resp,
|
||||||
|
&jval!("url"),
|
||||||
|
&utils::define_property_obj(jval!(url), false)
|
||||||
|
.replace_err("Failed to make define_property object for url")?,
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(resp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,50 +17,51 @@ extern "C" {
|
||||||
pub fn console_error(s: &str);
|
pub fn console_error(s: &str);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(unused_macros)]
|
||||||
macro_rules! debug {
|
macro_rules! debug {
|
||||||
($($t:tt)*) => (utils::console_debug(&format_args!($($t)*).to_string()))
|
($($t:tt)*) => (utils::console_debug(&format_args!($($t)*).to_string()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(unused_macros)]
|
||||||
macro_rules! log {
|
macro_rules! log {
|
||||||
($($t:tt)*) => (utils::console_log(&format_args!($($t)*).to_string()))
|
($($t:tt)*) => (utils::console_log(&format_args!($($t)*).to_string()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(unused_macros)]
|
||||||
macro_rules! error {
|
macro_rules! error {
|
||||||
($($t:tt)*) => (utils::console_error(&format_args!($($t)*).to_string()))
|
($($t:tt)*) => (utils::console_error(&format_args!($($t)*).to_string()))
|
||||||
}
|
}
|
||||||
|
|
||||||
macro_rules! nerr {
|
#[allow(unused_macros)]
|
||||||
($expr:expr) => (JsError::new($expr))
|
macro_rules! jerr {
|
||||||
|
($expr:expr) => {
|
||||||
|
JsError::new($expr)
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn entries_of_object(obj: &Object) -> Vec<Vec<String>> {
|
#[allow(unused_macros)]
|
||||||
js_sys::Object::entries(obj)
|
macro_rules! jval {
|
||||||
.to_vec()
|
($expr:expr) => {
|
||||||
.iter()
|
JsValue::from($expr)
|
||||||
.map(|val| {
|
};
|
||||||
Array::from(val)
|
|
||||||
.to_vec()
|
|
||||||
.iter()
|
|
||||||
.map(|val| {
|
|
||||||
val.as_string()
|
|
||||||
.expect_throw("failed to get string from object entry")
|
|
||||||
})
|
|
||||||
.collect()
|
|
||||||
})
|
|
||||||
.collect::<Vec<Vec<_>>>()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait ReplaceErr {
|
pub trait ReplaceErr {
|
||||||
type Ok;
|
type Ok;
|
||||||
|
|
||||||
fn replace_err(self, err: &str) -> Result<Self::Ok, JsError>;
|
fn replace_err(self, err: &str) -> Result<Self::Ok, JsError>;
|
||||||
|
fn replace_err_jv(self, err: &str) -> Result<Self::Ok, JsValue>;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T, E> ReplaceErr for Result<T, E> {
|
impl<T, E> ReplaceErr for Result<T, E> {
|
||||||
type Ok = T;
|
type Ok = T;
|
||||||
|
|
||||||
fn replace_err(self, err: &str) -> Result<<Self as ReplaceErr>::Ok, JsError> {
|
fn replace_err(self, err: &str) -> Result<<Self as ReplaceErr>::Ok, JsError> {
|
||||||
self.map_err(|_| JsError::new(err))
|
self.map_err(|_| jerr!(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn replace_err_jv(self, err: &str) -> Result<<Self as ReplaceErr>::Ok, JsValue> {
|
||||||
|
self.map_err(|_| jval!(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -68,6 +69,33 @@ impl<T> ReplaceErr for Option<T> {
|
||||||
type Ok = T;
|
type Ok = T;
|
||||||
|
|
||||||
fn replace_err(self, err: &str) -> Result<<Self as ReplaceErr>::Ok, JsError> {
|
fn replace_err(self, err: &str) -> Result<<Self as ReplaceErr>::Ok, JsError> {
|
||||||
self.ok_or_else(|| JsError::new(err))
|
self.ok_or_else(|| jerr!(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn replace_err_jv(self, err: &str) -> Result<<Self as ReplaceErr>::Ok, JsValue> {
|
||||||
|
self.ok_or_else(|| jval!(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn entries_of_object(obj: &Object) -> Vec<Vec<String>> {
|
||||||
|
js_sys::Object::entries(obj)
|
||||||
|
.to_vec()
|
||||||
|
.iter()
|
||||||
|
.filter_map(|val| {
|
||||||
|
Array::from(val)
|
||||||
|
.to_vec()
|
||||||
|
.iter()
|
||||||
|
.map(|val| val.as_string())
|
||||||
|
.collect::<Option<Vec<_>>>()
|
||||||
|
})
|
||||||
|
.collect::<Vec<Vec<_>>>()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn define_property_obj(value: JsValue, writable: bool) -> Result<Object, JsValue> {
|
||||||
|
let entries: Array = vec![
|
||||||
|
Array::of2(&jval!("value"), &jval!(value)),
|
||||||
|
Array::of2(&jval!("writable"), &jval!(writable)),
|
||||||
|
]
|
||||||
|
.iter().collect::<Array>();
|
||||||
|
Object::from_entries(&entries)
|
||||||
|
}
|
||||||
|
|
|
@ -5,23 +5,15 @@
|
||||||
);
|
);
|
||||||
await wasm_bindgen("./wstcp_client_bg.wasm");
|
await wasm_bindgen("./wstcp_client_bg.wasm");
|
||||||
const tconn0 = performance.now();
|
const tconn0 = performance.now();
|
||||||
let wstcp = await new wasm_bindgen.WsTcpWorker("wss://localhost:4000", navigator.userAgent);
|
let wstcp = await new wasm_bindgen.WsTcp("wss://localhost:4000", navigator.userAgent);
|
||||||
const tconn1 = performance.now();
|
const tconn1 = performance.now();
|
||||||
console.warn(`conn establish took ${tconn1 - tconn0} ms or ${(tconn1 - tconn0) / 1000} s`);
|
console.warn(`conn establish took ${tconn1 - tconn0} ms or ${(tconn1 - tconn0) / 1000} s`);
|
||||||
const t0 = performance.now();
|
const t0 = performance.now();
|
||||||
await wstcp.fetch("https://httpbin.org/post", { method: "POST", body: "test", headers: { "X-Header-One": "one", "x-header-one": "One", "X-Header-Two": "two" } });
|
let resp = await wstcp.fetch("https://httpbin.org/post", { method: "POST", body: "test", headers: { "X-Header-One": "one", "x-header-one": "One", "X-Header-Two": "two" } });
|
||||||
const t1 = performance.now();
|
const t1 = performance.now();
|
||||||
|
|
||||||
|
console.warn(resp);
|
||||||
|
console.warn(Object.fromEntries(resp.headers));
|
||||||
|
console.warn(await resp.text());
|
||||||
console.warn(`mux 1 took ${t1 - t0} ms or ${(t1 - t0) / 1000} s`);
|
console.warn(`mux 1 took ${t1 - t0} ms or ${(t1 - t0) / 1000} s`);
|
||||||
|
|
||||||
const t2 = performance.now();
|
|
||||||
await wstcp.fetch("https://httpbin.org/post", { method: "POST", body: "test", headers: { "X-Header-One": "one", "x-header-one": "One", "X-Header-Two": "two" } });
|
|
||||||
const t3 = performance.now();
|
|
||||||
console.warn(`mux 2 took ${t3 - t2} ms or ${(t3 - t2) / 1000} s`);
|
|
||||||
|
|
||||||
const t4 = performance.now();
|
|
||||||
await fetch("https://httpbin.org/post", { method: "POST", body: "test", headers: { "X-Header-One": "one", "x-header-one": "One", "X-Header-Two": "two" } });
|
|
||||||
const t5 = performance.now();
|
|
||||||
console.warn(`native took ${t5 - t4} ms or ${(t5 - t4) / 1000} s`);
|
|
||||||
|
|
||||||
alert(`conn establish took ${tconn1 - tconn0} ms or ${(tconn1 - tconn0) / 1000} s\nmux 1 took ${t1 - t0} ms or ${(t1 - t0) / 1000} s\nmux 2 took ${t3 - t2} ms or ${(t3 - t2) / 1000} s\nnative took ${t5 - t4} ms or ${(t5 - t4) / 1000} s`)
|
|
||||||
})();
|
})();
|
||||||
|
|
|
@ -5,6 +5,7 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use futures_util::{Sink, Stream};
|
use futures_util::{Sink, Stream};
|
||||||
|
use hyper::body::Body;
|
||||||
use penguin_mux_wasm::ws;
|
use penguin_mux_wasm::ws;
|
||||||
use pin_project_lite::pin_project;
|
use pin_project_lite::pin_project;
|
||||||
use ws_stream_wasm::{WsErr, WsMessage, WsMeta, WsStream};
|
use ws_stream_wasm::{WsErr, WsMessage, WsMeta, WsStream};
|
||||||
|
@ -20,9 +21,9 @@ impl WsStreamWrapper {
|
||||||
pub async fn connect(
|
pub async fn connect(
|
||||||
url: impl AsRef<str>,
|
url: impl AsRef<str>,
|
||||||
protocols: impl Into<Option<Vec<&str>>>,
|
protocols: impl Into<Option<Vec<&str>>>,
|
||||||
) -> Result<(Self, WsMeta), WsErr> {
|
) -> Result<Self, WsErr> {
|
||||||
let (wsmeta, wsstream) = WsMeta::connect(url, protocols).await?;
|
let (_, wsstream) = WsMeta::connect(url, protocols).await?;
|
||||||
Ok((WsStreamWrapper { ws: wsstream }, wsmeta))
|
Ok(WsStreamWrapper { ws: wsstream })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,7 +80,7 @@ impl Sink<ws::Message> for WsStreamWrapper {
|
||||||
"ws close err: {:?}",
|
"ws close err: {:?}",
|
||||||
err
|
err
|
||||||
)))),
|
)))),
|
||||||
}
|
};
|
||||||
}
|
}
|
||||||
Ping(_) | Pong(_) | Frame(_) => return Ok(()),
|
Ping(_) | Pong(_) | Frame(_) => return Ok(()),
|
||||||
};
|
};
|
||||||
|
@ -114,3 +115,37 @@ impl ws::WebSocketStream for WsStreamWrapper {
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pin_project! {
|
||||||
|
pub struct IncomingBody {
|
||||||
|
#[pin]
|
||||||
|
incoming: hyper::body::Incoming,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IncomingBody {
|
||||||
|
pub fn new(incoming: hyper::body::Incoming) -> IncomingBody {
|
||||||
|
IncomingBody { incoming }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Stream for IncomingBody {
|
||||||
|
type Item = Result<JsValue, JsValue>;
|
||||||
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
|
let this = self.project();
|
||||||
|
let ret = this.incoming.poll_frame(cx);
|
||||||
|
match ret {
|
||||||
|
Poll::Ready(item) => Poll::<Option<Self::Item>>::Ready(match item {
|
||||||
|
Some(frame) => frame
|
||||||
|
.map(|x| {
|
||||||
|
x.into_data()
|
||||||
|
.map(|x| Uint8Array::from(x.as_ref()).into())
|
||||||
|
.replace_err_jv("Error creating uint8array from http frame")
|
||||||
|
})
|
||||||
|
.ok(),
|
||||||
|
None => None,
|
||||||
|
}),
|
||||||
|
Poll::Pending => Poll::<Option<Self::Item>>::Pending,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue