diff --git a/.gitignore b/.gitignore index bfd66de..1cd2570 100644 --- a/.gitignore +++ b/.gitignore @@ -1,9 +1,12 @@ /target -server/src/*.pem +**/*.pem client/pkg client/out .direnv -client/index.js -client/module.js -client/module.d.ts +client/epoxy-bundled.js +client/epoxy-module-bundled.js +client/epoxy-module-bundled.d.ts +client/epoxy.js +client/epoxy.d.ts +client/epoxy.wasm pnpm-lock.yaml diff --git a/Cargo.lock b/Cargo.lock index 4f61515..cc4a287 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -33,10 +33,58 @@ dependencies = [ ] [[package]] -name = "async-compression" -version = "0.4.5" +name = "anstream" +version = "0.6.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc2d0cfb2a7388d34f590e76686704c494ed7aaceed62ee1ba35cbf363abc2a5" +checksum = "6e2e1ebcb11de5c03c67de28a7df593d32191b44939c482e97702baaaa6ab6a5" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc" + +[[package]] +name = "anstyle-parse" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648" +dependencies = [ + "windows-sys 0.52.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7" +dependencies = [ + "anstyle", + "windows-sys 0.52.0", +] + +[[package]] +name = "async-compression" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a116f46a969224200a0a97f29cfd4c50e7534e4b4826bd23ea2c3c533039c82c" dependencies = [ "brotli", "flate2", @@ -93,9 +141,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.4.1" +version = "2.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" +checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" [[package]] name = "block-buffer" @@ -133,12 +181,6 @@ version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" -[[package]] -name = "byteorder" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" - [[package]] name = "bytes" version = "1.5.0" @@ -160,6 +202,77 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "clap" +version = "4.4.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e578d6ec4194633722ccf9544794b71b1385c3c027efe0c55db226fc880865c" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.4.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4df4df40ec50c46000231c914968278b1eb05098cf8f1b3a518a95030e71d1c7" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", + "terminal_size", +] + +[[package]] +name = "clap_derive" +version = "4.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf9804afaaf59a91e75b022a30fb7229a7901f60c755489cc61c9b423b836442" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1" + +[[package]] +name = "clio" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7fc6734af48458f72f5a3fa7b840903606427d98a710256e808f76a965047d9" +dependencies = [ + "cfg-if", + "clap", + "is-terminal", + "libc", + "tempfile", + "walkdir", + "windows-sys 0.42.0", +] + +[[package]] +name = "colorchoice" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" + +[[package]] +name = "concurrent-queue" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "console_error_panic_hook" version = "0.1.7" @@ -204,6 +317,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" + [[package]] name = "crypto-common" version = "0.1.6" @@ -215,10 +334,17 @@ dependencies = [ ] [[package]] -name = "data-encoding" -version = "2.5.0" +name = "dashmap" +version = "5.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core", +] [[package]] name = "digest" @@ -230,40 +356,35 @@ dependencies = [ "crypto-common", ] -[[package]] -name = "either" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" - [[package]] name = "epoxy-client" version = "1.0.0" dependencies = [ "async-compression", + "async_io_stream", "base64", "bytes", "console_error_panic_hook", - "either", "fastwebsockets", "futures-util", "getrandom", - "http 1.0.0", + "http", "http-body-util", "hyper", + "hyper-util 0.1.3 (git+https://github.com/r58Playz/hyper-util-wasm)", "js-sys", - "penguin-mux-wasm", "pin-project-lite", - "rand", "ring", "tokio", "tokio-rustls", "tokio-util", + "tower-service", "wasm-bindgen", "wasm-bindgen-futures", "wasm-streams", "web-sys", "webpki-roots", + "wisp-mux", "ws_stream_wasm", ] @@ -271,15 +392,27 @@ dependencies = [ name = "epoxy-server" version = "1.0.0" dependencies = [ + "bytes", + "clap", + "clio", + "dashmap", + "fastwebsockets", + "futures-util", "http-body-util", "hyper", - "hyper-util", - "rusty-penguin", + "hyper-util 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "tokio", "tokio-native-tls", - "tokio-tungstenite", + "tokio-util", + "wisp-mux", ] +[[package]] +name = "equivalent" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" + [[package]] name = "errno" version = "0.3.8" @@ -290,6 +423,17 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "event-listener" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b72557800024fabbaa2449dd4bf24e37b93702d457a4d4f2b0dd1f0f039f20c1" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + [[package]] name = "fastrand" version = "2.0.1" @@ -302,7 +446,13 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f63dd7b57f9b33b1741fa631c9522eb35d43e96dcca4a6a91d5e4ca7c93acdc1" dependencies = [ + "base64", + "http-body-util", + "hyper", + "hyper-util 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "pin-project", "rand", + "sha1", "simdutf8", "thiserror", "tokio", @@ -340,15 +490,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" -[[package]] -name = "form_urlencoded" -version = "1.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" -dependencies = [ - "percent-encoding", -] - [[package]] name = "futures" version = "0.3.30" @@ -450,9 +591,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.11" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f" +checksum = "190092ea657667030ac6a35e305e62fc4dd69fd98ac98631e5d3a2b1575a12b5" dependencies = [ "cfg-if", "js-sys", @@ -468,22 +609,42 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" [[package]] -name = "hermit-abi" -version = "0.3.3" +name = "h2" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" - -[[package]] -name = "http" -version = "0.2.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8947b1a6fad4393052c7ba1f4cd97bed3e953a95c79c92ad9b051a04611d9fbb" +checksum = "31d030e59af851932b72ceebadf4a2b5986dba4c3b99dd2493f8273a0f151943" dependencies = [ "bytes", "fnv", - "itoa", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", ] +[[package]] +name = "hashbrown" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" + +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" + +[[package]] +name = "hermit-abi" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0c62115964e08cb8039170eb33c1d0e2388a256930279edca206fff675f82c3" + [[package]] name = "http" version = "1.0.0" @@ -502,7 +663,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" dependencies = [ "bytes", - "http 1.0.0", + "http", ] [[package]] @@ -513,7 +674,7 @@ checksum = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840" dependencies = [ "bytes", "futures-util", - "http 1.0.0", + "http", "http-body", "pin-project-lite", ] @@ -539,7 +700,8 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "http 1.0.0", + "h2", + "http", "http-body", "httparse", "httpdate", @@ -551,30 +713,58 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdea9aac0dbe5a9240d68cfd9501e2db94222c6dc06843e06640b9e07f0fdc67" +checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa" dependencies = [ "bytes", - "futures-channel", "futures-util", - "http 1.0.0", + "http", "http-body", "hyper", "pin-project-lite", "socket2", "tokio", - "tracing", ] [[package]] -name = "idna" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" +name = "hyper-util" +version = "0.1.3" +source = "git+https://github.com/r58Playz/hyper-util-wasm#ea9a5608f3255562d4a647a5c94ff7d3f9c32b53" dependencies = [ - "unicode-bidi", - "unicode-normalization", + "bytes", + "futures-channel", + "futures-util", + "http", + "http-body", + "hyper", + "pin-project-lite", + "tower", + "tower-service", + "tracing", + "wasm-bindgen", + "wasmtimer", +] + +[[package]] +name = "indexmap" +version = "2.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "824b2ae422412366ba479e8111fd301f7b5faece8149317bb81925979a53f520" +dependencies = [ + "equivalent", + "hashbrown", +] + +[[package]] +name = "is-terminal" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bad00257d07be169d870ab665980b06cdb366d792ad690bf2e76876dc503455" +dependencies = [ + "hermit-abi", + "rustix", + "windows-sys 0.52.0", ] [[package]] @@ -585,9 +775,9 @@ checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" [[package]] name = "js-sys" -version = "0.3.66" +version = "0.3.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cee9c64da59eae3b50095c18d3e74f8b73c0b86d2792824ff01bbce68ba229ca" +checksum = "406cda4b368d531c842222cf9d2600a9a4acce8d29423695379c6868a143a9ee" dependencies = [ "wasm-bindgen", ] @@ -600,15 +790,15 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.152" +version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" [[package]] name = "linux-raw-sys" -version = "0.4.12" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456" +checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" [[package]] name = "lock_api" @@ -634,9 +824,9 @@ checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" [[package]] name = "miniz_oxide" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" +checksum = "9d811f3e15f28568be3407c8e7fdb6514c1cda3cb30683f15b6a1a1dc4ea14a7" dependencies = [ "adler", ] @@ -697,11 +887,11 @@ checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "openssl" -version = "0.10.62" +version = "0.10.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cde4d2d9200ad5909f8dac647e29482e07c3a35de8a13fce7c9c7747ad9f671" +checksum = "15c9d69dd87a29568d4d017cfe8ec518706046a05184e5aea92d0af890b803c8" dependencies = [ - "bitflags 2.4.1", + "bitflags 2.4.2", "cfg-if", "foreign-types", "libc", @@ -729,9 +919,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" -version = "0.9.98" +version = "0.9.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1665caf8ab2dc9aef43d1c0023bd904633a6a05cb30b0ad59bec2ae986e57a7" +checksum = "22e1bf214306098e4832460f797824c05d25aacdf896f64a985fb0fd992454ae" dependencies = [ "cc", "libc", @@ -739,6 +929,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "parking" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" + [[package]] name = "parking_lot" version = "0.12.1" @@ -762,29 +958,6 @@ dependencies = [ "windows-targets 0.48.5", ] -[[package]] -name = "penguin-mux-wasm" -version = "0.1.0" -source = "git+https://github.com/r58Playz/penguin-mux-wasm#69b413aedb6f50f55eac646fda361abe430eb022" -dependencies = [ - "bytes", - "futures-util", - "http 0.2.11", - "parking_lot", - "rand", - "thiserror", - "tokio", - "tokio-tungstenite", - "tracing", - "wasm-bindgen-futures", -] - -[[package]] -name = "percent-encoding" -version = "2.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" - [[package]] name = "pharos" version = "0.5.3" @@ -795,6 +968,26 @@ dependencies = [ "rustc_version", ] +[[package]] +name = "pin-project" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0302c4a0442c456bd56f841aee5c3bfd17967563f6fadc9ceb9f9c23cf3807e0" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "266c042b60c9c76b8d53061e52b2e0d1116abc57cefc8c5cd671619a56ac3690" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.13" @@ -809,9 +1002,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pkg-config" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a" +checksum = "2900ede94e305130c13ddd391e0ab7cbaeb783945ae07a279c268cb05109c6cb" [[package]] name = "ppv-lite86" @@ -821,9 +1014,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" -version = "1.0.76" +version = "1.0.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95fc56cda0b5c3325f5fbbd7ff9fda9e02bb00bb3dac51252d2f1bfa1cb8cc8c" +checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" dependencies = [ "unicode-ident", ] @@ -907,11 +1100,11 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.28" +version = "0.38.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72e572a5e8ca657d7366229cdde4bd14c4eb5499a9573d4d366fe1b599daa316" +checksum = "6ea3e1a662af26cd7a3ba09c0297a31af215563ecf42817c98df621387f4e949" dependencies = [ - "bitflags 2.4.1", + "bitflags 2.4.2", "errno", "libc", "linux-raw-sys", @@ -934,17 +1127,17 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.1.0" -source = "git+https://github.com/r58Playz/rustls-pki-types#685721bb4b819c7da4724f07cffe06173f8cc883" +version = "1.2.0" +source = "git+https://github.com/r58Playz/rustls-pki-types#7bc22404e91ac909ef0e6ac11e6e316aefacde75" dependencies = [ "wasm-bindgen", ] [[package]] name = "rustls-webpki" -version = "0.102.1" +version = "0.102.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef4ca26037c909dedb327b48c3327d0ba91d3dd3c4e05dad328f210ffb68e95b" +checksum = "faaa0a62740bedb9b2ef5afa303da42764c012f743917351dc9a237ea1663610" dependencies = [ "ring", "rustls-pki-types", @@ -952,20 +1145,12 @@ dependencies = [ ] [[package]] -name = "rusty-penguin" -version = "0.5.3" +name = "same-file" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aefd4b85c815cf35675640924e0e73d9847bbdec8aa2e7daa8703fc5161f11d9" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" dependencies = [ - "bytes", - "futures-util", - "http 0.2.11", - "parking_lot", - "rand", - "thiserror", - "tokio", - "tokio-tungstenite", - "tracing", + "winapi-util", ] [[package]] @@ -1029,12 +1214,36 @@ dependencies = [ "digest", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + [[package]] name = "simdutf8" version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" +[[package]] +name = "simple-wisp-client" +version = "0.1.0" +dependencies = [ + "bytes", + "fastwebsockets", + "futures", + "http-body-util", + "hyper", + "tokio", + "tokio-native-tls", + "tokio-util", + "wisp-mux", +] + [[package]] name = "slab" version = "0.4.9" @@ -1046,9 +1255,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.2" +version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" +checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" [[package]] name = "socket2" @@ -1066,6 +1275,12 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + [[package]] name = "subtle" version = "2.5.0" @@ -1085,17 +1300,26 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.9.0" +version = "3.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01ce4141aa927a6d1bd34a041795abd0db1cccba5d5f24b009f694bdf3a1f3fa" +checksum = "a365e8cd18e44762ef95d87f284f4b5cd04107fec2ff3052bd6a3e6069669e67" dependencies = [ "cfg-if", "fastrand", - "redox_syscall", "rustix", "windows-sys 0.52.0", ] +[[package]] +name = "terminal_size" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21bebf2b7c9e0a515f6e0f8c51dc0f8e4696391e6f1ff30379559f8365fb0df7" +dependencies = [ + "rustix", + "windows-sys 0.48.0", +] + [[package]] name = "thiserror" version = "1.0.56" @@ -1116,26 +1340,11 @@ dependencies = [ "syn", ] -[[package]] -name = "tinyvec" -version = "1.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" -dependencies = [ - "tinyvec_macros", -] - -[[package]] -name = "tinyvec_macros" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" - [[package]] name = "tokio" -version = "1.35.1" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" +checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" dependencies = [ "backtrace", "bytes", @@ -1144,6 +1353,7 @@ dependencies = [ "num_cpus", "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.48.0", @@ -1181,18 +1391,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-tungstenite" -version = "0.21.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" -dependencies = [ - "futures-util", - "log", - "tokio", - "tungstenite", -] - [[package]] name = "tokio-util" version = "0.7.10" @@ -1204,30 +1402,47 @@ dependencies = [ "futures-sink", "pin-project-lite", "tokio", + "tracing", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + +[[package]] +name = "tower-service" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" + [[package]] name = "tracing" version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ + "log", "pin-project-lite", - "tracing-attributes", "tracing-core", ] -[[package]] -name = "tracing-attributes" -version = "0.1.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "tracing-core" version = "0.1.32" @@ -1243,75 +1458,36 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" -[[package]] -name = "tungstenite" -version = "0.21.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" -dependencies = [ - "byteorder", - "bytes", - "data-encoding", - "http 1.0.0", - "httparse", - "log", - "rand", - "sha1", - "thiserror", - "url", - "utf-8", -] - [[package]] name = "typenum" version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" -[[package]] -name = "unicode-bidi" -version = "0.3.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f2528f27a9eb2b21e69c95319b30bd0efd85d09c379741b0f78ea1d86be2416" - [[package]] name = "unicode-ident" version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" -[[package]] -name = "unicode-normalization" -version = "0.1.22" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921" -dependencies = [ - "tinyvec", -] - [[package]] name = "untrusted" version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" -[[package]] -name = "url" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" -dependencies = [ - "form_urlencoded", - "idna", - "percent-encoding", -] - [[package]] name = "utf-8" version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "utf8parse" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" + [[package]] name = "vcpkg" version = "0.2.15" @@ -1324,6 +1500,16 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "walkdir" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71d857dc86794ca4c280d616f7da00d2dbfd8cd788846559a6813e6aa4b54ee" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -1341,9 +1527,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.89" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ed0d4f68a3015cc185aff4db9506a015f4b96f95303897bfa23f846db54064e" +checksum = "c1e124130aee3fb58c5bdd6b639a0509486b0338acaaae0c84a5124b0f588b7f" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -1351,9 +1537,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.89" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b56f625e64f3a1084ded111c4d5f477df9f8c92df113852fa5a374dbda78826" +checksum = "c9e7e1900c352b609c8488ad12639a311045f40a35491fb69ba8c12f758af70b" dependencies = [ "bumpalo", "log", @@ -1366,9 +1552,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.39" +version = "0.4.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac36a15a220124ac510204aec1c3e5db8a22ab06fd6706d881dc6149f8ed9a12" +checksum = "877b9c3f61ceea0e56331985743b13f3d25c406a7098d45180fb5f09bc19ed97" dependencies = [ "cfg-if", "js-sys", @@ -1378,9 +1564,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.89" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0162dbf37223cd2afce98f3d0785506dcb8d266223983e4b5b525859e6e182b2" +checksum = "b30af9e2d358182b5c7449424f017eba305ed32a7010509ede96cdc4696c46ed" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -1388,9 +1574,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.89" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" +checksum = "642f325be6301eb8107a83d12a8ac6c1e1c54345a7ef1a9261962dfefda09e66" dependencies = [ "proc-macro2", "quote", @@ -1401,9 +1587,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.89" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ab9b36309365056cd639da3134bf87fa8f3d86008abf99e612384a6eecd459f" +checksum = "4f186bd2dcf04330886ce82d6f33dd75a7bfcf69ecf5763b89fcde53b6ac9838" [[package]] name = "wasm-streams" @@ -1419,10 +1605,24 @@ dependencies = [ ] [[package]] -name = "web-sys" -version = "0.3.66" +name = "wasmtimer" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50c24a44ec86bb68fbecd1b3efed7e85ea5621b39b35ef2766b66cd984f8010f" +checksum = "5f656cd8858a5164932d8a90f936700860976ec21eb00e0fe2aa8cab13f6b4cf" +dependencies = [ + "futures", + "js-sys", + "parking_lot", + "pin-utils", + "slab", + "wasm-bindgen", +] + +[[package]] +name = "web-sys" +version = "0.3.67" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58cd2333b6e0be7a39605f0e255892fd7418a682d8da8fe042fe25128794d2ed" dependencies = [ "js-sys", "wasm-bindgen", @@ -1430,13 +1630,59 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.26.0" +version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0de2cfda980f21be5a7ed2eadb3e6fe074d56022bea2cdeb1a62eb220fc04188" +checksum = "b3de34ae270483955a94f4b21bdaaeb83d508bb84a01435f393818edb0012009" dependencies = [ "rustls-pki-types", ] +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-util" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596" +dependencies = [ + "winapi", +] + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-sys" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" +dependencies = [ + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", +] + [[package]] name = "windows-sys" version = "0.48.0" @@ -1485,6 +1731,12 @@ dependencies = [ "windows_x86_64_msvc 0.52.0", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" @@ -1497,6 +1749,12 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" @@ -1509,6 +1767,12 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" +[[package]] +name = "windows_i686_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" + [[package]] name = "windows_i686_gnu" version = "0.48.5" @@ -1521,6 +1785,12 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" +[[package]] +name = "windows_i686_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" + [[package]] name = "windows_i686_msvc" version = "0.48.5" @@ -1533,6 +1803,12 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" @@ -1545,6 +1821,12 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" @@ -1557,6 +1839,12 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" @@ -1569,6 +1857,24 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" +[[package]] +name = "wisp-mux" +version = "0.1.0" +dependencies = [ + "async_io_stream", + "bytes", + "event-listener", + "fastwebsockets", + "futures", + "futures-util", + "hyper", + "hyper-util 0.1.3 (git+https://github.com/r58Playz/hyper-util-wasm)", + "pin-project-lite", + "tokio", + "tower-service", + "ws_stream_wasm", +] + [[package]] name = "ws_stream_wasm" version = "0.7.4" diff --git a/Cargo.toml b/Cargo.toml index 0d2e374..2fcf5e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,11 @@ [workspace] resolver = "2" -members = ["server", "client"] +members = ["server", "client", "wisp", "simple-wisp-client"] [patch.crates-io] rustls-pki-types = { git = "https://github.com/r58Playz/rustls-pki-types" } + +[profile.release] +lto = true +opt-level = 'z' +codegen-units = 1 diff --git a/README.md b/README.md index 8cfb752..5c0eca9 100644 --- a/README.md +++ b/README.md @@ -1,29 +1,55 @@ # epoxy Epoxy is an encrypted proxy for browser javascript. It allows you to make requests that bypass cors without compromising security, by running SSL/TLS inside webassembly. -Simple usage example for making a secure GET request to httpbin.org: +## Using the client +Epoxy must be run from within a web worker and must be served with the [security headers needed for `SharedArrayBuffer`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer#security_requirements). Here is a simple usage example: ```javascript -import epoxy from "@mercuryworkshop/epoxy-tls"; +importScripts("epoxy-bundled.js"); const { EpoxyClient } = await epoxy(); let client = await new EpoxyClient("wss://localhost:4000", navigator.userAgent, 10); let response = await client.fetch("https://httpbin.org/get"); await response.text(); - ``` -Epoxy also allows you to make arbitrary end to end encrypted TCP connections safely directly from the browser. +## Using the server +``` +$ cargo r -r --bin epoxy-server -- --help +Implementation of the Wisp protocol in Rust, made for epoxy. + +Usage: epoxy-server [OPTIONS] --pubkey --privkey + +Options: + --prefix [default: ] + -l, --port [default: 4000] + -p, --pubkey + -P, --privkey + -h, --help Print help + -V, --version Print version +``` ## Building +Rust nightly is required. ### Server - -1. Generate certs with `mkcert` and place the public certificate in `./server/src/pem.pem` and private certificate in `./server/src/key.pem` -2. Run `cargo r --bin epoxy-server`, optionally with `-r` flag for release +``` +cargo b -r --bin epoxy-server +``` +The executable will be placed at `target/release/epoxy-server`. ### Client -Note: Building the client is only supported on linux +> [!IMPORTANT] +> Building the client is only supported on Linux. -1. Make sure you have the `wasm32-unknown-unknown` target installed, `wasm-bindgen` and `wasm-opt` executables installed, and `bash`, `python3` packages (`python3` is used for `http.server` module) -2. Run `pnpm build` +Make sure you have the `wasm32-unknown-unknown` rust target, the `rust-std` component, and the `wasm-bindgen`, `wasm-opt`, and `base64` binaries installed. + +In the `client` directory: +``` +bash build.sh +``` + +To host a local server with the required headers: +``` +python3 serve.py +``` diff --git a/client/Cargo.toml b/client/Cargo.toml index 9fc6149..cd05441 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -6,18 +6,12 @@ edition = "2021" [lib] crate-type = ["cdylib"] -[features] -default = ["console_error_panic_hook"] - [dependencies] bytes = "1.5.0" -console_error_panic_hook = { version = "0.1.7", optional = true } http = "1.0.0" http-body-util = "0.1.0" -hyper = { version = "1.1.0", features = ["client", "http1"] } +hyper = { version = "1.1.0", features = ["client", "http1", "http2"] } pin-project-lite = "0.2.13" -penguin-mux-wasm = { git = "https://github.com/r58Playz/penguin-mux-wasm" } -tokio = { version = "1.35.1", default_features = false } wasm-bindgen = "0.2" wasm-bindgen-futures = "0.4.39" ws_stream_wasm = { version = "0.7.4", features = ["tokio_io"] } @@ -25,17 +19,19 @@ futures-util = "0.3.30" js-sys = "0.3.66" webpki-roots = "0.26.0" tokio-rustls = "0.25.0" -web-sys = { version = "0.3.66", features = ["TextEncoder", "Navigator", "Response", "ResponseInit"] } +web-sys = { version = "0.3.66", features = ["TextEncoder", "Response", "ResponseInit"] } wasm-streams = "0.4.0" -either = "1.9.0" tokio-util = { version = "0.7.10", features = ["io"] } async-compression = { version = "0.4.5", features = ["tokio", "gzip", "brotli"] } -fastwebsockets = { version = "0.6.0", features = ["simdutf8", "unstable-split"] } -rand = "0.8.5" +fastwebsockets = { version = "0.6.0", features = ["unstable-split"] } base64 = "0.21.7" - -[dependencies.getrandom] -features = ["js"] +wisp-mux = { path = "../wisp", features = ["ws_stream_wasm", "tokio_io", "hyper_tower"] } +async_io_stream = { version = "0.3.3", features = ["tokio_io"] } +getrandom = { version = "0.2.12", features = ["js"] } +hyper-util = { git = "https://github.com/r58Playz/hyper-util-wasm", features = ["client", "client-legacy", "http1", "http2"] } +tokio = { version = "1.36.0", default-features = false } +tower-service = "0.3.2" +console_error_panic_hook = "0.1.7" [dependencies.ring] features = ["wasm32_unknown_unknown_js"] diff --git a/client/build.sh b/client/build.sh index 68a1dbe..7f402f0 100755 --- a/client/build.sh +++ b/client/build.sh @@ -5,27 +5,33 @@ shopt -s inherit_errexit rm -rf out/ || true mkdir out/ -cargo build --target wasm32-unknown-unknown --release -echo "[ws] built rust" +RUSTFLAGS='-C target-feature=+atomics,+bulk-memory' cargo build --target wasm32-unknown-unknown -Z build-std=panic_abort,std --release +echo "[ws] cargo finished" wasm-bindgen --weak-refs --target no-modules --no-modules-global epoxy --out-dir out/ ../target/wasm32-unknown-unknown/release/epoxy_client.wasm -echo "[ws] bindgen finished" +echo "[ws] wasm-bindgen finished" mv out/epoxy_client_bg.wasm out/epoxy_client_unoptimized.wasm -time wasm-opt -O4 out/epoxy_client_unoptimized.wasm -o out/epoxy_client_bg.wasm -echo "[ws] optimized" +time wasm-opt -Oz --vacuum --dce --enable-threads --enable-bulk-memory --enable-simd out/epoxy_client_unoptimized.wasm -o out/epoxy_client_bg.wasm +echo "[ws] wasm-opt finished" AUTOGENERATED_SOURCE=$(<"out/epoxy_client.js") +# patch for websocket sharedarraybuffer error +AUTOGENERATED_SOURCE=${AUTOGENERATED_SOURCE//getObject(arg0).send(getArrayU8FromWasm0(arg1, arg2)/getObject(arg0).send(new Uint8Array(getArrayU8FromWasm0(arg1, arg2))} WASM_BASE64=$(base64 -w0 out/epoxy_client_bg.wasm) -AUTOGENERATED_SOURCE=${AUTOGENERATED_SOURCE//__wbg_init(input) \{/__wbg_init() \{let input=\'data:application/wasm;base64,$WASM_BASE64\'} +AUTOGENERATED_SOURCE=${AUTOGENERATED_SOURCE//__wbg_init(input, maybe_memory) \{/__wbg_init(input, maybe_memory) \{$'\n'if (!input) \{input=\'data:application/wasm;base64,$WASM_BASE64\'\}} AUTOGENERATED_SOURCE=${AUTOGENERATED_SOURCE//return __wbg_finalize_init\(instance\, module\);/__wbg_finalize_init\(instance\, module\); return epoxy} -echo "$AUTOGENERATED_SOURCE" > index.js -cp index.js module.js -echo "module.exports = epoxy" >> module.js +echo "$AUTOGENERATED_SOURCE" > epoxy-bundled.js +cp epoxy-bundled.js epoxy-module-bundled.js +echo "module.exports = epoxy" >> epoxy-module-bundled.js AUTOGENERATED_TYPEDEFS=$(<"out/epoxy_client.d.ts") AUTOGENERATED_TYPEDEFS=${AUTOGENERATED_TYPEDEFS%%export class IntoUnderlyingByteSource*} -echo "$AUTOGENERATED_TYPEDEFS" >"module.d.ts" -echo "} export default function epoxy(): Promise;" >> "module.d.ts" +echo "$AUTOGENERATED_TYPEDEFS" >"epoxy-module-bundled.d.ts" +echo "} export default function epoxy(): Promise;" >> "epoxy-module-bundled.d.ts" + +cp out/epoxy_client.js epoxy.js +cp out/epoxy_client.d.ts epoxy.d.ts +cp out/epoxy_client_bg.wasm epoxy.wasm rm -rf out/ echo "[ws] done!" diff --git a/client/demo.js b/client/demo.js index da2d24d..5b11bd1 100644 --- a/client/demo.js +++ b/client/demo.js @@ -1,21 +1,38 @@ -(async () => { +importScripts("epoxy-bundled.js"); +onmessage = async (msg) => { + console.debug("recieved:", msg); + let [should_feature_test, should_multiparallel_test, should_parallel_test, should_multiperf_test, should_perf_test, should_ws_test, should_tls_test] = msg.data; console.log( "%cWASM is significantly slower with DevTools open!", "color:red;font-size:3rem;font-weight:bold" ); - const should_feature_test = (new URL(window.location.href)).searchParams.has("feature_test"); - const should_perf_test = (new URL(window.location.href)).searchParams.has("perf_test"); - const should_ws_test = (new URL(window.location.href)).searchParams.has("ws_test"); + const log = (str) => { + console.warn(str); + postMessage(str); + } - let { EpoxyClient } = await epoxy(); + const { EpoxyClient } = await epoxy(); const tconn0 = performance.now(); // args: websocket url, user agent, redirect limit let epoxy_client = await new EpoxyClient("wss://localhost:4000", navigator.userAgent, 10); const tconn1 = performance.now(); - console.warn(`conn establish took ${tconn1 - tconn0} ms or ${(tconn1 - tconn0) / 1000} s`); + log(`conn establish took ${tconn1 - tconn0} ms or ${(tconn1 - tconn0) / 1000} s`); + const test_mux = async (url) => { + const t0 = performance.now(); + await epoxy_client.fetch(url); + const t1 = performance.now(); + return t1 - t0; + }; + + const test_native = async (url) => { + const t0 = performance.now(); + await fetch(url, { cache: "no-store" }); + const t1 = performance.now(); + return t1 - t0; + }; if (should_feature_test) { for (const url of [ @@ -23,44 +40,102 @@ ["https://httpbin.org/gzip", {}], ["https://httpbin.org/brotli", {}], ["https://httpbin.org/redirect/11", {}], - ["https://httpbin.org/redirect/1", { redirect: "manual" }] + ["https://httpbin.org/redirect/1", { redirect: "manual" }], ]) { let resp = await epoxy_client.fetch(url[0], url[1]); console.warn(url, resp, Object.fromEntries(resp.headers)); console.warn(await resp.text()); } - } else if (should_perf_test) { - const test_mux = async (url) => { - const t0 = performance.now(); - await epoxy_client.fetch(url); - const t1 = performance.now(); - return t1 - t0; - }; + } else if (should_multiparallel_test) { + const num_tests = 10; + let total_mux_minus_native = 0; + for (const _ of Array(num_tests).keys()) { + let total_mux = 0; + await Promise.all([...Array(num_tests).keys()].map(async i => { + log(`running mux test ${i}`); + return await test_mux("https://httpbin.org/get"); + })).then((vals) => { total_mux = vals.reduce((acc, x) => acc + x, 0) }); + total_mux = total_mux / num_tests; - const test_native = async (url) => { - const t0 = performance.now(); - await fetch(url); - const t1 = performance.now(); - return t1 - t0; - }; + let total_native = 0; + await Promise.all([...Array(num_tests).keys()].map(async i => { + log(`running native test ${i}`); + return await test_native("https://httpbin.org/get"); + })).then((vals) => { total_native = vals.reduce((acc, x) => acc + x, 0) }); + total_native = total_native / num_tests; + log(`avg mux (${num_tests}) took ${total_mux} ms or ${total_mux / 1000} s`); + log(`avg native (${num_tests}) took ${total_native} ms or ${total_native / 1000} s`); + log(`avg mux - avg native (${num_tests}): ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`); + total_mux_minus_native += total_mux - total_native; + } + total_mux_minus_native = total_mux_minus_native / num_tests; + log(`total mux - native (${num_tests} tests of ${num_tests} reqs): ${total_mux_minus_native} ms or ${total_mux_minus_native / 1000} s`); + } else if (should_parallel_test) { const num_tests = 10; let total_mux = 0; + await Promise.all([...Array(num_tests).keys()].map(async i => { + log(`running mux test ${i}`); + return await test_mux("https://httpbin.org/get"); + })).then((vals) => { total_mux = vals.reduce((acc, x) => acc + x, 0) }); + total_mux = total_mux / num_tests; + + let total_native = 0; + await Promise.all([...Array(num_tests).keys()].map(async i => { + log(`running native test ${i}`); + return await test_native("https://httpbin.org/get"); + })).then((vals) => { total_native = vals.reduce((acc, x) => acc + x, 0) }); + total_native = total_native / num_tests; + + log(`avg mux (${num_tests}) took ${total_mux} ms or ${total_mux / 1000} s`); + log(`avg native (${num_tests}) took ${total_native} ms or ${total_native / 1000} s`); + log(`avg mux - avg native (${num_tests}): ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`); + } else if (should_multiperf_test) { + const num_tests = 10; + let total_mux_minus_native = 0; for (const _ of Array(num_tests).keys()) { + let total_mux = 0; + for (const i of Array(num_tests).keys()) { + log(`running mux test ${i}`); + total_mux += await test_mux("https://httpbin.org/get"); + } + total_mux = total_mux / num_tests; + + let total_native = 0; + for (const i of Array(num_tests).keys()) { + log(`running native test ${i}`); + total_native += await test_native("https://httpbin.org/get"); + } + total_native = total_native / num_tests; + + log(`avg mux (${num_tests}) took ${total_mux} ms or ${total_mux / 1000} s`); + log(`avg native (${num_tests}) took ${total_native} ms or ${total_native / 1000} s`); + log(`avg mux - avg native (${num_tests}): ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`); + total_mux_minus_native += total_mux - total_native; + } + total_mux_minus_native = total_mux_minus_native / num_tests; + log(`total mux - native (${num_tests} tests of ${num_tests} reqs): ${total_mux_minus_native} ms or ${total_mux_minus_native / 1000} s`); + } else if (should_perf_test) { + const num_tests = 10; + + let total_mux = 0; + for (const i of Array(num_tests).keys()) { + log(`running mux test ${i}`); total_mux += await test_mux("https://httpbin.org/get"); } total_mux = total_mux / num_tests; let total_native = 0; - for (const _ of Array(num_tests).keys()) { + for (const i of Array(num_tests).keys()) { + log(`running native test ${i}`); total_native += await test_native("https://httpbin.org/get"); } total_native = total_native / num_tests; - console.warn(`avg mux (10) took ${total_mux} ms or ${total_mux / 1000} s`); - console.warn(`avg native (10) took ${total_native} ms or ${total_native / 1000} s`); - console.warn(`mux - native: ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`); + log(`avg mux (${num_tests}) took ${total_mux} ms or ${total_mux / 1000} s`); + log(`avg native (${num_tests}) took ${total_native} ms or ${total_native / 1000} s`); + log(`avg mux - avg native (${num_tests}): ${total_mux - total_native} ms or ${(total_mux - total_native) / 1000} s`); } else if (should_ws_test) { let ws = await epoxy_client.connect_ws( () => console.log("opened"), @@ -75,10 +150,20 @@ await ws.send("data"); await (new Promise((res, _) => setTimeout(res, 100))); } + } else if (should_tls_test) { + let decoder = new TextDecoder(); + let ws = await epoxy_client.connect_tls( + () => console.log("opened"), + () => console.log("closed"), + err => console.error(err), + msg => { console.log(msg); console.log(decoder.decode(msg)) }, + "alicesworld.tech:443", + ); + await ws.send("GET / HTTP 1.1\r\nHost: alicesworld.tech\r\nConnection: close\r\n\r\n"); } else { let resp = await epoxy_client.fetch("https://httpbin.org/get"); console.warn(resp, Object.fromEntries(resp.headers)); console.warn(await resp.text()); } - if (!should_ws_test) alert("you can open console now"); -})(); + log("done"); +}; diff --git a/client/index.html b/client/index.html index c8ff7ed..718f470 100644 --- a/client/index.html +++ b/client/index.html @@ -2,12 +2,35 @@ epoxy - - + + - running... (wait for the browser alert if not running ws test) +
+ running... (wait for the browser alert if not running ws test) +
+
diff --git a/client/serve.py b/client/serve.py new file mode 100644 index 0000000..e32b7a0 --- /dev/null +++ b/client/serve.py @@ -0,0 +1,10 @@ +from http.server import HTTPServer, SimpleHTTPRequestHandler, test +import sys + +class RequestHandler (SimpleHTTPRequestHandler): + def end_headers (self): + self.send_header('Cross-Origin-Opener-Policy', 'same-origin') + self.send_header('Cross-Origin-Embedder-Policy', 'require-corp') + SimpleHTTPRequestHandler.end_headers(self) + +test(RequestHandler, HTTPServer, port=int(sys.argv[1]) if len(sys.argv) > 1 else 8000) diff --git a/client/src/lib.rs b/client/src/lib.rs index 30aabde..4da9fee 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1,24 +1,25 @@ -#![feature(let_chains)] +#![feature(let_chains, impl_trait_in_assoc_type)] #[macro_use] mod utils; -mod tokioio; +mod tls_stream; mod websocket; mod wrappers; -use tokioio::TokioIo; +use tls_stream::EpxTlsStream; use utils::{ReplaceErr, UriExt}; use websocket::EpxWebSocket; -use wrappers::{IncomingBody, WsStreamWrapper}; +use wrappers::{IncomingBody, TlsWispService}; use std::sync::Arc; use async_compression::tokio::bufread as async_comp; +use async_io_stream::IoStream; use bytes::Bytes; -use futures_util::StreamExt; +use futures_util::{stream::SplitSink, StreamExt}; use http::{uri, HeaderName, HeaderValue, Request, Response}; -use hyper::{body::Incoming, client::conn::http1::Builder, Uri}; +use hyper::{body::Incoming, Uri}; +use hyper_util::client::legacy::Client; use js_sys::{Array, Function, Object, Reflect, Uint8Array}; -use penguin_mux_wasm::{Multiplexor, MuxStream}; use tokio_rustls::{client::TlsStream, rustls, rustls::RootCertStore, TlsConnector}; use tokio_util::{ either::Either, @@ -26,13 +27,15 @@ use tokio_util::{ }; use wasm_bindgen::prelude::*; use web_sys::TextEncoder; +use wisp_mux::{tokioio::TokioIo, tower::ServiceWrapper, ClientMux, MuxStreamIo, StreamType}; +use ws_stream_wasm::{WsMessage, WsMeta, WsStream}; type HttpBody = http_body_util::Full; #[derive(Debug)] enum EpxResponse { Success(Response), - Redirect((Response, http::Request, Uri)), + Redirect((Response, http::Request)), } enum EpxCompression { @@ -40,80 +43,20 @@ enum EpxCompression { Gzip, } -type EpxTlsStream = TlsStream>; -type EpxUnencryptedStream = MuxStream; -type EpxStream = Either; - -async fn send_req( - req: http::Request, - should_redirect: bool, - io: EpxStream, -) -> Result { - let (mut req_sender, conn) = Builder::new() - .title_case_headers(true) - .preserve_header_case(true) - .handshake(TokioIo::new(io)) - .await - .replace_err("Failed to connect to host")?; - - wasm_bindgen_futures::spawn_local(async move { - if let Err(e) = conn.await { - error!("epoxy: error in muxed hyper connection! {:?}", e); - } - }); - - let new_req = if should_redirect { - Some(req.clone()) - } else { - None - }; - - let res = req_sender - .send_request(req) - .await - .replace_err("Failed to send request"); - match res { - Ok(res) => { - if utils::is_redirect(res.status().as_u16()) - && let Some(mut new_req) = new_req - && let Some(location) = res.headers().get("Location") - && let Ok(redirect_url) = new_req.uri().get_redirect(location) - && let Some(redirect_url_authority) = redirect_url - .clone() - .authority() - .replace_err("Redirect URL must have an authority") - .ok() - { - let should_strip = new_req.uri().is_same_host(&redirect_url); - if should_strip { - new_req.headers_mut().remove("authorization"); - new_req.headers_mut().remove("cookie"); - new_req.headers_mut().remove("www-authenticate"); - } - let new_url = redirect_url.clone(); - *new_req.uri_mut() = redirect_url; - new_req.headers_mut().insert( - "Host", - HeaderValue::from_str(redirect_url_authority.as_str())?, - ); - Ok(EpxResponse::Redirect((res, new_req, new_url))) - } else { - Ok(EpxResponse::Success(res)) - } - } - Err(err) => Err(err), - } -} +type EpxIoTlsStream = TlsStream>>; +type EpxIoUnencryptedStream = IoStream>; +type EpxIoStream = Either; #[wasm_bindgen(start)] -async fn start() { - utils::set_panic_hook(); +fn init() { + console_error_panic_hook::set_once(); } #[wasm_bindgen] pub struct EpoxyClient { rustls_config: Arc, - mux: Multiplexor, + mux: Arc>>, + hyper_client: Client>, HttpBody>, useragent: String, redirect_limit: usize, } @@ -138,11 +81,19 @@ impl EpoxyClient { } debug!("connecting to ws {:?}", ws_url); - let ws = WsStreamWrapper::connect(ws_url, None) + let (_, ws) = WsMeta::connect(ws_url, vec!["wisp-v1"]) .await .replace_err("Failed to connect to websocket")?; debug!("connected!"); - let mux = Multiplexor::new(ws, penguin_mux_wasm::Role::Client, None, None); + let (wtx, wrx) = ws.split(); + let (mux, fut) = ClientMux::new(wrx, wtx).await?; + let mux = Arc::new(mux); + + wasm_bindgen_futures::spawn_local(async move { + if let Err(err) = fut.await { + error!("epoxy: error in mux future! {:?}", err); + } + }); let mut certstore = RootCertStore::empty(); certstore.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); @@ -154,37 +105,86 @@ impl EpoxyClient { ); Ok(EpoxyClient { - mux, + mux: mux.clone(), + hyper_client: Client::builder(utils::WasmExecutor {}) + .http09_responses(true) + .http1_title_case_headers(true) + .http1_preserve_header_case(true) + .build(TlsWispService { + rustls_config: rustls_config.clone(), + service: ServiceWrapper(mux), + }), rustls_config, useragent, redirect_limit, }) } - async fn get_http_io(&self, url: &Uri) -> Result { - let url_host = url.host().replace_err("URL must have a host")?; - let url_port = utils::get_url_port(url)?; + async fn get_tls_io(&self, url_host: &str, url_port: u16) -> Result { let channel = self .mux - .client_new_stream_channel(url_host.as_bytes(), url_port) + .client_new_stream(StreamType::Tcp, url_host.to_string(), url_port) .await - .replace_err("Failed to create multiplexor channel")?; + .replace_err("Failed to create multiplexor channel")? + .into_io() + .into_asyncrw(); + let cloned_uri = url_host.to_string().clone(); + let connector = TlsConnector::from(self.rustls_config.clone()); + debug!("connecting channel"); + let io = connector + .connect( + cloned_uri + .try_into() + .replace_err("Failed to parse URL (rustls)")?, + channel, + ) + .await + .replace_err("Failed to perform TLS handshake")?; + debug!("connected channel"); + Ok(io) + } - if utils::get_is_secure(url)? { - let cloned_uri = url_host.to_string().clone(); - let connector = TlsConnector::from(self.rustls_config.clone()); - let io = connector - .connect( - cloned_uri - .try_into() - .replace_err("Failed to parse URL (rustls)")?, - channel, - ) - .await - .replace_err("Failed to perform TLS handshake")?; - Ok(EpxStream::Left(io)) + async fn send_req_inner( + &self, + req: http::Request, + should_redirect: bool, + ) -> Result { + let new_req = if should_redirect { + Some(req.clone()) } else { - Ok(EpxStream::Right(channel)) + None + }; + + debug!("sending req"); + let res = self + .hyper_client + .request(req) + .await + .replace_err("Failed to send request"); + debug!("recieved res"); + match res { + Ok(res) => { + if utils::is_redirect(res.status().as_u16()) + && let Some(mut new_req) = new_req + && let Some(location) = res.headers().get("Location") + && let Ok(redirect_url) = new_req.uri().get_redirect(location) + && let Some(redirect_url_authority) = redirect_url + .clone() + .authority() + .replace_err("Redirect URL must have an authority") + .ok() + { + *new_req.uri_mut() = redirect_url; + new_req.headers_mut().insert( + "Host", + HeaderValue::from_str(redirect_url_authority.as_str())?, + ); + Ok(EpxResponse::Redirect((res, new_req))) + } else { + Ok(EpxResponse::Success(res)) + } + } + Err(err) => Err(err), } } @@ -194,23 +194,22 @@ impl EpoxyClient { should_redirect: bool, ) -> Result<(hyper::Response, Uri, bool), JsError> { let mut redirected = false; - let uri = req.uri().clone(); - let mut current_resp: EpxResponse = - send_req(req, should_redirect, self.get_http_io(&uri).await?).await?; + let mut current_url = req.uri().clone(); + let mut current_resp: EpxResponse = self.send_req_inner(req, should_redirect).await?; for _ in 0..self.redirect_limit - 1 { match current_resp { EpxResponse::Success(_) => break, - EpxResponse::Redirect((_, req, new_url)) => { + EpxResponse::Redirect((_, req)) => { redirected = true; - current_resp = - send_req(req, should_redirect, self.get_http_io(&new_url).await?).await? + current_url = req.uri().clone(); + current_resp = self.send_req_inner(req, should_redirect).await? } } } match current_resp { - EpxResponse::Success(resp) => Ok((resp, uri, redirected)), - EpxResponse::Redirect((resp, _, new_url)) => Ok((resp, new_url, redirected)), + EpxResponse::Success(resp) => Ok((resp, current_url, redirected)), + EpxResponse::Redirect((resp, _)) => Ok((resp, current_url, redirected)), } } @@ -232,6 +231,17 @@ impl EpoxyClient { .await } + pub async fn connect_tls( + &self, + onopen: Function, + onclose: Function, + onerror: Function, + onmessage: Function, + url: String, + ) -> Result { + EpxTlsStream::connect(self, onopen, onclose, onerror, onmessage, url).await + } + pub async fn fetch(&self, url: String, options: Object) -> Result { let uri = url.parse::().replace_err("Failed to parse URL")?; let uri_scheme = uri.scheme().replace_err("URL must have a scheme")?; @@ -292,7 +302,7 @@ impl EpoxyClient { let headers_map = builder.headers_mut().replace_err("Failed to get headers")?; headers_map.insert("Accept-Encoding", HeaderValue::from_str("gzip, br")?); - headers_map.insert("Connection", HeaderValue::from_str("close")?); + headers_map.insert("Connection", HeaderValue::from_str("keep-alive")?); headers_map.insert("User-Agent", HeaderValue::from_str(&self.useragent)?); headers_map.insert("Host", HeaderValue::from_str(uri_host)?); if body_bytes.is_empty() { @@ -314,7 +324,7 @@ impl EpoxyClient { .body(HttpBody::new(body_bytes)) .replace_err("Failed to make request")?; - let (resp, last_url, req_redirected) = self.send_req(request, req_should_redirect).await?; + let (resp, resp_uri, req_redirected) = self.send_req(request, req_should_redirect).await?; let resp_headers_raw = resp.headers().clone(); @@ -378,7 +388,7 @@ impl EpoxyClient { Object::define_property( &resp, &jval!("url"), - &utils::define_property_obj(jval!(last_url.to_string()), false) + &utils::define_property_obj(jval!(resp_uri.to_string()), false) .replace_err("Failed to make define_property object for url")?, ); diff --git a/client/src/tls_stream.rs b/client/src/tls_stream.rs new file mode 100644 index 0000000..97e61a7 --- /dev/null +++ b/client/src/tls_stream.rs @@ -0,0 +1,82 @@ +use crate::*; + +use js_sys::Function; +use tokio::io::{split, AsyncWriteExt, WriteHalf}; +use tokio_util::io::ReaderStream; + +#[wasm_bindgen] +pub struct EpxTlsStream { + tx: WriteHalf, + onerror: Function, +} + +#[wasm_bindgen] +impl EpxTlsStream { + #[wasm_bindgen(constructor)] + pub fn new() -> Result { + Err(jerr!("Use EpoxyClient.connect_tls() instead.")) + } + + // shut up + #[allow(clippy::too_many_arguments)] + pub async fn connect( + tcp: &EpoxyClient, + onopen: Function, + onclose: Function, + onerror: Function, + onmessage: Function, + url: String, + ) -> Result { + let onerr = onerror.clone(); + let ret: Result = async move { + let url = Uri::try_from(url).replace_err("Failed to parse URL")?; + let url_host = url.host().replace_err("URL must have a host")?; + let url_port = url.port().replace_err("URL must have a port")?.into(); + + let io = tcp.get_tls_io(url_host, url_port).await?; + let (rx, tx) = split(io); + let mut rx = ReaderStream::new(rx); + + wasm_bindgen_futures::spawn_local(async move { + while let Some(Ok(data)) = rx.next().await { + let _ = onmessage.call1( + &JsValue::null(), + &jval!(Uint8Array::from(data.to_vec().as_slice())), + ); + } + let _ = onclose.call0(&JsValue::null()); + }); + + onopen + .call0(&Object::default()) + .replace_err("Failed to call onopen")?; + + Ok(Self { tx, onerror }) + } + .await; + if let Err(ret) = ret { + let _ = onerr.call1(&JsValue::null(), &jval!(ret.clone())); + Err(ret) + } else { + ret + } + } + + #[wasm_bindgen] + pub async fn send(&mut self, payload: Uint8Array) -> Result<(), JsError> { + let onerr = self.onerror.clone(); + let ret = self.tx.write_all(&payload.to_vec()).await; + if let Err(ret) = ret { + let _ = onerr.call1(&JsValue::null(), &jval!(format!("{}", ret))); + Err(ret.into()) + } else { + Ok(ret?) + } + } + + #[wasm_bindgen] + pub async fn close(&mut self) -> Result<(), JsError> { + self.tx.shutdown().await?; + Ok(()) + } +} diff --git a/client/src/utils.rs b/client/src/utils.rs index 077a120..0c71583 100644 --- a/client/src/utils.rs +++ b/client/src/utils.rs @@ -1,13 +1,9 @@ use wasm_bindgen::prelude::*; +use hyper::rt::Executor; use hyper::{header::HeaderValue, Uri}; -use http::uri; use js_sys::{Array, Object}; - -pub fn set_panic_hook() { - #[cfg(feature = "console_error_panic_hook")] - console_error_panic_hook::set_once(); -} +use std::future::Future; #[wasm_bindgen] extern "C" { @@ -55,15 +51,15 @@ pub trait ReplaceErr { fn replace_err_jv(self, err: &str) -> Result; } -impl ReplaceErr for Result { +impl ReplaceErr for Result { type Ok = T; fn replace_err(self, err: &str) -> Result<::Ok, JsError> { - self.map_err(|_| jerr!(err)) + self.map_err(|oe| jerr!(&format!("{}, original error: {:?}", err, oe))) } fn replace_err_jv(self, err: &str) -> Result<::Ok, JsValue> { - self.map_err(|_| jval!(err)) + self.map_err(|oe| jval!(&format!("{}, original error: {:?}", err, oe))) } } @@ -102,6 +98,21 @@ impl UriExt for Uri { } } +#[derive(Clone)] +pub struct WasmExecutor; + +impl Executor for WasmExecutor +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + fn execute(&self, future: F) { + wasm_bindgen_futures::spawn_local(async move { + let _ = future.await; + }); + } +} + pub fn entries_of_object(obj: &Object) -> Vec> { js_sys::Object::entries(obj) .to_vec() @@ -131,41 +142,19 @@ pub fn is_redirect(code: u16) -> bool { } pub fn get_is_secure(url: &Uri) -> Result { - let url_scheme = url.scheme().replace_err("URL must have a scheme")?; let url_scheme_str = url.scheme_str().replace_err("URL must have a scheme")?; - // can't use match, compiler error - // error: to use a constant of type `Scheme` in a pattern, `Scheme` must be annotated with `#[derive(PartialEq, Eq)]` - if *url_scheme == uri::Scheme::HTTP { - Ok(false) - } else if *url_scheme == uri::Scheme::HTTPS { - Ok(true) - } else if url_scheme_str == "ws" { - Ok(false) - } else if url_scheme_str == "wss" { - Ok(true) - } else { - return Ok(false); + match url_scheme_str { + "https" | "wss" => Ok(true), + _ => Ok(false), } } pub fn get_url_port(url: &Uri) -> Result { - let url_scheme = url.scheme().replace_err("URL must have a scheme")?; - let url_scheme_str = url.scheme_str().replace_err("URL must have a scheme")?; if let Some(port) = url.port() { Ok(port.as_u16()) + } else if get_is_secure(url)? { + Ok(443) } else { - // can't use match, compiler error - // error: to use a constant of type `Scheme` in a pattern, `Scheme` must be annotated with `#[derive(PartialEq, Eq)]` - if *url_scheme == uri::Scheme::HTTP { - Ok(80) - } else if *url_scheme == uri::Scheme::HTTPS { - Ok(443) - } else if url_scheme_str == "ws" { - Ok(80) - } else if url_scheme_str == "wss" { - Ok(443) - } else { - return Err(jerr!("Failed to coerce port from scheme")); - } + Ok(80) } } diff --git a/client/src/websocket.rs b/client/src/websocket.rs index addae2c..d186b17 100644 --- a/client/src/websocket.rs +++ b/client/src/websocket.rs @@ -4,7 +4,8 @@ use base64::{engine::general_purpose::STANDARD, Engine}; use fastwebsockets::{ CloseCode, FragmentCollectorRead, Frame, OpCode, Payload, Role, WebSocket, WebSocketWrite, }; -use http_body_util::Empty; +use futures_util::lock::Mutex; +use http_body_util::Full; use hyper::{ header::{CONNECTION, UPGRADE}, upgrade::Upgraded, @@ -16,7 +17,7 @@ use tokio::io::WriteHalf; #[wasm_bindgen] pub struct EpxWebSocket { - tx: WebSocketWrite>>, + tx: Arc>>>>, onerror: Function, } @@ -44,7 +45,8 @@ impl EpxWebSocket { let url = Uri::try_from(url).replace_err("Failed to parse URL")?; let host = url.host().replace_err("URL must have a host")?; - let rand: [u8; 16] = rand::random(); + let mut rand: [u8; 16] = [0; 16]; + getrandom::getrandom(&mut rand)?; let key = STANDARD.encode(rand); let mut builder = Request::builder() @@ -61,23 +63,9 @@ impl EpxWebSocket { builder = builder.header("Sec-WebSocket-Protocol", protocols.join(", ")); } - let req = builder.body(Empty::::new())?; + let req = builder.body(Full::::new(Bytes::new()))?; - let stream = tcp.get_http_io(&url).await?; - - let (mut sender, conn) = Builder::new() - .title_case_headers(true) - .preserve_header_case(true) - .handshake::, Empty>(TokioIo::new(stream)) - .await?; - - wasm_bindgen_futures::spawn_local(async move { - if let Err(e) = conn.with_upgrades().await { - error!("epoxy: error in muxed hyper connection (ws)! {:?}", e); - } - }); - - let mut response = sender.send_request(req).await?; + let mut response = tcp.hyper_client.request(req).await?; verify(&response)?; let ws = WebSocket::after_handshake( @@ -88,16 +76,12 @@ impl EpxWebSocket { let (rx, tx) = ws.split(tokio::io::split); let mut rx = FragmentCollectorRead::new(rx); + let tx = Arc::new(Mutex::new(tx)); + let tx_cloned = tx.clone(); wasm_bindgen_futures::spawn_local(async move { while let Ok(frame) = rx - .read_frame(&mut |arg| async move { - error!( - "wtf is an obligated write {:?}, {:?}, {:?}", - arg.fin, arg.opcode, arg.payload - ); - Ok::<(), std::io::Error>(()) - }) + .read_frame(&mut |arg| async { tx_cloned.lock().await.write_frame(arg).await }) .await { match frame.opcode { @@ -137,10 +121,12 @@ impl EpxWebSocket { } #[wasm_bindgen] - pub async fn send(&mut self, payload: String) -> Result<(), JsError> { + pub async fn send(&self, payload: String) -> Result<(), JsError> { let onerr = self.onerror.clone(); let ret = self .tx + .lock() + .await .write_frame(Frame::text(Payload::Owned(payload.as_bytes().to_vec()))) .await; if let Err(ret) = ret { @@ -152,8 +138,10 @@ impl EpxWebSocket { } #[wasm_bindgen] - pub async fn close(&mut self) -> Result<(), JsError> { + pub async fn close(&self) -> Result<(), JsError> { self.tx + .lock() + .await .write_frame(Frame::close(CloseCode::Normal.into(), b"")) .await?; Ok(()) diff --git a/client/src/wrappers.rs b/client/src/wrappers.rs index 1ecc702..5df0814 100644 --- a/client/src/wrappers.rs +++ b/client/src/wrappers.rs @@ -4,117 +4,11 @@ use std::{ task::{Context, Poll}, }; -use futures_util::{Sink, Stream}; +use futures_util::Stream; use hyper::body::Body; -use penguin_mux_wasm::ws; use pin_project_lite::pin_project; -use ws_stream_wasm::{WsErr, WsMessage, WsMeta, WsStream}; - -pin_project! { - pub struct WsStreamWrapper { - #[pin] - ws: WsStream, - } -} - -impl WsStreamWrapper { - pub async fn connect( - url: impl AsRef, - protocols: impl Into>>, - ) -> Result { - let (_, wsstream) = WsMeta::connect(url, protocols).await?; - Ok(WsStreamWrapper { ws: wsstream }) - } -} - -impl Stream for WsStreamWrapper { - type Item = Result; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - let ret = this.ws.poll_next(cx); - match ret { - Poll::Ready(item) => Poll::>::Ready(item.map(|x| { - Ok(match x { - WsMessage::Text(txt) => ws::Message::Text(txt), - WsMessage::Binary(bin) => ws::Message::Binary(bin), - }) - })), - Poll::Pending => Poll::>::Pending, - } - } -} - -fn wserr_to_ws_err(err: WsErr) -> ws::Error { - debug!("err: {:?}", err); - match err { - WsErr::ConnectionNotOpen => ws::Error::AlreadyClosed, - _ => ws::Error::Io(std::io::Error::other(format!("{:?}", err))), - } -} - -impl Sink for WsStreamWrapper { - type Error = ws::Error; - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - let ret = this.ws.poll_ready(cx); - match ret { - Poll::Ready(item) => Poll::>::Ready(match item { - Ok(_) => Ok(()), - Err(err) => Err(wserr_to_ws_err(err)), - }), - Poll::Pending => Poll::>::Pending, - } - } - - fn start_send(self: Pin<&mut Self>, item: ws::Message) -> Result<(), Self::Error> { - use ws::Message::*; - let item = match item { - Text(txt) => WsMessage::Text(txt), - Binary(bin) => WsMessage::Binary(bin), - Close(_) => { - debug!("closing"); - return match self.ws.wrapped().close() { - Ok(_) => Ok(()), - Err(err) => Err(ws::Error::Io(std::io::Error::other(format!( - "ws close err: {:?}", - err - )))), - }; - } - Ping(_) | Pong(_) | Frame(_) => return Ok(()), - }; - let this = self.project(); - let ret = this.ws.start_send(item); - match ret { - Ok(_) => Ok(()), - Err(err) => Err(wserr_to_ws_err(err)), - } - } - - // no point wrapping this as it's not going to do anything - fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Ok(()).into() - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - let ret = this.ws.poll_close(cx); - match ret { - Poll::Ready(item) => Poll::>::Ready(match item { - Ok(_) => Ok(()), - Err(err) => Err(wserr_to_ws_err(err)), - }), - Poll::Pending => Poll::>::Pending, - } - } -} - -impl ws::WebSocketStream for WsStreamWrapper { - fn ping_auto_pong(&self) -> bool { - true - } -} +use std::future::Future; +use wisp_mux::{tokioio::TokioIo, tower::ServiceWrapper, WispError}; pin_project! { pub struct IncomingBody { @@ -138,7 +32,8 @@ impl Stream for IncomingBody { Poll::Ready(item) => Poll::>::Ready(match item { Some(frame) => frame .map(|x| { - x.into_data().map_err(|_| std::io::Error::other("not data frame")) + x.into_data() + .map_err(|_| std::io::Error::other("not data frame")) }) .ok(), None => None, @@ -147,3 +42,68 @@ impl Stream for IncomingBody { } } } + +pub struct TlsWispService +where + W: wisp_mux::ws::WebSocketWrite + Send + 'static, +{ + pub service: ServiceWrapper, + pub rustls_config: Arc, +} + + +impl tower_service::Service + for TlsWispService +{ + type Response = TokioIo; + type Error = WispError; + type Future = Pin>>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.service.poll_ready(cx) + } + + fn call(&mut self, req: http::Uri) -> Self::Future { + let mut service = self.service.clone(); + let rustls_config = self.rustls_config.clone(); + Box::pin(async move { + let uri_host = req + .host() + .ok_or(WispError::UriHasNoHost)? + .to_string() + .clone(); + let uri_parsed = Uri::builder() + .authority(format!( + "{}:{}", + uri_host, + utils::get_url_port(&req).map_err(|_| WispError::UriHasNoPort)? + )) + .build() + .map_err(|x| WispError::Other(Box::new(x)))?; + let stream = service.call(uri_parsed).await?.into_inner(); + if utils::get_is_secure(&req).map_err(|_| WispError::InvalidUri)? { + let connector = TlsConnector::from(rustls_config); + Ok(TokioIo::new(Either::Left( + connector + .connect( + uri_host.try_into().map_err(|_| WispError::InvalidUri)?, + stream, + ) + .await + .map_err(|x| WispError::Other(Box::new(x)))?, + ))) + } else { + Ok(TokioIo::new(Either::Right(stream))) + } + }) + } +} + +impl Clone for TlsWispService { + fn clone(&self) -> Self { + Self { + rustls_config: self.rustls_config.clone(), + service: self.service.clone(), + } + } +} diff --git a/package.json b/package.json index 3d6c731..15b432c 100644 --- a/package.json +++ b/package.json @@ -16,8 +16,8 @@ "author": "MercuryWorkshop", "repository": "https://github.com/MercuryWorkshop/epoxy-tls", "license": "MIT", - "browser": "./client/module.js", - "module": "./client/module.js", - "main": "./client/module.js", - "types": "./client/module.d.ts" + "browser": "./client/epoxy-module-bundled.js", + "module": "./client/epoxy-module-bundled.js", + "main": "./client/epoxy-module-bundled.js", + "types": "./client/epoxy-module-bundled.d.ts" } diff --git a/server/Cargo.toml b/server/Cargo.toml index 6abb7ca..9e3231d 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -4,10 +4,16 @@ version = "1.0.0" edition = "2021" [dependencies] +bytes = "1.5.0" +clap = { version = "4.4.18", features = ["derive", "help", "usage", "color", "wrap_help", "cargo"] } +clio = { version = "0.3.5", features = ["clap-parse"] } +dashmap = "5.5.3" +fastwebsockets = { version = "0.6.0", features = ["upgrade", "simdutf8", "unstable-split"] } +futures-util = { version = "0.3.30", features = ["sink"] } http-body-util = "0.1.0" hyper = { version = "1.1.0", features = ["server", "http1"] } hyper-util = { version = "0.1.2", features = ["tokio"] } -rusty-penguin = { version = "0.5.3", default-features = false } -tokio = { version = "1.35.1", features = ["rt-multi-thread", "net", "macros"] } +tokio = { version = "1.5.1", features = ["rt-multi-thread", "macros"] } tokio-native-tls = "0.3.1" -tokio-tungstenite = "0.21.0" +tokio-util = { version = "0.7.10", features = ["codec"] } +wisp-mux = { path = "../wisp", features = ["fastwebsockets", "tokio_io"] } diff --git a/server/src/main.rs b/server/src/main.rs index fc56579..7205d18 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,176 +1,283 @@ -use std::{convert::Infallible, env, net::SocketAddr, sync::Arc}; +#![feature(let_chains)] +use std::io::{Error, Read}; +use bytes::Bytes; +use clap::Parser; +use fastwebsockets::{ + upgrade, CloseCode, FragmentCollector, FragmentCollectorRead, Frame, OpCode, Payload, + WebSocketError, +}; +use futures_util::{SinkExt, StreamExt, TryFutureExt}; use hyper::{ - body::Incoming, - header::{ - HeaderValue, CONNECTION, SEC_WEBSOCKET_ACCEPT, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_PROTOCOL, - SEC_WEBSOCKET_VERSION, UPGRADE, - }, - server::conn::http1, - service::service_fn, - upgrade::Upgraded, - Method, Request, Response, StatusCode, Version, + body::Incoming, header::HeaderValue, server::conn::http1, service::service_fn, Request, + Response, StatusCode, }; use hyper_util::rt::TokioIo; -use penguin_mux::{Multiplexor, MuxStream}; -use tokio::{ - net::{TcpListener, TcpStream}, - task::{JoinError, JoinSet}, -}; +use tokio::net::{TcpListener, TcpStream, UdpSocket}; use tokio_native_tls::{native_tls, TlsAcceptor}; -use tokio_tungstenite::{ - tungstenite::{handshake::derive_accept_key, protocol::Role}, - WebSocketStream, -}; +use tokio_util::codec::{BytesCodec, Framed}; -type Body = http_body_util::Empty; +use wisp_mux::{ws, ConnectPacket, MuxStream, ServerMux, StreamType, WispError, WsEvent}; -type MultiplexorStream = MuxStream>>; +type HttpBody = http_body_util::Full; -async fn forward(mut stream: MultiplexorStream) -> Result<(), JoinError> { - println!("forwarding"); - let host = std::str::from_utf8(&stream.dest_host).unwrap(); - let mut tcp_stream = TcpStream::connect((host, stream.dest_port)).await.unwrap(); - println!("connected to {:?}", tcp_stream.peer_addr().unwrap()); - tokio::io::copy_bidirectional(&mut stream, &mut tcp_stream) +#[derive(Parser)] +#[command(version = clap::crate_version!(), about = "Implementation of the Wisp protocol in Rust, made for epoxy.")] +struct Cli { + #[arg(long, default_value = "")] + prefix: String, + #[arg( + long = "port", + short = 'l', + value_name = "PORT", + default_value = "4000" + )] + listen_port: String, + #[arg(long, short, value_parser)] + pubkey: clio::Input, + #[arg(long, short = 'P', value_parser)] + privkey: clio::Input, +} + +#[tokio::main(flavor = "multi_thread")] +async fn main() -> Result<(), Error> { + let mut opt = Cli::parse(); + let mut pem = Vec::new(); + opt.pubkey.read_to_end(&mut pem)?; + let mut key = Vec::new(); + opt.privkey.read_to_end(&mut key)?; + let identity = native_tls::Identity::from_pkcs8(&pem, &key).expect("failed to make identity"); + + let socket = TcpListener::bind(format!("0.0.0.0:{}", opt.listen_port)) .await - .unwrap(); - println!("finished"); - Ok(()) -} - -async fn handle_connection(ws_stream: WebSocketStream>, addr: SocketAddr) { - println!("WebSocket connection established: {}", addr); - let mux = Multiplexor::new(ws_stream, penguin_mux::Role::Server, None, None); - let mut jobs = JoinSet::new(); - println!("muxing"); - loop { - tokio::select! { - Some(result) = jobs.join_next() => { - match result { - Ok(Ok(())) => {} - Ok(Err(err)) | Err(err) => eprintln!("failed to forward: {:?}", err), - } - } - Ok(result) = mux.server_new_stream_channel() => { - jobs.spawn(forward(result)); - } - else => { - break; - } - } - } - println!("{} disconnected", &addr); -} - -async fn handle_request( - mut req: Request, - addr: SocketAddr, -) -> Result, Infallible> { - let headers = req.headers(); - let derived = headers - .get(SEC_WEBSOCKET_KEY) - .map(|k| derive_accept_key(k.as_bytes())); - - let mut negotiated_protocol: Option = None; - if let Some(protocols) = headers - .get(SEC_WEBSOCKET_PROTOCOL) - .and_then(|h| h.to_str().ok()) - { - negotiated_protocol = protocols.split(',').next().map(|h| h.trim().to_string()); - } - - if req.method() != Method::GET - || req.version() < Version::HTTP_11 - || !headers - .get(CONNECTION) - .and_then(|h| h.to_str().ok()) - .map(|h| { - h.split(|c| c == ' ' || c == ',') - .any(|p| p.eq_ignore_ascii_case("upgrade")) - }) - .unwrap_or(false) - || !headers - .get(UPGRADE) - .and_then(|h| h.to_str().ok()) - .map(|h| h.eq_ignore_ascii_case("websocket")) - .unwrap_or(false) - || !headers - .get(SEC_WEBSOCKET_VERSION) - .map(|h| h == "13") - .unwrap_or(false) - || derived.is_none() - { - return Ok(Response::new(Body::default())); - } - - let ver = req.version(); - tokio::task::spawn(async move { - match hyper::upgrade::on(&mut req).await { - Ok(upgraded) => { - let upgraded = TokioIo::new(upgraded); - handle_connection( - WebSocketStream::from_raw_socket(upgraded, Role::Server, None).await, - addr, - ) - .await; - } - Err(e) => eprintln!("upgrade error: {}", e), - } - }); - - let mut res = Response::new(Body::default()); - *res.status_mut() = StatusCode::SWITCHING_PROTOCOLS; - *res.version_mut() = ver; - res.headers_mut() - .append(CONNECTION, HeaderValue::from_static("Upgrade")); - res.headers_mut() - .append(UPGRADE, HeaderValue::from_static("websocket")); - res.headers_mut() - .append(SEC_WEBSOCKET_ACCEPT, derived.unwrap().parse().unwrap()); - if let Some(protocol) = negotiated_protocol { - res.headers_mut() - .append(SEC_WEBSOCKET_PROTOCOL, protocol.parse().unwrap()); - } - - Ok(res) -} - -#[tokio::main] -async fn main() -> Result<(), Box> { - let addr = env::args() - .nth(1) - .unwrap_or_else(|| "0.0.0.0:4000".to_string()) - .parse::()?; - let pem = include_bytes!("./pem.pem"); - let key = include_bytes!("./key.pem"); - - let identity = native_tls::Identity::from_pkcs8(pem, key).expect("invalid pem/key"); - - let acceptor = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity).unwrap()); - let acceptor = Arc::new(acceptor); - - let listener = TcpListener::bind(addr).await?; - - println!("listening on {}", addr); - - loop { - let (stream, remote_addr) = listener.accept().await?; - let acceptor = acceptor.clone(); + .expect("failed to bind"); + let acceptor = TlsAcceptor::from( + native_tls::TlsAcceptor::new(identity).expect("failed to make tls acceptor"), + ); + let acceptor = std::sync::Arc::new(acceptor); + println!("listening on 0.0.0.0:4000"); + while let Ok((stream, addr)) = socket.accept().await { + let acceptor_cloned = acceptor.clone(); + let prefix_cloned = opt.prefix.clone(); tokio::spawn(async move { - let stream = acceptor.accept(stream).await.expect("not tls"); + let stream = acceptor_cloned.accept(stream).await.expect("not tls"); let io = TokioIo::new(stream); - - let service = service_fn(move |req| handle_request(req, remote_addr)); - + let service = + service_fn(move |res| accept_http(res, addr.to_string(), prefix_cloned.clone())); let conn = http1::Builder::new() .serve_connection(io, service) .with_upgrades(); - if let Err(err) = conn.await { - eprintln!("failed to serve connection: {:?}", err); + println!("{:?}: failed to serve conn: {:?}", addr, err); } }); } + + Ok(()) +} + +async fn accept_http( + mut req: Request, + addr: String, + prefix: String, +) -> Result, WebSocketError> { + let uri = req.uri().clone().path().to_string(); + if upgrade::is_upgrade_request(&req) + && let Some(uri) = uri.strip_prefix(&prefix) + { + let (mut res, fut) = upgrade::upgrade(&mut req)?; + + if let Some(protocols) = req.headers().get("Sec-Websocket-Protocol").and_then(|x| { + Some( + x.to_str() + .ok()? + .split(',') + .map(|x| x.trim()) + .collect::>(), + ) + }) && protocols.contains(&"wisp-v1") + && (uri == "" || uri == "/") + { + tokio::spawn(async move { accept_ws(fut, addr.clone()).await }); + res.headers_mut().insert( + "Sec-Websocket-Protocol", + HeaderValue::from_str("wisp-v1").unwrap(), + ); + } else { + let uri = uri.strip_prefix("/").unwrap_or(uri).to_string(); + tokio::spawn(async move { accept_wsproxy(fut, uri, addr.clone()).await }); + } + + Ok(Response::from_parts( + res.into_parts().0, + HttpBody::new(Bytes::new()), + )) + } else { + println!("random request to path {:?}", uri); + Ok(Response::builder() + .status(StatusCode::OK) + .body(HttpBody::new(":3".to_string().into())) + .unwrap()) + } +} + +async fn handle_mux( + packet: ConnectPacket, + mut stream: MuxStream, +) -> Result { + let uri = format!( + "{}:{}", + packet.destination_hostname, packet.destination_port + ); + match packet.stream_type { + StreamType::Tcp => { + let mut tcp_stream = TcpStream::connect(uri) + .await + .map_err(|x| WispError::Other(Box::new(x)))?; + let mut mux_stream = stream.into_io().into_asyncrw(); + tokio::io::copy_bidirectional(&mut tcp_stream, &mut mux_stream) + .await + .map_err(|x| WispError::Other(Box::new(x)))?; + } + StreamType::Udp => { + let udp_socket = UdpSocket::bind(uri) + .await + .map_err(|x| WispError::Other(Box::new(x)))?; + let mut data = vec![0u8; 65507]; // udp standard max datagram size + loop { + tokio::select! { + size = udp_socket.recv(&mut data).map_err(|x| WispError::Other(Box::new(x))) => { + let size = size?; + stream.write(Bytes::copy_from_slice(&data[..size])).await? + }, + event = stream.read() => { + match event { + Some(event) => match event { + WsEvent::Send(data) => { + udp_socket.send(&data).await.map_err(|x| WispError::Other(Box::new(x)))?; + } + WsEvent::Close(_) => return Ok(false), + }, + None => break, + } + } + } + } + } + } + Ok(true) +} + +async fn accept_ws( + fut: upgrade::UpgradeFut, + addr: String, +) -> Result<(), Box> { + let (rx, tx) = fut.await?.split(tokio::io::split); + let rx = FragmentCollectorRead::new(rx); + + println!("{:?}: connected", addr); + + let (mut mux, fut) = ServerMux::new(rx, tx, 128); + + tokio::spawn(async move { + if let Err(e) = fut.await { + println!("err in mux: {:?}", e); + } + }); + + while let Some((packet, stream)) = mux.server_new_stream().await { + tokio::spawn(async move { + let close_err = stream.get_close_handle(); + let close_ok = stream.get_close_handle(); + let _ = handle_mux(packet, stream) + .or_else(|err| async move { + let _ = close_err.close(0x03).await; + Err(err) + }) + .and_then(|should_send| async move { + if should_send { + close_ok.close(0x02).await + } else { + Ok(()) + } + }) + .await; + }); + } + + println!("{:?}: disconnected", addr); + Ok(()) +} + +async fn accept_wsproxy( + fut: upgrade::UpgradeFut, + incoming_uri: String, + addr: String, +) -> Result<(), Box> { + let mut ws_stream = FragmentCollector::new(fut.await?); + + println!("{:?}: connected (wsproxy): {:?}", addr, incoming_uri); + + match hyper::Uri::try_from(incoming_uri.clone()) { + Ok(_) => (), + Err(err) => { + ws_stream.write_frame(Frame::close(CloseCode::Away.into(), b"invalid uri")).await?; + return Err(Box::new(err)); + } + } + + let tcp_stream = match TcpStream::connect(incoming_uri).await { + Ok(stream) => stream, + Err(err) => { + ws_stream + .write_frame(Frame::close(CloseCode::Away.into(), b"failed to connect")) + .await?; + return Err(Box::new(err)); + } + }; + let mut tcp_stream_framed = Framed::new(tcp_stream, BytesCodec::new()); + + loop { + tokio::select! { + event = ws_stream.read_frame() => { + match event { + Ok(frame) => { + match frame.opcode { + OpCode::Text | OpCode::Binary => { + let _ = tcp_stream_framed.send(Bytes::from(frame.payload.to_vec())).await; + } + OpCode::Close => { + // tokio closes the stream for us + drop(tcp_stream_framed); + break; + } + _ => {} + } + }, + Err(_) => { + // tokio closes the stream for us + drop(tcp_stream_framed); + break; + } + } + }, + event = tcp_stream_framed.next() => { + if let Some(res) = event { + match res { + Ok(buf) => { + let _ = ws_stream.write_frame(Frame::binary(Payload::Borrowed(&buf))).await; + } + Err(_) => { + let _ = ws_stream.write_frame(Frame::close(CloseCode::Away.into(), b"tcp side is going away")).await; + } + } + } + } + } + } + + println!("{:?}: disconnected (wsproxy)", addr); + + Ok(()) } diff --git a/simple-wisp-client/Cargo.toml b/simple-wisp-client/Cargo.toml new file mode 100644 index 0000000..9525805 --- /dev/null +++ b/simple-wisp-client/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "simple-wisp-client" +version = "0.1.0" +edition = "2021" + +[dependencies] +bytes = "1.5.0" +fastwebsockets = { version = "0.6.0", features = ["unstable-split", "upgrade"] } +futures = "0.3.30" +http-body-util = "0.1.0" +hyper = { version = "1.1.0", features = ["http1", "client"] } +tokio = { version = "1.36.0", features = ["full"] } +tokio-native-tls = "0.3.1" +tokio-util = "0.7.10" +wisp-mux = { path = "../wisp", features = ["fastwebsockets"]} + diff --git a/simple-wisp-client/src/main.rs b/simple-wisp-client/src/main.rs new file mode 100644 index 0000000..2b3ca0e --- /dev/null +++ b/simple-wisp-client/src/main.rs @@ -0,0 +1,114 @@ +use bytes::Bytes; +use fastwebsockets::{handshake, FragmentCollectorRead}; +use futures::io::AsyncWriteExt; +use http_body_util::Empty; +use hyper::{ + header::{CONNECTION, UPGRADE}, + Request, +}; +use std::{error::Error, future::Future}; +use tokio::net::TcpStream; +use tokio_native_tls::{native_tls, TlsConnector}; +use wisp_mux::{ClientMux, StreamType}; +use tokio_util::either::Either; + +#[derive(Debug)] +struct StrError(String); + +impl StrError { + pub fn new(str: &str) -> Self { + Self(str.to_string()) + } +} + +impl std::fmt::Display for StrError { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { + write!(fmt, "{}", self.0) + } +} + +impl Error for StrError {} + +struct SpawnExecutor; + +impl hyper::rt::Executor for SpawnExecutor +where + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, +{ + fn execute(&self, fut: Fut) { + tokio::task::spawn(fut); + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let addr = std::env::args() + .nth(1) + .ok_or(StrError::new("no src addr"))?; + + let addr_port: u16 = std::env::args() + .nth(2) + .ok_or(StrError::new("no src port"))? + .parse()?; + + let addr_dest = std::env::args() + .nth(3) + .ok_or(StrError::new("no dest addr"))?; + + let addr_dest_port: u16 = std::env::args() + .nth(4) + .ok_or(StrError::new("no dest port"))? + .parse()?; + let should_tls: bool = std::env::args() + .nth(5) + .ok_or(StrError::new("no should tls"))? + .parse()?; + + let socket = TcpStream::connect(format!("{}:{}", &addr, addr_port)).await?; + let socket = if should_tls { + let cx = TlsConnector::from(native_tls::TlsConnector::builder().build()?); + Either::Left(cx.connect(&addr, socket).await?) + } else { + Either::Right(socket) + }; + let req = Request::builder() + .method("GET") + .uri(format!("wss://{}:{}/", &addr, addr_port)) + .header("Host", &addr) + .header(UPGRADE, "websocket") + .header(CONNECTION, "upgrade") + .header( + "Sec-WebSocket-Key", + fastwebsockets::handshake::generate_key(), + ) + .header("Sec-WebSocket-Version", "13") + .header("Sec-WebSocket-Protocol", "wisp-v1") + .body(Empty::::new())?; + + let (ws, _) = handshake::client(&SpawnExecutor, req, socket).await?; + + let (rx, tx) = ws.split(tokio::io::split); + let rx = FragmentCollectorRead::new(rx); + + let (mux, fut) = ClientMux::new(rx, tx).await?; + + tokio::task::spawn(fut); + + let mut hi: u64 = 0; + loop { + let mut channel = mux + .client_new_stream(StreamType::Tcp, addr_dest.clone(), addr_dest_port) + .await? + .into_io() + .into_asyncrw(); + for _ in 0..10 { + channel.write_all(b"hiiiiiiii").await?; + hi += 1; + println!("said hi {}", hi); + } + } + + #[allow(unreachable_code)] + Ok(()) +} diff --git a/wisp/.gitignore b/wisp/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/wisp/.gitignore @@ -0,0 +1 @@ +/target diff --git a/wisp/Cargo.toml b/wisp/Cargo.toml new file mode 100644 index 0000000..da0a601 --- /dev/null +++ b/wisp/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "wisp-mux" +version = "0.1.0" +edition = "2021" + +[dependencies] +async_io_stream = "0.3.3" +bytes = "1.5.0" +event-listener = "5.0.0" +fastwebsockets = { version = "0.6.0", features = ["unstable-split"], optional = true } +futures = "0.3.30" +futures-util = "0.3.30" +hyper = { version = "1.1.0", optional = true } +hyper-util = { git = "https://github.com/r58Playz/hyper-util-wasm", features = ["client", "client-legacy"], optional = true } +pin-project-lite = "0.2.13" +tokio = { version = "1.35.1", optional = true, default-features = false } +tower-service = { version = "0.3.2", optional = true } +ws_stream_wasm = { version = "0.7.4", optional = true } + +[features] +fastwebsockets = ["dep:fastwebsockets", "dep:tokio"] +ws_stream_wasm = ["dep:ws_stream_wasm"] +tokio_io = ["async_io_stream/tokio_io"] +hyper_tower = ["dep:tower-service", "dep:hyper", "dep:tokio", "dep:hyper-util"] + diff --git a/wisp/src/fastwebsockets.rs b/wisp/src/fastwebsockets.rs new file mode 100644 index 0000000..fb31e4a --- /dev/null +++ b/wisp/src/fastwebsockets.rs @@ -0,0 +1,72 @@ +use bytes::Bytes; +use fastwebsockets::{ + FragmentCollectorRead, Frame, OpCode, Payload, WebSocketError, WebSocketWrite, +}; +use tokio::io::{AsyncRead, AsyncWrite}; + +impl From for crate::ws::OpCode { + fn from(opcode: OpCode) -> Self { + use OpCode::*; + match opcode { + Continuation => unreachable!(), + Text => Self::Text, + Binary => Self::Binary, + Close => Self::Close, + Ping => Self::Ping, + Pong => Self::Pong, + } + } +} + +impl From> for crate::ws::Frame { + fn from(mut frame: Frame) -> Self { + Self { + finished: frame.fin, + opcode: frame.opcode.into(), + payload: Bytes::copy_from_slice(frame.payload.to_mut()), + } + } +} + +impl TryFrom for Frame<'_> { + type Error = crate::WispError; + fn try_from(frame: crate::ws::Frame) -> Result { + use crate::ws::OpCode::*; + Ok(match frame.opcode { + Text => Self::text(Payload::Owned(frame.payload.to_vec())), + Binary => Self::binary(Payload::Owned(frame.payload.to_vec())), + Close => Self::close_raw(Payload::Owned(frame.payload.to_vec())), + Ping => Self::new( + true, + OpCode::Ping, + None, + Payload::Owned(frame.payload.to_vec()), + ), + Pong => Self::pong(Payload::Owned(frame.payload.to_vec())), + }) + } +} + +impl From for crate::WispError { + fn from(err: WebSocketError) -> Self { + Self::WsImplError(Box::new(err)) + } +} + +impl crate::ws::WebSocketRead for FragmentCollectorRead { + async fn wisp_read_frame( + &mut self, + tx: &crate::ws::LockedWebSocketWrite, + ) -> Result { + Ok(self + .read_frame(&mut |frame| async { tx.write_frame(frame.into()).await }) + .await? + .into()) + } +} + +impl crate::ws::WebSocketWrite for WebSocketWrite { + async fn wisp_write_frame(&mut self, frame: crate::ws::Frame) -> Result<(), crate::WispError> { + self.write_frame(frame.try_into()?).await.map_err(|e| e.into()) + } +} diff --git a/wisp/src/lib.rs b/wisp/src/lib.rs new file mode 100644 index 0000000..7ad9931 --- /dev/null +++ b/wisp/src/lib.rs @@ -0,0 +1,390 @@ +#![feature(impl_trait_in_assoc_type)] +#[cfg(feature = "fastwebsockets")] +mod fastwebsockets; +mod packet; +mod stream; +#[cfg(feature = "hyper_tower")] +pub mod tokioio; +#[cfg(feature = "hyper_tower")] +pub mod tower; +pub mod ws; +#[cfg(feature = "ws_stream_wasm")] +mod ws_stream_wasm; + +pub use crate::packet::*; +pub use crate::stream::*; + +use event_listener::Event; +use futures::{channel::mpsc, lock::Mutex, Future, FutureExt, StreamExt}; +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicBool, AtomicU32, Ordering}, + Arc, + }, +}; + +#[derive(Debug, PartialEq, Copy, Clone)] +pub enum Role { + Client, + Server, +} + +#[derive(Debug)] +pub enum WispError { + PacketTooSmall, + InvalidPacketType, + InvalidStreamType, + InvalidStreamId, + InvalidUri, + UriHasNoHost, + UriHasNoPort, + MaxStreamCountReached, + StreamAlreadyClosed, + WsFrameInvalidType, + WsFrameNotFinished, + WsImplError(Box), + WsImplSocketClosed, + WsImplNotSupported, + Utf8Error(std::str::Utf8Error), + Other(Box), +} + +impl From for WispError { + fn from(err: std::str::Utf8Error) -> WispError { + WispError::Utf8Error(err) + } +} + +impl std::fmt::Display for WispError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + use WispError::*; + match self { + PacketTooSmall => write!(f, "Packet too small"), + InvalidPacketType => write!(f, "Invalid packet type"), + InvalidStreamType => write!(f, "Invalid stream type"), + InvalidStreamId => write!(f, "Invalid stream id"), + InvalidUri => write!(f, "Invalid URI"), + UriHasNoHost => write!(f, "URI has no host"), + UriHasNoPort => write!(f, "URI has no port"), + MaxStreamCountReached => write!(f, "Maximum stream count reached"), + StreamAlreadyClosed => write!(f, "Stream already closed"), + WsFrameInvalidType => write!(f, "Invalid websocket frame type"), + WsFrameNotFinished => write!(f, "Unfinished websocket frame"), + WsImplError(err) => write!(f, "Websocket implementation error: {:?}", err), + WsImplSocketClosed => write!(f, "Websocket implementation error: websocket closed"), + WsImplNotSupported => write!(f, "Websocket implementation error: unsupported feature"), + Utf8Error(err) => write!(f, "UTF-8 error: {:?}", err), + Other(err) => write!(f, "Other error: {:?}", err), + } + } +} + +impl std::error::Error for WispError {} + +struct ServerMuxInner +where + W: ws::WebSocketWrite + Send + 'static, +{ + tx: ws::LockedWebSocketWrite, + stream_map: Arc>>>, + close_tx: mpsc::UnboundedSender, +} + +impl ServerMuxInner { + pub async fn into_future( + self, + rx: R, + close_rx: mpsc::UnboundedReceiver, + muxstream_sender: mpsc::UnboundedSender<(ConnectPacket, MuxStream)>, + buffer_size: u32 + ) -> Result<(), WispError> + where + R: ws::WebSocketRead, + { + let ret = futures::select! { + x = self.server_close_loop(close_rx, self.stream_map.clone(), self.tx.clone()).fuse() => x, + x = self.server_msg_loop(rx, muxstream_sender, buffer_size).fuse() => x + }; + self.stream_map.lock().await.iter().for_each(|x| { + let _ = x.1.unbounded_send(WsEvent::Close(ClosePacket::new(0x01))); + }); + ret + } + + async fn server_close_loop( + &self, + mut close_rx: mpsc::UnboundedReceiver, + stream_map: Arc>>>, + tx: ws::LockedWebSocketWrite, + ) -> Result<(), WispError> { + while let Some(msg) = close_rx.next().await { + match msg { + MuxEvent::Close(stream_id, reason, channel) => { + if stream_map.lock().await.remove(&stream_id).is_some() { + let _ = channel.send( + tx.write_frame(Packet::new_close(stream_id, reason).into()) + .await, + ); + } else { + let _ = channel.send(Err(WispError::InvalidStreamId)); + } + } + } + } + Ok(()) + } + + async fn server_msg_loop( + &self, + mut rx: R, + muxstream_sender: mpsc::UnboundedSender<(ConnectPacket, MuxStream)>, + buffer_size: u32, + ) -> Result<(), WispError> + where + R: ws::WebSocketRead, + { + self.tx + .write_frame(Packet::new_continue(0, buffer_size).into()) + .await?; + + while let Ok(frame) = rx.wisp_read_frame(&self.tx).await { + if let Ok(packet) = Packet::try_from(frame) { + use PacketType::*; + match packet.packet { + Connect(inner_packet) => { + let (ch_tx, ch_rx) = mpsc::unbounded(); + self.stream_map.lock().await.insert(packet.stream_id, ch_tx); + muxstream_sender + .unbounded_send(( + inner_packet, + MuxStream::new( + packet.stream_id, + Role::Server, + ch_rx, + self.tx.clone(), + self.close_tx.clone(), + AtomicBool::new(false).into(), + AtomicU32::new(buffer_size).into(), + Event::new().into(), + ), + )) + .map_err(|x| WispError::Other(Box::new(x)))?; + } + Data(data) => { + if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { + let _ = stream.unbounded_send(WsEvent::Send(data)); + } + } + Continue(_) => unreachable!(), + Close(inner_packet) => { + if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { + let _ = stream.unbounded_send(WsEvent::Close(inner_packet)); + } + self.stream_map.lock().await.remove(&packet.stream_id); + } + } + } else { + break; + } + } + drop(muxstream_sender); + Ok(()) + } +} + +pub struct ServerMux +where + W: ws::WebSocketWrite + Send + 'static, +{ + muxstream_recv: mpsc::UnboundedReceiver<(ConnectPacket, MuxStream)>, +} + +impl ServerMux { + pub fn new(read: R, write: W, buffer_size: u32) -> (Self, impl Future>) + where + R: ws::WebSocketRead, + { + let (close_tx, close_rx) = mpsc::unbounded::(); + let (tx, rx) = mpsc::unbounded::<(ConnectPacket, MuxStream)>(); + let write = ws::LockedWebSocketWrite::new(write); + let map = Arc::new(Mutex::new(HashMap::new())); + ( + Self { muxstream_recv: rx }, + ServerMuxInner { + tx: write, + close_tx, + stream_map: map.clone(), + } + .into_future(read, close_rx, tx, buffer_size), + ) + } + + pub async fn server_new_stream(&mut self) -> Option<(ConnectPacket, MuxStream)> { + self.muxstream_recv.next().await + } +} + +pub struct ClientMuxInner +where + W: ws::WebSocketWrite, +{ + tx: ws::LockedWebSocketWrite, + stream_map: + Arc, Arc, Arc)>>>, +} + +impl ClientMuxInner { + pub async fn into_future( + self, + rx: R, + close_rx: mpsc::UnboundedReceiver, + ) -> Result<(), WispError> + where + R: ws::WebSocketRead, + { + futures::select! { + x = self.client_bg_loop(close_rx).fuse() => x, + x = self.client_loop(rx).fuse() => x + } + } + + async fn client_bg_loop( + &self, + mut close_rx: mpsc::UnboundedReceiver, + ) -> Result<(), WispError> { + while let Some(msg) = close_rx.next().await { + match msg { + MuxEvent::Close(stream_id, reason, channel) => { + if self.stream_map.lock().await.remove(&stream_id).is_some() { + let _ = channel.send( + self.tx + .write_frame(Packet::new_close(stream_id, reason).into()) + .await, + ); + } else { + let _ = channel.send(Err(WispError::InvalidStreamId)); + } + } + } + } + Ok(()) + } + + async fn client_loop(&self, mut rx: R) -> Result<(), WispError> + where + R: ws::WebSocketRead, + { + while let Ok(frame) = rx.wisp_read_frame(&self.tx).await { + if let Ok(packet) = Packet::try_from(frame) { + use PacketType::*; + match packet.packet { + Connect(_) => unreachable!(), + Data(data) => { + if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { + let _ = stream.0.unbounded_send(WsEvent::Send(data)); + } + } + Continue(inner_packet) => { + if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { + stream + .1 + .store(inner_packet.buffer_remaining, Ordering::Release); + let _ = stream.2.notify(u32::MAX); + } + } + Close(inner_packet) => { + if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { + let _ = stream.0.unbounded_send(WsEvent::Close(inner_packet)); + } + self.stream_map.lock().await.remove(&packet.stream_id); + } + } + } + } + Ok(()) + } +} + +pub struct ClientMux +where + W: ws::WebSocketWrite, +{ + tx: ws::LockedWebSocketWrite, + stream_map: + Arc, Arc, Arc)>>>, + next_free_stream_id: AtomicU32, + close_tx: mpsc::UnboundedSender, + buf_size: u32, +} + +impl ClientMux { + pub async fn new( + mut read: R, + write: W, + ) -> Result<(Self, impl Future>), WispError> + where + R: ws::WebSocketRead, + { + let write = ws::LockedWebSocketWrite::new(write); + let first_packet = Packet::try_from(read.wisp_read_frame(&write).await?)?; + if first_packet.stream_id != 0 { + return Err(WispError::InvalidStreamId); + } + if let PacketType::Continue(packet) = first_packet.packet { + let (tx, rx) = mpsc::unbounded::(); + let map = Arc::new(Mutex::new(HashMap::new())); + Ok(( + Self { + tx: write.clone(), + stream_map: map.clone(), + next_free_stream_id: AtomicU32::new(1), + close_tx: tx, + buf_size: packet.buffer_remaining, + }, + ClientMuxInner { + tx: write.clone(), + stream_map: map.clone(), + } + .into_future(read, rx), + )) + } else { + Err(WispError::InvalidPacketType) + } + } + + pub async fn client_new_stream( + &self, + stream_type: StreamType, + host: String, + port: u16, + ) -> Result, WispError> { + let (ch_tx, ch_rx) = mpsc::unbounded(); + let evt: Arc = Event::new().into(); + let flow_control: Arc = AtomicU32::new(self.buf_size).into(); + let stream_id = self.next_free_stream_id.load(Ordering::Acquire); + self.tx + .write_frame(Packet::new_connect(stream_id, stream_type, port, host).into()) + .await?; + self.next_free_stream_id.store( + stream_id + .checked_add(1) + .ok_or(WispError::MaxStreamCountReached)?, + Ordering::Release, + ); + self.stream_map + .lock() + .await + .insert(stream_id, (ch_tx, flow_control.clone(), evt.clone())); + Ok(MuxStream::new( + stream_id, + Role::Client, + ch_rx, + self.tx.clone(), + self.close_tx.clone(), + AtomicBool::new(false).into(), + flow_control, + evt, + )) + } +} diff --git a/wisp/src/packet.rs b/wisp/src/packet.rs new file mode 100644 index 0000000..505bbe6 --- /dev/null +++ b/wisp/src/packet.rs @@ -0,0 +1,255 @@ +use crate::ws; +use crate::WispError; +use bytes::{Buf, BufMut, Bytes}; + +#[derive(Debug)] +pub enum StreamType { + Tcp = 0x01, + Udp = 0x02, +} + +impl TryFrom for StreamType { + type Error = WispError; + fn try_from(stream_type: u8) -> Result { + use StreamType::*; + match stream_type { + 0x01 => Ok(Tcp), + 0x02 => Ok(Udp), + _ => Err(Self::Error::InvalidStreamType), + } + } +} + +#[derive(Debug)] +pub struct ConnectPacket { + pub stream_type: StreamType, + pub destination_port: u16, + pub destination_hostname: String, +} + +impl ConnectPacket { + pub fn new(stream_type: StreamType, destination_port: u16, destination_hostname: String) -> Self { + Self { + stream_type, + destination_port, + destination_hostname, + } + } +} + +impl TryFrom for ConnectPacket { + type Error = WispError; + fn try_from(mut bytes: Bytes) -> Result { + if bytes.remaining() < (1 + 2) { + return Err(Self::Error::PacketTooSmall); + } + Ok(Self { + stream_type: bytes.get_u8().try_into()?, + destination_port: bytes.get_u16_le(), + destination_hostname: std::str::from_utf8(&bytes)?.to_string(), + }) + } +} + +impl From for Vec { + fn from(packet: ConnectPacket) -> Self { + let mut encoded = Self::with_capacity(1 + 2 + packet.destination_hostname.len()); + encoded.put_u8(packet.stream_type as u8); + encoded.put_u16_le(packet.destination_port); + encoded.extend(packet.destination_hostname.bytes()); + encoded + } +} + +#[derive(Debug)] +pub struct ContinuePacket { + pub buffer_remaining: u32, +} + +impl ContinuePacket { + pub fn new(buffer_remaining: u32) -> Self { + Self { buffer_remaining } + } +} + +impl TryFrom for ContinuePacket { + type Error = WispError; + fn try_from(mut bytes: Bytes) -> Result { + if bytes.remaining() < 4 { + return Err(Self::Error::PacketTooSmall); + } + Ok(Self { + buffer_remaining: bytes.get_u32_le(), + }) + } +} + +impl From for Vec { + fn from(packet: ContinuePacket) -> Self { + let mut encoded = Self::with_capacity(4); + encoded.put_u32_le(packet.buffer_remaining); + encoded + } +} + +#[derive(Debug)] +pub struct ClosePacket { + pub reason: u8, +} + +impl ClosePacket { + pub fn new(reason: u8) -> Self { + Self { reason } + } +} + +impl TryFrom for ClosePacket { + type Error = WispError; + fn try_from(mut bytes: Bytes) -> Result { + if bytes.remaining() < 1 { + return Err(Self::Error::PacketTooSmall); + } + Ok(Self { + reason: bytes.get_u8(), + }) + } +} + +impl From for Vec { + fn from(packet: ClosePacket) -> Self { + let mut encoded = Self::with_capacity(1); + encoded.put_u8(packet.reason); + encoded + } +} + +#[derive(Debug)] +pub enum PacketType { + Connect(ConnectPacket), + Data(Bytes), + Continue(ContinuePacket), + Close(ClosePacket), +} + +impl PacketType { + pub fn as_u8(&self) -> u8 { + use PacketType::*; + match self { + Connect(_) => 0x01, + Data(_) => 0x02, + Continue(_) => 0x03, + Close(_) => 0x04, + } + } +} + +impl From for Vec { + fn from(packet: PacketType) -> Self { + use PacketType::*; + match packet { + Connect(x) => x.into(), + Data(x) => x.to_vec(), + Continue(x) => x.into(), + Close(x) => x.into(), + } + } +} + +#[derive(Debug)] +pub struct Packet { + pub stream_id: u32, + pub packet: PacketType, +} + +impl Packet { + pub fn new(stream_id: u32, packet: PacketType) -> Self { + Self { stream_id, packet } + } + + pub fn new_connect( + stream_id: u32, + stream_type: StreamType, + destination_port: u16, + destination_hostname: String, + ) -> Self { + Self { + stream_id, + packet: PacketType::Connect(ConnectPacket::new( + stream_type, + destination_port, + destination_hostname, + )), + } + } + + pub fn new_data(stream_id: u32, data: Bytes) -> Self { + Self { + stream_id, + packet: PacketType::Data(data), + } + } + + pub fn new_continue(stream_id: u32, buffer_remaining: u32) -> Self { + Self { + stream_id, + packet: PacketType::Continue(ContinuePacket::new(buffer_remaining)), + } + } + + pub fn new_close(stream_id: u32, reason: u8) -> Self { + Self { + stream_id, + packet: PacketType::Close(ClosePacket::new(reason)), + } + } +} + +impl TryFrom for Packet { + type Error = WispError; + fn try_from(mut bytes: Bytes) -> Result { + if bytes.remaining() < 5 { + return Err(Self::Error::PacketTooSmall); + } + let packet_type = bytes.get_u8(); + use PacketType::*; + Ok(Self { + stream_id: bytes.get_u32_le(), + packet: match packet_type { + 0x01 => Connect(ConnectPacket::try_from(bytes)?), + 0x02 => Data(bytes), + 0x03 => Continue(ContinuePacket::try_from(bytes)?), + 0x04 => Close(ClosePacket::try_from(bytes)?), + _ => return Err(Self::Error::InvalidPacketType), + }, + }) + } +} + +impl From for Vec { + fn from(packet: Packet) -> Self { + let mut encoded = Self::with_capacity(1 + 4); + encoded.push(packet.packet.as_u8()); + encoded.put_u32_le(packet.stream_id); + encoded.extend(Vec::::from(packet.packet)); + encoded + } +} + +impl TryFrom for Packet { + type Error = WispError; + fn try_from(frame: ws::Frame) -> Result { + if !frame.finished { + return Err(Self::Error::WsFrameNotFinished); + } + if frame.opcode != ws::OpCode::Binary { + return Err(Self::Error::WsFrameInvalidType); + } + frame.payload.try_into() + } +} + +impl From for ws::Frame { + fn from(packet: Packet) -> Self { + Self::binary(Vec::::from(packet).into()) + } +} diff --git a/wisp/src/stream.rs b/wisp/src/stream.rs new file mode 100644 index 0000000..f561edb --- /dev/null +++ b/wisp/src/stream.rs @@ -0,0 +1,298 @@ +use async_io_stream::IoStream; +use bytes::Bytes; +use event_listener::Event; +use futures::{ + channel::{mpsc, oneshot}, + sink, stream, + task::{Context, Poll}, + Sink, Stream, StreamExt, +}; +use pin_project_lite::pin_project; +use std::{ + pin::Pin, + sync::{ + atomic::{AtomicBool, AtomicU32, Ordering}, + Arc, + }, +}; + +pub enum WsEvent { + Send(Bytes), + Close(crate::ClosePacket), +} + +pub enum MuxEvent { + Close(u32, u8, oneshot::Sender>), +} + +pub struct MuxStreamRead +where + W: crate::ws::WebSocketWrite, +{ + pub stream_id: u32, + role: crate::Role, + tx: crate::ws::LockedWebSocketWrite, + rx: mpsc::UnboundedReceiver, + is_closed: Arc, + flow_control: Arc, +} + +impl MuxStreamRead { + pub async fn read(&mut self) -> Option { + if self.is_closed.load(Ordering::Acquire) { + return None; + } + match self.rx.next().await? { + WsEvent::Send(bytes) => { + if self.role == crate::Role::Server { + let old_val = self.flow_control.fetch_add(1, Ordering::SeqCst); + self.tx + .write_frame( + crate::Packet::new_continue(self.stream_id, old_val + 1).into(), + ) + .await + .ok()?; + } + Some(WsEvent::Send(bytes)) + } + WsEvent::Close(packet) => { + self.is_closed.store(true, Ordering::Release); + Some(WsEvent::Close(packet)) + } + } + } + + pub(crate) fn into_stream(self) -> Pin + Send>> { + Box::pin(stream::unfold(self, |mut rx| async move { + let evt = rx.read().await?; + Some(( + match evt { + WsEvent::Send(bytes) => bytes, + WsEvent::Close(_) => return None, + }, + rx, + )) + })) + } +} + +pub struct MuxStreamWrite +where + W: crate::ws::WebSocketWrite, +{ + pub stream_id: u32, + role: crate::Role, + tx: crate::ws::LockedWebSocketWrite, + close_channel: mpsc::UnboundedSender, + is_closed: Arc, + continue_recieved: Arc, + flow_control: Arc, +} + +impl MuxStreamWrite { + pub async fn write(&self, data: Bytes) -> Result<(), crate::WispError> { + if self.is_closed.load(Ordering::Acquire) { + return Err(crate::WispError::StreamAlreadyClosed); + } + if self.role == crate::Role::Client && self.flow_control.load(Ordering::Acquire) <= 0 { + self.continue_recieved.listen().await; + } + self.tx + .write_frame(crate::Packet::new_data(self.stream_id, data).into()) + .await?; + if self.role == crate::Role::Client { + self.flow_control.store( + self.flow_control + .load(Ordering::Acquire) + .checked_add(1) + .unwrap_or(0), + Ordering::Release, + ); + } + Ok(()) + } + + pub fn get_close_handle(&self) -> MuxStreamCloser { + MuxStreamCloser { + stream_id: self.stream_id, + close_channel: self.close_channel.clone(), + is_closed: self.is_closed.clone(), + } + } + + pub async fn close(&self, reason: u8) -> Result<(), crate::WispError> { + if self.is_closed.load(Ordering::Acquire) { + return Err(crate::WispError::StreamAlreadyClosed); + } + let (tx, rx) = oneshot::channel::>(); + self.close_channel + .unbounded_send(MuxEvent::Close(self.stream_id, reason, tx)) + .map_err(|x| crate::WispError::Other(Box::new(x)))?; + rx.await + .map_err(|x| crate::WispError::Other(Box::new(x)))??; + + self.is_closed.store(true, Ordering::Release); + Ok(()) + } + + pub(crate) fn into_sink(self) -> Pin + Send>> { + Box::pin(sink::unfold(self, |tx, data| async move { + tx.write(data).await?; + Ok(tx) + })) + } +} + +impl Drop for MuxStreamWrite { + fn drop(&mut self) { + let (tx, _) = oneshot::channel::>(); + let _ = self + .close_channel + .unbounded_send(MuxEvent::Close(self.stream_id, 0x01, tx)); + } +} + +pub struct MuxStream +where + W: crate::ws::WebSocketWrite, +{ + pub stream_id: u32, + rx: MuxStreamRead, + tx: MuxStreamWrite, +} + +impl MuxStream { + pub(crate) fn new( + stream_id: u32, + role: crate::Role, + rx: mpsc::UnboundedReceiver, + tx: crate::ws::LockedWebSocketWrite, + close_channel: mpsc::UnboundedSender, + is_closed: Arc, + flow_control: Arc, + continue_recieved: Arc + ) -> Self { + Self { + stream_id, + rx: MuxStreamRead { + stream_id, + role, + tx: tx.clone(), + rx, + is_closed: is_closed.clone(), + flow_control: flow_control.clone(), + }, + tx: MuxStreamWrite { + stream_id, + role, + tx, + close_channel, + is_closed: is_closed.clone(), + flow_control: flow_control.clone(), + continue_recieved: continue_recieved.clone(), + }, + } + } + + pub async fn read(&mut self) -> Option { + self.rx.read().await + } + + pub async fn write(&self, data: Bytes) -> Result<(), crate::WispError> { + self.tx.write(data).await + } + + pub fn get_close_handle(&self) -> MuxStreamCloser { + self.tx.get_close_handle() + } + + pub async fn close(&self, reason: u8) -> Result<(), crate::WispError> { + self.tx.close(reason).await + } + + pub fn into_split(self) -> (MuxStreamRead, MuxStreamWrite) { + (self.rx, self.tx) + } + + pub fn into_io(self) -> MuxStreamIo { + MuxStreamIo { + rx: self.rx.into_stream(), + tx: self.tx.into_sink(), + } + } +} + +pub struct MuxStreamCloser { + stream_id: u32, + close_channel: mpsc::UnboundedSender, + is_closed: Arc, +} + +impl MuxStreamCloser { + pub async fn close(&self, reason: u8) -> Result<(), crate::WispError> { + if self.is_closed.load(Ordering::Acquire) { + return Err(crate::WispError::StreamAlreadyClosed); + } + let (tx, rx) = oneshot::channel::>(); + self.close_channel + .unbounded_send(MuxEvent::Close(self.stream_id, reason, tx)) + .map_err(|x| crate::WispError::Other(Box::new(x)))?; + rx.await + .map_err(|x| crate::WispError::Other(Box::new(x)))??; + self.is_closed.store(true, Ordering::Release); + Ok(()) + } +} + +pin_project! { + pub struct MuxStreamIo { + #[pin] + rx: Pin + Send>>, + #[pin] + tx: Pin + Send>>, + } +} + +impl MuxStreamIo { + pub fn into_asyncrw(self) -> IoStream> { + IoStream::new(self) + } +} + +impl Stream for MuxStreamIo { + type Item = Result, std::io::Error>; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project() + .rx + .poll_next(cx) + .map(|x| x.map(|x| Ok(x.to_vec()))) + } +} + +impl Sink> for MuxStreamIo { + type Error = std::io::Error; + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project() + .tx + .poll_ready(cx) + .map_err(std::io::Error::other) + } + fn start_send(self: Pin<&mut Self>, item: Vec) -> Result<(), Self::Error> { + self.project() + .tx + .start_send(item.into()) + .map_err(std::io::Error::other) + } + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project() + .tx + .poll_flush(cx) + .map_err(std::io::Error::other) + } + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project() + .tx + .poll_close(cx) + .map_err(std::io::Error::other) + } +} diff --git a/client/src/tokioio.rs b/wisp/src/tokioio.rs similarity index 95% rename from client/src/tokioio.rs rename to wisp/src/tokioio.rs index 7d6acc0..a3ca7be 100644 --- a/client/src/tokioio.rs +++ b/wisp/src/tokioio.rs @@ -1,7 +1,6 @@ #![allow(dead_code)] // Taken from https://github.com/hyperium/hyper-util/blob/master/src/rt/tokio.rs -// hyper-util fails to compile on WASM as it has a dependency on socket2, but I only need -// hyper-util for TokioIo. +// hyper-util fails to compile on WASM as it has a dependency on socket2 use std::{ pin::Pin, @@ -169,3 +168,9 @@ where hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs) } } + +impl hyper_util::client::legacy::connect::Connection for TokioIo { + fn connected(&self) -> hyper_util::client::legacy::connect::Connected { + hyper_util::client::legacy::connect::Connected::new() + } +} diff --git a/wisp/src/tower.rs b/wisp/src/tower.rs new file mode 100644 index 0000000..06f3ebc --- /dev/null +++ b/wisp/src/tower.rs @@ -0,0 +1,41 @@ +use crate::{tokioio::TokioIo, ws::WebSocketWrite, ClientMux, MuxStreamIo, StreamType, WispError}; +use async_io_stream::IoStream; +use futures::{ + task::{Context, Poll}, + Future, +}; +use std::sync::Arc; + +pub struct ServiceWrapper(pub Arc>); + +impl tower_service::Service for ServiceWrapper { + type Response = TokioIo>>; + type Error = WispError; + type Future = impl Future>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: hyper::Uri) -> Self::Future { + let mux = self.0.clone(); + async move { + Ok(TokioIo::new( + mux.client_new_stream( + StreamType::Tcp, + req.host().ok_or(WispError::UriHasNoHost)?.to_string(), + req.port().ok_or(WispError::UriHasNoPort)?.into(), + ) + .await? + .into_io() + .into_asyncrw(), + )) + } + } +} + +impl Clone for ServiceWrapper { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} diff --git a/wisp/src/ws.rs b/wisp/src/ws.rs new file mode 100644 index 0000000..f75c526 --- /dev/null +++ b/wisp/src/ws.rs @@ -0,0 +1,76 @@ +use bytes::Bytes; +use futures::lock::Mutex; +use std::sync::Arc; + +#[derive(Debug, PartialEq, Clone, Copy)] +pub enum OpCode { + Text, + Binary, + Close, + Ping, + Pong, +} + +pub struct Frame { + pub finished: bool, + pub opcode: OpCode, + pub payload: Bytes, +} + +impl Frame { + pub fn text(payload: Bytes) -> Self { + Self { + finished: true, + opcode: OpCode::Text, + payload, + } + } + + pub fn binary(payload: Bytes) -> Self { + Self { + finished: true, + opcode: OpCode::Binary, + payload, + } + } + + pub fn close(payload: Bytes) -> Self { + Self { + finished: true, + opcode: OpCode::Close, + payload, + } + } +} + +pub trait WebSocketRead { + fn wisp_read_frame( + &mut self, + tx: &crate::ws::LockedWebSocketWrite, + ) -> impl std::future::Future> + Send; +} + +pub trait WebSocketWrite { + fn wisp_write_frame( + &mut self, + frame: Frame, + ) -> impl std::future::Future> + Send; +} + +pub struct LockedWebSocketWrite(Arc>); + +impl LockedWebSocketWrite { + pub fn new(ws: S) -> Self { + Self(Arc::new(Mutex::new(ws))) + } + + pub async fn write_frame(&self, frame: Frame) -> Result<(), crate::WispError> { + self.0.lock().await.wisp_write_frame(frame).await + } +} + +impl Clone for LockedWebSocketWrite { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} diff --git a/wisp/src/ws_stream_wasm.rs b/wisp/src/ws_stream_wasm.rs new file mode 100644 index 0000000..410b537 --- /dev/null +++ b/wisp/src/ws_stream_wasm.rs @@ -0,0 +1,60 @@ +use futures::{stream::{SplitStream, SplitSink}, SinkExt, StreamExt}; +use ws_stream_wasm::{WsErr, WsMessage, WsStream}; + +impl From for crate::ws::Frame { + fn from(msg: WsMessage) -> Self { + use crate::ws::OpCode; + match msg { + WsMessage::Text(str) => Self { + finished: true, + opcode: OpCode::Text, + payload: str.into(), + }, + WsMessage::Binary(bin) => Self { + finished: true, + opcode: OpCode::Binary, + payload: bin.into(), + }, + } + } +} + +impl TryFrom for WsMessage { + type Error = crate::WispError; + fn try_from(msg: crate::ws::Frame) -> Result { + use crate::ws::OpCode; + match msg.opcode { + OpCode::Text => Ok(Self::Text(std::str::from_utf8(&msg.payload)?.to_string())), + OpCode::Binary => Ok(Self::Binary(msg.payload.to_vec())), + _ => Err(Self::Error::WsImplNotSupported), + } + } +} + +impl From for crate::WispError { + fn from(err: WsErr) -> Self { + Self::WsImplError(Box::new(err)) + } +} + +impl crate::ws::WebSocketRead for SplitStream { + async fn wisp_read_frame( + &mut self, + _: &crate::ws::LockedWebSocketWrite, + ) -> Result { + Ok(self + .next() + .await + .ok_or(crate::WispError::WsImplSocketClosed)? + .into()) + } +} + +impl crate::ws::WebSocketWrite for SplitSink { + async fn wisp_write_frame(&mut self, frame: crate::ws::Frame) -> Result<(), crate::WispError> { + self + .send(frame.try_into()?) + .await + .map_err(|e| e.into()) + } +}