diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..bbfc28a --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,3 @@ +[build] +rustflags = ["--cfg", "tokio_unstable"] + diff --git a/Cargo.lock b/Cargo.lock index 45851ff..2434d76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,15 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "aho-corasick" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0" +dependencies = [ + "memchr", +] + [[package]] name = "alloc-no-stdlib" version = "2.0.4" @@ -80,6 +89,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "anyhow" +version = "1.0.81" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0952808a6c2afd1aa8947271f3a60f1a6763c7b912d210184c5149b5cf147247" + [[package]] name = "async-compression" version = "0.4.6" @@ -94,6 +109,39 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "async-trait" +version = "0.1.78" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "461abc97219de0eaaf81fe3ef974a540158f3d079c2ab200f891f1a2ef201e85" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async_io_stream" version = "0.3.3" @@ -111,6 +159,51 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.28", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 0.2.12", + "http-body 0.4.6", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -155,9 +248,9 @@ dependencies = [ [[package]] name = "brotli" -version = "3.4.0" +version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "516074a47ef4bce09577a3b379392300159ce5b1ba2e501ff1c819950066100f" +checksum = "d640d25bc63c50fb1f0b545ffd80207d2e10a4c965530809b40ba3386825c391" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", @@ -180,6 +273,12 @@ version = "3.15.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ff69b9dd49fd426c69a0db9fc04dd934cdb6645ff000864d98f7e2af8830eaa" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.5.0" @@ -200,9 +299,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "clap" -version = "4.5.2" +version = "4.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b230ab84b0ffdf890d5a10abdbc8b83ae1c4918275daea1ab8801f71536b2651" +checksum = "949626d00e063efc93b6dca932419ceb5432f99769911c0b995f7e884c778813" dependencies = [ "clap_builder", "clap_derive", @@ -223,9 +322,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.0" +version = "4.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "307bc0538d5f0f83b8248db3087aa92fe504e4691294d0c96c0eabc33f47ba47" +checksum = "90239a040c80f5e14809ca132ddc4176ab33d5e17e49691793296e3fcb34d72f" dependencies = [ "heck", "proc-macro2", @@ -269,6 +368,43 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "console-api" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd326812b3fd01da5bb1af7d340d0d555fd3d4b641e7f1dfcf5962a902952787" +dependencies = [ + "futures-core", + "prost", + "prost-types", + "tonic", + "tracing-core", +] + +[[package]] +name = "console-subscriber" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7481d4c57092cd1c19dd541b92bdce883de840df30aa5d03fd48a3935c01842e" +dependencies = [ + "console-api", + "crossbeam-channel", + "crossbeam-utils", + "futures-task", + "hdrhistogram", + "humantime", + "prost-types", + "serde", + "serde_json", + "thread_local", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "console_error_panic_hook" version = "0.1.7" @@ -313,6 +449,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab3db02a9c5b5121e1e42fbdb1aeb65f5e02624cc58c43f2884c6ccac0b82f95" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.19" @@ -336,7 +481,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "hashbrown", + "hashbrown 0.14.3", "lock_api", "once_cell", "parking_lot_core", @@ -352,9 +497,15 @@ dependencies = [ "crypto-common", ] +[[package]] +name = "either" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11157ac094ffbdde99aa67b23417ebdd801842852b500e395a45a9c0aac03e4a" + [[package]] name = "epoxy-client" -version = "1.4.0" +version = "1.4.2" dependencies = [ "async-compression", "async_io_stream", @@ -365,9 +516,9 @@ dependencies = [ "fastwebsockets", "futures-util", "getrandom", - "http", + "http 1.1.0", "http-body-util", - "hyper", + "hyper 1.2.0", "hyper-util-wasm", "js-sys", "pin-project-lite", @@ -393,11 +544,12 @@ dependencies = [ "bytes", "clap", "clio", + "console-subscriber", "dashmap", "fastwebsockets", "futures-util", "http-body-util", - "hyper", + "hyper 1.2.0", "hyper-util", "tokio", "tokio-util", @@ -445,7 +597,7 @@ checksum = "f63dd7b57f9b33b1741fa631c9522eb35d43e96dcca4a6a91d5e4ca7c93acdc1" dependencies = [ "base64", "http-body-util", - "hyper", + "hyper 1.2.0", "hyper-util", "pin-project", "rand", @@ -607,23 +759,48 @@ checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" [[package]] name = "h2" -version = "0.4.2" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31d030e59af851932b72ceebadf4a2b5986dba4c3b99dd2493f8273a0f151943" +checksum = "4fbd2820c5e49886948654ab546d0688ff24530286bdcf8fca3cefb16d4618eb" dependencies = [ "bytes", "fnv", "futures-core", "futures-sink", "futures-util", - "http", - "indexmap", + "http 0.2.12", + "indexmap 2.2.5", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "h2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51ee2dd2e4f378392eeff5d51618cd9a63166a2513846bbc55f21cfacd9199d4" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 1.1.0", + "indexmap 2.2.5", "slab", "tokio", "tokio-util", "tracing", ] +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + [[package]] name = "hashbrown" version = "0.14.3" @@ -631,10 +808,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" [[package]] -name = "heck" -version = "0.4.1" +name = "hdrhistogram" +version = "7.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" +dependencies = [ + "base64", + "byteorder", + "flate2", + "nom", + "num-traits", +] + +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hermit-abi" @@ -642,6 +832,17 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "http" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http" version = "1.1.0" @@ -653,6 +854,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http 0.2.12", + "pin-project-lite", +] + [[package]] name = "http-body" version = "1.0.0" @@ -660,19 +872,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" dependencies = [ "bytes", - "http", + "http 1.1.0", ] [[package]] name = "http-body-util" -version = "0.1.0" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840" +checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d" dependencies = [ "bytes", - "futures-util", - "http", - "http-body", + "futures-core", + "http 1.1.0", + "http-body 1.0.0", "pin-project-lite", ] @@ -688,6 +900,36 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + +[[package]] +name = "hyper" +version = "0.14.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2 0.3.25", + "http 0.2.12", + "http-body 0.4.6", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "hyper" version = "1.2.0" @@ -697,9 +939,9 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2", - "http", - "http-body", + "h2 0.4.3", + "http 1.1.0", + "http-body 1.0.0", "httparse", "httpdate", "itoa", @@ -709,6 +951,18 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper 0.14.28", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "hyper-util" version = "0.1.3" @@ -717,9 +971,9 @@ checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa" dependencies = [ "bytes", "futures-util", - "http", - "http-body", - "hyper", + "http 1.1.0", + "http-body 1.0.0", + "hyper 1.2.0", "pin-project-lite", "socket2", "tokio", @@ -734,9 +988,9 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "http", - "http-body", - "hyper", + "http 1.1.0", + "http-body 1.0.0", + "hyper 1.2.0", "pin-project-lite", "tower", "tower-service", @@ -745,6 +999,16 @@ dependencies = [ "wasmtimer", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.2.5" @@ -752,7 +1016,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b0b929d511467233429c45a44ac1dcaa21ba0f5ba11e4879e6ed28ddb4f9df4" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.14.3", ] [[package]] @@ -766,6 +1030,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "itertools" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.10" @@ -815,12 +1088,39 @@ version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "memchr" version = "2.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.7.2" @@ -859,6 +1159,25 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + +[[package]] +name = "num-traits" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0df0e5185db44f69b44f26786fe401b6c293d1907744beaa7fa62b2e5a517a" +dependencies = [ + "autocfg", +] + [[package]] name = "num_cpus" version = "1.16.0" @@ -957,6 +1276,12 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "percent-encoding" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" + [[package]] name = "pin-project" version = "1.1.5" @@ -1003,13 +1328,45 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" -version = "1.0.78" +version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" +checksum = "e835ff2298f5721608eb1a980ecaee1aef2c132bf95ecc026a11b7bf3c01c02e" dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "prost-types" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "193898f59edcf43c26227dcd4c8427f00d99d61e95dcde58dabd49fa291d470e" +dependencies = [ + "prost", +] + [[package]] name = "quote" version = "1.0.35" @@ -1058,6 +1415,50 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "regex" +version = "1.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata 0.4.6", + "regex-syntax 0.8.2", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", +] + +[[package]] +name = "regex-automata" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax 0.8.2", +] + +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + +[[package]] +name = "regex-syntax" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" + [[package]] name = "ring" version = "0.17.8" @@ -1117,8 +1518,8 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.2.0" -source = "git+https://github.com/r58Playz/rustls-pki-types#7bc22404e91ac909ef0e6ac11e6e316aefacde75" +version = "1.3.1" +source = "git+https://github.com/r58Playz/rustls-pki-types#944afed109e6874e1482392c7ab5333d7112ef47" dependencies = [ "wasm-bindgen", ] @@ -1134,6 +1535,18 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustversion" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" + +[[package]] +name = "ryu" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" + [[package]] name = "same-file" version = "1.0.6" @@ -1193,6 +1606,37 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" +[[package]] +name = "serde" +version = "1.0.197" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fb1c873e1b9b056a4dc4c0c198b24c3ffa059243875552b2bd0933b1aee4ce2" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.197" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.114" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5f09b1bd632ef549eaa9f60a1f8de742bdbc698e6cee2095fc84dde5f549ae0" +dependencies = [ + "itoa", + "ryu", + "serde", +] + [[package]] name = "sha1" version = "0.10.6" @@ -1204,6 +1648,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -1224,10 +1677,11 @@ name = "simple-wisp-client" version = "1.0.0" dependencies = [ "bytes", + "console-subscriber", "fastwebsockets", "futures", "http-body-util", - "hyper", + "hyper 1.2.0", "tokio", "tokio-native-tls", "tokio-util", @@ -1279,15 +1733,21 @@ checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" [[package]] name = "syn" -version = "2.0.52" +version = "2.0.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b699d15b36d1f02c3e7c69f8ffef53de37aefae075d8488d4ba1a7788d574a07" +checksum = "7383cd0e49fff4b6b90ca5670bfd3e9d6a733b3f90c686605aa7eec8c4996032" dependencies = [ "proc-macro2", "quote", "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "tempfile" version = "3.10.1" @@ -1312,24 +1772,34 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.57" +version = "1.0.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e45bcbe8ed29775f228095caf2cd67af7a4ccf756ebff23a306bf3e8b47b24b" +checksum = "03468839009160513471e86a034bb2c5c0e4baae3b43f79ffc55c4a5427b3297" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.57" +version = "1.0.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a953cb265bef375dae3de6663da4d3804eee9682ea80d8e2542529b73c531c81" +checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7" dependencies = [ "proc-macro2", "quote", "syn", ] +[[package]] +name = "thread_local" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "tokio" version = "1.36.0" @@ -1346,9 +1816,20 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", + "tracing", "windows-sys 0.48.0", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.2.0" @@ -1381,6 +1862,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.10" @@ -1395,6 +1887,33 @@ dependencies = [ "tracing", ] +[[package]] +name = "tonic" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64", + "bytes", + "h2 0.3.25", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.28", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.4.13" @@ -1403,8 +1922,13 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", + "indexmap 1.9.3", "pin-project", "pin-project-lite", + "rand", + "slab", + "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", @@ -1430,9 +1954,21 @@ checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ "log", "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tracing-core" version = "0.1.32" @@ -1440,6 +1976,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +dependencies = [ + "matchers", + "once_cell", + "regex", + "sharded-slab", + "thread_local", + "tracing", + "tracing-core", ] [[package]] @@ -1478,6 +2030,12 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "vcpkg" version = "0.2.15" @@ -1849,7 +2407,7 @@ checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8" [[package]] name = "wisp-mux" -version = "1.2.2" +version = "2.0.0" dependencies = [ "async_io_stream", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 2fcf5e1..3f17cef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] resolver = "2" members = ["server", "client", "wisp", "simple-wisp-client"] +default-members = ["server"] [patch.crates-io] rustls-pki-types = { git = "https://github.com/r58Playz/rustls-pki-types" } @@ -8,4 +9,7 @@ rustls-pki-types = { git = "https://github.com/r58Playz/rustls-pki-types" } [profile.release] lto = true opt-level = 'z' +strip = true +panic = "abort" codegen-units = 1 + diff --git a/client/.npmignore b/client/.npmignore index de95065..f1bf2aa 100644 --- a/client/.npmignore +++ b/client/.npmignore @@ -2,5 +2,3 @@ build.sh Cargo.toml serve.py src -pkg/epoxy.wasm - diff --git a/client/Cargo.toml b/client/Cargo.toml index ff380cd..895d526 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "epoxy-client" -version = "1.4.1" +version = "1.4.2" edition = "2021" license = "LGPL-3.0-only" @@ -38,4 +38,3 @@ wasmtimer = "0.2.0" [dependencies.ring] features = ["wasm32_unknown_unknown_js"] - diff --git a/client/build.sh b/client/build.sh index f420ffa..6bc90fe 100755 --- a/client/build.sh +++ b/client/build.sh @@ -19,6 +19,7 @@ echo "[epx] wasm-opt finished" AUTOGENERATED_SOURCE=$(<"out/epoxy_client.js") # patch for websocket sharedarraybuffer error AUTOGENERATED_SOURCE=${AUTOGENERATED_SOURCE//getObject(arg0).send(getArrayU8FromWasm0(arg1, arg2)/getObject(arg0).send(new Uint8Array(getArrayU8FromWasm0(arg1, arg2))} +echo "$AUTOGENERATED_SOURCE" > pkg/epoxy.js WASM_BASE64=$(base64 -w0 out/epoxy_client_bg.wasm) AUTOGENERATED_SOURCE=${AUTOGENERATED_SOURCE//__wbg_init(input, maybe_memory) \{/__wbg_init(maybe_memory) \{$'\n'let input=\'data:application/wasm;base64,$WASM_BASE64\'} @@ -37,6 +38,7 @@ echo "}\nexport default function epoxy(maybe_memory?: WebAssembly.Memory): Promi echo "$AUTOGENERATED_TYPEDEFS" > pkg/epoxy-bundled.d.ts echo "}\ndeclare function epoxy(maybe_memory?: WebAssembly.Memory): Promise;" >> pkg/epoxy-bundled.d.ts +cp out/epoxy_client.d.ts pkg/epoxy.d.ts cp out/epoxy_client_bg.wasm pkg/epoxy.wasm rm -r out/ diff --git a/client/demo.js b/client/demo.js index 8a7e54f..67549d0 100644 --- a/client/demo.js +++ b/client/demo.js @@ -12,6 +12,7 @@ onmessage = async (msg) => { should_tls_test, should_udp_test, should_reconnect_test, + should_perf2_test, ] = msg.data; console.log( "%cWASM is significantly slower with DevTools open!", @@ -217,6 +218,24 @@ onmessage = async (msg) => { log("sent req"); await (new Promise((res, _) => setTimeout(res, 500))); } + } else if (should_perf2_test) { + const num_outer_tests = 10; + const num_inner_tests = 50; + let total_mux_multi = 0; + for (const _ of Array(num_outer_tests).keys()) { + let total_mux = 0; + await Promise.all([...Array(num_inner_tests).keys()].map(async i => { + log(`running mux test ${i}`); + return await test_mux("https://httpbin.org/get"); + })).then((vals) => { total_mux = vals.reduce((acc, x) => acc + x, 0) }); + total_mux = total_mux / num_inner_tests; + + log(`avg mux (${num_inner_tests}) took ${total_mux} ms or ${total_mux / 1000} s`); + total_mux_multi += total_mux; + } + total_mux_multi = total_mux_multi / num_outer_tests; + log(`total avg mux (${num_outer_tests} tests of ${num_inner_tests} reqs): ${total_mux_multi} ms or ${total_mux_multi / 1000} s`); + } else { let resp = await epoxy_client.fetch("https://httpbin.org/get"); console.log(resp, Object.fromEntries(resp.headers)); diff --git a/client/index.html b/client/index.html index fa5efc0..dfea541 100644 --- a/client/index.html +++ b/client/index.html @@ -18,6 +18,7 @@ const should_tls_test = params.has("rawtls_test"); const should_udp_test = params.has("udp_test"); const should_reconnect_test = params.has("reconnect_test"); + const should_perf2_test = params.has("perf2_test"); const worker = new Worker("demo.js", {type:'module'}); worker.onmessage = (msg) => { let el = document.createElement("pre"); @@ -35,6 +36,7 @@ should_tls_test, should_udp_test, should_reconnect_test, + should_perf2_test, ]); diff --git a/client/package.json b/client/package.json index 703d366..b5b6ffd 100644 --- a/client/package.json +++ b/client/package.json @@ -1,6 +1,6 @@ { "name": "@mercuryworkshop/epoxy-tls", - "version": "1.4.1", + "version": "1.4.2", "description": "A wasm library for using raw encrypted tls/ssl/https/websocket streams on the browser", "scripts": { "build": "./build.sh" diff --git a/client/src/utils.rs b/client/src/utils.rs index bd3245c..11d77c9 100644 --- a/client/src/utils.rs +++ b/client/src/utils.rs @@ -6,7 +6,7 @@ use wasm_bindgen_futures::JsFuture; use hyper::rt::Executor; use js_sys::ArrayBuffer; use std::future::Future; -use wisp_mux::{CloseReason, WispError}; +use wisp_mux::WispError; #[wasm_bindgen] extern "C" { @@ -22,11 +22,11 @@ macro_rules! debug { ($($t:tt)*) => (utils::console_debug(&format_args!($($t)*).to_string())) } -#[allow(unused_macros)] macro_rules! log { ($($t:tt)*) => (utils::console_log(&format_args!($($t)*).to_string())) } +#[allow(unused_macros)] macro_rules! error { ($($t:tt)*) => (utils::console_error(&format_args!($($t)*).to_string())) } @@ -215,9 +215,9 @@ pub fn spawn_mux_fut( ) { wasm_bindgen_futures::spawn_local(async move { if let Err(e) = fut.await { - error!("epoxy: error in mux future, restarting: {:?}", e); + log!("epoxy: error in mux future, restarting: {:?}", e); while let Err(e) = replace_mux(mux.clone(), &url).await { - error!("epoxy: failed to restart mux future: {:?}", e); + log!("epoxy: failed to restart mux future: {:?}", e); wasmtimer::tokio::sleep(std::time::Duration::from_millis(500)).await; } } @@ -231,7 +231,7 @@ pub async fn replace_mux( ) -> Result<(), WispError> { let (mux_replace, fut) = make_mux(url).await?; let mut mux_write = mux.write().await; - mux_write.close(CloseReason::Unknown).await; + mux_write.close().await; *mux_write = mux_replace; drop(mux_write); spawn_mux_fut(mux, fut, url.into()); diff --git a/server/Cargo.toml b/server/Cargo.toml index 6679a10..7d3d46e 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" bytes = "1.5.0" clap = { version = "4.4.18", features = ["derive", "help", "usage", "color", "wrap_help", "cargo"] } clio = { version = "0.3.5", features = ["clap-parse"] } +console-subscriber = { version = "0.2.0", optional = true } dashmap = "5.5.3" fastwebsockets = { version = "0.6.0", features = ["upgrade", "simdutf8", "unstable-split"] } futures-util = { version = "0.3.30", features = ["sink"] } @@ -16,3 +17,6 @@ hyper-util = { version = "0.1.2", features = ["tokio"] } tokio = { version = "1.5.1", features = ["rt-multi-thread", "macros"] } tokio-util = { version = "0.7.10", features = ["codec"] } wisp-mux = { path = "../wisp", features = ["fastwebsockets", "tokio_io"] } + +[features] +tokio-console = ["tokio/tracing", "dep:console-subscriber"] diff --git a/server/src/main.rs b/server/src/main.rs index c0b78e5..6364c79 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -19,14 +19,12 @@ use tokio_util::codec::{BytesCodec, Framed}; #[cfg(unix)] use tokio_util::either::Either; -use wisp_mux::{ - ws, CloseReason, ConnectPacket, MuxEvent, MuxStream, ServerMux, StreamType, WispError, -}; +use wisp_mux::{CloseReason, ConnectPacket, MuxStream, ServerMux, StreamType, WispError}; type HttpBody = http_body_util::Full; #[derive(Parser)] -#[command(version = clap::crate_version!(), about = "Implementation of the Wisp protocol in Rust, made for epoxy.")] +#[command(version = clap::crate_version!(), about = "Server implementation of the Wisp protocol in Rust, made for epoxy.")] struct Cli { #[arg(long, default_value = "")] prefix: String, @@ -96,6 +94,8 @@ async fn bind(addr: &str, unix: bool) -> Result { #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<(), Error> { + #[cfg(feature = "tokio-console")] + console_subscriber::init(); let opt = Cli::parse(); let addr = if opt.unix_socket { opt.bind_host @@ -137,8 +137,7 @@ async fn accept_http( if uri.is_empty() || uri == "/" { tokio::spawn(async move { accept_ws(fut, addr.clone()).await }); - } else { - let uri = uri.strip_prefix('/').unwrap_or(uri).to_string(); + } else if let Some(uri) = uri.strip_prefix('/').map(|x| x.to_string()) { tokio::spawn(async move { accept_wsproxy(fut, uri, addr.clone()).await }); } @@ -155,10 +154,7 @@ async fn accept_http( } } -async fn handle_mux( - packet: ConnectPacket, - mut stream: MuxStream, -) -> Result { +async fn handle_mux(packet: ConnectPacket, mut stream: MuxStream) -> Result { let uri = format!( "{}:{}", packet.destination_hostname, packet.destination_port @@ -190,12 +186,9 @@ async fn handle_mux( }, event = stream.read() => { match event { - Some(event) => match event { - MuxEvent::Send(data) => { - udp_socket.send(&data).await.map_err(|x| WispError::Other(Box::new(x)))?; - } - MuxEvent::Close(_) => return Ok(false), - }, + Some(event) => { + let _ = udp_socket.send(&event).await.map_err(|x| WispError::Other(Box::new(x)))?; + } None => break, } } diff --git a/simple-wisp-client/Cargo.toml b/simple-wisp-client/Cargo.toml index 0a3eb20..6d02aab 100644 --- a/simple-wisp-client/Cargo.toml +++ b/simple-wisp-client/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] bytes = "1.5.0" +console-subscriber = { version = "0.2.0", optional = true } fastwebsockets = { version = "0.6.0", features = ["unstable-split", "upgrade"] } futures = "0.3.30" http-body-util = "0.1.0" @@ -14,3 +15,6 @@ tokio-native-tls = "0.3.1" tokio-util = "0.7.10" wisp-mux = { path = "../wisp", features = ["fastwebsockets"]} +[features] +tokio-console = ["tokio/tracing", "dep:console-subscriber"] + diff --git a/simple-wisp-client/src/main.rs b/simple-wisp-client/src/main.rs index a0f78e8..39a5c26 100644 --- a/simple-wisp-client/src/main.rs +++ b/simple-wisp-client/src/main.rs @@ -41,8 +41,10 @@ where } } -#[tokio::main] +#[tokio::main(flavor = "multi_thread")] async fn main() -> Result<(), Box> { + #[cfg(feature = "tokio-console")] + console_subscriber::init(); let addr = std::env::args() .nth(1) .ok_or(StrError::new("no src addr"))?; @@ -106,7 +108,7 @@ async fn main() -> Result<(), Box> { .await? .into_io() .into_asyncrw(); - for _ in 0..10 { + for _ in 0..256 { channel.write_all(b"hiiiiiiii").await?; hi += 1; println!("said hi {}", hi); diff --git a/wisp/Cargo.toml b/wisp/Cargo.toml index aa56b57..1c38574 100644 --- a/wisp/Cargo.toml +++ b/wisp/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "wisp-mux" -version = "1.2.2" +version = "2.0.0" license = "LGPL-3.0-only" description = "A library for easily creating Wisp servers and clients." homepage = "https://github.com/MercuryWorkshop/epoxy-tls/tree/multiplexed/wisp" diff --git a/wisp/src/fastwebsockets.rs b/wisp/src/fastwebsockets.rs index 5a81e8d..98dcc55 100644 --- a/wisp/src/fastwebsockets.rs +++ b/wisp/src/fastwebsockets.rs @@ -56,10 +56,10 @@ impl From for crate::WispError { } } -impl crate::ws::WebSocketRead for FragmentCollectorRead { +impl crate::ws::WebSocketRead for FragmentCollectorRead { async fn wisp_read_frame( &mut self, - tx: &crate::ws::LockedWebSocketWrite, + tx: &crate::ws::LockedWebSocketWrite, ) -> Result { Ok(self .read_frame(&mut |frame| async { tx.write_frame(frame.into()).await }) @@ -68,7 +68,7 @@ impl crate::ws::WebSocketRead for FragmentCollector } } -impl crate::ws::WebSocketWrite for WebSocketWrite { +impl crate::ws::WebSocketWrite for WebSocketWrite { async fn wisp_write_frame(&mut self, frame: crate::ws::Frame) -> Result<(), crate::WispError> { self.write_frame(frame.into()) .await diff --git a/wisp/src/lib.rs b/wisp/src/lib.rs index bf07ea2..9c211d7 100644 --- a/wisp/src/lib.rs +++ b/wisp/src/lib.rs @@ -15,6 +15,7 @@ pub mod ws; pub use crate::packet::*; pub use crate::stream::*; +use bytes::Bytes; use event_listener::Event; use futures::{channel::mpsc, lock::Mutex, Future, FutureExt, StreamExt}; use std::{ @@ -95,11 +96,11 @@ impl std::fmt::Display for WispError { StreamAlreadyClosed => write!(f, "Stream already closed"), WsFrameInvalidType => write!(f, "Invalid websocket frame type"), WsFrameNotFinished => write!(f, "Unfinished websocket frame"), - WsImplError(err) => write!(f, "Websocket implementation error: {:?}", err), + WsImplError(err) => write!(f, "Websocket implementation error: {}", err), WsImplSocketClosed => write!(f, "Websocket implementation error: websocket closed"), WsImplNotSupported => write!(f, "Websocket implementation error: unsupported feature"), - Utf8Error(err) => write!(f, "UTF-8 error: {:?}", err), - Other(err) => write!(f, "Other error: {:?}", err), + Utf8Error(err) => write!(f, "UTF-8 error: {}", err), + Other(err) => write!(f, "Other error: {}", err), } } } @@ -107,27 +108,28 @@ impl std::fmt::Display for WispError { impl std::error::Error for WispError {} struct MuxMapValue { - stream: mpsc::UnboundedSender, + stream: mpsc::UnboundedSender, stream_type: StreamType, flow_control: Arc, flow_control_event: Arc, + is_closed: Arc, } struct ServerMuxInner where - W: ws::WebSocketWrite + Send + 'static, + W: ws::WebSocketWrite, { tx: ws::LockedWebSocketWrite, stream_map: Arc>>, close_tx: mpsc::UnboundedSender, } -impl ServerMuxInner { +impl ServerMuxInner { pub async fn into_future( self, rx: R, close_rx: mpsc::UnboundedReceiver, - muxstream_sender: mpsc::UnboundedSender<(ConnectPacket, MuxStream)>, + muxstream_sender: mpsc::UnboundedSender<(ConnectPacket, MuxStream)>, buffer_size: u32, ) -> Result<(), WispError> where @@ -137,10 +139,10 @@ impl ServerMuxInner { x = self.server_bg_loop(close_rx).fuse() => x, x = self.server_msg_loop(rx, muxstream_sender, buffer_size).fuse() => x }; - self.stream_map.lock().await.drain().for_each(|x| { - let _ = - x.1.stream - .unbounded_send(MuxEvent::Close(ClosePacket::new(CloseReason::Unknown))); + self.stream_map.lock().await.drain().for_each(|mut x| { + x.1.is_closed.store(true, Ordering::Release); + x.1.stream.disconnect(); + x.1.stream.close_channel(); }); ret } @@ -151,13 +153,26 @@ impl ServerMuxInner { ) -> Result<(), WispError> { while let Some(msg) = close_rx.next().await { match msg { - WsEvent::Close(stream_id, reason, channel) => { - if self.stream_map.lock().await.remove(&stream_id).is_some() { - let _ = channel.send( - self.tx - .write_frame(Packet::new_close(stream_id, reason).into()) - .await, - ); + WsEvent::SendPacket(packet, channel) => { + if self + .stream_map + .lock() + .await + .get(&packet.stream_id) + .is_some() + { + let _ = channel.send(self.tx.write_frame(packet.into()).await); + } else { + let _ = channel.send(Err(WispError::InvalidStreamId)); + } + } + WsEvent::Close(packet, channel) => { + if let Some(mut stream) = + self.stream_map.lock().await.remove(&packet.stream_id) + { + stream.stream.disconnect(); + stream.stream.close_channel(); + let _ = channel.send(self.tx.write_frame(packet.into()).await); } else { let _ = channel.send(Err(WispError::InvalidStreamId)); } @@ -171,12 +186,14 @@ impl ServerMuxInner { async fn server_msg_loop( &self, mut rx: R, - muxstream_sender: mpsc::UnboundedSender<(ConnectPacket, MuxStream)>, + muxstream_sender: mpsc::UnboundedSender<(ConnectPacket, MuxStream)>, buffer_size: u32, ) -> Result<(), WispError> where R: ws::WebSocketRead, { + // will send continues once flow_control is at 10% of max + let target_buffer_size = buffer_size * 90 / 100; self.tx .write_frame(Packet::new_continue(0, buffer_size).into()) .await?; @@ -195,6 +212,7 @@ impl ServerMuxInner { let stream_type = inner_packet.stream_type; let flow_control: Arc = AtomicU32::new(buffer_size).into(); let flow_control_event: Arc = Event::new().into(); + let is_closed: Arc = AtomicBool::new(false).into(); self.stream_map.lock().await.insert( packet.stream_id, @@ -203,6 +221,7 @@ impl ServerMuxInner { stream_type, flow_control: flow_control.clone(), flow_control_event: flow_control_event.clone(), + is_closed: is_closed.clone(), }, ); muxstream_sender @@ -213,18 +232,18 @@ impl ServerMuxInner { Role::Server, stream_type, ch_rx, - self.tx.clone(), self.close_tx.clone(), - AtomicBool::new(false).into(), + is_closed, flow_control, flow_control_event, + target_buffer_size, ), )) .map_err(|x| WispError::Other(Box::new(x)))?; } Data(data) => { if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { - let _ = stream.stream.unbounded_send(MuxEvent::Send(data)); + let _ = stream.stream.unbounded_send(data); if stream.stream_type == StreamType::Tcp { stream.flow_control.store( stream @@ -237,11 +256,14 @@ impl ServerMuxInner { } } Continue(_) => unreachable!(), - Close(inner_packet) => { - if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { - let _ = stream.stream.unbounded_send(MuxEvent::Close(inner_packet)); + Close(_) => { + if let Some(mut stream) = + self.stream_map.lock().await.remove(&packet.stream_id) + { + stream.is_closed.store(true, Ordering::Release); + stream.stream.disconnect(); + stream.stream.close_channel(); } - self.stream_map.lock().await.remove(&packet.stream_id); } } } @@ -267,18 +289,15 @@ impl ServerMuxInner { /// }); /// } /// ``` -pub struct ServerMux -where - W: ws::WebSocketWrite + Send + 'static, -{ +pub struct ServerMux { stream_map: Arc>>, close_tx: mpsc::UnboundedSender, - muxstream_recv: mpsc::UnboundedReceiver<(ConnectPacket, MuxStream)>, + muxstream_recv: mpsc::UnboundedReceiver<(ConnectPacket, MuxStream)>, } -impl ServerMux { +impl ServerMux { /// Create a new server-side multiplexor. - pub fn new( + pub fn new( read: R, write: W, buffer_size: u32, @@ -287,7 +306,7 @@ impl ServerMux { R: ws::WebSocketRead, { let (close_tx, close_rx) = mpsc::unbounded::(); - let (tx, rx) = mpsc::unbounded::<(ConnectPacket, MuxStream)>(); + let (tx, rx) = mpsc::unbounded::<(ConnectPacket, MuxStream)>(); let write = ws::LockedWebSocketWrite::new(write); let map = Arc::new(Mutex::new(HashMap::new())); ( @@ -306,7 +325,7 @@ impl ServerMux { } /// Wait for a stream to be created. - pub async fn server_new_stream(&mut self) -> Option<(ConnectPacket, MuxStream)> { + pub async fn server_new_stream(&mut self) -> Option<(ConnectPacket, MuxStream)> { self.muxstream_recv.next().await } @@ -314,11 +333,11 @@ impl ServerMux { /// /// Also terminates the multiplexor future. Waiting for a new stream will never succeed after /// this function is called. - pub async fn close(&self, reason: CloseReason) { - self.stream_map.lock().await.drain().for_each(|x| { - let _ = - x.1.stream - .unbounded_send(MuxEvent::Close(ClosePacket::new(reason))); + pub async fn close(&self) { + self.stream_map.lock().await.drain().for_each(|mut x| { + x.1.is_closed.store(true, Ordering::Release); + x.1.stream.disconnect(); + x.1.stream.close_channel(); }); let _ = self.close_tx.unbounded_send(WsEvent::EndFut); } @@ -332,7 +351,7 @@ where stream_map: Arc>>, } -impl ClientMuxInner { +impl ClientMuxInner { pub(crate) async fn into_future( self, rx: R, @@ -341,10 +360,16 @@ impl ClientMuxInner { where R: ws::WebSocketRead, { - futures::select! { + let ret = futures::select! { x = self.client_bg_loop(close_rx).fuse() => x, x = self.client_loop(rx).fuse() => x - } + }; + self.stream_map.lock().await.drain().for_each(|mut x| { + x.1.is_closed.store(true, Ordering::Release); + x.1.stream.disconnect(); + x.1.stream.close_channel(); + }); + ret } async fn client_bg_loop( @@ -353,13 +378,26 @@ impl ClientMuxInner { ) -> Result<(), WispError> { while let Some(msg) = close_rx.next().await { match msg { - WsEvent::Close(stream_id, reason, channel) => { - if self.stream_map.lock().await.remove(&stream_id).is_some() { - let _ = channel.send( - self.tx - .write_frame(Packet::new_close(stream_id, reason).into()) - .await, - ); + WsEvent::SendPacket(packet, channel) => { + if self + .stream_map + .lock() + .await + .get(&packet.stream_id) + .is_some() + { + let _ = channel.send(self.tx.write_frame(packet.into()).await); + } else { + let _ = channel.send(Err(WispError::InvalidStreamId)); + } + } + WsEvent::Close(packet, channel) => { + if let Some(mut stream) = + self.stream_map.lock().await.remove(&packet.stream_id) + { + stream.stream.disconnect(); + stream.stream.close_channel(); + let _ = channel.send(self.tx.write_frame(packet.into()).await); } else { let _ = channel.send(Err(WispError::InvalidStreamId)); } @@ -386,7 +424,7 @@ impl ClientMuxInner { Connect(_) => unreachable!(), Data(data) => { if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { - let _ = stream.stream.unbounded_send(MuxEvent::Send(data)); + let _ = stream.stream.unbounded_send(data); } } Continue(inner_packet) => { @@ -399,11 +437,14 @@ impl ClientMuxInner { } } } - Close(inner_packet) => { - if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) { - let _ = stream.stream.unbounded_send(MuxEvent::Close(inner_packet)); + Close(_) => { + if let Some(mut stream) = + self.stream_map.lock().await.remove(&packet.stream_id) + { + stream.is_closed.store(true, Ordering::Release); + stream.stream.disconnect(); + stream.stream.close_channel(); } - self.stream_map.lock().await.remove(&packet.stream_id); } } } @@ -433,9 +474,10 @@ where next_free_stream_id: AtomicU32, close_tx: mpsc::UnboundedSender, buf_size: u32, + target_buf_size: u32, } -impl ClientMux { +impl ClientMux { /// Create a new client side multiplexor. pub async fn new( mut read: R, @@ -459,6 +501,8 @@ impl ClientMux { next_free_stream_id: AtomicU32::new(1), close_tx: tx, buf_size: packet.buffer_remaining, + // server-only + target_buf_size: 0, }, ClientMuxInner { tx: write.clone(), @@ -477,39 +521,46 @@ impl ClientMux { stream_type: StreamType, host: String, port: u16, - ) -> Result, WispError> { + ) -> Result { let (ch_tx, ch_rx) = mpsc::unbounded(); - let evt: Arc = Event::new().into(); - let flow_control: Arc = AtomicU32::new(self.buf_size).into(); let stream_id = self.next_free_stream_id.load(Ordering::Acquire); + let next_stream_id = stream_id + .checked_add(1) + .ok_or(WispError::MaxStreamCountReached)?; + + let flow_control_event: Arc = Event::new().into(); + let flow_control: Arc = AtomicU32::new(self.buf_size).into(); + + let is_closed: Arc = AtomicBool::new(false).into(); + self.tx .write_frame(Packet::new_connect(stream_id, stream_type, port, host).into()) .await?; - self.next_free_stream_id.store( - stream_id - .checked_add(1) - .ok_or(WispError::MaxStreamCountReached)?, - Ordering::Release, - ); + + self.next_free_stream_id + .store(next_stream_id, Ordering::Release); + self.stream_map.lock().await.insert( stream_id, MuxMapValue { stream: ch_tx, stream_type, flow_control: flow_control.clone(), - flow_control_event: evt.clone(), + flow_control_event: flow_control_event.clone(), + is_closed: is_closed.clone(), }, ); + Ok(MuxStream::new( stream_id, Role::Client, stream_type, ch_rx, - self.tx.clone(), self.close_tx.clone(), - AtomicBool::new(false).into(), + is_closed, flow_control, - evt, + flow_control_event, + self.target_buf_size, )) } @@ -517,11 +568,11 @@ impl ClientMux { /// /// Also terminates the multiplexor future. Creating a stream is UB after calling this /// function. - pub async fn close(&self, reason: CloseReason) { - self.stream_map.lock().await.drain().for_each(|x| { - let _ = - x.1.stream - .unbounded_send(MuxEvent::Close(ClosePacket::new(reason))); + pub async fn close(&self) { + self.stream_map.lock().await.drain().for_each(|mut x| { + x.1.is_closed.store(true, Ordering::Release); + x.1.stream.disconnect(); + x.1.stream.close_channel(); }); let _ = self.close_tx.unbounded_send(WsEvent::EndFut); } diff --git a/wisp/src/stream.rs b/wisp/src/stream.rs index 3364101..524c04d 100644 --- a/wisp/src/stream.rs +++ b/wisp/src/stream.rs @@ -1,4 +1,4 @@ -use crate::{sink_unfold, ws, ClosePacket, CloseReason, Packet, Role, StreamType, WispError}; +use crate::{sink_unfold, CloseReason, Packet, Role, StreamType, WispError}; use async_io_stream::IoStream; use bytes::Bytes; @@ -18,91 +18,75 @@ use std::{ }, }; -/// Multiplexor event recieved from a Wisp stream. -pub enum MuxEvent { - /// The other side has sent data. - Send(Bytes), - /// The other side has closed. - Close(ClosePacket), -} - pub(crate) enum WsEvent { - Close(u32, CloseReason, oneshot::Sender>), + SendPacket(Packet, oneshot::Sender>), + Close(Packet, oneshot::Sender>), EndFut, } /// Read side of a multiplexor stream. -pub struct MuxStreamRead -where - W: ws::WebSocketWrite, -{ +pub struct MuxStreamRead { /// ID of the stream. pub stream_id: u32, /// Type of the stream. pub stream_type: StreamType, role: Role, - tx: ws::LockedWebSocketWrite, - rx: mpsc::UnboundedReceiver, + tx: mpsc::UnboundedSender, + rx: mpsc::UnboundedReceiver, is_closed: Arc, flow_control: Arc, + flow_control_read: AtomicU32, + target_flow_control: u32, } -impl MuxStreamRead { +impl MuxStreamRead { /// Read an event from the stream. - pub async fn read(&mut self) -> Option { + pub async fn read(&mut self) -> Option { if self.is_closed.load(Ordering::Acquire) { return None; } - match self.rx.next().await? { - MuxEvent::Send(bytes) => { - if self.role == Role::Server && self.stream_type == StreamType::Tcp { - let old_val = self.flow_control.fetch_add(1, Ordering::AcqRel); - self.tx - .write_frame(Packet::new_continue(self.stream_id, old_val + 1).into()) - .await - .ok()?; - } - Some(MuxEvent::Send(bytes)) - } - MuxEvent::Close(packet) => { - self.is_closed.store(true, Ordering::Release); - Some(MuxEvent::Close(packet)) + let bytes = self.rx.next().await?; + if self.role == Role::Server && self.stream_type == StreamType::Tcp { + let val = self.flow_control_read.fetch_add(1, Ordering::AcqRel) + 1; + if val > self.target_flow_control { + let (tx, rx) = oneshot::channel::>(); + self.tx + .unbounded_send(WsEvent::SendPacket( + Packet::new_continue( + self.stream_id, + self.flow_control.fetch_add(val, Ordering::AcqRel) + val, + ), + tx, + )) + .ok()?; + rx.await.ok()?.ok()?; + self.flow_control_read.store(0, Ordering::Release); } } + Some(bytes) } pub(crate) fn into_stream(self) -> Pin + Send>> { Box::pin(stream::unfold(self, |mut rx| async move { - let evt = rx.read().await?; - Some(( - match evt { - MuxEvent::Send(bytes) => bytes, - MuxEvent::Close(_) => return None, - }, - rx, - )) + Some((rx.read().await?, rx)) })) } } /// Write side of a multiplexor stream. -pub struct MuxStreamWrite -where - W: ws::WebSocketWrite, -{ +pub struct MuxStreamWrite { /// ID of the stream. pub stream_id: u32, /// Type of the stream. pub stream_type: StreamType, role: Role, - tx: ws::LockedWebSocketWrite, - close_channel: mpsc::UnboundedSender, + tx: mpsc::UnboundedSender, is_closed: Arc, continue_recieved: Arc, flow_control: Arc, } -impl MuxStreamWrite { +impl MuxStreamWrite { /// Write data to the stream. pub async fn write(&self, data: Bytes) -> Result<(), WispError> { if self.is_closed.load(Ordering::Acquire) { @@ -114,9 +98,14 @@ impl MuxStreamWrite { { self.continue_recieved.listen().await; } + let (tx, rx) = oneshot::channel::>(); self.tx - .write_frame(Packet::new_data(self.stream_id, data).into()) - .await?; + .unbounded_send(WsEvent::SendPacket( + Packet::new_data(self.stream_id, data), + tx, + )) + .map_err(|x| WispError::Other(Box::new(x)))?; + rx.await.map_err(|x| WispError::Other(Box::new(x)))??; if self.role == Role::Client && self.stream_type == StreamType::Tcp { self.flow_control.store( self.flow_control.load(Ordering::Acquire).saturating_sub(1), @@ -140,7 +129,7 @@ impl MuxStreamWrite { pub fn get_close_handle(&self) -> MuxStreamCloser { MuxStreamCloser { stream_id: self.stream_id, - close_channel: self.close_channel.clone(), + close_channel: self.tx.clone(), is_closed: self.is_closed.clone(), } } @@ -150,13 +139,17 @@ impl MuxStreamWrite { if self.is_closed.load(Ordering::Acquire) { return Err(WispError::StreamAlreadyClosed); } + self.is_closed.store(true, Ordering::Release); + let (tx, rx) = oneshot::channel::>(); - self.close_channel - .unbounded_send(WsEvent::Close(self.stream_id, reason, tx)) + self.tx + .unbounded_send(WsEvent::Close( + Packet::new_close(self.stream_id, reason), + tx, + )) .map_err(|x| WispError::Other(Box::new(x)))?; rx.await.map_err(|x| WispError::Other(Box::new(x)))??; - self.is_closed.store(true, Ordering::Release); Ok(()) } @@ -173,40 +166,36 @@ impl MuxStreamWrite { } } -impl Drop for MuxStreamWrite { +impl Drop for MuxStreamWrite { fn drop(&mut self) { let (tx, _) = oneshot::channel::>(); - let _ = self.close_channel.unbounded_send(WsEvent::Close( - self.stream_id, - CloseReason::Unknown, + let _ = self.tx.unbounded_send(WsEvent::Close( + Packet::new_close(self.stream_id, CloseReason::Unknown), tx, )); } } /// Multiplexor stream. -pub struct MuxStream -where - W: ws::WebSocketWrite, -{ +pub struct MuxStream { /// ID of the stream. pub stream_id: u32, - rx: MuxStreamRead, - tx: MuxStreamWrite, + rx: MuxStreamRead, + tx: MuxStreamWrite, } -impl MuxStream { +impl MuxStream { #[allow(clippy::too_many_arguments)] pub(crate) fn new( stream_id: u32, role: Role, stream_type: StreamType, - rx: mpsc::UnboundedReceiver, - tx: ws::LockedWebSocketWrite, - close_channel: mpsc::UnboundedSender, + rx: mpsc::UnboundedReceiver, + tx: mpsc::UnboundedSender, is_closed: Arc, flow_control: Arc, continue_recieved: Arc, + target_flow_control: u32, ) -> Self { Self { stream_id, @@ -218,13 +207,14 @@ impl MuxStream { rx, is_closed: is_closed.clone(), flow_control: flow_control.clone(), + flow_control_read: AtomicU32::new(0), + target_flow_control, }, tx: MuxStreamWrite { stream_id, stream_type, role, tx, - close_channel, is_closed: is_closed.clone(), flow_control: flow_control.clone(), continue_recieved: continue_recieved.clone(), @@ -233,7 +223,7 @@ impl MuxStream { } /// Read an event from the stream. - pub async fn read(&mut self) -> Option { + pub async fn read(&mut self) -> Option { self.rx.read().await } @@ -263,7 +253,7 @@ impl MuxStream { } /// Split the stream into read and write parts, consuming it. - pub fn into_split(self) -> (MuxStreamRead, MuxStreamWrite) { + pub fn into_split(self) -> (MuxStreamRead, MuxStreamWrite) { (self.rx, self.tx) } @@ -291,25 +281,34 @@ impl MuxStreamCloser { if self.is_closed.load(Ordering::Acquire) { return Err(WispError::StreamAlreadyClosed); } + self.is_closed.store(true, Ordering::Release); + let (tx, rx) = oneshot::channel::>(); self.close_channel - .unbounded_send(WsEvent::Close(self.stream_id, reason, tx)) + .unbounded_send(WsEvent::Close( + Packet::new_close(self.stream_id, reason), + tx, + )) .map_err(|x| WispError::Other(Box::new(x)))?; rx.await.map_err(|x| WispError::Other(Box::new(x)))??; - self.is_closed.store(true, Ordering::Release); + Ok(()) } - /// Close the stream. This function does not check if it was actually closed. pub(crate) fn close_sync(&self, reason: CloseReason) -> Result<(), WispError> { if self.is_closed.load(Ordering::Acquire) { return Err(WispError::StreamAlreadyClosed); } + self.is_closed.store(true, Ordering::Release); + let (tx, _) = oneshot::channel::>(); self.close_channel - .unbounded_send(WsEvent::Close(self.stream_id, reason, tx)) + .unbounded_send(WsEvent::Close( + Packet::new_close(self.stream_id, reason), + tx, + )) .map_err(|x| WispError::Other(Box::new(x)))?; - self.is_closed.store(true, Ordering::Release); + Ok(()) } } diff --git a/wisp/src/ws.rs b/wisp/src/ws.rs index 610351c..c65cfd9 100644 --- a/wisp/src/ws.rs +++ b/wisp/src/ws.rs @@ -1,10 +1,9 @@ //! Abstraction over WebSocket implementations. //! -//! Use the [`fastwebsockets`] and [`ws_stream_wasm`] implementations of these traits as an example -//! for implementing them for other WebSocket implementations. +//! Use the [`fastwebsockets`] implementation of these traits as an example for implementing them +//! for other WebSocket implementations. //! //! [`fastwebsockets`]: https://github.com/MercuryWorkshop/epoxy-tls/blob/multiplexed/wisp/src/fastwebsockets.rs -//! [`ws_stream_wasm`]: https://github.com/MercuryWorkshop/epoxy-tls/blob/multiplexed/wisp/src/ws_stream_wasm.rs use bytes::Bytes; use futures::lock::Mutex; use std::sync::Arc; @@ -68,8 +67,8 @@ pub trait WebSocketRead { /// Read a frame from the socket. fn wisp_read_frame( &mut self, - tx: &crate::ws::LockedWebSocketWrite, - ) -> impl std::future::Future> + Send; + tx: &crate::ws::LockedWebSocketWrite, + ) -> impl std::future::Future>; } /// Generic WebSocket write trait. @@ -78,13 +77,13 @@ pub trait WebSocketWrite { fn wisp_write_frame( &mut self, frame: Frame, - ) -> impl std::future::Future> + Send; + ) -> impl std::future::Future>; } /// Locked WebSocket that can be shared between threads. pub struct LockedWebSocketWrite(Arc>); -impl LockedWebSocketWrite { +impl LockedWebSocketWrite { /// Create a new locked websocket. pub fn new(ws: S) -> Self { Self(Arc::new(Mutex::new(ws)))