From d508f90a6288128383309978e0737cfd44f42118 Mon Sep 17 00:00:00 2001 From: Toshit Chawda Date: Tue, 9 Jan 2024 08:20:14 -0800 Subject: [PATCH] initial multiplexor --- .gitignore | 4 +- Cargo.lock | 582 ++++++++++++++++++++++++++++++---- Cargo.toml | 21 +- README.md | 9 +- client/Cargo.toml | 34 ++ client/build.sh | 7 + client/src/lib.rs | 151 +++++++++ client/src/tokioio.rs | 171 ++++++++++ client/src/utils.rs | 29 ++ client/src/web/index.html | 9 + client/src/web/index.js | 5 + client/src/wsstreamwrapper.rs | 121 +++++++ server/Cargo.toml | 13 + server/src/main.rs | 176 ++++++++++ src/main.rs | 178 ----------- 15 files changed, 1253 insertions(+), 257 deletions(-) create mode 100644 client/Cargo.toml create mode 100644 client/build.sh create mode 100644 client/src/lib.rs create mode 100644 client/src/tokioio.rs create mode 100644 client/src/utils.rs create mode 100644 client/src/web/index.html create mode 100644 client/src/web/index.js create mode 100644 client/src/wsstreamwrapper.rs create mode 100644 server/Cargo.toml create mode 100644 server/src/main.rs delete mode 100644 src/main.rs diff --git a/.gitignore b/.gitignore index 80ccd66..b5a9cb3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ /target -src/*.pem +server/src/*.pem +client/pkg +client/out diff --git a/Cargo.lock b/Cargo.lock index 95ebd76..a7ab1f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,18 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "async_io_stream" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6d7b9decdf35d8908a7e3ef02f64c5e9b1695e230154c0e8de3969142d9b94c" +dependencies = [ + "futures", + "pharos", + "rustc_version", + "tokio", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -38,12 +50,6 @@ dependencies = [ "rustc-demangle", ] -[[package]] -name = "base64" -version = "0.21.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "414dcefbc63d77c526a76b3afcf6fbb9b5e2791c19c3aa2297733208750c6e53" - [[package]] name = "bitflags" version = "1.3.2" @@ -65,6 +71,18 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bumpalo" +version = "3.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" + +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.5.0" @@ -86,6 +104,16 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "console_error_panic_hook" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06aeb73f470f66dcdbf7223caeebb85984942f22f1adb2a088cf9668146bbbc" +dependencies = [ + "cfg-if", + "wasm-bindgen", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -121,6 +149,12 @@ dependencies = [ "typenum", ] +[[package]] +name = "data-encoding" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" + [[package]] name = "digest" version = "0.10.7" @@ -147,25 +181,6 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" -[[package]] -name = "fastwebsockets" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f63dd7b57f9b33b1741fa631c9522eb35d43e96dcca4a6a91d5e4ca7c93acdc1" -dependencies = [ - "base64", - "http-body-util", - "hyper", - "hyper-util", - "pin-project", - "rand", - "sha1", - "simdutf8", - "thiserror", - "tokio", - "utf-8", -] - [[package]] name = "fnv" version = "1.0.7" @@ -187,6 +202,30 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" +[[package]] +name = "form_urlencoded" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" +dependencies = [ + "percent-encoding", +] + +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.30" @@ -194,6 +233,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -202,6 +242,23 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + [[package]] name = "futures-macro" version = "0.3.30" @@ -231,10 +288,13 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", + "futures-io", "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -257,8 +317,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -273,6 +335,17 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" +[[package]] +name = "http" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8947b1a6fad4393052c7ba1f4cd97bed3e953a95c79c92ad9b051a04611d9fbb" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http" version = "1.0.0" @@ -291,7 +364,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" dependencies = [ "bytes", - "http", + "http 1.0.0", ] [[package]] @@ -302,7 +375,7 @@ checksum = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840" dependencies = [ "bytes", "futures-util", - "http", + "http 1.0.0", "http-body", "pin-project-lite", ] @@ -328,7 +401,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "http", + "http 1.0.0", "http-body", "httparse", "httpdate", @@ -347,7 +420,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "http", + "http 1.0.0", "http-body", "hyper", "pin-project-lite", @@ -356,12 +429,31 @@ dependencies = [ "tracing", ] +[[package]] +name = "idna" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "itoa" version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" +[[package]] +name = "js-sys" +version = "0.3.66" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cee9c64da59eae3b50095c18d3e74f8b73c0b86d2792824ff01bbce68ba229ca" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -370,9 +462,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.151" +version = "0.2.152" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" +checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7" [[package]] name = "linux-raw-sys" @@ -380,6 +472,16 @@ version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456" +[[package]] +name = "lock_api" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "log" version = "0.4.20" @@ -500,23 +602,59 @@ dependencies = [ ] [[package]] -name = "pin-project" -version = "1.1.3" +name = "parking_lot" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ - "pin-project-internal", + "lock_api", + "parking_lot_core", ] [[package]] -name = "pin-project-internal" -version = "1.1.3" +name = "parking_lot_core" +version = "0.9.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" +checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" dependencies = [ - "proc-macro2", - "quote", - "syn", + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets 0.48.5", +] + +[[package]] +name = "penguin-mux-wasm" +version = "0.1.0" +source = "git+https://github.com/r58Playz/penguin-mux-wasm#69b413aedb6f50f55eac646fda361abe430eb022" +dependencies = [ + "bytes", + "futures-util", + "http 0.2.11", + "parking_lot", + "rand", + "thiserror", + "tokio", + "tokio-tungstenite", + "tracing", + "wasm-bindgen-futures", +] + +[[package]] +name = "percent-encoding" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" + +[[package]] +name = "pharos" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9567389417feee6ce15dd6527a8a1ecac205ef62c2932bcf3d9f6fc5b78b414" +dependencies = [ + "futures", + "rustc_version", ] [[package]] @@ -545,9 +683,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" -version = "1.0.75" +version = "1.0.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "907a61bd0f64c2f29cd1cf1dc34d05176426a3f504a78010f08416ddb7b13708" +checksum = "95fc56cda0b5c3325f5fbbd7ff9fda9e02bb00bb3dac51252d2f1bfa1cb8cc8c" dependencies = [ "unicode-ident", ] @@ -600,12 +738,35 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "ring" +version = "0.17.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "688c63d65483050968b2a8937f7995f443e27041a0f7700aa59b0822aedebb74" +dependencies = [ + "cc", + "getrandom", + "libc", + "spin", + "untrusted", + "windows-sys 0.48.0", +] + [[package]] name = "rustc-demangle" version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + [[package]] name = "rustix" version = "0.38.28" @@ -619,6 +780,56 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustls" +version = "0.22.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41" +dependencies = [ + "log", + "ring", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + +[[package]] +name = "rustls-pki-types" +version = "1.1.0" +source = "git+https://github.com/r58Playz/rustls-pki-types#685721bb4b819c7da4724f07cffe06173f8cc883" +dependencies = [ + "wasm-bindgen", +] + +[[package]] +name = "rustls-webpki" +version = "0.102.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef4ca26037c909dedb327b48c3327d0ba91d3dd3c4e05dad328f210ffb68e95b" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", +] + +[[package]] +name = "rusty-penguin" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aefd4b85c815cf35675640924e0e73d9847bbdec8aa2e7daa8703fc5161f11d9" +dependencies = [ + "bytes", + "futures-util", + "http 0.2.11", + "parking_lot", + "rand", + "thiserror", + "tokio", + "tokio-tungstenite", + "tracing", +] + [[package]] name = "schannel" version = "0.1.23" @@ -628,6 +839,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "security-framework" version = "2.9.2" @@ -651,6 +868,18 @@ dependencies = [ "libc", ] +[[package]] +name = "semver" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0" + +[[package]] +name = "send_wrapper" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" + [[package]] name = "sha1" version = "0.10.6" @@ -662,12 +891,6 @@ dependencies = [ "digest", ] -[[package]] -name = "simdutf8" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" - [[package]] name = "slab" version = "0.4.9" @@ -677,6 +900,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "smallvec" +version = "1.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" + [[package]] name = "socket2" version = "0.5.5" @@ -687,6 +916,18 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" + +[[package]] +name = "subtle" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" + [[package]] name = "syn" version = "2.0.48" @@ -731,6 +972,21 @@ dependencies = [ "syn", ] +[[package]] +name = "tinyvec" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokio" version = "1.35.1" @@ -742,6 +998,7 @@ dependencies = [ "libc", "mio", "num_cpus", + "parking_lot", "pin-project-lite", "socket2", "tokio-macros", @@ -770,17 +1027,26 @@ dependencies = [ ] [[package]] -name = "tokio-util" -version = "0.7.10" +name = "tokio-rustls" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" +checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" dependencies = [ - "bytes", - "futures-core", - "futures-sink", - "pin-project-lite", + "rustls", + "rustls-pki-types", "tokio", - "tracing", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", ] [[package]] @@ -790,9 +1056,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tracing-core" version = "0.1.32" @@ -808,18 +1086,69 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.0.0", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "typenum" version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "unicode-bidi" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f2528f27a9eb2b21e69c95319b30bd0efd85d09c379741b0f78ea1d86be2416" + [[package]] name = "unicode-ident" version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" +[[package]] +name = "unicode-normalization" +version = "0.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921" +dependencies = [ + "tinyvec", +] + +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + +[[package]] +name = "url" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", +] + [[package]] name = "utf-8" version = "0.7.6" @@ -853,6 +1182,91 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasm-bindgen" +version = "0.2.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ed0d4f68a3015cc185aff4db9506a015f4b96f95303897bfa23f846db54064e" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b56f625e64f3a1084ded111c4d5f477df9f8c92df113852fa5a374dbda78826" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac36a15a220124ac510204aec1c3e5db8a22ab06fd6706d881dc6149f8ed9a12" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0162dbf37223cd2afce98f3d0785506dcb8d266223983e4b5b525859e6e182b2" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ab9b36309365056cd639da3134bf87fa8f3d86008abf99e612384a6eecd459f" + +[[package]] +name = "web-sys" +version = "0.3.66" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50c24a44ec86bb68fbecd1b3efed7e85ea5621b39b35ef2766b66cd984f8010f" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "webpki-roots" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de2cfda980f21be5a7ed2eadb3e6fe074d56022bea2cdeb1a62eb220fc04188" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "windows-sys" version = "0.48.0" @@ -986,16 +1400,62 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" [[package]] -name = "wsfetch-server" -version = "0.1.0" +name = "ws_stream_wasm" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7999f5f4217fe3818726b66257a4475f71e74ffd190776ad053fa159e50737f5" +dependencies = [ + "async_io_stream", + "futures", + "js-sys", + "log", + "pharos", + "rustc_version", + "send_wrapper", + "thiserror", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + +[[package]] +name = "wstcp-client" +version = "1.0.0" dependencies = [ "bytes", - "fastwebsockets", + "console_error_panic_hook", "futures-util", + "getrandom", + "http 1.0.0", + "http-body-util", + "hyper", + "js-sys", + "penguin-mux-wasm", + "pin-project-lite", + "ring", + "tokio", + "tokio-rustls", + "wasm-bindgen", + "wasm-bindgen-futures", + "webpki-roots", + "ws_stream_wasm", +] + +[[package]] +name = "wstcp-server" +version = "1.0.0" +dependencies = [ "http-body-util", "hyper", "hyper-util", + "rusty-penguin", "tokio", "tokio-native-tls", - "tokio-util", + "tokio-tungstenite", ] + +[[package]] +name = "zeroize" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d" diff --git a/Cargo.toml b/Cargo.toml index 0c8a670..0d2e374 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,17 +1,6 @@ -[package] -name = "wsfetch-server" -version = "0.1.0" -edition = "2021" +[workspace] +resolver = "2" +members = ["server", "client"] -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -bytes = "1.5.0" -fastwebsockets = { version = "0.6.0", features = ["upgrade", "simdutf8"] } -futures-util = { version = "0.3.30", features = ["sink"] } -http-body-util = "0.1.0" -hyper = { version = "1.1.0", features = ["server", "http1"] } -hyper-util = { version = "0.1.2", features = ["tokio"] } -tokio = { version = "1.5.1", features = ["rt-multi-thread", "macros"] } -tokio-native-tls = "0.3.1" -tokio-util = { version = "0.7.10", features = ["codec"] } +[patch.crates-io] +rustls-pki-types = { git = "https://github.com/r58Playz/rustls-pki-types" } diff --git a/README.md b/README.md index 44ae2c1..8cf774a 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,11 @@ # wsproxy-rust ## Building -Place your certs in the source folder, public named `pem.pem` and private named `key.pem` + +### Server +1. Place your certs in the source folder, public named `pem.pem` and private named `key.pem` +2. Run `cargo r --bin wstcp-server`, optionally with `-r` flag for release + +### Client +1. Make sure you have the `wasm32-unknown-unknown` target installed, `wasm-bindgen` executable installed, and `bash`, `python3` packages (`python3` is used for `http.server` module) +2. Run `bash build.sh` to build and start a webserver diff --git a/client/Cargo.toml b/client/Cargo.toml new file mode 100644 index 0000000..2896c8d --- /dev/null +++ b/client/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "wstcp-client" +version = "1.0.0" +edition = "2021" + +[lib] +crate-type = ["cdylib"] + +[features] +default = ["console_error_panic_hook"] + +[dependencies] +bytes = "1.5.0" +console_error_panic_hook = { version = "0.1.7", optional = true } +http = "1.0.0" +http-body-util = "0.1.0" +hyper = { version = "1.1.0", features = ["client", "http1"] } +pin-project-lite = "0.2.13" +penguin-mux-wasm = { git = "https://github.com/r58Playz/penguin-mux-wasm" } +tokio = { version = "1.35.1", default_features = false } +wasm-bindgen = "0.2" +wasm-bindgen-futures = "0.4.39" +ws_stream_wasm = { version = "0.7.4", features = ["tokio_io"] } +futures-util = "0.3.30" +js-sys = "0.3.66" +webpki-roots = "0.26.0" +tokio-rustls = "0.25.0" + +[dependencies.getrandom] +features = ["js"] + +[dependencies.ring] +features = ["wasm32_unknown_unknown_js"] + diff --git a/client/build.sh b/client/build.sh new file mode 100644 index 0000000..7caf25b --- /dev/null +++ b/client/build.sh @@ -0,0 +1,7 @@ +set -e +rm -rf out/ || true +mkdir out/ +cargo build --target wasm32-unknown-unknown -r +wasm-bindgen --weak-refs --no-typescript --target no-modules --out-dir out/ ../target/wasm32-unknown-unknown/release/wstcp_client.wasm +cp -r src/web/* out/ +(cd out; python3 -m http.server) diff --git a/client/src/lib.rs b/client/src/lib.rs new file mode 100644 index 0000000..922bee6 --- /dev/null +++ b/client/src/lib.rs @@ -0,0 +1,151 @@ +#[macro_use] +mod utils; +mod tokioio; +mod wsstreamwrapper; + +use tokioio::TokioIo; +use wsstreamwrapper::WsStreamWrapper; + +use std::sync::Arc; + +use bytes::Bytes; +use http::{uri, Request}; +use hyper::{body::Incoming, client::conn as hyper_conn}; +use js_sys::Object; +use penguin_mux_wasm::{Multiplexor, MuxStream, Role}; +use tokio_rustls::{client::TlsStream, rustls, rustls::RootCertStore, TlsConnector}; +use wasm_bindgen::prelude::*; + +type MuxIo = TokioIo>; +type MuxRustlsIo = TokioIo>>; +type HttpBody = http_body_util::Full; + +#[wasm_bindgen(start)] +async fn start() { + utils::set_panic_hook(); +} + +#[wasm_bindgen] +pub struct WsTcpWorker { + rustls_config: Arc, + mux: Multiplexor, +} + +#[wasm_bindgen] +impl WsTcpWorker { + #[wasm_bindgen(constructor)] + pub async fn new(ws_url: String) -> Result { + let ws_uri = ws_url + .parse::() + .expect_throw("Failed to parse websocket URL"); + let ws_uri_scheme = ws_uri + .scheme_str() + .expect_throw("Websocket URL must have a scheme"); + if ws_uri_scheme != "ws" && ws_uri_scheme != "wss" { + return Err("Scheme must be either `ws` or `wss`".into()); + } + + debug!("connecting to ws {:?}", ws_url); + let (ws, wsmeta) = WsStreamWrapper::connect(ws_url, None) + .await + .expect_throw("Failed to connect to websocket"); + debug!("connected!"); + let mux = Multiplexor::new(ws, Role::Client, None, None); + + debug!("wsmeta ready state: {:?}", wsmeta.ready_state()); + + let mut certstore = RootCertStore::empty(); + certstore.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); + + let rustls_config = Arc::new( + rustls::ClientConfig::builder() + .with_root_certificates(certstore) + .with_no_client_auth(), + ); + + Ok(WsTcpWorker { mux, rustls_config }) + } + + pub async fn fetch(&self, url: String, options: Object) -> Result<(), JsValue> { + let uri = url.parse::().expect_throw("Failed to parse URL"); + let uri_scheme = uri.scheme().expect_throw("URL must have a scheme"); + if *uri_scheme != uri::Scheme::HTTP && *uri_scheme != uri::Scheme::HTTPS { + return Err("Scheme must be either `http` or `https`".into()); + } + let uri_host = uri.host().expect_throw("URL must have a host"); + let uri_port = if let Some(port) = uri.port() { + port.as_u16() + } else { + // can't use match, compiler error + // error: to use a constant of type `Scheme` in a pattern, `Scheme` must be annotated with `#[derive(PartialEq, Eq)]` + if *uri_scheme == uri::Scheme::HTTP { + 80 + } else if *uri_scheme == uri::Scheme::HTTPS { + 443 + } else { + return Err("Failed to coerce port from scheme".into()); + } + }; + + let channel = self + .mux + .client_new_stream_channel(uri_host.as_bytes(), uri_port) + .await + .expect_throw("Failed to create multiplexor channel"); + + let request = Request::builder() + .header("Host", uri_host) + .header("Connection", "close") + .method("GET") + .body(HttpBody::new(Bytes::new())) + .expect_throw("Failed to create request"); + + let resp: hyper::Response; + + if *uri_scheme == uri::Scheme::HTTPS { + let cloned_uri = uri_host.to_string().clone(); + let connector = TlsConnector::from(self.rustls_config.clone()); + let io = connector + .connect( + cloned_uri + .try_into() + .expect_throw("Failed to parse URL (rustls)"), + channel, + ) + .await + .expect_throw("Failed to perform TLS handshake"); + let io = TokioIo::new(io); + let (mut req_sender, conn) = hyper_conn::http1::handshake::(io) + .await + .expect_throw("Failed to connect to host"); + + wasm_bindgen_futures::spawn_local(async move { + if let Err(e) = conn.await { + error!("wstcp: error in muxed hyper connection! {:?}", e); + } + }); + + debug!("sending req tls"); + resp = req_sender.send_request(request).await.expect_throw("Failed to send request"); + debug!("recieved resp"); + } else { + let io = TokioIo::new(channel); + let (mut req_sender, conn) = hyper_conn::http1::handshake::(io) + .await + .expect_throw("Failed to connect to host"); + + wasm_bindgen_futures::spawn_local(async move { + if let Err(e) = conn.await { + error!("err in conn: {:?}", e); + } + }); + debug!("sending req"); + resp = req_sender.send_request(request).await.expect_throw("Failed to send request"); + debug!("recieved resp"); + } + + log!("{:?}", resp); + + Ok(()) + } +} diff --git a/client/src/tokioio.rs b/client/src/tokioio.rs new file mode 100644 index 0000000..7d6acc0 --- /dev/null +++ b/client/src/tokioio.rs @@ -0,0 +1,171 @@ +#![allow(dead_code)] +// Taken from https://github.com/hyperium/hyper-util/blob/master/src/rt/tokio.rs +// hyper-util fails to compile on WASM as it has a dependency on socket2, but I only need +// hyper-util for TokioIo. + +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use pin_project_lite::pin_project; + +pin_project! { + /// A wrapping implementing hyper IO traits for a type that + /// implements Tokio's IO traits. + #[derive(Debug)] + pub struct TokioIo { + #[pin] + inner: T, + } +} + +impl TokioIo { + /// Wrap a type implementing Tokio's IO traits. + pub fn new(inner: T) -> Self { + Self { inner } + } + + /// Borrow the inner type. + pub fn inner(&self) -> &T { + &self.inner + } + + /// Mut borrow the inner type. + pub fn inner_mut(&mut self) -> &mut T { + &mut self.inner + } + + /// Consume this wrapper and get the inner type. + pub fn into_inner(self) -> T { + self.inner + } +} + +impl hyper::rt::Read for TokioIo +where + T: tokio::io::AsyncRead, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + mut buf: hyper::rt::ReadBufCursor<'_>, + ) -> Poll> { + let n = unsafe { + let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut()); + match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) { + Poll::Ready(Ok(())) => tbuf.filled().len(), + other => return other, + } + }; + + unsafe { + buf.advance(n); + } + Poll::Ready(Ok(())) + } +} + +impl hyper::rt::Write for TokioIo +where + T: tokio::io::AsyncWrite, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + tokio::io::AsyncWrite::poll_flush(self.project().inner, cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx) + } + + fn is_write_vectored(&self) -> bool { + tokio::io::AsyncWrite::is_write_vectored(&self.inner) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs) + } +} + +impl tokio::io::AsyncRead for TokioIo +where + T: hyper::rt::Read, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + tbuf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + //let init = tbuf.initialized().len(); + let filled = tbuf.filled().len(); + let sub_filled = unsafe { + let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut()); + + match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) { + Poll::Ready(Ok(())) => buf.filled().len(), + other => return other, + } + }; + + let n_filled = filled + sub_filled; + // At least sub_filled bytes had to have been initialized. + let n_init = sub_filled; + unsafe { + tbuf.assume_init(n_init); + tbuf.set_filled(n_filled); + } + + Poll::Ready(Ok(())) + } +} + +impl tokio::io::AsyncWrite for TokioIo +where + T: hyper::rt::Write, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + hyper::rt::Write::poll_write(self.project().inner, cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + hyper::rt::Write::poll_flush(self.project().inner, cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + hyper::rt::Write::poll_shutdown(self.project().inner, cx) + } + + fn is_write_vectored(&self) -> bool { + hyper::rt::Write::is_write_vectored(&self.inner) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs) + } +} diff --git a/client/src/utils.rs b/client/src/utils.rs new file mode 100644 index 0000000..6f96576 --- /dev/null +++ b/client/src/utils.rs @@ -0,0 +1,29 @@ +use wasm_bindgen::prelude::*; + +pub fn set_panic_hook() { + #[cfg(feature = "console_error_panic_hook")] + console_error_panic_hook::set_once(); +} + +#[wasm_bindgen] +extern "C" { + #[wasm_bindgen(js_namespace = console, js_name = debug)] + pub fn console_debug(s: &str); + #[wasm_bindgen(js_namespace = console, js_name = log)] + pub fn console_log(s: &str); + #[wasm_bindgen(js_namespace = console, js_name = error)] + pub fn console_error(s: &str); +} + + +macro_rules! debug { + ($($t:tt)*) => (utils::console_debug(&format_args!($($t)*).to_string())) +} + +macro_rules! log { + ($($t:tt)*) => (utils::console_log(&format_args!($($t)*).to_string())) +} + +macro_rules! error { + ($($t:tt)*) => (utils::console_error(&format_args!($($t)*).to_string())) +} diff --git a/client/src/web/index.html b/client/src/web/index.html new file mode 100644 index 0000000..1db0a54 --- /dev/null +++ b/client/src/web/index.html @@ -0,0 +1,9 @@ + + + wstcp + + + + + + diff --git a/client/src/web/index.js b/client/src/web/index.js new file mode 100644 index 0000000..b79f2eb --- /dev/null +++ b/client/src/web/index.js @@ -0,0 +1,5 @@ +(async () => { + await wasm_bindgen("./wstcp_client_bg.wasm"); + let wstcp = await new wasm_bindgen.WsTcpWorker("wss://localhost:4000"); + await wstcp.fetch("https://alicesworld.tech"); +})(); diff --git a/client/src/wsstreamwrapper.rs b/client/src/wsstreamwrapper.rs new file mode 100644 index 0000000..ed20095 --- /dev/null +++ b/client/src/wsstreamwrapper.rs @@ -0,0 +1,121 @@ +use crate::*; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use futures_util::{Sink, Stream}; +use penguin_mux_wasm::ws; +use pin_project_lite::pin_project; +use ws_stream_wasm::{WsErr, WsMessage, WsMeta, WsStream}; + +pin_project! { + pub struct WsStreamWrapper { + #[pin] + ws: WsStream, + } +} + +impl WsStreamWrapper { + pub async fn connect( + url: impl AsRef, + protocols: impl Into>>, + ) -> Result<(Self, WsMeta), WsErr> { + let (wsmeta, wsstream) = WsMeta::connect(url, protocols).await?; + debug!("readystate {:?}", wsstream.ready_state()); + Ok((WsStreamWrapper { ws: wsstream }, wsmeta)) + } +} + +impl Stream for WsStreamWrapper { + type Item = Result; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + debug!("poll_next: {:?}", cx); + let this = self.project(); + let ret = this.ws.poll_next(cx); + match ret { + Poll::Ready(item) => Poll::>::Ready(item.map(|x| { + Ok(match x { + WsMessage::Text(txt) => ws::Message::Text(txt), + WsMessage::Binary(bin) => ws::Message::Binary(bin), + }) + })), + Poll::Pending => Poll::>::Pending, + } + } +} + +fn wserr_to_ws_err(err: WsErr) -> ws::Error { + debug!("err: {:?}", err); + match err { + WsErr::ConnectionNotOpen => ws::Error::AlreadyClosed, + _ => ws::Error::Io(std::io::Error::other(format!("{:?}", err))), + } +} + +impl Sink for WsStreamWrapper { + type Error = ws::Error; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + debug!("poll_ready: {:?}", cx); + let this = self.project(); + let ret = this.ws.poll_ready(cx); + match ret { + Poll::Ready(item) => Poll::>::Ready(match item { + Ok(_) => Ok(()), + Err(err) => Err(wserr_to_ws_err(err)), + }), + Poll::Pending => Poll::>::Pending, + } + } + + fn start_send(self: Pin<&mut Self>, item: ws::Message) -> Result<(), Self::Error> { + debug!("start_send: {:?}", item); + use ws::Message::*; + let item = match item { + Text(txt) => WsMessage::Text(txt), + Binary(bin) => WsMessage::Binary(bin), + Close(_) => { + debug!("closing"); + return match self.ws.wrapped().close() { + Ok(_) => Ok(()), + Err(err) => Err(ws::Error::Io(std::io::Error::other(format!( + "ws close err: {:?}", + err + )))), + } + } + Ping(_) | Pong(_) | Frame(_) => return Ok(()), + }; + let this = self.project(); + let ret = this.ws.start_send(item); + match ret { + Ok(_) => Ok(()), + Err(err) => Err(wserr_to_ws_err(err)), + } + } + + // no point wrapping this as it's not going to do anything + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Ok(()).into() + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + debug!("poll closing {:?}", cx); + let this = self.project(); + let ret = this.ws.poll_close(cx); + match ret { + Poll::Ready(item) => Poll::>::Ready(match item { + Ok(_) => Ok(()), + Err(err) => Err(wserr_to_ws_err(err)), + }), + Poll::Pending => Poll::>::Pending, + } + } +} + +impl ws::WebSocketStream for WsStreamWrapper { + fn ping_auto_pong(&self) -> bool { + true + } +} diff --git a/server/Cargo.toml b/server/Cargo.toml new file mode 100644 index 0000000..a1f7d0a --- /dev/null +++ b/server/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "wstcp-server" +version = "1.0.0" +edition = "2021" + +[dependencies] +http-body-util = "0.1.0" +hyper = { version = "1.1.0", features = ["server", "http1"] } +hyper-util = { version = "0.1.2", features = ["tokio"] } +rusty-penguin = { version = "0.5.3", default-features = false } +tokio = { version = "1.35.1", features = ["rt-multi-thread", "net", "macros"] } +tokio-native-tls = "0.3.1" +tokio-tungstenite = "0.21.0" diff --git a/server/src/main.rs b/server/src/main.rs new file mode 100644 index 0000000..fc56579 --- /dev/null +++ b/server/src/main.rs @@ -0,0 +1,176 @@ +use std::{convert::Infallible, env, net::SocketAddr, sync::Arc}; + +use hyper::{ + body::Incoming, + header::{ + HeaderValue, CONNECTION, SEC_WEBSOCKET_ACCEPT, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_PROTOCOL, + SEC_WEBSOCKET_VERSION, UPGRADE, + }, + server::conn::http1, + service::service_fn, + upgrade::Upgraded, + Method, Request, Response, StatusCode, Version, +}; +use hyper_util::rt::TokioIo; +use penguin_mux::{Multiplexor, MuxStream}; +use tokio::{ + net::{TcpListener, TcpStream}, + task::{JoinError, JoinSet}, +}; +use tokio_native_tls::{native_tls, TlsAcceptor}; +use tokio_tungstenite::{ + tungstenite::{handshake::derive_accept_key, protocol::Role}, + WebSocketStream, +}; + +type Body = http_body_util::Empty; + +type MultiplexorStream = MuxStream>>; + +async fn forward(mut stream: MultiplexorStream) -> Result<(), JoinError> { + println!("forwarding"); + let host = std::str::from_utf8(&stream.dest_host).unwrap(); + let mut tcp_stream = TcpStream::connect((host, stream.dest_port)).await.unwrap(); + println!("connected to {:?}", tcp_stream.peer_addr().unwrap()); + tokio::io::copy_bidirectional(&mut stream, &mut tcp_stream) + .await + .unwrap(); + println!("finished"); + Ok(()) +} + +async fn handle_connection(ws_stream: WebSocketStream>, addr: SocketAddr) { + println!("WebSocket connection established: {}", addr); + let mux = Multiplexor::new(ws_stream, penguin_mux::Role::Server, None, None); + let mut jobs = JoinSet::new(); + println!("muxing"); + loop { + tokio::select! { + Some(result) = jobs.join_next() => { + match result { + Ok(Ok(())) => {} + Ok(Err(err)) | Err(err) => eprintln!("failed to forward: {:?}", err), + } + } + Ok(result) = mux.server_new_stream_channel() => { + jobs.spawn(forward(result)); + } + else => { + break; + } + } + } + println!("{} disconnected", &addr); +} + +async fn handle_request( + mut req: Request, + addr: SocketAddr, +) -> Result, Infallible> { + let headers = req.headers(); + let derived = headers + .get(SEC_WEBSOCKET_KEY) + .map(|k| derive_accept_key(k.as_bytes())); + + let mut negotiated_protocol: Option = None; + if let Some(protocols) = headers + .get(SEC_WEBSOCKET_PROTOCOL) + .and_then(|h| h.to_str().ok()) + { + negotiated_protocol = protocols.split(',').next().map(|h| h.trim().to_string()); + } + + if req.method() != Method::GET + || req.version() < Version::HTTP_11 + || !headers + .get(CONNECTION) + .and_then(|h| h.to_str().ok()) + .map(|h| { + h.split(|c| c == ' ' || c == ',') + .any(|p| p.eq_ignore_ascii_case("upgrade")) + }) + .unwrap_or(false) + || !headers + .get(UPGRADE) + .and_then(|h| h.to_str().ok()) + .map(|h| h.eq_ignore_ascii_case("websocket")) + .unwrap_or(false) + || !headers + .get(SEC_WEBSOCKET_VERSION) + .map(|h| h == "13") + .unwrap_or(false) + || derived.is_none() + { + return Ok(Response::new(Body::default())); + } + + let ver = req.version(); + tokio::task::spawn(async move { + match hyper::upgrade::on(&mut req).await { + Ok(upgraded) => { + let upgraded = TokioIo::new(upgraded); + handle_connection( + WebSocketStream::from_raw_socket(upgraded, Role::Server, None).await, + addr, + ) + .await; + } + Err(e) => eprintln!("upgrade error: {}", e), + } + }); + + let mut res = Response::new(Body::default()); + *res.status_mut() = StatusCode::SWITCHING_PROTOCOLS; + *res.version_mut() = ver; + res.headers_mut() + .append(CONNECTION, HeaderValue::from_static("Upgrade")); + res.headers_mut() + .append(UPGRADE, HeaderValue::from_static("websocket")); + res.headers_mut() + .append(SEC_WEBSOCKET_ACCEPT, derived.unwrap().parse().unwrap()); + if let Some(protocol) = negotiated_protocol { + res.headers_mut() + .append(SEC_WEBSOCKET_PROTOCOL, protocol.parse().unwrap()); + } + + Ok(res) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let addr = env::args() + .nth(1) + .unwrap_or_else(|| "0.0.0.0:4000".to_string()) + .parse::()?; + let pem = include_bytes!("./pem.pem"); + let key = include_bytes!("./key.pem"); + + let identity = native_tls::Identity::from_pkcs8(pem, key).expect("invalid pem/key"); + + let acceptor = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity).unwrap()); + let acceptor = Arc::new(acceptor); + + let listener = TcpListener::bind(addr).await?; + + println!("listening on {}", addr); + + loop { + let (stream, remote_addr) = listener.accept().await?; + let acceptor = acceptor.clone(); + + tokio::spawn(async move { + let stream = acceptor.accept(stream).await.expect("not tls"); + let io = TokioIo::new(stream); + + let service = service_fn(move |req| handle_request(req, remote_addr)); + + let conn = http1::Builder::new() + .serve_connection(io, service) + .with_upgrades(); + + if let Err(err) = conn.await { + eprintln!("failed to serve connection: {:?}", err); + } + }); + } +} diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index 45796a0..0000000 --- a/src/main.rs +++ /dev/null @@ -1,178 +0,0 @@ -use std::io::Error; - -use bytes::Bytes; -use fastwebsockets::{ - upgrade, CloseCode, FragmentCollector, Frame, OpCode, Payload, WebSocketError, -}; -use futures_util::{SinkExt, StreamExt}; -use hyper::{ - body::Incoming, header::HeaderValue, server::conn::http1, service::service_fn, Request, - Response, StatusCode, -}; -use hyper_util::rt::TokioIo; -use tokio::net::{TcpListener, TcpStream}; -use tokio_native_tls::{native_tls, TlsAcceptor}; -use tokio_util::codec::{BytesCodec, Framed}; - -type HttpBody = http_body_util::Empty; - -#[tokio::main(flavor = "multi_thread")] -async fn main() -> Result<(), Error> { - let pem = include_bytes!("./pem.pem"); - let key = include_bytes!("./key.pem"); - let identity = native_tls::Identity::from_pkcs8(pem, key).expect("failed to make identity"); - - let socket = TcpListener::bind("0.0.0.0:4000") - .await - .expect("failed to bind"); - let acceptor = TlsAcceptor::from( - native_tls::TlsAcceptor::new(identity).expect("failed to make tls acceptor"), - ); - let acceptor = std::sync::Arc::new(acceptor); - - println!("listening on 0.0.0.0:4000"); - while let Ok((stream, addr)) = socket.accept().await { - let acceptor_cloned = acceptor.clone(); - tokio::spawn(async move { - let stream = acceptor_cloned.accept(stream).await.expect("not tls"); - let io = TokioIo::new(stream); - let service = service_fn(move |res| accept_http(res, addr.to_string())); - let conn = http1::Builder::new() - .serve_connection(io, service) - .with_upgrades(); - if let Err(err) = conn.await { - println!("{:?}: failed to serve conn: {:?}", addr, err); - } - }); - } - - Ok(()) -} - -async fn accept_http( - mut req: Request, - addr: String, -) -> Result, WebSocketError> { - if upgrade::is_upgrade_request(&req) { - let uri = req.uri().clone(); - let (mut res, fut) = upgrade::upgrade(&mut req)?; - - tokio::spawn(async move { - if let Err(e) = accept_ws(fut, uri.path().to_string(), addr.clone()).await { - println!("{:?}: error in ws handling: {:?}", addr, e); - } - }); - - if let Some(protocol) = req.headers().get("Sec-Websocket-Protocol") { - let first_protocol = protocol - .to_str() - .expect("failed to get protocol") - .split(',') - .next() - .expect("failed to get first protocol") - .trim(); - res.headers_mut().insert( - "Sec-Websocket-Protocol", - HeaderValue::from_str(first_protocol).unwrap(), - ); - } - - Ok(res) - } else { - Ok(Response::builder() - .status(StatusCode::OK) - .body(HttpBody::new()) - .unwrap()) - } -} - -async fn accept_ws( - fut: upgrade::UpgradeFut, - incoming_uri: String, - addr: String, -) -> Result<(), Box> { - let mut ws_stream = FragmentCollector::new(fut.await?); - - let mut incoming_uri_chars = incoming_uri.chars(); - incoming_uri_chars.next(); - - println!("{:?}: connected", addr); - - let tcp_stream = match TcpStream::connect(incoming_uri_chars.as_str()).await { - Ok(stream) => stream, - Err(err) => { - ws_stream - .write_frame(Frame::close(CloseCode::Away.into(), b"failed to connect")) - .await - .unwrap(); - return Err(Box::new(err)); - } - }; - let mut tcp_stream_framed = Framed::new(tcp_stream, BytesCodec::new()); - - loop { - tokio::select! { - event = ws_stream.read_frame() => { - match event { - Ok(frame) => { - print!("{:?}: event ws - ", addr); - match frame.opcode { - OpCode::Text | OpCode::Binary => { - if tcp_stream_framed.send(Bytes::from(frame.payload.to_vec())).await.is_ok() { - println!("sent success"); - } else { - println!("sent FAILED"); - } - } - OpCode::Close => { - if as SinkExt>::close(&mut tcp_stream_framed).await.is_ok() { - println!("closed success"); - } else { - println!("closed FAILED"); - } - break; - } - _ => { - println!("ignored"); - } - } - }, - Err(err) => { - print!("{:?}: err in ws: {:?} - ", addr, err); - if as SinkExt>::close(&mut tcp_stream_framed).await.is_ok() { - println!("closed tcp success"); - } else { - println!("closed tcp FAILED"); - } - break; - } - } - }, - event = tcp_stream_framed.next() => { - if let Some(res) = event { - print!("{:?}: event tcp - ", addr); - match res { - Ok(buf) => { - if ws_stream.write_frame(Frame::binary(Payload::Owned(buf.to_vec()))).await.is_ok() { - println!("sent success"); - } else { - println!("sent FAILED"); - } - } - Err(_) => { - if ws_stream.write_frame(Frame::close(CloseCode::Away.into(), b"tcp side is going away")).await.is_ok() { - println!("closed success"); - } else { - println!("closed FAILED"); - } - } - } - } - } - } - } - - println!("\"{}\": connection closed", addr); - - Ok(()) -}