diff --git a/Cargo.lock b/Cargo.lock index 4f61515..3db0195 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -214,12 +214,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "data-encoding" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" - [[package]] name = "digest" version = "0.10.7" @@ -271,13 +265,16 @@ dependencies = [ name = "epoxy-server" version = "1.0.0" dependencies = [ + "bytes", + "fastwebsockets", + "futures-util", "http-body-util", "hyper", "hyper-util", - "rusty-penguin", "tokio", "tokio-native-tls", - "tokio-tungstenite", + "tokio-util", + "wisp-mux", ] [[package]] @@ -302,7 +299,13 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f63dd7b57f9b33b1741fa631c9522eb35d43e96dcca4a6a91d5e4ca7c93acdc1" dependencies = [ + "base64", + "http-body-util", + "hyper", + "hyper-util", + "pin-project", "rand", + "sha1", "simdutf8", "thiserror", "tokio", @@ -340,15 +343,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" -[[package]] -name = "form_urlencoded" -version = "1.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" -dependencies = [ - "percent-encoding", -] - [[package]] name = "futures" version = "0.3.30" @@ -567,16 +561,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "idna" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" -dependencies = [ - "unicode-bidi", - "unicode-normalization", -] - [[package]] name = "itoa" version = "1.0.10" @@ -779,12 +763,6 @@ dependencies = [ "wasm-bindgen-futures", ] -[[package]] -name = "percent-encoding" -version = "2.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" - [[package]] name = "pharos" version = "0.5.3" @@ -795,6 +773,26 @@ dependencies = [ "rustc_version", ] +[[package]] +name = "pin-project" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.13" @@ -951,23 +949,6 @@ dependencies = [ "untrusted", ] -[[package]] -name = "rusty-penguin" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aefd4b85c815cf35675640924e0e73d9847bbdec8aa2e7daa8703fc5161f11d9" -dependencies = [ - "bytes", - "futures-util", - "http 0.2.11", - "parking_lot", - "rand", - "thiserror", - "tokio", - "tokio-tungstenite", - "tracing", -] - [[package]] name = "schannel" version = "0.1.23" @@ -1116,21 +1097,6 @@ dependencies = [ "syn", ] -[[package]] -name = "tinyvec" -version = "1.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" -dependencies = [ - "tinyvec_macros", -] - -[[package]] -name = "tinyvec_macros" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" - [[package]] name = "tokio" version = "1.35.1" @@ -1204,6 +1170,7 @@ dependencies = [ "futures-sink", "pin-project-lite", "tokio", + "tracing", ] [[package]] @@ -1251,14 +1218,9 @@ checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" dependencies = [ "byteorder", "bytes", - "data-encoding", - "http 1.0.0", - "httparse", "log", "rand", - "sha1", "thiserror", - "url", "utf-8", ] @@ -1268,44 +1230,18 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" -[[package]] -name = "unicode-bidi" -version = "0.3.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f2528f27a9eb2b21e69c95319b30bd0efd85d09c379741b0f78ea1d86be2416" - [[package]] name = "unicode-ident" version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" -[[package]] -name = "unicode-normalization" -version = "0.1.22" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921" -dependencies = [ - "tinyvec", -] - [[package]] name = "untrusted" version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" -[[package]] -name = "url" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" -dependencies = [ - "form_urlencoded", - "idna", - "percent-encoding", -] - [[package]] name = "utf-8" version = "0.7.6" @@ -1569,6 +1505,15 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" +[[package]] +name = "wisp-mux" +version = "0.1.0" +dependencies = [ + "bytes", + "futures", + "futures-util", +] + [[package]] name = "ws_stream_wasm" version = "0.7.4" diff --git a/Cargo.toml b/Cargo.toml index 0d2e374..1927a61 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] resolver = "2" -members = ["server", "client"] +members = ["server", "client", "wisp"] [patch.crates-io] rustls-pki-types = { git = "https://github.com/r58Playz/rustls-pki-types" } diff --git a/server/Cargo.toml b/server/Cargo.toml index 6abb7ca..c0a4a10 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -4,10 +4,13 @@ version = "1.0.0" edition = "2021" [dependencies] +bytes = "1.5.0" +fastwebsockets = { version = "0.6.0", features = ["upgrade", "simdutf8"] } +futures-util = { version = "0.3.30", features = ["sink"] } http-body-util = "0.1.0" hyper = { version = "1.1.0", features = ["server", "http1"] } hyper-util = { version = "0.1.2", features = ["tokio"] } -rusty-penguin = { version = "0.5.3", default-features = false } -tokio = { version = "1.35.1", features = ["rt-multi-thread", "net", "macros"] } +tokio = { version = "1.5.1", features = ["rt-multi-thread", "macros"] } tokio-native-tls = "0.3.1" -tokio-tungstenite = "0.21.0" +tokio-util = { version = "0.7.10", features = ["codec"] } +wisp-mux = { path = "../wisp" } diff --git a/server/src/main.rs b/server/src/main.rs index fc56579..6318929 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,176 +1,196 @@ -use std::{convert::Infallible, env, net::SocketAddr, sync::Arc}; +use std::io::Error; +use bytes::Bytes; +use fastwebsockets::{ + upgrade, CloseCode, FragmentCollector, Frame, OpCode, Payload, WebSocketError, +}; +use futures_util::{SinkExt, StreamExt}; use hyper::{ - body::Incoming, - header::{ - HeaderValue, CONNECTION, SEC_WEBSOCKET_ACCEPT, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_PROTOCOL, - SEC_WEBSOCKET_VERSION, UPGRADE, - }, - server::conn::http1, - service::service_fn, - upgrade::Upgraded, - Method, Request, Response, StatusCode, Version, + body::Incoming, header::HeaderValue, server::conn::http1, service::service_fn, Request, + Response, StatusCode, }; use hyper_util::rt::TokioIo; -use penguin_mux::{Multiplexor, MuxStream}; -use tokio::{ - net::{TcpListener, TcpStream}, - task::{JoinError, JoinSet}, -}; +use tokio::net::{TcpListener, TcpStream}; use tokio_native_tls::{native_tls, TlsAcceptor}; -use tokio_tungstenite::{ - tungstenite::{handshake::derive_accept_key, protocol::Role}, - WebSocketStream, -}; +use tokio_util::codec::{BytesCodec, Framed}; -type Body = http_body_util::Empty; +type HttpBody = http_body_util::Empty; -type MultiplexorStream = MuxStream>>; - -async fn forward(mut stream: MultiplexorStream) -> Result<(), JoinError> { - println!("forwarding"); - let host = std::str::from_utf8(&stream.dest_host).unwrap(); - let mut tcp_stream = TcpStream::connect((host, stream.dest_port)).await.unwrap(); - println!("connected to {:?}", tcp_stream.peer_addr().unwrap()); - tokio::io::copy_bidirectional(&mut stream, &mut tcp_stream) - .await - .unwrap(); - println!("finished"); - Ok(()) -} - -async fn handle_connection(ws_stream: WebSocketStream>, addr: SocketAddr) { - println!("WebSocket connection established: {}", addr); - let mux = Multiplexor::new(ws_stream, penguin_mux::Role::Server, None, None); - let mut jobs = JoinSet::new(); - println!("muxing"); - loop { - tokio::select! { - Some(result) = jobs.join_next() => { - match result { - Ok(Ok(())) => {} - Ok(Err(err)) | Err(err) => eprintln!("failed to forward: {:?}", err), - } - } - Ok(result) = mux.server_new_stream_channel() => { - jobs.spawn(forward(result)); - } - else => { - break; - } - } - } - println!("{} disconnected", &addr); -} - -async fn handle_request( - mut req: Request, - addr: SocketAddr, -) -> Result, Infallible> { - let headers = req.headers(); - let derived = headers - .get(SEC_WEBSOCKET_KEY) - .map(|k| derive_accept_key(k.as_bytes())); - - let mut negotiated_protocol: Option = None; - if let Some(protocols) = headers - .get(SEC_WEBSOCKET_PROTOCOL) - .and_then(|h| h.to_str().ok()) - { - negotiated_protocol = protocols.split(',').next().map(|h| h.trim().to_string()); - } - - if req.method() != Method::GET - || req.version() < Version::HTTP_11 - || !headers - .get(CONNECTION) - .and_then(|h| h.to_str().ok()) - .map(|h| { - h.split(|c| c == ' ' || c == ',') - .any(|p| p.eq_ignore_ascii_case("upgrade")) - }) - .unwrap_or(false) - || !headers - .get(UPGRADE) - .and_then(|h| h.to_str().ok()) - .map(|h| h.eq_ignore_ascii_case("websocket")) - .unwrap_or(false) - || !headers - .get(SEC_WEBSOCKET_VERSION) - .map(|h| h == "13") - .unwrap_or(false) - || derived.is_none() - { - return Ok(Response::new(Body::default())); - } - - let ver = req.version(); - tokio::task::spawn(async move { - match hyper::upgrade::on(&mut req).await { - Ok(upgraded) => { - let upgraded = TokioIo::new(upgraded); - handle_connection( - WebSocketStream::from_raw_socket(upgraded, Role::Server, None).await, - addr, - ) - .await; - } - Err(e) => eprintln!("upgrade error: {}", e), - } - }); - - let mut res = Response::new(Body::default()); - *res.status_mut() = StatusCode::SWITCHING_PROTOCOLS; - *res.version_mut() = ver; - res.headers_mut() - .append(CONNECTION, HeaderValue::from_static("Upgrade")); - res.headers_mut() - .append(UPGRADE, HeaderValue::from_static("websocket")); - res.headers_mut() - .append(SEC_WEBSOCKET_ACCEPT, derived.unwrap().parse().unwrap()); - if let Some(protocol) = negotiated_protocol { - res.headers_mut() - .append(SEC_WEBSOCKET_PROTOCOL, protocol.parse().unwrap()); - } - - Ok(res) -} - -#[tokio::main] -async fn main() -> Result<(), Box> { - let addr = env::args() - .nth(1) - .unwrap_or_else(|| "0.0.0.0:4000".to_string()) - .parse::()?; +#[tokio::main(flavor = "multi_thread")] +async fn main() -> Result<(), Error> { let pem = include_bytes!("./pem.pem"); let key = include_bytes!("./key.pem"); + let identity = native_tls::Identity::from_pkcs8(pem, key).expect("failed to make identity"); + let prefix = if let Some(prefix) = std::env::args().nth(1) { + prefix + } else { + "/".to_string() + }; + let port = if let Some(prefix) = std::env::args().nth(1) { + prefix + } else { + "4000".to_string() + }; - let identity = native_tls::Identity::from_pkcs8(pem, key).expect("invalid pem/key"); - - let acceptor = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity).unwrap()); - let acceptor = Arc::new(acceptor); - - let listener = TcpListener::bind(addr).await?; - - println!("listening on {}", addr); - - loop { - let (stream, remote_addr) = listener.accept().await?; - let acceptor = acceptor.clone(); + let socket = TcpListener::bind(format!("0.0.0.0:{}", port)) + .await + .expect("failed to bind"); + let acceptor = TlsAcceptor::from( + native_tls::TlsAcceptor::new(identity).expect("failed to make tls acceptor"), + ); + let acceptor = std::sync::Arc::new(acceptor); + println!("listening on 0.0.0.0:4000"); + while let Ok((stream, addr)) = socket.accept().await { + let acceptor_cloned = acceptor.clone(); + let prefix_cloned = prefix.clone(); tokio::spawn(async move { - let stream = acceptor.accept(stream).await.expect("not tls"); + let stream = acceptor_cloned.accept(stream).await.expect("not tls"); let io = TokioIo::new(stream); - - let service = service_fn(move |req| handle_request(req, remote_addr)); - + let service = service_fn(move |res| accept_http(res, addr.to_string(), prefix_cloned.clone())); let conn = http1::Builder::new() .serve_connection(io, service) .with_upgrades(); - if let Err(err) = conn.await { - eprintln!("failed to serve connection: {:?}", err); + println!("{:?}: failed to serve conn: {:?}", addr, err); } }); } + + Ok(()) } + +async fn accept_http( + mut req: Request, + addr: String, + prefix: String, +) -> Result, WebSocketError> { + if upgrade::is_upgrade_request(&req) && req.uri().path().to_string().starts_with(&prefix) { + let uri = req.uri().clone(); + let (mut res, fut) = upgrade::upgrade(&mut req)?; + + tokio::spawn(async move { + if *uri.path() != prefix { + if let Err(e) = + accept_wsproxy(fut, uri.path().to_string(), addr.clone(), prefix).await + { + println!("{:?}: error in ws handling: {:?}", addr, e); + } + } + }); + + if let Some(protocol) = req.headers().get("Sec-Websocket-Protocol") { + let first_protocol = protocol + .to_str() + .expect("failed to get protocol") + .split(',') + .next() + .expect("failed to get first protocol") + .trim(); + res.headers_mut().insert( + "Sec-Websocket-Protocol", + HeaderValue::from_str(first_protocol).unwrap(), + ); + } + + Ok(res) + } else { + Ok(Response::builder() + .status(StatusCode::OK) + .body(HttpBody::new()) + .unwrap()) + } +} + +async fn accept_wsproxy( + fut: upgrade::UpgradeFut, + incoming_uri: String, + addr: String, + prefix: String, +) -> Result<(), Box> { + let mut ws_stream = FragmentCollector::new(fut.await?); + + // should always have prefix + let incoming_uri = incoming_uri.strip_prefix(&prefix).unwrap(); + + println!("{:?}: connected", addr); + + let tcp_stream = match TcpStream::connect(incoming_uri).await { + Ok(stream) => stream, + Err(err) => { + ws_stream + .write_frame(Frame::close(CloseCode::Away.into(), b"failed to connect")) + .await + .unwrap(); + return Err(Box::new(err)); + } + }; + let mut tcp_stream_framed = Framed::new(tcp_stream, BytesCodec::new()); + + loop { + tokio::select! { + event = ws_stream.read_frame() => { + match event { + Ok(frame) => { + print!("{:?}: event ws - ", addr); + match frame.opcode { + OpCode::Text | OpCode::Binary => { + if tcp_stream_framed.send(Bytes::from(frame.payload.to_vec())).await.is_ok() { + println!("sent success"); + } else { + println!("sent FAILED"); + } + } + OpCode::Close => { + if as SinkExt>::close(&mut tcp_stream_framed).await.is_ok() { + println!("closed success"); + } else { + println!("closed FAILED"); + } + break; + } + _ => { + println!("ignored"); + } + } + }, + Err(err) => { + print!("{:?}: err in ws: {:?} - ", addr, err); + if as SinkExt>::close(&mut tcp_stream_framed).await.is_ok() { + println!("closed tcp success"); + } else { + println!("closed tcp FAILED"); + } + break; + } + } + }, + event = tcp_stream_framed.next() => { + if let Some(res) = event { + print!("{:?}: event tcp - ", addr); + match res { + Ok(buf) => { + if ws_stream.write_frame(Frame::binary(Payload::Owned(buf.to_vec()))).await.is_ok() { + println!("sent success"); + } else { + println!("sent FAILED"); + } + } + Err(_) => { + if ws_stream.write_frame(Frame::close(CloseCode::Away.into(), b"tcp side is going away")).await.is_ok() { + println!("closed success"); + } else { + println!("closed FAILED"); + } + } + } + } + } + } + } + + println!("\"{}\": connection closed", addr); + + Ok(()) +} + diff --git a/wisp/.gitignore b/wisp/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/wisp/.gitignore @@ -0,0 +1 @@ +/target diff --git a/wisp/Cargo.lock b/wisp/Cargo.lock new file mode 100644 index 0000000..19bc2ba --- /dev/null +++ b/wisp/Cargo.lock @@ -0,0 +1,320 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "bytes" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core", +] + +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" + +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" + +[[package]] +name = "futures-task" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" + +[[package]] +name = "futures-util" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "hashbrown" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" + +[[package]] +name = "libc" +version = "0.2.152" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7" + +[[package]] +name = "lock_api" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "memchr" +version = "2.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" + +[[package]] +name = "once_cell" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" + +[[package]] +name = "parking_lot_core" +version = "0.9.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "proc-macro2" +version = "1.0.76" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95fc56cda0b5c3325f5fbbd7ff9fda9e02bb00bb3dac51252d2f1bfa1cb8cc8c" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "redox_syscall" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" +dependencies = [ + "bitflags", +] + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + +[[package]] +name = "smallvec" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" + +[[package]] +name = "syn" +version = "2.0.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f3531638e407dfc0814761abb7c00a5b54992b849452a0646b7f65c9f770f3f" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "unicode-ident" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" + +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + +[[package]] +name = "wisp-mux" +version = "0.1.0" +dependencies = [ + "bytes", + "dashmap", + "futures", + "futures-util", +] diff --git a/wisp/Cargo.toml b/wisp/Cargo.toml new file mode 100644 index 0000000..a660280 --- /dev/null +++ b/wisp/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "wisp-mux" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +bytes = "1.5.0" +futures = "0.3.30" +futures-util = "0.3.30" diff --git a/wisp/src/lib.rs b/wisp/src/lib.rs new file mode 100644 index 0000000..c8147ae --- /dev/null +++ b/wisp/src/lib.rs @@ -0,0 +1,25 @@ +mod packet; +mod ws; + +pub use crate::packet::*; + +#[derive(Debug, PartialEq)] +pub enum Role { + Client, + Server, +} + +pub enum WispError { + PacketTooSmall, + InvalidPacketType, + WsFrameInvalidType, + WsFrameNotFinished, + WsImplError(Box), + Utf8Error(std::str::Utf8Error), +} + +impl From for WispError { + fn from(err: std::str::Utf8Error) -> WispError { + WispError::Utf8Error(err) + } +} diff --git a/wisp/src/packet.rs b/wisp/src/packet.rs new file mode 100644 index 0000000..b1091b8 --- /dev/null +++ b/wisp/src/packet.rs @@ -0,0 +1,237 @@ +use crate::ws; +use crate::WispError; +use bytes::{Buf, BufMut, Bytes}; + +#[derive(Debug)] +pub struct ConnectPacket { + stream_type: u8, + destination_port: u16, + destination_hostname: String, +} + +impl ConnectPacket { + pub fn new(stream_type: u8, destination_port: u16, destination_hostname: String) -> Self { + Self { + stream_type, + destination_port, + destination_hostname, + } + } +} + +impl TryFrom for ConnectPacket { + type Error = WispError; + fn try_from(mut bytes: Bytes) -> Result { + if bytes.remaining() < (1 + 2) { + return Err(Self::Error::PacketTooSmall); + } + Ok(Self { + stream_type: bytes.get_u8(), + destination_port: bytes.get_u16_le(), + destination_hostname: std::str::from_utf8(&bytes)?.to_string(), + }) + } +} + +impl From for Vec { + fn from(packet: ConnectPacket) -> Self { + let mut encoded = Self::with_capacity(1 + 2 + packet.destination_hostname.len()); + encoded.put_u8(packet.stream_type); + encoded.put_u16_le(packet.destination_port); + encoded.extend(packet.destination_hostname.bytes()); + encoded + } +} + +#[derive(Debug)] +pub struct ContinuePacket { + buffer_remaining: u32, +} + +impl ContinuePacket { + pub fn new(buffer_remaining: u32) -> Self { + Self { buffer_remaining } + } +} + +impl TryFrom for ContinuePacket { + type Error = WispError; + fn try_from(mut bytes: Bytes) -> Result { + if bytes.remaining() < 4 { + return Err(Self::Error::PacketTooSmall); + } + Ok(Self { + buffer_remaining: bytes.get_u32_le(), + }) + } +} + +impl From for Vec { + fn from(packet: ContinuePacket) -> Self { + let mut encoded = Self::with_capacity(4); + encoded.put_u32_le(packet.buffer_remaining); + encoded + } +} + +#[derive(Debug)] +pub struct ClosePacket { + reason: u8, +} + +impl ClosePacket { + pub fn new(reason: u8) -> Self { + Self { reason } + } +} + +impl TryFrom for ClosePacket { + type Error = WispError; + fn try_from(mut bytes: Bytes) -> Result { + if bytes.remaining() < 1 { + return Err(Self::Error::PacketTooSmall); + } + Ok(Self { + reason: bytes.get_u8(), + }) + } +} + +impl From for Vec { + fn from(packet: ClosePacket) -> Self { + let mut encoded = Self::with_capacity(1); + encoded.put_u8(packet.reason); + encoded + } +} + +#[derive(Debug)] +pub enum PacketType { + Connect(ConnectPacket), + Data(Vec), + Continue(ContinuePacket), + Close(ClosePacket), +} + +impl PacketType { + pub fn as_u8(&self) -> u8 { + use PacketType::*; + match self { + Connect(_) => 0x01, + Data(_) => 0x02, + Continue(_) => 0x03, + Close(_) => 0x04, + } + } +} + +impl From for Vec { + fn from(packet: PacketType) -> Self { + use PacketType::*; + match packet { + Connect(x) => x.into(), + Data(x) => x, + Continue(x) => x.into(), + Close(x) => x.into(), + } + } +} + +#[derive(Debug)] +pub struct Packet { + stream_id: u32, + packet: PacketType, +} + +impl Packet { + pub fn new(stream_id: u32, packet: PacketType) -> Self { + Self { stream_id, packet } + } + + pub fn new_connect( + stream_id: u32, + stream_type: u8, + destination_port: u16, + destination_hostname: String, + ) -> Self { + Self { + stream_id, + packet: PacketType::Connect(ConnectPacket::new( + stream_type, + destination_port, + destination_hostname, + )), + } + } + + pub fn new_data(stream_id: u32, data: Vec) -> Self { + Self { + stream_id, + packet: PacketType::Data(data), + } + } + + pub fn new_continue(stream_id: u32, buffer_remaining: u32) -> Self { + Self { + stream_id, + packet: PacketType::Continue(ContinuePacket::new(buffer_remaining)), + } + } + + pub fn new_close(stream_id: u32, reason: u8) -> Self { + Self { + stream_id, + packet: PacketType::Close(ClosePacket::new(reason)), + } + } +} + +impl TryFrom for Packet { + type Error = WispError; + fn try_from(mut bytes: Bytes) -> Result { + if bytes.remaining() < 5 { + return Err(Self::Error::PacketTooSmall); + } + let packet_type = bytes.get_u8(); + use PacketType::*; + Ok(Self { + stream_id: bytes.get_u32_le(), + packet: match packet_type { + 0x01 => Connect(ConnectPacket::try_from(bytes)?), + 0x02 => Data(bytes.to_vec()), + 0x03 => Continue(ContinuePacket::try_from(bytes)?), + 0x04 => Close(ClosePacket::try_from(bytes)?), + _ => return Err(Self::Error::InvalidPacketType), + }, + }) + } +} + +impl From for Vec { + fn from(packet: Packet) -> Self { + let mut encoded = Self::with_capacity(1 + 4); + encoded.push(packet.packet.as_u8()); + encoded.put_u32_le(packet.stream_id); + encoded.extend(Vec::::from(packet.packet)); + encoded + } +} + +impl TryFrom for Packet { + type Error = WispError; + fn try_from(frame: ws::Frame) -> Result { + if !frame.finished { + return Err(Self::Error::WsFrameNotFinished); + } + if frame.opcode != ws::OpCode::Binary { + return Err(Self::Error::WsFrameInvalidType); + } + frame.payload.try_into() + } +} + +impl From for ws::Frame { + fn from(packet: Packet) -> Self { + Self::binary(Vec::::from(packet).into()) + } +} diff --git a/wisp/src/ws.rs b/wisp/src/ws.rs new file mode 100644 index 0000000..fbb1e56 --- /dev/null +++ b/wisp/src/ws.rs @@ -0,0 +1,40 @@ +use bytes::Bytes; + +#[derive(Debug, PartialEq, Clone, Copy)] +pub enum OpCode { + Text, + Binary, + Close, +} + +pub struct Frame { + pub finished: bool, + pub opcode: OpCode, + pub payload: Bytes, +} + +impl Frame { + pub fn text(payload: Bytes) -> Self { + Self { + finished: true, + opcode: OpCode::Text, + payload, + } + } + + pub fn binary(payload: Bytes) -> Self { + Self { + finished: true, + opcode: OpCode::Binary, + payload, + } + } + + pub fn close(payload: Bytes) -> Self { + Self { + finished: true, + opcode: OpCode::Close, + payload, + } + } +}