mirror of
https://github.com/MercuryWorkshop/epoxy-tls.git
synced 2025-05-13 06:20:02 -04:00
fix error handling
This commit is contained in:
parent
81e78c89bc
commit
3b4cd96614
3 changed files with 85 additions and 48 deletions
|
@ -4,6 +4,7 @@ mod tokioio;
|
||||||
mod wsstreamwrapper;
|
mod wsstreamwrapper;
|
||||||
|
|
||||||
use tokioio::TokioIo;
|
use tokioio::TokioIo;
|
||||||
|
use utils::ReplaceErr;
|
||||||
use wsstreamwrapper::WsStreamWrapper;
|
use wsstreamwrapper::WsStreamWrapper;
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
@ -20,13 +21,13 @@ use web_sys::TextEncoder;
|
||||||
|
|
||||||
type HttpBody = http_body_util::Full<Bytes>;
|
type HttpBody = http_body_util::Full<Bytes>;
|
||||||
|
|
||||||
async fn send_req<T>(req: http::Request<HttpBody>, io: T) -> Response<Incoming>
|
async fn send_req<T>(req: http::Request<HttpBody>, io: T) -> Result<Response<Incoming>, JsError>
|
||||||
where
|
where
|
||||||
T: hyper::rt::Read + hyper::rt::Write + std::marker::Unpin + 'static,
|
T: hyper::rt::Read + hyper::rt::Write + std::marker::Unpin + 'static,
|
||||||
{
|
{
|
||||||
let (mut req_sender, conn) = hyper_conn::http1::handshake::<T, HttpBody>(io)
|
let (mut req_sender, conn) = hyper_conn::http1::handshake::<T, HttpBody>(io)
|
||||||
.await
|
.await
|
||||||
.expect_throw("Failed to connect to host");
|
.replace_err("Failed to connect to host")?;
|
||||||
|
|
||||||
wasm_bindgen_futures::spawn_local(async move {
|
wasm_bindgen_futures::spawn_local(async move {
|
||||||
if let Err(e) = conn.await {
|
if let Err(e) = conn.await {
|
||||||
|
@ -38,7 +39,7 @@ where
|
||||||
req_sender
|
req_sender
|
||||||
.send_request(req)
|
.send_request(req)
|
||||||
.await
|
.await
|
||||||
.expect_throw("Failed to send request")
|
.replace_err("Failed to send request")
|
||||||
}
|
}
|
||||||
|
|
||||||
#[wasm_bindgen(start)]
|
#[wasm_bindgen(start)]
|
||||||
|
@ -56,21 +57,22 @@ pub struct WsTcpWorker {
|
||||||
#[wasm_bindgen]
|
#[wasm_bindgen]
|
||||||
impl WsTcpWorker {
|
impl WsTcpWorker {
|
||||||
#[wasm_bindgen(constructor)]
|
#[wasm_bindgen(constructor)]
|
||||||
pub async fn new(ws_url: String, useragent: String) -> Result<WsTcpWorker, JsValue> {
|
pub async fn new(ws_url: String, useragent: String) -> Result<WsTcpWorker, JsError> {
|
||||||
let ws_uri = ws_url
|
let ws_uri = ws_url
|
||||||
.parse::<uri::Uri>()
|
.parse::<uri::Uri>()
|
||||||
.expect_throw("Failed to parse websocket URL");
|
.replace_err("Failed to parse websocket url")?;
|
||||||
|
|
||||||
let ws_uri_scheme = ws_uri
|
let ws_uri_scheme = ws_uri
|
||||||
.scheme_str()
|
.scheme_str()
|
||||||
.expect_throw("Websocket URL must have a scheme");
|
.replace_err("Websocket URL must have a scheme")?;
|
||||||
if ws_uri_scheme != "ws" && ws_uri_scheme != "wss" {
|
if ws_uri_scheme != "ws" && ws_uri_scheme != "wss" {
|
||||||
return Err("Scheme must be either `ws` or `wss`".into());
|
return Err(JsError::new("Scheme must be either `ws` or `wss`"));
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("connecting to ws {:?}", ws_url);
|
debug!("connecting to ws {:?}", ws_url);
|
||||||
let (ws, wsmeta) = WsStreamWrapper::connect(ws_url, None)
|
let (ws, wsmeta) = WsStreamWrapper::connect(ws_url, None)
|
||||||
.await
|
.await
|
||||||
.expect_throw("Failed to connect to websocket");
|
.replace_err("Failed to connect to websocket")?;
|
||||||
debug!("connected!");
|
debug!("connected!");
|
||||||
let mux = Multiplexor::new(ws, Role::Client, None, None);
|
let mux = Multiplexor::new(ws, Role::Client, None, None);
|
||||||
|
|
||||||
|
@ -92,13 +94,13 @@ impl WsTcpWorker {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn fetch(&self, url: String, options: Object) -> Result<(), JsValue> {
|
pub async fn fetch(&self, url: String, options: Object) -> Result<(), JsError> {
|
||||||
let uri = url.parse::<uri::Uri>().expect_throw("Failed to parse URL");
|
let uri = url.parse::<uri::Uri>().replace_err("Failed to parse URL")?;
|
||||||
let uri_scheme = uri.scheme().expect_throw("URL must have a scheme");
|
let uri_scheme = uri.scheme().replace_err("URL must have a scheme")?;
|
||||||
if *uri_scheme != uri::Scheme::HTTP && *uri_scheme != uri::Scheme::HTTPS {
|
if *uri_scheme != uri::Scheme::HTTP && *uri_scheme != uri::Scheme::HTTPS {
|
||||||
return Err("Scheme must be either `http` or `https`".into());
|
return Err(nerr!("Scheme must be either `http` or `https`"));
|
||||||
}
|
}
|
||||||
let uri_host = uri.host().expect_throw("URL must have a host");
|
let uri_host = uri.host().replace_err("URL must have a host")?;
|
||||||
let uri_port = if let Some(port) = uri.port() {
|
let uri_port = if let Some(port) = uri.port() {
|
||||||
port.as_u16()
|
port.as_u16()
|
||||||
} else {
|
} else {
|
||||||
|
@ -109,7 +111,7 @@ impl WsTcpWorker {
|
||||||
} else if *uri_scheme == uri::Scheme::HTTPS {
|
} else if *uri_scheme == uri::Scheme::HTTPS {
|
||||||
443
|
443
|
||||||
} else {
|
} else {
|
||||||
return Err("Failed to coerce port from scheme".into());
|
return Err(nerr!("Failed to coerce port from scheme"));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -120,23 +122,24 @@ impl WsTcpWorker {
|
||||||
debug!("method {:?}", req_method_string);
|
debug!("method {:?}", req_method_string);
|
||||||
let req_method: http::Method =
|
let req_method: http::Method =
|
||||||
http::Method::try_from(<String as AsRef<str>>::as_ref(&req_method_string))
|
http::Method::try_from(<String as AsRef<str>>::as_ref(&req_method_string))
|
||||||
.expect_throw("Invalid http method");
|
.replace_err("Invalid http method")?;
|
||||||
|
|
||||||
let body: Option<Vec<u8>> = Reflect::get(&options, &JsValue::from_str("body"))
|
let body_jsvalue: Option<JsValue> = Reflect::get(&options, &JsValue::from_str("body")).ok();
|
||||||
.map(|val| {
|
let body = if let Some(val) = body_jsvalue {
|
||||||
if val.is_string() {
|
if val.is_string() {
|
||||||
let str = val
|
let str = val
|
||||||
.as_string()
|
.as_string()
|
||||||
.expect_throw("Failed to encode body into uint8array");
|
.replace_err("Failed to get string from body")?;
|
||||||
let encoder =
|
let encoder =
|
||||||
TextEncoder::new().expect_throw("Failed to encode body into uint8array");
|
TextEncoder::new().replace_err("Failed to create TextEncoder for body")?;
|
||||||
let encoded = encoder.encode_with_input(str.as_ref());
|
let encoded = encoder.encode_with_input(str.as_ref());
|
||||||
Some(encoded)
|
Some(encoded)
|
||||||
} else {
|
} else {
|
||||||
Some(Uint8Array::new(&val).to_vec())
|
Some(Uint8Array::new(&val).to_vec())
|
||||||
}
|
}
|
||||||
})
|
} else {
|
||||||
.unwrap_or(None);
|
None
|
||||||
|
};
|
||||||
|
|
||||||
let body_bytes: Bytes = match body {
|
let body_bytes: Bytes = match body {
|
||||||
Some(vec) => Bytes::from(vec),
|
Some(vec) => Bytes::from(vec),
|
||||||
|
@ -157,13 +160,13 @@ impl WsTcpWorker {
|
||||||
let mut builder = Request::builder().uri(uri.clone()).method(req_method);
|
let mut builder = Request::builder().uri(uri.clone()).method(req_method);
|
||||||
|
|
||||||
if let Some(headers) = headers {
|
if let Some(headers) = headers {
|
||||||
let headers_map = builder.headers_mut().expect_throw("failed to get headers");
|
let headers_map = builder.headers_mut().replace_err("Failed to get headers")?;
|
||||||
for hdr in headers {
|
for hdr in headers {
|
||||||
headers_map.insert(
|
headers_map.insert(
|
||||||
HeaderName::from_bytes(hdr[0].as_bytes())
|
HeaderName::from_bytes(hdr[0].as_bytes())
|
||||||
.expect_throw("failed to get hdr name"),
|
.replace_err("Failed to get hdr name")?,
|
||||||
HeaderValue::from_str(hdr[1].clone().as_ref())
|
HeaderValue::from_str(hdr[1].clone().as_ref())
|
||||||
.expect_throw("failed to get hdr value"),
|
.replace_err("Failed to get hdr value")?,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -175,13 +178,13 @@ impl WsTcpWorker {
|
||||||
|
|
||||||
let request = builder
|
let request = builder
|
||||||
.body(HttpBody::new(body_bytes))
|
.body(HttpBody::new(body_bytes))
|
||||||
.expect_throw("Failed to make request");
|
.replace_err("Failed to make request")?;
|
||||||
|
|
||||||
let channel = self
|
let channel = self
|
||||||
.mux
|
.mux
|
||||||
.client_new_stream_channel(uri_host.as_bytes(), uri_port)
|
.client_new_stream_channel(uri_host.as_bytes(), uri_port)
|
||||||
.await
|
.await
|
||||||
.expect_throw("Failed to create multiplexor channel");
|
.replace_err("Failed to create multiplexor channel")?;
|
||||||
|
|
||||||
let mut resp: hyper::Response<Incoming>;
|
let mut resp: hyper::Response<Incoming>;
|
||||||
|
|
||||||
|
@ -192,20 +195,20 @@ impl WsTcpWorker {
|
||||||
.connect(
|
.connect(
|
||||||
cloned_uri
|
cloned_uri
|
||||||
.try_into()
|
.try_into()
|
||||||
.expect_throw("Failed to parse URL (rustls)"),
|
.replace_err("Failed to parse URL (rustls)")?,
|
||||||
channel,
|
channel,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.expect_throw("Failed to perform TLS handshake");
|
.replace_err("Failed to perform TLS handshake")?;
|
||||||
resp = send_req(request, TokioIo::new(io)).await;
|
resp = send_req(request, TokioIo::new(io)).await?;
|
||||||
} else {
|
} else {
|
||||||
resp = send_req(request, TokioIo::new(channel)).await;
|
resp = send_req(request, TokioIo::new(channel)).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
log!("{:?}", resp);
|
log!("{:?}", resp);
|
||||||
let body = resp.body_mut().collect();
|
let body = resp.body_mut().collect();
|
||||||
let body_bytes = body.await.expect_throw("Failed to get body").to_bytes();
|
let body_bytes = body.await.replace_err("Failed to get body")?.to_bytes();
|
||||||
log!("{}", std::str::from_utf8(&body_bytes).expect_throw("e"));
|
log!("{}", std::str::from_utf8(&body_bytes).replace_err("e")?);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,10 @@ macro_rules! error {
|
||||||
($($t:tt)*) => (utils::console_error(&format_args!($($t)*).to_string()))
|
($($t:tt)*) => (utils::console_error(&format_args!($($t)*).to_string()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
macro_rules! nerr {
|
||||||
|
($expr:expr) => (JsError::new($expr))
|
||||||
|
}
|
||||||
|
|
||||||
pub fn entries_of_object(obj: &Object) -> Vec<Vec<String>> {
|
pub fn entries_of_object(obj: &Object) -> Vec<Vec<String>> {
|
||||||
js_sys::Object::entries(obj)
|
js_sys::Object::entries(obj)
|
||||||
.to_vec()
|
.to_vec()
|
||||||
|
@ -45,3 +49,25 @@ pub fn entries_of_object(obj: &Object) -> Vec<Vec<String>> {
|
||||||
})
|
})
|
||||||
.collect::<Vec<Vec<_>>>()
|
.collect::<Vec<Vec<_>>>()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub trait ReplaceErr {
|
||||||
|
type Ok;
|
||||||
|
|
||||||
|
fn replace_err(self, err: &str) -> Result<Self::Ok, JsError>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, E> ReplaceErr for Result<T, E> {
|
||||||
|
type Ok = T;
|
||||||
|
|
||||||
|
fn replace_err(self, err: &str) -> Result<<Self as ReplaceErr>::Ok, JsError> {
|
||||||
|
self.map_err(|_| JsError::new(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> ReplaceErr for Option<T> {
|
||||||
|
type Ok = T;
|
||||||
|
|
||||||
|
fn replace_err(self, err: &str) -> Result<<Self as ReplaceErr>::Ok, JsError> {
|
||||||
|
self.ok_or_else(|| JsError::new(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,19 +1,27 @@
|
||||||
(async () => {
|
(async () => {
|
||||||
|
console.log(
|
||||||
|
"%cWASM is significantly slower with DevTools open!",
|
||||||
|
"color:red;font-size:2rem;font-weight:bold"
|
||||||
|
);
|
||||||
await wasm_bindgen("./wstcp_client_bg.wasm");
|
await wasm_bindgen("./wstcp_client_bg.wasm");
|
||||||
|
const tconn0 = performance.now();
|
||||||
let wstcp = await new wasm_bindgen.WsTcpWorker("wss://localhost:4000", navigator.userAgent);
|
let wstcp = await new wasm_bindgen.WsTcpWorker("wss://localhost:4000", navigator.userAgent);
|
||||||
|
const tconn1 = performance.now();
|
||||||
|
console.warn(`conn establish took ${tconn1 - tconn0} ms or ${(tconn1 - tconn0) / 1000} s`);
|
||||||
const t0 = performance.now();
|
const t0 = performance.now();
|
||||||
await wstcp.fetch("https://httpbin.org/post", {method: "POST", body: "test", headers: {"X-Header-One":"one","x-header-one":"One","X-Header-Two":"two"}});
|
await wstcp.fetch("https://httpbin.org/post", { method: "POST", body: "test", headers: { "X-Header-One": "one", "x-header-one": "One", "X-Header-Two": "two" } });
|
||||||
const t1 = performance.now();
|
const t1 = performance.now();
|
||||||
console.warn(`mux 1 took ${t1 - t0} ms or ${(t1 - t0)/1000} s`);
|
console.warn(`mux 1 took ${t1 - t0} ms or ${(t1 - t0) / 1000} s`);
|
||||||
|
|
||||||
const t2 = performance.now();
|
const t2 = performance.now();
|
||||||
await wstcp.fetch("https://httpbin.org/post", {method: "POST", body: "test", headers: {"X-Header-One":"one","x-header-one":"One","X-Header-Two":"two"}});
|
await wstcp.fetch("https://httpbin.org/post", { method: "POST", body: "test", headers: { "X-Header-One": "one", "x-header-one": "One", "X-Header-Two": "two" } });
|
||||||
const t3 = performance.now();
|
const t3 = performance.now();
|
||||||
console.warn(`mux 2 took ${t3 - t2} ms or ${(t3 - t2)/1000} s`);
|
console.warn(`mux 2 took ${t3 - t2} ms or ${(t3 - t2) / 1000} s`);
|
||||||
|
|
||||||
const t4 = performance.now();
|
const t4 = performance.now();
|
||||||
await fetch("https://httpbin.org/post", {method: "POST", body: "test", headers: {"X-Header-One":"one","x-header-one":"One","X-Header-Two":"two"}});
|
await fetch("https://httpbin.org/post", { method: "POST", body: "test", headers: { "X-Header-One": "one", "x-header-one": "One", "X-Header-Two": "two" } });
|
||||||
const t5 = performance.now();
|
const t5 = performance.now();
|
||||||
console.warn(`native took ${t5 - t4} ms or ${(t5 - t4)/1000} s`);
|
console.warn(`native took ${t5 - t4} ms or ${(t5 - t4) / 1000} s`);
|
||||||
|
|
||||||
alert()
|
alert(`conn establish took ${tconn1 - tconn0} ms or ${(tconn1 - tconn0) / 1000} s\nmux 1 took ${t1 - t0} ms or ${(t1 - t0) / 1000} s\nmux 2 took ${t3 - t2} ms or ${(t3 - t2) / 1000} s\nnative took ${t5 - t4} ms or ${(t5 - t4) / 1000} s`)
|
||||||
})();
|
})();
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue