websockets impl

This commit is contained in:
CoolElectronics 2024-01-14 12:01:41 -05:00
parent 05d55eada3
commit 4de4f06f30
No known key found for this signature in database
GPG key ID: F63593D168636C50
5 changed files with 186 additions and 38 deletions

View file

@ -4,13 +4,14 @@ mod utils;
mod tokioio;
mod wrappers;
use base64::{engine::general_purpose::STANDARD, Engine};
use fastwebsockets::{Frame, OpCode, Payload, Role, WebSocket};
use tokio::io::AsyncWriteExt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokioio::TokioIo;
use utils::{ReplaceErr, UriExt};
use wrappers::{IncomingBody, WsStreamWrapper};
use std::sync::Arc;
use std::{io::Read, ptr::null_mut, str::from_utf8, sync::Arc};
use async_compression::tokio::bufread as async_comp;
use bytes::Bytes;
@ -114,6 +115,154 @@ async fn start() {
utils::set_panic_hook();
}
#[wasm_bindgen]
pub struct WsWebSocket {
onopen: Function,
onclose: Function,
onerror: Function,
onmessage: Function,
ws: Option<WebSocket<WsTcpStream>>,
}
async fn wtf(iop: *mut WsTcpStream) {
let mut t = false;
unsafe {
let io = &mut *iop;
loop {
let r = io.read_u8().await;
if let Ok(u) = r {
// log!("{}", u as char);
if t && u as char == '\r' {
let r = io.read_u8().await;
break;
}
if u as char == '\n' {
t = true;
} else {
t = false;
}
} else {
break;
}
}
}
}
#[wasm_bindgen]
impl WsWebSocket {
#[wasm_bindgen(constructor)]
pub fn new(
onopen: Function,
onclose: Function,
onmessage: Function,
onerror: Function,
) -> Result<WsWebSocket, JsError> {
Ok(Self {
onopen,
onclose,
onerror,
onmessage,
ws: None,
})
}
#[wasm_bindgen]
pub async fn connect(
&mut self,
tcp: &mut WsTcp,
url: String,
protocols: Vec<String>,
host: String,
) -> Result<(), JsError> {
self.onopen.call0(&Object::default());
let uri = url.parse::<uri::Uri>().replace_err("Failed to parse URL")?;
let mut io = tcp.get_http_io(&uri).await?;
let r: [u8; 16] = rand::random();
let key = STANDARD.encode(&r);
io.write(b"GET / HTTP/1.1\r\n").await;
io.write(b"Sec-WebSocket-Version: 13\r\n").await;
io.write(format!("Sec-WebSocket-Key: {}\r\n", key).as_bytes())
.await;
io.write(b"Connection: Upgrade\r\n").await;
io.write(b"Upgrade: websocket\r\n").await;
io.write(format!("Host: {}\r\n", host).as_bytes()).await;
io.write(b"\r\n").await;
let iop: *mut WsTcpStream = &mut io;
wtf(iop).await;
let mut ws = WebSocket::after_handshake(io, fastwebsockets::Role::Client);
ws.set_writev(false);
ws.set_auto_close(true);
ws.set_auto_pong(true);
self.ws = Some(ws);
Ok(())
}
#[wasm_bindgen]
pub fn ptr(&mut self) -> *mut WsWebSocket {
self
}
#[wasm_bindgen]
pub async fn send(&mut self, payload: String) -> Result<(), JsError> {
let Some(ws) = self.ws.as_mut() else {
return Err(JsError::new("Tried to send() before handshake!"));
};
ws.write_frame(Frame::new(
true,
OpCode::Text,
None,
Payload::Owned(payload.as_bytes().to_vec()),
))
.await
.replace_err("Failed to send WsWebSocket payload")?;
Ok(())
}
#[wasm_bindgen]
pub async fn recv(&mut self) -> Result<(), JsError> {
let Some(ws) = self.ws.as_mut() else {
return Err(JsError::new("Tried to recv() before handshake!"));
};
loop {
let Ok(frame) = ws.read_frame().await else {
break;
};
if frame.opcode == OpCode::Text {
if let Ok(str) = from_utf8(&frame.payload) {
self.onmessage
.call1(&JsValue::null(), &jval!(str))
.replace_err("missing onmessage handler")?;
}
} else if frame.opcode == OpCode::Binary {
self.onmessage
.call1(
&JsValue::null(),
&jval!(Uint8Array::from(frame.payload.to_vec().as_slice())),
)
.replace_err("missing onmessage handler")?;
}
}
self.onclose
.call0(&JsValue::null())
.replace_err("missing onclose handler")?;
Ok(())
}
}
#[wasm_bindgen]
pub async fn send(pointer: *mut WsWebSocket, payload: String) -> Result<(), JsError> {
let tcp = unsafe { &mut *pointer };
tcp.send(payload).await
}
#[wasm_bindgen]
pub struct WsTcp {
rustls_config: Arc<rustls::ClientConfig>,
@ -164,6 +313,10 @@ impl WsTcp {
redirect_limit,
})
}
#[wasm_bindgen]
pub fn ptr(&mut self) -> *mut WsTcp {
self as *mut Self
}
async fn get_http_io(&self, url: &Uri) -> Result<WsTcpStream, JsError> {
let url_host = url.host().replace_err("URL must have a host")?;
@ -192,41 +345,6 @@ impl WsTcp {
}
}
#[wasm_bindgen]
pub async fn connect_ws(
&self,
url: String,
protocols: Vec<String>,
onopen: Function,
onclose: Function,
onmessage: Function,
onerror: Function,
host: String,
) -> Result<JsValue, JsError> {
onopen.call0(&Object::default());
let uri = url.parse::<uri::Uri>().replace_err("Failed to parse URL")?;
let mut io = self.get_http_io(&uri).await?;
let mut a = WebSocket::after_handshake(io, fastwebsockets::Role::Client);
a.set_writev(false);
a.set_auto_close(true);
a.set_auto_pong(true);
a.write_frame(Frame::new(
true,
OpCode::Text,
None,
Payload::Owned(b"aasdfdfhsdfhkadfhsdfkhjasfkajdfhaksjhfkadhfkashdfkhsd".to_vec()),
))
.await;
// .await
// .replace_err("Failed to connect to host")?;
let closure = Closure::<dyn FnMut(JsValue)>::new(move |data: JsValue| {
log!("WaeASDASd");
});
Ok(closure.into_js_value())
}
async fn send_req(
&self,
req: http::Request<HttpBody>,