From a8709255b2c9364fa7f951a4d3f9750d8c369502 Mon Sep 17 00:00:00 2001 From: Toshit Chawda Date: Fri, 8 Mar 2024 22:40:15 -0800 Subject: [PATCH] add autoreconnect, wisp_mux 1.2.0 --- Cargo.lock | 215 ++++++++++++++++---------------- README.md | 2 +- client/Cargo.toml | 5 +- client/demo.js | 11 +- client/index.html | 4 +- client/package.json | 2 +- client/src/lib.rs | 32 ++--- client/src/tls_stream.rs | 2 - {wisp => client}/src/tokioio.rs | 0 client/src/udp_stream.rs | 3 +- client/src/utils.rs | 41 +++++- client/src/websocket.rs | 1 - client/src/wrappers.rs | 168 ++++++++++++++++--------- simple-wisp-client/src/main.rs | 2 +- wisp/Cargo.toml | 6 +- wisp/src/fastwebsockets.rs | 19 +-- wisp/src/lib.rs | 160 ++++++++++++++---------- wisp/src/packet.rs | 20 +-- wisp/src/stream.rs | 1 + wisp/src/tower.rs | 43 ------- 20 files changed, 404 insertions(+), 333 deletions(-) rename {wisp => client}/src/tokioio.rs (100%) delete mode 100644 wisp/src/tower.rs diff --git a/Cargo.lock b/Cargo.lock index a1bdf3e..cc2678a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -34,9 +34,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.6.11" +version = "0.6.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e2e1ebcb11de5c03c67de28a7df593d32191b44939c482e97702baaaa6ab6a5" +checksum = "d96bd03f33fe50a863e394ee9718a706f988b9079b20c3784fb726e7678b62fb" dependencies = [ "anstyle", "anstyle-parse", @@ -176,9 +176,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.14.0" +version = "3.15.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" +checksum = "7ff69b9dd49fd426c69a0db9fc04dd934cdb6645ff000864d98f7e2af8830eaa" [[package]] name = "bytes" @@ -188,12 +188,9 @@ checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" [[package]] name = "cc" -version = "1.0.83" +version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" -dependencies = [ - "libc", -] +checksum = "8cd6604a82acf3039f1144f54b8eb34e91ffba622051189e71b781822d5ee1f5" [[package]] name = "cfg-if" @@ -203,9 +200,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "clap" -version = "4.4.18" +version = "4.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e578d6ec4194633722ccf9544794b71b1385c3c027efe0c55db226fc880865c" +checksum = "b230ab84b0ffdf890d5a10abdbc8b83ae1c4918275daea1ab8801f71536b2651" dependencies = [ "clap_builder", "clap_derive", @@ -213,9 +210,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.4.18" +version = "4.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4df4df40ec50c46000231c914968278b1eb05098cf8f1b3a518a95030e71d1c7" +checksum = "ae129e2e766ae0ec03484e609954119f123cc1fe650337e155d03b022f24f7b4" dependencies = [ "anstream", "anstyle", @@ -226,9 +223,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.4.7" +version = "4.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf9804afaaf59a91e75b022a30fb7229a7901f60c755489cc61c9b423b836442" +checksum = "307bc0538d5f0f83b8248db3087aa92fe504e4691294d0c96c0eabc33f47ba47" dependencies = [ "heck", "proc-macro2", @@ -238,9 +235,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.6.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1" +checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" [[package]] name = "clio" @@ -309,9 +306,9 @@ dependencies = [ [[package]] name = "crc32fast" -version = "1.3.2" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +checksum = "b3855a8a784b474f333699ef2bbca9db2c4a1f6d9088a90a2d25b1eb53111eaa" dependencies = [ "cfg-if", ] @@ -357,7 +354,7 @@ dependencies = [ [[package]] name = "epoxy-client" -version = "1.2.1" +version = "1.3.0" dependencies = [ "async-compression", "async_io_stream", @@ -383,6 +380,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "wasm-streams", + "wasmtimer", "web-sys", "webpki-roots", "wisp-mux", @@ -640,15 +638,15 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" [[package]] name = "hermit-abi" -version = "0.3.5" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0c62115964e08cb8039170eb33c1d0e2388a256930279edca206fff675f82c3" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" [[package]] name = "http" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" dependencies = [ "bytes", "fnv", @@ -692,9 +690,9 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb5aa53871fc917b1a9ed87b683a5d86db645e23acb32c2e0785a353e522fb75" +checksum = "186548d73ac615b32a73aafe38fb4f56c0d340e110e5a200bcadbaf2e199263a" dependencies = [ "bytes", "futures-channel", @@ -706,6 +704,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", + "smallvec", "tokio", "want", ] @@ -748,9 +747,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.2" +version = "2.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "824b2ae422412366ba479e8111fd301f7b5faece8149317bb81925979a53f520" +checksum = "7b0b929d511467233429c45a44ac1dcaa21ba0f5ba11e4879e6ed28ddb4f9df4" dependencies = [ "equivalent", "hashbrown", @@ -758,12 +757,12 @@ dependencies = [ [[package]] name = "is-terminal" -version = "0.4.10" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bad00257d07be169d870ab665980b06cdb366d792ad690bf2e76876dc503455" +checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b" dependencies = [ "hermit-abi", - "rustix", + "libc", "windows-sys 0.52.0", ] @@ -775,9 +774,9 @@ checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" [[package]] name = "js-sys" -version = "0.3.68" +version = "0.3.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "406cda4b368d531c842222cf9d2600a9a4acce8d29423695379c6868a143a9ee" +checksum = "29c15563dc2726973df627357ce0c9ddddbea194836909d655df6a75d2cf296d" dependencies = [ "wasm-bindgen", ] @@ -812,9 +811,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.20" +version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" [[package]] name = "memchr" @@ -833,9 +832,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.10" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" dependencies = [ "libc", "wasi", @@ -887,9 +886,9 @@ checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "openssl" -version = "0.10.63" +version = "0.10.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15c9d69dd87a29568d4d017cfe8ec518706046a05184e5aea92d0af890b803c8" +checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f" dependencies = [ "bitflags 2.4.2", "cfg-if", @@ -919,9 +918,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" -version = "0.9.99" +version = "0.9.101" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22e1bf214306098e4832460f797824c05d25aacdf896f64a985fb0fd992454ae" +checksum = "dda2b0f344e78efc2facf7d195d098df0dd72151b26ab98da807afc26c198dff" dependencies = [ "cc", "libc", @@ -960,18 +959,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.4" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0302c4a0442c456bd56f841aee5c3bfd17967563f6fadc9ceb9f9c23cf3807e0" +checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.4" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "266c042b60c9c76b8d53061e52b2e0d1116abc57cefc8c5cd671619a56ac3690" +checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", @@ -992,9 +991,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pkg-config" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2900ede94e305130c13ddd391e0ab7cbaeb783945ae07a279c268cb05109c6cb" +checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" [[package]] name = "ppv-lite86" @@ -1061,16 +1060,17 @@ dependencies = [ [[package]] name = "ring" -version = "0.17.7" +version = "0.17.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "688c63d65483050968b2a8937f7995f443e27041a0f7700aa59b0822aedebb74" +checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" dependencies = [ "cc", + "cfg-if", "getrandom", "libc", "spin", "untrusted", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -1183,9 +1183,9 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.21" +version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0" +checksum = "92d43fe69e652f3df9bdc2b85b2854a0825b86e4fb76bc44d945137d053639ca" [[package]] name = "send_wrapper" @@ -1251,12 +1251,12 @@ checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" [[package]] name = "socket2" -version = "0.5.5" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" +checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871" dependencies = [ "libc", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -1267,9 +1267,9 @@ checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" [[package]] name = "strsim" -version = "0.10.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +checksum = "5ee073c9e4cd00e28217186dbe12796d692868f432bf2e97ee73bed0c56dfa01" [[package]] name = "subtle" @@ -1279,9 +1279,9 @@ checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" [[package]] name = "syn" -version = "2.0.48" +version = "2.0.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f3531638e407dfc0814761abb7c00a5b54992b849452a0646b7f65c9f770f3f" +checksum = "b699d15b36d1f02c3e7c69f8ffef53de37aefae075d8488d4ba1a7788d574a07" dependencies = [ "proc-macro2", "quote", @@ -1290,9 +1290,9 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.10.0" +version = "3.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a365e8cd18e44762ef95d87f284f4b5cd04107fec2ff3052bd6a3e6069669e67" +checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" dependencies = [ "cfg-if", "fastrand", @@ -1312,18 +1312,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.56" +version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d54378c645627613241d077a3a79db965db602882668f9136ac42af9ecb730ad" +checksum = "1e45bcbe8ed29775f228095caf2cd67af7a4ccf756ebff23a306bf3e8b47b24b" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.56" +version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" +checksum = "a953cb265bef375dae3de6663da4d3804eee9682ea80d8e2542529b73c531c81" dependencies = [ "proc-macro2", "quote", @@ -1492,9 +1492,9 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "walkdir" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d71d857dc86794ca4c280d616f7da00d2dbfd8cd788846559a6813e6aa4b54ee" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" dependencies = [ "same-file", "winapi-util", @@ -1517,9 +1517,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.91" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1e124130aee3fb58c5bdd6b639a0509486b0338acaaae0c84a5124b0f588b7f" +checksum = "4be2531df63900aeb2bca0daaaddec08491ee64ceecbee5076636a3b026795a8" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -1527,9 +1527,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.91" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9e7e1900c352b609c8488ad12639a311045f40a35491fb69ba8c12f758af70b" +checksum = "614d787b966d3989fa7bb98a654e369c762374fd3213d212cfc0251257e747da" dependencies = [ "bumpalo", "log", @@ -1542,9 +1542,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.41" +version = "0.4.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "877b9c3f61ceea0e56331985743b13f3d25c406a7098d45180fb5f09bc19ed97" +checksum = "76bc14366121efc8dbb487ab05bcc9d346b3b5ec0eaa76e46594cabbe51762c0" dependencies = [ "cfg-if", "js-sys", @@ -1554,9 +1554,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.91" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b30af9e2d358182b5c7449424f017eba305ed32a7010509ede96cdc4696c46ed" +checksum = "a1f8823de937b71b9460c0c34e25f3da88250760bec0ebac694b49997550d726" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -1564,9 +1564,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.91" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "642f325be6301eb8107a83d12a8ac6c1e1c54345a7ef1a9261962dfefda09e66" +checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", @@ -1577,9 +1577,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.91" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f186bd2dcf04330886ce82d6f33dd75a7bfcf69ecf5763b89fcde53b6ac9838" +checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" [[package]] name = "wasm-streams" @@ -1610,9 +1610,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.67" +version = "0.3.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58cd2333b6e0be7a39605f0e255892fd7418a682d8da8fe042fe25128794d2ed" +checksum = "77afa9a11836342370f4817622a2f0f418b134426d91a82dfb48f532d2ec13ef" dependencies = [ "js-sys", "wasm-bindgen", @@ -1688,7 +1688,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.0", + "windows-targets 0.52.4", ] [[package]] @@ -1708,17 +1708,17 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.52.0" +version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" +checksum = "7dd37b7e5ab9018759f893a1952c9420d060016fc19a472b4bb20d1bdd694d1b" dependencies = [ - "windows_aarch64_gnullvm 0.52.0", - "windows_aarch64_msvc 0.52.0", - "windows_i686_gnu 0.52.0", - "windows_i686_msvc 0.52.0", - "windows_x86_64_gnu 0.52.0", - "windows_x86_64_gnullvm 0.52.0", - "windows_x86_64_msvc 0.52.0", + "windows_aarch64_gnullvm 0.52.4", + "windows_aarch64_msvc 0.52.4", + "windows_i686_gnu 0.52.4", + "windows_i686_msvc 0.52.4", + "windows_x86_64_gnu 0.52.4", + "windows_x86_64_gnullvm 0.52.4", + "windows_x86_64_msvc 0.52.4", ] [[package]] @@ -1735,9 +1735,9 @@ checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" [[package]] name = "windows_aarch64_gnullvm" -version = "0.52.0" +version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" +checksum = "bcf46cf4c365c6f2d1cc93ce535f2c8b244591df96ceee75d8e83deb70a9cac9" [[package]] name = "windows_aarch64_msvc" @@ -1753,9 +1753,9 @@ checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" [[package]] name = "windows_aarch64_msvc" -version = "0.52.0" +version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" +checksum = "da9f259dd3bcf6990b55bffd094c4f7235817ba4ceebde8e6d11cd0c5633b675" [[package]] name = "windows_i686_gnu" @@ -1771,9 +1771,9 @@ checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" [[package]] name = "windows_i686_gnu" -version = "0.52.0" +version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" +checksum = "b474d8268f99e0995f25b9f095bc7434632601028cf86590aea5c8a5cb7801d3" [[package]] name = "windows_i686_msvc" @@ -1789,9 +1789,9 @@ checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" [[package]] name = "windows_i686_msvc" -version = "0.52.0" +version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" +checksum = "1515e9a29e5bed743cb4415a9ecf5dfca648ce85ee42e15873c3cd8610ff8e02" [[package]] name = "windows_x86_64_gnu" @@ -1807,9 +1807,9 @@ checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" [[package]] name = "windows_x86_64_gnu" -version = "0.52.0" +version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" +checksum = "5eee091590e89cc02ad514ffe3ead9eb6b660aedca2183455434b93546371a03" [[package]] name = "windows_x86_64_gnullvm" @@ -1825,9 +1825,9 @@ checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" [[package]] name = "windows_x86_64_gnullvm" -version = "0.52.0" +version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" +checksum = "77ca79f2451b49fa9e2af39f0747fe999fcda4f5e241b2898624dca97a1f2177" [[package]] name = "windows_x86_64_msvc" @@ -1843,13 +1843,13 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "windows_x86_64_msvc" -version = "0.52.0" +version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" +checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8" [[package]] name = "wisp-mux" -version = "1.1.3" +version = "1.2.0" dependencies = [ "async_io_stream", "bytes", @@ -1857,11 +1857,8 @@ dependencies = [ "fastwebsockets", "futures", "futures-util", - "hyper", - "hyper-util-wasm", "pin-project-lite", "tokio", - "tower-service", ] [[package]] diff --git a/README.md b/README.md index 8b8d636..5d12d16 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # epoxy -Epoxy is an encrypted proxy for browser javascript. It allows you to make requests that bypass cors without compromising security, by running SSL/TLS inside webassembly. +Epoxy is an encrypted proxy for browser javascript. It allows you to make requests that bypass CORS without compromising security, by running SSL/TLS inside webassembly. ## Using the client Epoxy must be run from within a web worker and must be served with the [security headers needed for `SharedArrayBuffer`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer#security_requirements). Here is a simple usage example: diff --git a/client/Cargo.toml b/client/Cargo.toml index e1238e7..d392d09 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "epoxy-client" -version = "1.2.1" +version = "1.3.0" edition = "2021" license = "LGPL-3.0-only" @@ -25,7 +25,7 @@ tokio-util = { version = "0.7.10", features = ["io"] } async-compression = { version = "0.4.5", features = ["tokio", "gzip", "brotli"] } fastwebsockets = { version = "0.6.0", features = ["unstable-split"] } base64 = "0.21.7" -wisp-mux = { path = "../wisp", features = ["tokio_io", "hyper_tower"] } +wisp-mux = { path = "../wisp", features = ["tokio_io"] } async_io_stream = { version = "0.3.3", features = ["tokio_io"] } getrandom = { version = "0.2.12", features = ["js"] } hyper-util-wasm = { version = "0.1.3", features = ["client", "client-legacy", "http1", "http2"] } @@ -34,6 +34,7 @@ tower-service = "0.3.2" console_error_panic_hook = "0.1.7" send_wrapper = "0.6.0" event-listener = "5.2.0" +wasmtimer = "0.2.0" [dependencies.ring] features = ["wasm32_unknown_unknown_js"] diff --git a/client/demo.js b/client/demo.js index 1fab06e..772dff6 100644 --- a/client/demo.js +++ b/client/demo.js @@ -10,7 +10,8 @@ onmessage = async (msg) => { should_perf_test, should_ws_test, should_tls_test, - should_udp_test + should_udp_test, + should_reconnect_test, ] = msg.data; console.log( "%cWASM is significantly slower with DevTools open!", @@ -197,6 +198,14 @@ onmessage = async (msg) => { ); await (new Promise((res, _) => setTimeout(res, 5000))); await ws.close(); + } else if (should_reconnect_test) { + while (true) { + try { + await epoxy_client.fetch("https://httpbin.org/get"); + } catch(e) {console.error(e)} + log("sent req"); + await (new Promise((res, _) => setTimeout(res, 500))); + } } else { let resp = await epoxy_client.fetch("https://httpbin.org/get"); console.log(resp, Object.fromEntries(resp.headers)); diff --git a/client/index.html b/client/index.html index c013161..fa5efc0 100644 --- a/client/index.html +++ b/client/index.html @@ -17,6 +17,7 @@ const should_ws_test = params.has("ws_test"); const should_tls_test = params.has("rawtls_test"); const should_udp_test = params.has("udp_test"); + const should_reconnect_test = params.has("reconnect_test"); const worker = new Worker("demo.js", {type:'module'}); worker.onmessage = (msg) => { let el = document.createElement("pre"); @@ -32,7 +33,8 @@ should_perf_test, should_ws_test, should_tls_test, - should_udp_test + should_udp_test, + should_reconnect_test, ]); diff --git a/client/package.json b/client/package.json index 51c35bb..99a5c52 100644 --- a/client/package.json +++ b/client/package.json @@ -1,6 +1,6 @@ { "name": "@mercuryworkshop/epoxy-tls", - "version": "1.2.1", + "version": "1.3.0", "description": "A wasm library for using raw encrypted tls/ssl/https/websocket streams on the browser", "scripts": { "build": "./build.sh" diff --git a/client/src/lib.rs b/client/src/lib.rs index 4a4584c..011dd7b 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -2,15 +2,17 @@ #[macro_use] mod utils; mod tls_stream; +mod tokioio; mod udp_stream; mod websocket; mod wrappers; use tls_stream::EpxTlsStream; +use tokioio::TokioIo; use udp_stream::EpxUdpStream; use utils::{Boolinator, ReplaceErr, UriExt}; use websocket::EpxWebSocket; -use wrappers::{IncomingBody, TlsWispService, WebSocketWrapper}; +use wrappers::{IncomingBody, ServiceWrapper, TlsWispService, WebSocketWrapper}; use std::sync::Arc; @@ -23,6 +25,7 @@ use hyper::{body::Incoming, Uri}; use hyper_util_wasm::client::legacy::Client; use js_sys::{Array, Function, Object, Reflect, Uint8Array}; use rustls::pki_types::TrustAnchor; +use tokio::sync::RwLock; use tokio_rustls::{client::TlsStream, rustls, rustls::RootCertStore, TlsConnector}; use tokio_util::{ either::Either, @@ -30,7 +33,7 @@ use tokio_util::{ }; use wasm_bindgen::{intern, prelude::*}; use web_sys::TextEncoder; -use wisp_mux::{tokioio::TokioIo, tower::ServiceWrapper, ClientMux, MuxStreamIo, StreamType}; +use wisp_mux::{ClientMux, MuxStreamIo, StreamType}; type HttpBody = http_body_util::Full; @@ -101,8 +104,8 @@ pub fn certs() -> Result { #[wasm_bindgen(inspectable)] pub struct EpoxyClient { rustls_config: Arc, - mux: Arc>, - hyper_client: Client, HttpBody>, + mux: Arc>>, + hyper_client: Client, #[wasm_bindgen(getter_with_clone)] pub useragent: String, #[wasm_bindgen(js_name = "redirectLimit")] @@ -128,20 +131,9 @@ impl EpoxyClient { return Err(JsError::new("Scheme must be either `ws` or `wss`")); } - debug!("connecting to ws {:?}", ws_url); - let (wtx, wrx) = WebSocketWrapper::connect(ws_url, vec![]) - .await - .replace_err("Failed to connect to websocket")?; - debug!("connected!"); - - let (mux, fut) = ClientMux::new(wrx, wtx).await?; - let mux = Arc::new(mux); - - wasm_bindgen_futures::spawn_local(async move { - if let Err(err) = fut.await { - error!("epoxy: error in mux future! {:?}", err); - } - }); + let (mux, fut) = utils::make_mux(&ws_url).await?; + let mux = Arc::new(RwLock::new(mux)); + utils::spawn_mux_fut(mux.clone(), fut, ws_url.clone()); let mut certstore = RootCertStore::empty(); certstore.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); @@ -160,7 +152,7 @@ impl EpoxyClient { .http1_preserve_header_case(true) .build(TlsWispService { rustls_config: rustls_config.clone(), - service: ServiceWrapper(mux), + service: ServiceWrapper(mux, ws_url), }), rustls_config, useragent, @@ -171,6 +163,8 @@ impl EpoxyClient { async fn get_tls_io(&self, url_host: &str, url_port: u16) -> Result { let channel = self .mux + .read() + .await .client_new_stream(StreamType::Tcp, url_host.to_string(), url_port) .await .replace_err("Failed to create multiplexor channel")? diff --git a/client/src/tls_stream.rs b/client/src/tls_stream.rs index 3975982..b624753 100644 --- a/client/src/tls_stream.rs +++ b/client/src/tls_stream.rs @@ -1,8 +1,6 @@ use crate::*; -use js_sys::Function; use tokio::io::{split, AsyncWriteExt, WriteHalf}; -use tokio_util::io::ReaderStream; #[wasm_bindgen(inspectable)] pub struct EpxTlsStream { diff --git a/wisp/src/tokioio.rs b/client/src/tokioio.rs similarity index 100% rename from wisp/src/tokioio.rs rename to client/src/tokioio.rs diff --git a/client/src/udp_stream.rs b/client/src/udp_stream.rs index 721fe48..4fad806 100644 --- a/client/src/udp_stream.rs +++ b/client/src/udp_stream.rs @@ -1,7 +1,6 @@ use crate::*; use futures_util::{stream::SplitSink, SinkExt}; -use js_sys::Function; #[wasm_bindgen(inspectable)] pub struct EpxUdpStream { @@ -34,6 +33,8 @@ impl EpxUdpStream { let io = tcp .mux + .read() + .await .client_new_stream(StreamType::Udp, url_host.to_string(), url_port) .await .replace_err("Failed to open multiplexor channel")? diff --git a/client/src/utils.rs b/client/src/utils.rs index 1b71b48..adae84c 100644 --- a/client/src/utils.rs +++ b/client/src/utils.rs @@ -1,9 +1,10 @@ +use crate::*; + use wasm_bindgen::prelude::*; use hyper::rt::Executor; -use hyper::{header::HeaderValue, Uri}; -use js_sys::{Array, Object}; use std::future::Future; +use wisp_mux::{CloseReason, WispError}; #[wasm_bindgen] extern "C" { @@ -186,3 +187,39 @@ pub fn get_url_port(url: &Uri) -> Result { Ok(80) } } + +pub async fn make_mux(url: &str) -> Result<(ClientMux, impl Future>), WispError> { + let (wtx, wrx) = WebSocketWrapper::connect(url, vec![]) + .await + .map_err(|_| WispError::WsImplSocketClosed)?; + wtx.wait_for_open().await; + let mux = ClientMux::new(wrx, wtx).await?; + + Ok(mux) +} + +pub fn spawn_mux_fut(mux: Arc>>, fut: impl Future> + 'static, url: String) { + wasm_bindgen_futures::spawn_local(async move { + if let Err(e) = fut.await { + error!("epoxy: error in mux future, restarting: {:?}", e); + while let Err(e) = replace_mux(mux.clone(), &url).await { + error!("epoxy: failed to restart mux future: {:?}", e); + wasmtimer::tokio::sleep(std::time::Duration::from_millis(500)).await; + } + } + debug!("epoxy: mux future exited"); + }); +} + +pub async fn replace_mux( + mux: Arc>>, + url: &str, +) -> Result<(), WispError> { + let (mux_replace, fut) = make_mux(url).await?; + let mut mux_write = mux.write().await; + mux_write.close(CloseReason::Unknown).await; + *mux_write = mux_replace; + drop(mux_write); + spawn_mux_fut(mux, fut, url.into()); + Ok(()) +} diff --git a/client/src/websocket.rs b/client/src/websocket.rs index 3c1d33f..63d949c 100644 --- a/client/src/websocket.rs +++ b/client/src/websocket.rs @@ -11,7 +11,6 @@ use hyper::{ upgrade::Upgraded, StatusCode, }; -use js_sys::Function; use std::str::from_utf8; use tokio::io::WriteHalf; diff --git a/client/src/wrappers.rs b/client/src/wrappers.rs index 8513108..24e4188 100644 --- a/client/src/wrappers.rs +++ b/client/src/wrappers.rs @@ -1,11 +1,12 @@ use crate::*; use std::{ pin::Pin, + sync::atomic::{AtomicBool, Ordering}, task::{Context, Poll}, }; use event_listener::Event; -use futures_util::Stream; +use futures_util::{FutureExt, Stream}; use hyper::body::Body; use js_sys::ArrayBuffer; use pin_project_lite::pin_project; @@ -14,8 +15,6 @@ use std::future::Future; use tokio::sync::mpsc; use web_sys::{BinaryType, MessageEvent, WebSocket}; use wisp_mux::{ - tokioio::TokioIo, - tower::ServiceWrapper, ws::{Frame, LockedWebSocketWrite, WebSocketRead, WebSocketWrite}, WispError, }; @@ -53,17 +52,49 @@ impl Stream for IncomingBody { } } -pub struct TlsWispService -where - W: wisp_mux::ws::WebSocketWrite + Send + 'static, -{ - pub service: ServiceWrapper, +#[derive(Clone)] +pub struct ServiceWrapper(pub Arc>>, pub String); + +impl tower_service::Service for ServiceWrapper { + type Response = TokioIo>>; + type Error = WispError; + type Future = impl Future>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: hyper::Uri) -> Self::Future { + let mux = self.0.clone(); + let mux_url = self.1.clone(); + async move { + let stream = mux + .read() + .await + .client_new_stream( + StreamType::Tcp, + req.host().ok_or(WispError::UriHasNoHost)?.to_string(), + req.port().ok_or(WispError::UriHasNoPort)?.into(), + ) + .await; + if stream + .as_ref() + .is_err_and(|e| matches!(e, WispError::WsImplSocketClosed)) + { + utils::replace_mux(mux, &mux_url).await?; + } + Ok(TokioIo::new(stream?.into_io().into_asyncrw())) + } + } +} + +#[derive(Clone)] +pub struct TlsWispService { + pub service: ServiceWrapper, pub rustls_config: Arc, } -impl tower_service::Service - for TlsWispService -{ +impl tower_service::Service for TlsWispService { type Response = TokioIo; type Error = WispError; type Future = Pin>>>; @@ -108,18 +139,8 @@ impl tower_service::Service Clone for TlsWispService { - fn clone(&self) -> Self { - Self { - rustls_config: self.rustls_config.clone(), - service: self.service.clone(), - } - } -} - #[derive(Debug)] pub enum WebSocketError { - Closed, Unknown, SendFailed, } @@ -128,7 +149,6 @@ impl std::fmt::Display for WebSocketError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { use WebSocketError::*; match self { - Closed => write!(f, "Websocket closed"), Unknown => write!(f, "Unknown error"), SendFailed => write!(f, "Send failed"), } @@ -144,13 +164,16 @@ impl From for WispError { } pub enum WebSocketMessage { - Close, + Closed, Error, Message(Vec), } pub struct WebSocketWrapper { inner: SendWrapper, + open_event: Arc, + error_event: Arc, + closed: Arc, // used to retain the closures #[allow(dead_code)] @@ -165,6 +188,8 @@ pub struct WebSocketWrapper { pub struct WebSocketReader { read_rx: mpsc::UnboundedReceiver, + closed: Arc, + close_event: Arc, } impl WebSocketRead for WebSocketReader { @@ -173,49 +198,36 @@ impl WebSocketRead for WebSocketReader { _: &LockedWebSocketWrite, ) -> Result { use WebSocketMessage::*; - match self - .read_rx - .recv() - .await - .ok_or(WispError::WsImplError(Box::new(WebSocketError::Closed)))? - { + if self.closed.load(Ordering::Acquire) { + return Err(WispError::WsImplSocketClosed); + } + let res = futures_util::select! { + data = self.read_rx.recv().fuse() => data, + _ = self.close_event.listen().fuse() => Some(Closed), + }; + match res.ok_or(WispError::WsImplSocketClosed)? { Message(bin) => Ok(Frame::binary(bin.into())), Error => Err(WebSocketError::Unknown.into()), - Close => Err(WebSocketError::Closed.into()), + Closed => Err(WispError::WsImplSocketClosed), } } } impl WebSocketWrapper { pub async fn connect( - url: String, + url: &str, protocols: Vec, ) -> Result<(Self, WebSocketReader), JsValue> { - let ws = if protocols.is_empty() { - WebSocket::new(&url) - } else { - WebSocket::new_with_str_sequence( - &url, - &protocols - .iter() - .fold(Array::new(), |acc, x| { - acc.push(&jval!(x)); - acc - }) - .into(), - ) - } - .replace_err("Failed to make websocket")?; - - ws.set_binary_type(BinaryType::Arraybuffer); - let (read_tx, read_rx) = mpsc::unbounded_channel(); + let closed = Arc::new(AtomicBool::new(false)); let open_event = Arc::new(Event::new()); + let close_event = Arc::new(Event::new()); + let error_event = Arc::new(Event::new()); - let open_event_tx = open_event.clone(); + let onopen_event = open_event.clone(); let onopen = Closure::wrap( - Box::new(move || while open_event_tx.notify(usize::MAX) == 0 {}) as Box, + Box::new(move || while onopen_event.notify(usize::MAX) == 0 {}) as Box, ); let onmessage_tx = read_tx.clone(); @@ -226,40 +238,78 @@ impl WebSocketWrapper { } }) as Box); - ws.set_onopen(Some(onopen.as_ref().unchecked_ref())); - ws.set_onmessage(Some(onmessage.as_ref().unchecked_ref())); - - let onclose_tx = read_tx.clone(); + let onclose_closed = closed.clone(); + let onclose_event = close_event.clone(); let onclose = Closure::wrap(Box::new(move || { - let _ = onclose_tx.send(WebSocketMessage::Close); + onclose_closed.store(true, Ordering::Release); + onclose_event.notify(usize::MAX); }) as Box); let onerror_tx = read_tx.clone(); + let onerror_closed = closed.clone(); + let onerror_close = close_event.clone(); + let onerror_event = error_event.clone(); let onerror = Closure::wrap(Box::new(move || { let _ = onerror_tx.send(WebSocketMessage::Error); + onerror_closed.store(true, Ordering::Release); + onerror_close.notify(usize::MAX); + onerror_event.notify(usize::MAX); }) as Box); + let ws = if protocols.is_empty() { + WebSocket::new(url) + } else { + WebSocket::new_with_str_sequence( + url, + &protocols + .iter() + .fold(Array::new(), |acc, x| { + acc.push(&jval!(x)); + acc + }) + .into(), + ) + } + .replace_err("Failed to make websocket")?; + ws.set_binary_type(BinaryType::Arraybuffer); + ws.set_onmessage(Some(onmessage.as_ref().unchecked_ref())); + ws.set_onopen(Some(onopen.as_ref().unchecked_ref())); ws.set_onclose(Some(onclose.as_ref().unchecked_ref())); ws.set_onerror(Some(onerror.as_ref().unchecked_ref())); - open_event.listen().await; - Ok(( Self { inner: SendWrapper::new(ws), + open_event, + error_event, + closed: closed.clone(), onopen: SendWrapper::new(onopen), onclose: SendWrapper::new(onclose), onerror: SendWrapper::new(onerror), onmessage: SendWrapper::new(onmessage), }, - WebSocketReader { read_rx }, + WebSocketReader { + read_rx, + closed, + close_event, + }, )) } + + pub async fn wait_for_open(&self) { + futures_util::select! { + _ = self.open_event.listen().fuse() => (), + _ = self.error_event.listen().fuse() => (), + } + } } impl WebSocketWrite for WebSocketWrapper { async fn wisp_write_frame(&mut self, frame: Frame) -> Result<(), WispError> { use wisp_mux::ws::OpCode::*; + if self.closed.load(Ordering::Acquire) { + return Err(WispError::WsImplSocketClosed); + } match frame.opcode { Binary => self .inner diff --git a/simple-wisp-client/src/main.rs b/simple-wisp-client/src/main.rs index d1e9f4c..a0f78e8 100644 --- a/simple-wisp-client/src/main.rs +++ b/simple-wisp-client/src/main.rs @@ -97,7 +97,7 @@ async fn main() -> Result<(), Box> { let (mux, fut) = ClientMux::new(rx, tx).await?; - tokio::task::spawn(fut); + tokio::task::spawn(async move { println!("err: {:?}", fut.await); }); let mut hi: u64 = 0; loop { diff --git a/wisp/Cargo.toml b/wisp/Cargo.toml index 48ab3b0..db960e3 100644 --- a/wisp/Cargo.toml +++ b/wisp/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "wisp-mux" -version = "1.1.3" +version = "1.2.0" license = "LGPL-3.0-only" description = "A library for easily creating Wisp servers and clients." homepage = "https://github.com/MercuryWorkshop/epoxy-tls/tree/multiplexed/wisp" @@ -15,16 +15,12 @@ event-listener = "5.0.0" fastwebsockets = { version = "0.6.0", features = ["unstable-split"], optional = true } futures = "0.3.30" futures-util = "0.3.30" -hyper = { version = "1.1.0", optional = true } -hyper-util-wasm = { version = "0.1.3", features = ["client", "client-legacy"], optional = true } pin-project-lite = "0.2.13" tokio = { version = "1.35.1", optional = true, default-features = false } -tower-service = { version = "0.3.2", optional = true } [features] fastwebsockets = ["dep:fastwebsockets", "dep:tokio"] tokio_io = ["async_io_stream/tokio_io"] -hyper_tower = ["dep:tower-service", "dep:hyper", "dep:tokio", "dep:hyper-util-wasm"] [package.metadata.docs.rs] all-features = true diff --git a/wisp/src/fastwebsockets.rs b/wisp/src/fastwebsockets.rs index fb31e4a..5a81e8d 100644 --- a/wisp/src/fastwebsockets.rs +++ b/wisp/src/fastwebsockets.rs @@ -28,11 +28,10 @@ impl From> for crate::ws::Frame { } } -impl TryFrom for Frame<'_> { - type Error = crate::WispError; - fn try_from(frame: crate::ws::Frame) -> Result { +impl From for Frame<'_> { + fn from(frame: crate::ws::Frame) -> Self { use crate::ws::OpCode::*; - Ok(match frame.opcode { + match frame.opcode { Text => Self::text(Payload::Owned(frame.payload.to_vec())), Binary => Self::binary(Payload::Owned(frame.payload.to_vec())), Close => Self::close_raw(Payload::Owned(frame.payload.to_vec())), @@ -43,13 +42,17 @@ impl TryFrom for Frame<'_> { Payload::Owned(frame.payload.to_vec()), ), Pong => Self::pong(Payload::Owned(frame.payload.to_vec())), - }) + } } } impl From for crate::WispError { fn from(err: WebSocketError) -> Self { - Self::WsImplError(Box::new(err)) + if let WebSocketError::ConnectionClosed = err { + Self::WsImplSocketClosed + } else { + Self::WsImplError(Box::new(err)) + } } } @@ -67,6 +70,8 @@ impl crate::ws::WebSocketRead for FragmentCollector impl crate::ws::WebSocketWrite for WebSocketWrite { async fn wisp_write_frame(&mut self, frame: crate::ws::Frame) -> Result<(), crate::WispError> { - self.write_frame(frame.try_into()?).await.map_err(|e| e.into()) + self.write_frame(frame.into()) + .await + .map_err(|e| e.into()) } } diff --git a/wisp/src/lib.rs b/wisp/src/lib.rs index 49dc45d..79971ba 100644 --- a/wisp/src/lib.rs +++ b/wisp/src/lib.rs @@ -11,12 +11,6 @@ mod fastwebsockets; mod packet; mod sink_unfold; mod stream; -#[cfg(feature = "hyper_tower")] -#[cfg_attr(docsrs, doc(cfg(feature = "hyper_tower")))] -pub mod tokioio; -#[cfg(feature = "hyper_tower")] -#[cfg_attr(docsrs, doc(cfg(feature = "hyper_tower")))] -pub mod tower; pub mod ws; pub use crate::packet::*; @@ -140,10 +134,10 @@ impl ServerMuxInner { R: ws::WebSocketRead, { let ret = futures::select! { - x = self.server_close_loop(close_rx).fuse() => x, + x = self.server_bg_loop(close_rx).fuse() => x, x = self.server_msg_loop(rx, muxstream_sender, buffer_size).fuse() => x }; - self.stream_map.lock().await.iter().for_each(|x| { + self.stream_map.lock().await.drain().for_each(|x| { let _ = x.1.stream .unbounded_send(MuxEvent::Close(ClosePacket::new(CloseReason::Unknown))); @@ -151,7 +145,7 @@ impl ServerMuxInner { ret } - async fn server_close_loop( + async fn server_bg_loop( &self, mut close_rx: mpsc::UnboundedReceiver, ) -> Result<(), WispError> { @@ -168,6 +162,7 @@ impl ServerMuxInner { let _ = channel.send(Err(WispError::InvalidStreamId)); } } + WsEvent::EndFut => break, } } Ok(()) @@ -186,66 +181,62 @@ impl ServerMuxInner { .write_frame(Packet::new_continue(0, buffer_size).into()) .await?; - while let Ok(frame) = rx.wisp_read_frame(&self.tx).await { - if let Ok(packet) = Packet::try_from(frame) { - use PacketType::*; - match packet.packet { - Connect(inner_packet) => { - let (ch_tx, ch_rx) = mpsc::unbounded(); - let stream_type = inner_packet.stream_type; - let flow_control: Arc = AtomicU32::new(buffer_size).into(); - let flow_control_event: Arc = Event::new().into(); + loop { + let packet: Packet = rx.wisp_read_frame(&self.tx).await?.try_into()?; + use PacketType::*; + match packet.packet_type { + Connect(inner_packet) => { + let (ch_tx, ch_rx) = mpsc::unbounded(); + let stream_type = inner_packet.stream_type; + let flow_control: Arc = AtomicU32::new(buffer_size).into(); + let flow_control_event: Arc = Event::new().into(); - self.stream_map.lock().await.insert( - packet.stream_id, - MuxMapValue { - stream: ch_tx, - flow_control: flow_control.clone(), - flow_control_event: flow_control_event.clone(), - }, + self.stream_map.lock().await.insert( + packet.stream_id, + MuxMapValue { + stream: ch_tx, + flow_control: flow_control.clone(), + flow_control_event: flow_control_event.clone(), + }, + ); + muxstream_sender + .unbounded_send(( + inner_packet, + MuxStream::new( + packet.stream_id, + Role::Server, + stream_type, + ch_rx, + self.tx.clone(), + self.close_tx.clone(), + AtomicBool::new(false).into(), + flow_control, + flow_control_event, + ), + )) + .map_err(|x| WispError::Other(Box::new(x)))?; + } + Data(data) => { + if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { + let _ = stream.stream.unbounded_send(MuxEvent::Send(data)); + stream.flow_control.store( + stream + .flow_control + .load(Ordering::Acquire) + .saturating_sub(1), + Ordering::Release, ); - muxstream_sender - .unbounded_send(( - inner_packet, - MuxStream::new( - packet.stream_id, - Role::Server, - stream_type, - ch_rx, - self.tx.clone(), - self.close_tx.clone(), - AtomicBool::new(false).into(), - flow_control, - flow_control_event, - ), - )) - .map_err(|x| WispError::Other(Box::new(x)))?; - } - Data(data) => { - if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { - let _ = stream.stream.unbounded_send(MuxEvent::Send(data)); - stream.flow_control.store( - stream.flow_control - .load(Ordering::Acquire) - .saturating_sub(1), - Ordering::Release, - ); - } - } - Continue(_) => unreachable!(), - Close(inner_packet) => { - if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { - let _ = stream.stream.unbounded_send(MuxEvent::Close(inner_packet)); - } - self.stream_map.lock().await.remove(&packet.stream_id); } } - } else { - break; + Continue(_) => unreachable!(), + Close(inner_packet) => { + if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { + let _ = stream.stream.unbounded_send(MuxEvent::Close(inner_packet)); + } + self.stream_map.lock().await.remove(&packet.stream_id); + } } } - drop(muxstream_sender); - Ok(()) } } @@ -272,6 +263,8 @@ pub struct ServerMux where W: ws::WebSocketWrite + Send + 'static, { + stream_map: Arc>>, + close_tx: mpsc::UnboundedSender, muxstream_recv: mpsc::UnboundedReceiver<(ConnectPacket, MuxStream)>, } @@ -290,7 +283,11 @@ impl ServerMux { let write = ws::LockedWebSocketWrite::new(write); let map = Arc::new(Mutex::new(HashMap::new())); ( - Self { muxstream_recv: rx }, + Self { + muxstream_recv: rx, + close_tx: close_tx.clone(), + stream_map: map.clone(), + }, ServerMuxInner { tx: write, close_tx, @@ -304,6 +301,19 @@ impl ServerMux { pub async fn server_new_stream(&mut self) -> Option<(ConnectPacket, MuxStream)> { self.muxstream_recv.next().await } + + /// Close all streams. + /// + /// Also terminates the multiplexor future. Waiting for a new stream will never succeed after + /// this function is called. + pub async fn close(&self, reason: CloseReason) { + self.stream_map.lock().await.drain().for_each(|x| { + let _ = + x.1.stream + .unbounded_send(MuxEvent::Close(ClosePacket::new(reason))); + }); + let _ = self.close_tx.unbounded_send(WsEvent::EndFut); + } } struct ClientMuxInner @@ -346,6 +356,7 @@ impl ClientMuxInner { let _ = channel.send(Err(WispError::InvalidStreamId)); } } + WsEvent::EndFut => break, } } Ok(()) @@ -355,10 +366,11 @@ impl ClientMuxInner { where R: ws::WebSocketRead, { - while let Ok(frame) = rx.wisp_read_frame(&self.tx).await { + loop { + let frame = rx.wisp_read_frame(&self.tx).await?; if let Ok(packet) = Packet::try_from(frame) { use PacketType::*; - match packet.packet { + match packet.packet_type { Connect(_) => unreachable!(), Data(data) => { if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { @@ -382,7 +394,6 @@ impl ClientMuxInner { } } } - Ok(()) } } @@ -425,7 +436,7 @@ impl ClientMux { if first_packet.stream_id != 0 { return Err(WispError::InvalidStreamId); } - if let PacketType::Continue(packet) = first_packet.packet { + if let PacketType::Continue(packet) = first_packet.packet_type { let (tx, rx) = mpsc::unbounded::(); let map = Arc::new(Mutex::new(HashMap::new())); Ok(( @@ -487,4 +498,17 @@ impl ClientMux { evt, )) } + + /// Close all streams. + /// + /// Also terminates the multiplexor future. Creating a stream is UB after calling this + /// function. + pub async fn close(&self, reason: CloseReason) { + self.stream_map.lock().await.drain().for_each(|x| { + let _ = + x.1.stream + .unbounded_send(MuxEvent::Close(ClosePacket::new(reason))); + }); + let _ = self.close_tx.unbounded_send(WsEvent::EndFut); + } } diff --git a/wisp/src/packet.rs b/wisp/src/packet.rs index 513e037..fd59a4c 100644 --- a/wisp/src/packet.rs +++ b/wisp/src/packet.rs @@ -242,8 +242,8 @@ impl From for Vec { pub struct Packet { /// Stream this packet is associated with. pub stream_id: u32, - /// Packet recieved. - pub packet: PacketType, + /// Packet type recieved. + pub packet_type: PacketType, } impl Packet { @@ -251,7 +251,7 @@ impl Packet { /// /// The helper functions should be used for most use cases. pub fn new(stream_id: u32, packet: PacketType) -> Self { - Self { stream_id, packet } + Self { stream_id, packet_type: packet } } /// Create a new connect packet. @@ -263,7 +263,7 @@ impl Packet { ) -> Self { Self { stream_id, - packet: PacketType::Connect(ConnectPacket::new( + packet_type: PacketType::Connect(ConnectPacket::new( stream_type, destination_port, destination_hostname, @@ -275,7 +275,7 @@ impl Packet { pub fn new_data(stream_id: u32, data: Bytes) -> Self { Self { stream_id, - packet: PacketType::Data(data), + packet_type: PacketType::Data(data), } } @@ -283,7 +283,7 @@ impl Packet { pub fn new_continue(stream_id: u32, buffer_remaining: u32) -> Self { Self { stream_id, - packet: PacketType::Continue(ContinuePacket::new(buffer_remaining)), + packet_type: PacketType::Continue(ContinuePacket::new(buffer_remaining)), } } @@ -291,7 +291,7 @@ impl Packet { pub fn new_close(stream_id: u32, reason: CloseReason) -> Self { Self { stream_id, - packet: PacketType::Close(ClosePacket::new(reason)), + packet_type: PacketType::Close(ClosePacket::new(reason)), } } } @@ -306,7 +306,7 @@ impl TryFrom for Packet { use PacketType::*; Ok(Self { stream_id: bytes.get_u32_le(), - packet: match packet_type { + packet_type: match packet_type { 0x01 => Connect(ConnectPacket::try_from(bytes)?), 0x02 => Data(bytes), 0x03 => Continue(ContinuePacket::try_from(bytes)?), @@ -320,9 +320,9 @@ impl TryFrom for Packet { impl From for Vec { fn from(packet: Packet) -> Self { let mut encoded = Self::with_capacity(1 + 4); - encoded.push(packet.packet.as_u8()); + encoded.push(packet.packet_type.as_u8()); encoded.put_u32_le(packet.stream_id); - encoded.extend(Vec::::from(packet.packet)); + encoded.extend(Vec::::from(packet.packet_type)); encoded } } diff --git a/wisp/src/stream.rs b/wisp/src/stream.rs index 70ccad4..be6ef87 100644 --- a/wisp/src/stream.rs +++ b/wisp/src/stream.rs @@ -26,6 +26,7 @@ pub enum MuxEvent { pub(crate) enum WsEvent { Close(u32, crate::CloseReason, oneshot::Sender>), + EndFut, } /// Read side of a multiplexor stream. diff --git a/wisp/src/tower.rs b/wisp/src/tower.rs deleted file mode 100644 index 368de8f..0000000 --- a/wisp/src/tower.rs +++ /dev/null @@ -1,43 +0,0 @@ -//! Helper that implements a Tower Service for a client multiplexor. -use crate::{tokioio::TokioIo, ws::WebSocketWrite, ClientMux, MuxStreamIo, StreamType, WispError}; -use async_io_stream::IoStream; -use futures::{ - task::{Context, Poll}, - Future, -}; -use std::sync::Arc; - -/// Wrapper struct that implements a Tower Service sfor a client multiplexor. -pub struct ServiceWrapper(pub Arc>); - -impl tower_service::Service for ServiceWrapper { - type Response = TokioIo>>; - type Error = WispError; - type Future = impl Future>; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: hyper::Uri) -> Self::Future { - let mux = self.0.clone(); - async move { - Ok(TokioIo::new( - mux.client_new_stream( - StreamType::Tcp, - req.host().ok_or(WispError::UriHasNoHost)?.to_string(), - req.port().ok_or(WispError::UriHasNoPort)?.into(), - ) - .await? - .into_io() - .into_asyncrw(), - )) - } - } -} - -impl Clone for ServiceWrapper { - fn clone(&self) -> Self { - Self(self.0.clone()) - } -}