From 85a30aeec570177c453b483d7d540e092477efa3 Mon Sep 17 00:00:00 2001 From: Toshit Chawda Date: Wed, 7 Feb 2024 08:38:37 -0800 Subject: [PATCH] more improvements and fix wisp impl --- .gitignore | 2 +- Cargo.lock | 319 ++++++++++++++++++++++++++++++++++++++++++-- Cargo.toml | 2 +- client/build.sh | 2 +- client/src/lib.rs | 10 +- server/Cargo.toml | 2 + server/src/main.rs | 71 ++++++---- wisp/Cargo.lock | 320 --------------------------------------------- wisp/Cargo.toml | 1 + wisp/src/lib.rs | 101 +++++++++----- wisp/src/packet.rs | 4 +- wisp/src/stream.rs | 57 +++++++- 12 files changed, 478 insertions(+), 413 deletions(-) delete mode 100644 wisp/Cargo.lock diff --git a/.gitignore b/.gitignore index dcb2577..1cd2570 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ /target -server/src/*.pem +**/*.pem client/pkg client/out .direnv diff --git a/Cargo.lock b/Cargo.lock index 6e6dfe3..cc4a287 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -32,6 +32,54 @@ dependencies = [ "alloc-no-stdlib", ] +[[package]] +name = "anstream" +version = "0.6.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e2e1ebcb11de5c03c67de28a7df593d32191b44939c482e97702baaaa6ab6a5" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc" + +[[package]] +name = "anstyle-parse" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648" +dependencies = [ + "windows-sys 0.52.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7" +dependencies = [ + "anstyle", + "windows-sys 0.52.0", +] + [[package]] name = "async-compression" version = "0.4.6" @@ -154,6 +202,77 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "clap" +version = "4.4.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e578d6ec4194633722ccf9544794b71b1385c3c027efe0c55db226fc880865c" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.4.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4df4df40ec50c46000231c914968278b1eb05098cf8f1b3a518a95030e71d1c7" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", + "terminal_size", +] + +[[package]] +name = "clap_derive" +version = "4.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf9804afaaf59a91e75b022a30fb7229a7901f60c755489cc61c9b423b836442" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1" + +[[package]] +name = "clio" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7fc6734af48458f72f5a3fa7b840903606427d98a710256e808f76a965047d9" +dependencies = [ + "cfg-if", + "clap", + "is-terminal", + "libc", + "tempfile", + "walkdir", + "windows-sys 0.42.0", +] + +[[package]] +name = "colorchoice" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" + +[[package]] +name = "concurrent-queue" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "console_error_panic_hook" version = "0.1.7" @@ -198,6 +317,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" + [[package]] name = "crypto-common" version = "0.1.6" @@ -268,6 +393,8 @@ name = "epoxy-server" version = "1.0.0" dependencies = [ "bytes", + "clap", + "clio", "dashmap", "fastwebsockets", "futures-util", @@ -296,6 +423,17 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "event-listener" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b72557800024fabbaa2449dd4bf24e37b93702d457a4d4f2b0dd1f0f039f20c1" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + [[package]] name = "fastrand" version = "2.0.1" @@ -495,6 +633,12 @@ version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" + [[package]] name = "hermit-abi" version = "0.3.5" @@ -612,6 +756,17 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "is-terminal" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bad00257d07be169d870ab665980b06cdb366d792ad690bf2e76876dc503455" +dependencies = [ + "hermit-abi", + "rustix", + "windows-sys 0.52.0", +] + [[package]] name = "itoa" version = "1.0.10" @@ -620,9 +775,9 @@ checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" [[package]] name = "js-sys" -version = "0.3.67" +version = "0.3.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a1d36f1235bc969acba30b7f5990b864423a6068a10f7c90ae8f0112e3a59d1" +checksum = "406cda4b368d531c842222cf9d2600a9a4acce8d29423695379c6868a143a9ee" dependencies = [ "wasm-bindgen", ] @@ -774,6 +929,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "parking" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" + [[package]] name = "parking_lot" version = "0.12.1" @@ -983,6 +1144,15 @@ dependencies = [ "untrusted", ] +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.23" @@ -1105,6 +1275,12 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + [[package]] name = "subtle" version = "2.5.0" @@ -1134,6 +1310,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "terminal_size" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21bebf2b7c9e0a515f6e0f8c51dc0f8e4696391e6f1ff30379559f8365fb0df7" +dependencies = [ + "rustix", + "windows-sys 0.48.0", +] + [[package]] name = "thiserror" version = "1.0.56" @@ -1296,6 +1482,12 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "utf8parse" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" + [[package]] name = "vcpkg" version = "0.2.15" @@ -1308,6 +1500,16 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "walkdir" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71d857dc86794ca4c280d616f7da00d2dbfd8cd788846559a6813e6aa4b54ee" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -1325,9 +1527,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1223296a201415c7fad14792dbefaace9bd52b62d33453ade1c5b5f07555406" +checksum = "c1e124130aee3fb58c5bdd6b639a0509486b0338acaaae0c84a5124b0f588b7f" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -1335,9 +1537,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcdc935b63408d58a32f8cc9738a0bffd8f05cc7c002086c6ef20b7312ad9dcd" +checksum = "c9e7e1900c352b609c8488ad12639a311045f40a35491fb69ba8c12f758af70b" dependencies = [ "bumpalo", "log", @@ -1350,9 +1552,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.40" +version = "0.4.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bde2032aeb86bdfaecc8b261eef3cba735cc426c1f3a3416d1e0791be95fc461" +checksum = "877b9c3f61ceea0e56331985743b13f3d25c406a7098d45180fb5f09bc19ed97" dependencies = [ "cfg-if", "js-sys", @@ -1362,9 +1564,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e4c238561b2d428924c49815533a8b9121c664599558a5d9ec51f8a1740a999" +checksum = "b30af9e2d358182b5c7449424f017eba305ed32a7010509ede96cdc4696c46ed" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -1372,9 +1574,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bae1abb6806dc1ad9e560ed242107c0f6c84335f1749dd4e8ddb012ebd5e25a7" +checksum = "642f325be6301eb8107a83d12a8ac6c1e1c54345a7ef1a9261962dfefda09e66" dependencies = [ "proc-macro2", "quote", @@ -1385,9 +1587,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d91413b1c31d7539ba5ef2451af3f0b833a005eb27a631cec32bc0635a8602b" +checksum = "4f186bd2dcf04330886ce82d6f33dd75a7bfcf69ecf5763b89fcde53b6ac9838" [[package]] name = "wasm-streams" @@ -1435,6 +1637,52 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-util" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596" +dependencies = [ + "winapi", +] + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-sys" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" +dependencies = [ + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", +] + [[package]] name = "windows-sys" version = "0.48.0" @@ -1483,6 +1731,12 @@ dependencies = [ "windows_x86_64_msvc 0.52.0", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" @@ -1495,6 +1749,12 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" @@ -1507,6 +1767,12 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" +[[package]] +name = "windows_i686_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" + [[package]] name = "windows_i686_gnu" version = "0.48.5" @@ -1519,6 +1785,12 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" +[[package]] +name = "windows_i686_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" + [[package]] name = "windows_i686_msvc" version = "0.48.5" @@ -1531,6 +1803,12 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" @@ -1543,6 +1821,12 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" @@ -1555,6 +1839,12 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" @@ -1573,6 +1863,7 @@ version = "0.1.0" dependencies = [ "async_io_stream", "bytes", + "event-listener", "fastwebsockets", "futures", "futures-util", diff --git a/Cargo.toml b/Cargo.toml index 2e8971b..2fcf5e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,5 +7,5 @@ rustls-pki-types = { git = "https://github.com/r58Playz/rustls-pki-types" } [profile.release] lto = true -opt-level = 3 +opt-level = 'z' codegen-units = 1 diff --git a/client/build.sh b/client/build.sh index 7c2725b..7f402f0 100755 --- a/client/build.sh +++ b/client/build.sh @@ -11,7 +11,7 @@ wasm-bindgen --weak-refs --target no-modules --no-modules-global epoxy --out-dir echo "[ws] wasm-bindgen finished" mv out/epoxy_client_bg.wasm out/epoxy_client_unoptimized.wasm -time wasm-opt -O4 --vacuum --dce --enable-threads --enable-bulk-memory --enable-simd out/epoxy_client_unoptimized.wasm -o out/epoxy_client_bg.wasm +time wasm-opt -Oz --vacuum --dce --enable-threads --enable-bulk-memory --enable-simd out/epoxy_client_unoptimized.wasm -o out/epoxy_client_bg.wasm echo "[ws] wasm-opt finished" AUTOGENERATED_SOURCE=$(<"out/epoxy_client.js") diff --git a/client/src/lib.rs b/client/src/lib.rs index 2a40a51..4da9fee 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -49,7 +49,7 @@ type EpxIoStream = Either; #[wasm_bindgen(start)] fn init() { - std::panic::set_hook(Box::new(console_error_panic_hook::hook)); + console_error_panic_hook::set_once(); } #[wasm_bindgen] @@ -86,7 +86,7 @@ impl EpoxyClient { .replace_err("Failed to connect to websocket")?; debug!("connected!"); let (wtx, wrx) = ws.split(); - let (mux, fut) = ClientMux::new(wrx, wtx); + let (mux, fut) = ClientMux::new(wrx, wtx).await?; let mux = Arc::new(mux); wasm_bindgen_futures::spawn_local(async move { @@ -174,12 +174,6 @@ impl EpoxyClient { .replace_err("Redirect URL must have an authority") .ok() { - let should_strip = new_req.uri().is_same_host(&redirect_url); - if should_strip { - new_req.headers_mut().remove("authorization"); - new_req.headers_mut().remove("cookie"); - new_req.headers_mut().remove("www-authenticate"); - } *new_req.uri_mut() = redirect_url; new_req.headers_mut().insert( "Host", diff --git a/server/Cargo.toml b/server/Cargo.toml index a0a64c1..5da8ac0 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -5,6 +5,8 @@ edition = "2021" [dependencies] bytes = "1.5.0" +clap = { version = "4.4.18", features = ["derive", "help", "usage", "color", "wrap_help", "cargo"] } +clio = { version = "0.3.5", features = ["clap-parse"] } dashmap = "5.5.3" fastwebsockets = { version = "0.6.0", features = ["upgrade", "simdutf8", "unstable-split"] } futures-util = { version = "0.3.30", features = ["sink"] } diff --git a/server/src/main.rs b/server/src/main.rs index 04c39e7..58ef7c2 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,7 +1,8 @@ #![feature(let_chains)] -use std::io::Error; +use std::io::{Error, Read}; use bytes::Bytes; +use clap::Parser; use fastwebsockets::{ upgrade, CloseCode, FragmentCollector, FragmentCollectorRead, Frame, OpCode, Payload, WebSocketError, @@ -18,25 +19,36 @@ use tokio_util::codec::{BytesCodec, Framed}; use wisp_mux::{ws, ConnectPacket, MuxStream, ServerMux, StreamType, WispError, WsEvent}; -type HttpBody = http_body_util::Empty; +type HttpBody = http_body_util::Full; -#[tokio::main] +#[derive(Parser)] +#[command(version = clap::crate_version!(), about = "Implementation of the Wisp protocol in Rust, made for epoxy.")] +struct Cli { + #[arg(long, default_value = "/")] + prefix: String, + #[arg( + long = "port", + short = 'l', + value_name = "PORT", + default_value = "4000" + )] + listen_port: String, + #[arg(long, short, value_parser)] + pubkey: clio::Input, + #[arg(long, short = 'P', value_parser)] + privkey: clio::Input, +} + +#[tokio::main(flavor = "multi_thread")] async fn main() -> Result<(), Error> { - let pem = include_bytes!("./pem.pem"); - let key = include_bytes!("./key.pem"); - let identity = native_tls::Identity::from_pkcs8(pem, key).expect("failed to make identity"); - let prefix = if let Some(prefix) = std::env::args().nth(1) { - prefix - } else { - "/".to_string() - }; - let port = if let Some(prefix) = std::env::args().nth(2) { - prefix - } else { - "4000".to_string() - }; + let mut opt = Cli::parse(); + let mut pem = Vec::new(); + opt.pubkey.read_to_end(&mut pem)?; + let mut key = Vec::new(); + opt.privkey.read_to_end(&mut key)?; + let identity = native_tls::Identity::from_pkcs8(&pem, &key).expect("failed to make identity"); - let socket = TcpListener::bind(format!("0.0.0.0:{}", port)) + let socket = TcpListener::bind(format!("0.0.0.0:{}", opt.listen_port)) .await .expect("failed to bind"); let acceptor = TlsAcceptor::from( @@ -47,7 +59,7 @@ async fn main() -> Result<(), Error> { println!("listening on 0.0.0.0:4000"); while let Ok((stream, addr)) = socket.accept().await { let acceptor_cloned = acceptor.clone(); - let prefix_cloned = prefix.clone(); + let prefix_cloned = opt.prefix.clone(); tokio::spawn(async move { let stream = acceptor_cloned.accept(stream).await.expect("not tls"); let io = TokioIo::new(stream); @@ -72,15 +84,23 @@ async fn accept_http( ) -> Result, WebSocketError> { if upgrade::is_upgrade_request(&req) && req.uri().path().to_string().starts_with(&prefix) - && let Some(protocol) = req.headers().get("Sec-Websocket-Protocol") - && protocol == "wisp-v1" + && let Some(protocols) = req.headers().get("Sec-Websocket-Protocol").and_then(|x| { + Some( + x.to_str() + .ok()? + .split(',') + .map(|x| x.trim()) + .collect::>(), + ) + }) + && protocols.contains(&"wisp-v1") { let uri = req.uri().clone(); let (mut res, fut) = upgrade::upgrade(&mut req)?; println!("{:?} {:?}", uri.path(), prefix); - if *uri.path() != prefix { + if uri.path().starts_with(&prefix) { tokio::spawn(async move { accept_wsproxy(fut, uri.path().strip_prefix(&prefix).unwrap(), addr.clone()).await }); @@ -92,11 +112,14 @@ async fn accept_http( "Sec-Websocket-Protocol", HeaderValue::from_str("wisp-v1").unwrap(), ); - Ok(res) + Ok(Response::from_parts( + res.into_parts().0, + HttpBody::new(Bytes::new()), + )) } else { Ok(Response::builder() .status(StatusCode::OK) - .body(HttpBody::new()) + .body(HttpBody::new(":3".to_string().into())) .unwrap()) } } @@ -176,7 +199,7 @@ async fn accept_ws( println!("{:?}: connected", addr); - let (mut mux, fut) = ServerMux::new(rx, tx); + let (mut mux, fut) = ServerMux::new(rx, tx, 128); tokio::spawn(async move { if let Err(e) = fut.await { diff --git a/wisp/Cargo.lock b/wisp/Cargo.lock deleted file mode 100644 index 19bc2ba..0000000 --- a/wisp/Cargo.lock +++ /dev/null @@ -1,320 +0,0 @@ -# This file is automatically @generated by Cargo. -# It is not intended for manual editing. -version = 3 - -[[package]] -name = "autocfg" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" - -[[package]] -name = "bitflags" -version = "1.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" - -[[package]] -name = "bytes" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" - -[[package]] -name = "cfg-if" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" - -[[package]] -name = "dashmap" -version = "5.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" -dependencies = [ - "cfg-if", - "hashbrown", - "lock_api", - "once_cell", - "parking_lot_core", -] - -[[package]] -name = "futures" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" -dependencies = [ - "futures-channel", - "futures-core", - "futures-executor", - "futures-io", - "futures-sink", - "futures-task", - "futures-util", -] - -[[package]] -name = "futures-channel" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" -dependencies = [ - "futures-core", - "futures-sink", -] - -[[package]] -name = "futures-core" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" - -[[package]] -name = "futures-executor" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" -dependencies = [ - "futures-core", - "futures-task", - "futures-util", -] - -[[package]] -name = "futures-io" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" - -[[package]] -name = "futures-macro" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "futures-sink" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" - -[[package]] -name = "futures-task" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" - -[[package]] -name = "futures-util" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" -dependencies = [ - "futures-channel", - "futures-core", - "futures-io", - "futures-macro", - "futures-sink", - "futures-task", - "memchr", - "pin-project-lite", - "pin-utils", - "slab", -] - -[[package]] -name = "hashbrown" -version = "0.14.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" - -[[package]] -name = "libc" -version = "0.2.152" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7" - -[[package]] -name = "lock_api" -version = "0.4.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45" -dependencies = [ - "autocfg", - "scopeguard", -] - -[[package]] -name = "memchr" -version = "2.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" - -[[package]] -name = "once_cell" -version = "1.19.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" - -[[package]] -name = "parking_lot_core" -version = "0.9.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" -dependencies = [ - "cfg-if", - "libc", - "redox_syscall", - "smallvec", - "windows-targets", -] - -[[package]] -name = "pin-project-lite" -version = "0.2.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" - -[[package]] -name = "pin-utils" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" - -[[package]] -name = "proc-macro2" -version = "1.0.76" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95fc56cda0b5c3325f5fbbd7ff9fda9e02bb00bb3dac51252d2f1bfa1cb8cc8c" -dependencies = [ - "unicode-ident", -] - -[[package]] -name = "quote" -version = "1.0.35" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" -dependencies = [ - "proc-macro2", -] - -[[package]] -name = "redox_syscall" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" -dependencies = [ - "bitflags", -] - -[[package]] -name = "scopeguard" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" - -[[package]] -name = "slab" -version = "0.4.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" -dependencies = [ - "autocfg", -] - -[[package]] -name = "smallvec" -version = "1.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" - -[[package]] -name = "syn" -version = "2.0.48" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f3531638e407dfc0814761abb7c00a5b54992b849452a0646b7f65c9f770f3f" -dependencies = [ - "proc-macro2", - "quote", - "unicode-ident", -] - -[[package]] -name = "unicode-ident" -version = "1.0.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" - -[[package]] -name = "windows-targets" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" -dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", -] - -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" - -[[package]] -name = "windows_aarch64_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" - -[[package]] -name = "windows_i686_gnu" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" - -[[package]] -name = "windows_i686_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" - -[[package]] -name = "windows_x86_64_gnu" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" - -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" - -[[package]] -name = "windows_x86_64_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" - -[[package]] -name = "wisp-mux" -version = "0.1.0" -dependencies = [ - "bytes", - "dashmap", - "futures", - "futures-util", -] diff --git a/wisp/Cargo.toml b/wisp/Cargo.toml index 42f65e2..da0a601 100644 --- a/wisp/Cargo.toml +++ b/wisp/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] async_io_stream = "0.3.3" bytes = "1.5.0" +event-listener = "5.0.0" fastwebsockets = { version = "0.6.0", features = ["unstable-split"], optional = true } futures = "0.3.30" futures-util = "0.3.30" diff --git a/wisp/src/lib.rs b/wisp/src/lib.rs index 9ee6785..7ad9931 100644 --- a/wisp/src/lib.rs +++ b/wisp/src/lib.rs @@ -3,17 +3,18 @@ mod fastwebsockets; mod packet; mod stream; -pub mod ws; -#[cfg(feature = "ws_stream_wasm")] -mod ws_stream_wasm; #[cfg(feature = "hyper_tower")] pub mod tokioio; #[cfg(feature = "hyper_tower")] pub mod tower; +pub mod ws; +#[cfg(feature = "ws_stream_wasm")] +mod ws_stream_wasm; pub use crate::packet::*; pub use crate::stream::*; +use event_listener::Event; use futures::{channel::mpsc, lock::Mutex, Future, FutureExt, StreamExt}; use std::{ collections::HashMap, @@ -23,7 +24,7 @@ use std::{ }, }; -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Copy, Clone)] pub enum Role { Client, Server, @@ -96,13 +97,14 @@ impl ServerMuxInner { rx: R, close_rx: mpsc::UnboundedReceiver, muxstream_sender: mpsc::UnboundedSender<(ConnectPacket, MuxStream)>, + buffer_size: u32 ) -> Result<(), WispError> where R: ws::WebSocketRead, { let ret = futures::select! { x = self.server_close_loop(close_rx, self.stream_map.clone(), self.tx.clone()).fuse() => x, - x = self.server_msg_loop(rx, muxstream_sender).fuse() => x + x = self.server_msg_loop(rx, muxstream_sender, buffer_size).fuse() => x }; self.stream_map.lock().await.iter().for_each(|x| { let _ = x.1.unbounded_send(WsEvent::Close(ClosePacket::new(0x01))); @@ -137,12 +139,13 @@ impl ServerMuxInner { &self, mut rx: R, muxstream_sender: mpsc::UnboundedSender<(ConnectPacket, MuxStream)>, + buffer_size: u32, ) -> Result<(), WispError> where R: ws::WebSocketRead, { self.tx - .write_frame(Packet::new_continue(0, u32::MAX).into()) + .write_frame(Packet::new_continue(0, buffer_size).into()) .await?; while let Ok(frame) = rx.wisp_read_frame(&self.tx).await { @@ -157,10 +160,13 @@ impl ServerMuxInner { inner_packet, MuxStream::new( packet.stream_id, + Role::Server, ch_rx, self.tx.clone(), self.close_tx.clone(), AtomicBool::new(false).into(), + AtomicU32::new(buffer_size).into(), + Event::new().into(), ), )) .map_err(|x| WispError::Other(Box::new(x)))?; @@ -168,11 +174,6 @@ impl ServerMuxInner { Data(data) => { if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { let _ = stream.unbounded_send(WsEvent::Send(data)); - self.tx - .write_frame( - Packet::new_continue(packet.stream_id, u32::MAX).into(), - ) - .await?; } } Continue(_) => unreachable!(), @@ -200,7 +201,7 @@ where } impl ServerMux { - pub fn new(read: R, write: W) -> (Self, impl Future>) + pub fn new(read: R, write: W, buffer_size: u32) -> (Self, impl Future>) where R: ws::WebSocketRead, { @@ -215,7 +216,7 @@ impl ServerMux { close_tx, stream_map: map.clone(), } - .into_future(read, close_rx, tx), + .into_future(read, close_rx, tx, buffer_size), ) } @@ -229,7 +230,8 @@ where W: ws::WebSocketWrite, { tx: ws::LockedWebSocketWrite, - stream_map: Arc>>>, + stream_map: + Arc, Arc, Arc)>>>, } impl ClientMuxInner { @@ -280,13 +282,20 @@ impl ClientMuxInner { Connect(_) => unreachable!(), Data(data) => { if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { - let _ = stream.unbounded_send(WsEvent::Send(data)); + let _ = stream.0.unbounded_send(WsEvent::Send(data)); + } + } + Continue(inner_packet) => { + if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { + stream + .1 + .store(inner_packet.buffer_remaining, Ordering::Release); + let _ = stream.2.notify(u32::MAX); } } - Continue(_) => {} Close(inner_packet) => { if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { - let _ = stream.unbounded_send(WsEvent::Close(inner_packet)); + let _ = stream.0.unbounded_send(WsEvent::Close(inner_packet)); } self.stream_map.lock().await.remove(&packet.stream_id); } @@ -302,32 +311,46 @@ where W: ws::WebSocketWrite, { tx: ws::LockedWebSocketWrite, - stream_map: Arc>>>, + stream_map: + Arc, Arc, Arc)>>>, next_free_stream_id: AtomicU32, close_tx: mpsc::UnboundedSender, + buf_size: u32, } impl ClientMux { - pub fn new(read: R, write: W) -> (Self, impl Future>) + pub async fn new( + mut read: R, + write: W, + ) -> Result<(Self, impl Future>), WispError> where R: ws::WebSocketRead, { - let (tx, rx) = mpsc::unbounded::(); - let map = Arc::new(Mutex::new(HashMap::new())); let write = ws::LockedWebSocketWrite::new(write); - ( - Self { - tx: write.clone(), - stream_map: map.clone(), - next_free_stream_id: AtomicU32::new(1), - close_tx: tx, - }, - ClientMuxInner { - tx: write.clone(), - stream_map: map.clone(), - } - .into_future(read, rx), - ) + let first_packet = Packet::try_from(read.wisp_read_frame(&write).await?)?; + if first_packet.stream_id != 0 { + return Err(WispError::InvalidStreamId); + } + if let PacketType::Continue(packet) = first_packet.packet { + let (tx, rx) = mpsc::unbounded::(); + let map = Arc::new(Mutex::new(HashMap::new())); + Ok(( + Self { + tx: write.clone(), + stream_map: map.clone(), + next_free_stream_id: AtomicU32::new(1), + close_tx: tx, + buf_size: packet.buffer_remaining, + }, + ClientMuxInner { + tx: write.clone(), + stream_map: map.clone(), + } + .into_future(read, rx), + )) + } else { + Err(WispError::InvalidPacketType) + } } pub async fn client_new_stream( @@ -337,6 +360,8 @@ impl ClientMux { port: u16, ) -> Result, WispError> { let (ch_tx, ch_rx) = mpsc::unbounded(); + let evt: Arc = Event::new().into(); + let flow_control: Arc = AtomicU32::new(self.buf_size).into(); let stream_id = self.next_free_stream_id.load(Ordering::Acquire); self.tx .write_frame(Packet::new_connect(stream_id, stream_type, port, host).into()) @@ -347,13 +372,19 @@ impl ClientMux { .ok_or(WispError::MaxStreamCountReached)?, Ordering::Release, ); - self.stream_map.lock().await.insert(stream_id, ch_tx); + self.stream_map + .lock() + .await + .insert(stream_id, (ch_tx, flow_control.clone(), evt.clone())); Ok(MuxStream::new( stream_id, + Role::Client, ch_rx, self.tx.clone(), self.close_tx.clone(), AtomicBool::new(false).into(), + flow_control, + evt, )) } } diff --git a/wisp/src/packet.rs b/wisp/src/packet.rs index 98eb20e..505bbe6 100644 --- a/wisp/src/packet.rs +++ b/wisp/src/packet.rs @@ -63,7 +63,7 @@ impl From for Vec { #[derive(Debug)] pub struct ContinuePacket { - buffer_remaining: u32, + pub buffer_remaining: u32, } impl ContinuePacket { @@ -94,7 +94,7 @@ impl From for Vec { #[derive(Debug)] pub struct ClosePacket { - reason: u8, + pub reason: u8, } impl ClosePacket { diff --git a/wisp/src/stream.rs b/wisp/src/stream.rs index 3998c9d..f561edb 100644 --- a/wisp/src/stream.rs +++ b/wisp/src/stream.rs @@ -1,5 +1,6 @@ use async_io_stream::IoStream; use bytes::Bytes; +use event_listener::Event; use futures::{ channel::{mpsc, oneshot}, sink, stream, @@ -10,7 +11,7 @@ use pin_project_lite::pin_project; use std::{ pin::Pin, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU32, Ordering}, Arc, }, }; @@ -24,19 +25,36 @@ pub enum MuxEvent { Close(u32, u8, oneshot::Sender>), } -pub struct MuxStreamRead { +pub struct MuxStreamRead +where + W: crate::ws::WebSocketWrite, +{ pub stream_id: u32, + role: crate::Role, + tx: crate::ws::LockedWebSocketWrite, rx: mpsc::UnboundedReceiver, is_closed: Arc, + flow_control: Arc, } -impl MuxStreamRead { +impl MuxStreamRead { pub async fn read(&mut self) -> Option { if self.is_closed.load(Ordering::Acquire) { return None; } match self.rx.next().await? { - WsEvent::Send(bytes) => Some(WsEvent::Send(bytes)), + WsEvent::Send(bytes) => { + if self.role == crate::Role::Server { + let old_val = self.flow_control.fetch_add(1, Ordering::SeqCst); + self.tx + .write_frame( + crate::Packet::new_continue(self.stream_id, old_val + 1).into(), + ) + .await + .ok()?; + } + Some(WsEvent::Send(bytes)) + } WsEvent::Close(packet) => { self.is_closed.store(true, Ordering::Release); Some(WsEvent::Close(packet)) @@ -63,9 +81,12 @@ where W: crate::ws::WebSocketWrite, { pub stream_id: u32, + role: crate::Role, tx: crate::ws::LockedWebSocketWrite, close_channel: mpsc::UnboundedSender, is_closed: Arc, + continue_recieved: Arc, + flow_control: Arc, } impl MuxStreamWrite { @@ -73,9 +94,22 @@ impl MuxStreamWrite { if self.is_closed.load(Ordering::Acquire) { return Err(crate::WispError::StreamAlreadyClosed); } + if self.role == crate::Role::Client && self.flow_control.load(Ordering::Acquire) <= 0 { + self.continue_recieved.listen().await; + } self.tx .write_frame(crate::Packet::new_data(self.stream_id, data).into()) - .await + .await?; + if self.role == crate::Role::Client { + self.flow_control.store( + self.flow_control + .load(Ordering::Acquire) + .checked_add(1) + .unwrap_or(0), + Ordering::Release, + ); + } + Ok(()) } pub fn get_close_handle(&self) -> MuxStreamCloser { @@ -123,30 +157,39 @@ where W: crate::ws::WebSocketWrite, { pub stream_id: u32, - rx: MuxStreamRead, + rx: MuxStreamRead, tx: MuxStreamWrite, } impl MuxStream { pub(crate) fn new( stream_id: u32, + role: crate::Role, rx: mpsc::UnboundedReceiver, tx: crate::ws::LockedWebSocketWrite, close_channel: mpsc::UnboundedSender, is_closed: Arc, + flow_control: Arc, + continue_recieved: Arc ) -> Self { Self { stream_id, rx: MuxStreamRead { stream_id, + role, + tx: tx.clone(), rx, is_closed: is_closed.clone(), + flow_control: flow_control.clone(), }, tx: MuxStreamWrite { stream_id, + role, tx, close_channel, is_closed: is_closed.clone(), + flow_control: flow_control.clone(), + continue_recieved: continue_recieved.clone(), }, } } @@ -167,7 +210,7 @@ impl MuxStream { self.tx.close(reason).await } - pub fn into_split(self) -> (MuxStreamRead, MuxStreamWrite) { + pub fn into_split(self) -> (MuxStreamRead, MuxStreamWrite) { (self.rx, self.tx) }