From 1b7f5a10c08798db0974094af30b595270e9bd77 Mon Sep 17 00:00:00 2001 From: Toshit Chawda Date: Sun, 24 Nov 2024 16:47:30 -0800 Subject: [PATCH] main spawns the REAL main --- .cargo/config.toml | 5 -- Cargo.lock | 41 ++++++--- client/Cargo.toml | 2 +- server/Cargo.toml | 2 + server/src/handle/wisp/mod.rs | 4 +- server/src/main.rs | 152 ++++++++++++++++++--------------- simple-wisp-client/src/main.rs | 4 + wisp/Cargo.toml | 2 +- 8 files changed, 122 insertions(+), 90 deletions(-) delete mode 100644 .cargo/config.toml diff --git a/.cargo/config.toml b/.cargo/config.toml deleted file mode 100644 index 9d76a7f..0000000 --- a/.cargo/config.toml +++ /dev/null @@ -1,5 +0,0 @@ -[build] -rustflags = ["--cfg", "tokio_unstable"] - -[target.wasm32-unknown-unknown] -runner = "wasm-bindgen-test-runner" diff --git a/Cargo.lock b/Cargo.lock index e7df3e4..e01bb2d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -720,7 +720,7 @@ dependencies = [ "rustls-pki-types", "rustls-webpki", "send_wrapper", - "thiserror", + "thiserror 2.0.3", "tokio", "wasm-bindgen", "wasm-bindgen-futures", @@ -740,6 +740,7 @@ dependencies = [ "bytes", "cfg-if", "clap", + "console-subscriber", "dashmap", "ed25519-dalek", "env_logger", @@ -815,7 +816,7 @@ dependencies = [ "rand", "sha1", "simdutf8", - "thiserror", + "thiserror 1.0.65", "tokio", "utf-8", ] @@ -1090,7 +1091,7 @@ dependencies = [ "ipnet", "once_cell", "rand", - "thiserror", + "thiserror 1.0.65", "tinyvec", "tokio", "tracing", @@ -1113,7 +1114,7 @@ dependencies = [ "rand", "resolv-conf", "smallvec", - "thiserror", + "thiserror 1.0.65", "tokio", "tracing", ] @@ -1692,9 +1693,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.89" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f139b0662de085916d1fb67d2b4169d1addddda1919e696f3252b740b629986e" +checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0" dependencies = [ "unicode-ident", ] @@ -2185,9 +2186,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "2.0.85" +version = "2.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5023162dfcd14ef8f32034d8bcd4cc5ddc61ef7a247c024a33e24e1f24d21b56" +checksum = "44d46482f1c1c87acd84dea20c1bf5ebff4c757009ed6bf19cfd36fb10e92c4e" dependencies = [ "proc-macro2", "quote", @@ -2212,7 +2213,16 @@ version = "1.0.65" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d11abd9594d9b38965ef50805c5e469ca9cc6f197f883f717e0269a3057b3d5" dependencies = [ - "thiserror-impl", + "thiserror-impl 1.0.65", +] + +[[package]] +name = "thiserror" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c006c85c7651b3cf2ada4584faa36773bd07bac24acfb39f3c431b36d7e667aa" +dependencies = [ + "thiserror-impl 2.0.3", ] [[package]] @@ -2226,6 +2236,17 @@ dependencies = [ "syn", ] +[[package]] +name = "thiserror-impl" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thread_local" version = "1.1.8" @@ -3022,7 +3043,7 @@ dependencies = [ "nohash-hasher", "pin-project-lite", "reusable-box-future", - "thiserror", + "thiserror 2.0.3", "tokio", ] diff --git a/client/Cargo.toml b/client/Cargo.toml index c98a68a..07a61b5 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -25,7 +25,7 @@ pin-project-lite = "0.2.14" rustls-pemfile = { version = "2.1.3", optional = true } rustls-webpki = { version = "0.102.7", optional = true } send_wrapper = { version = "0.6.0", features = ["futures"] } -thiserror = "1.0.63" +thiserror = "2.0.3" tokio = "1.39.3" wasm-bindgen = "0.2.93" wasm-bindgen-futures = "0.4.43" diff --git a/server/Cargo.toml b/server/Cargo.toml index 98e0388..9788719 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,6 +10,7 @@ async-trait = { version = "0.1.81", optional = true } bytes = "1.7.1" cfg-if = "1.0.0" clap = { version = "4.5.16", features = ["cargo", "derive"] } +console-subscriber = { version = "0.4.1", optional = true } dashmap = "6.0.1" ed25519-dalek = { version = "2.1.1", features = ["pem"] } env_logger = "0.11.5" @@ -50,6 +51,7 @@ toml = ["dep:toml"] twisp = ["dep:pty-process", "dep:libc", "dep:async-trait", "dep:shell-words"] speed-limit = ["dep:async-speed-limit"] +tokio-console = ["dep:console-subscriber", "tokio/tracing"] [build-dependencies] vergen-git2 = { version = "1.0.0", features = ["rustc"] } diff --git a/server/src/handle/wisp/mod.rs b/server/src/handle/wisp/mod.rs index b7b5f3c..49fd7ef 100644 --- a/server/src/handle/wisp/mod.rs +++ b/server/src/handle/wisp/mod.rs @@ -290,9 +290,7 @@ pub async fn handle_wisp(stream: WispResult, is_v2: bool, id: String) -> anyhow: let event: Arc = Event::new().into(); let mux_id = id.clone(); - set.spawn(tokio::task::unconstrained(fut.map(move |x| { - debug!("wisp client id {:?} multiplexor result {:?}", mux_id, x) - }))); + set.spawn(fut.map(move |x| debug!("wisp client id {:?} multiplexor result {:?}", mux_id, x))); let ping_mux = mux.clone(); let ping_event = event.clone(); diff --git a/server/src/main.rs b/server/src/main.rs index 3393bba..89a784e 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -4,7 +4,7 @@ use std::{fs::read_to_string, net::IpAddr}; -use anyhow::Context; +use anyhow::{Context, Result}; use clap::Parser; use config::{validate_config_cache, Cli, Config, RuntimeFlavor}; use dashmap::DashMap; @@ -114,7 +114,7 @@ lazy_static! { static JEMALLOCATOR: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; #[doc(hidden)] -fn main() -> anyhow::Result<()> { +fn main() -> Result<()> { if CLI.default_config { println!("{}", Config::default().ser()?); return Ok(()); @@ -135,88 +135,100 @@ fn main() -> anyhow::Result<()> { builder.enable_all(); let rt = builder.build()?; - rt.block_on(async { - validate_config_cache().await; + rt.block_on(async move { + tokio::spawn(async_main()).await??; + Ok(()) + }) +} - info!( - "listening on {:?} with runtime flavor {:?} and socket transport {:?}", - CONFIG.server.bind, CONFIG.server.runtime, CONFIG.server.transport - ); +#[doc(hidden)] +async fn async_main() -> Result<()> { + #[cfg(feature = "tokio-console")] + console_subscriber::init(); - trace!("CLI: {:#?}", &*CLI); - trace!("CONFIG: {:#?}", &*CONFIG); - trace!("RESOLVER: {:?}", &*RESOLVER); + validate_config_cache().await; - tokio::spawn(async { - let mut sig = signal(SignalKind::user_defined1()).unwrap(); - while sig.recv().await.is_some() { - match generate_stats() { - Ok(stats) => info!("Stats:\n{}", stats), - Err(err) => error!("error while creating stats {:?}", err), - } + info!( + "listening on {:?} with runtime flavor {:?} and socket transport {:?}", + CONFIG.server.bind, CONFIG.server.runtime, CONFIG.server.transport + ); + + trace!("CLI: {:#?}", &*CLI); + trace!("CONFIG: {:#?}", &*CONFIG); + trace!("RESOLVER: {:?}", &*RESOLVER); + + tokio::spawn(async { + let mut sig = signal(SignalKind::user_defined1()).unwrap(); + while sig.recv().await.is_some() { + match generate_stats() { + Ok(stats) => info!("Stats:\n{}", stats), + Err(err) => error!("error while creating stats {:?}", err), } - }); + } + }); - let mut listener = ServerListener::new(&CONFIG.server.bind) - .await - .with_context(|| format!("failed to bind to address {}", CONFIG.server.bind.1))?; + let mut listener = ServerListener::new(&CONFIG.server.bind) + .await + .with_context(|| format!("failed to bind to address {}", CONFIG.server.bind.1))?; - if let Some(bind_addr) = CONFIG - .server - .stats_endpoint - .as_ref() - .and_then(|x| x.get_bindaddr()) - { - info!("stats server listening on {:?}", bind_addr); - let mut stats_listener = ServerListener::new(&bind_addr).await.with_context(|| { - format!("failed to bind to address {} for stats server", bind_addr.1) - })?; + if let Some(bind_addr) = CONFIG + .server + .stats_endpoint + .as_ref() + .and_then(|x| x.get_bindaddr()) + { + info!("stats server listening on {:?}", bind_addr); + let mut stats_listener = ServerListener::new(&bind_addr).await.with_context(|| { + format!("failed to bind to address {} for stats server", bind_addr.1) + })?; - tokio::spawn(async move { - loop { - match stats_listener.accept().await { - Ok((stream, _)) => { + tokio::spawn(async move { + loop { + match stats_listener.accept().await { + Ok((stream, _)) => { + tokio::spawn(async move { if let Err(e) = route_stats(stream).await { error!("error while routing stats client: {:?}", e); } - } - Err(e) => error!("error while accepting stats client: {:?}", e), + }); } + Err(e) => error!("error while accepting stats client: {:?}", e), } - }); - } - - let stats_endpoint = CONFIG - .server - .stats_endpoint - .as_ref() - .and_then(|x| x.get_endpoint()); - loop { - let stats_endpoint = stats_endpoint.clone(); - match listener.accept().await { - Ok((stream, client_id)) => { - tokio::spawn(async move { - let res = route::route(stream, stats_endpoint, move |stream, maybe_ip| { - let client_id = if let Some(ip) = maybe_ip { - format!("{} ({})", client_id, ip) - } else { - client_id - }; - - trace!("routed {:?}: {}", client_id, stream); - handle_stream(stream, client_id) - }) - .await; - - if let Err(e) = res { - error!("error while routing client: {:?}", e); - } - }); - } - Err(e) => error!("error while accepting client: {:?}", e), } + }); + } + + let stats_endpoint = CONFIG + .server + .stats_endpoint + .as_ref() + .and_then(|x| x.get_endpoint()); + + loop { + let stats_endpoint = stats_endpoint.clone(); + match listener.accept().await { + Ok((stream, client_id)) => { + tokio::spawn(async move { + let res = route::route(stream, stats_endpoint, move |stream, maybe_ip| { + let client_id = if let Some(ip) = maybe_ip { + format!("{} ({})", client_id, ip) + } else { + client_id + }; + + trace!("routed {:?}: {}", client_id, stream); + handle_stream(stream, client_id) + }) + .await; + + if let Err(e) = res { + error!("error while routing client: {:?}", e); + } + }); + } + Err(e) => error!("error while accepting client: {:?}", e), } - }) + } } #[doc(hidden)] diff --git a/simple-wisp-client/src/main.rs b/simple-wisp-client/src/main.rs index 58ae617..b2391cb 100644 --- a/simple-wisp-client/src/main.rs +++ b/simple-wisp-client/src/main.rs @@ -121,6 +121,10 @@ async fn get_cert(path: PathBuf) -> Result Result<(), Box> { + tokio::spawn(real_main()).await? +} + +async fn real_main() -> Result<(), Box> { #[cfg(feature = "tokio-console")] console_subscriber::init(); let opts = Cli::parse(); diff --git a/wisp/Cargo.toml b/wisp/Cargo.toml index a2a1159..f85edc6 100644 --- a/wisp/Cargo.toml +++ b/wisp/Cargo.toml @@ -24,7 +24,7 @@ getrandom = { version = "0.2.15", features = ["std"], optional = true } nohash-hasher = "0.2.0" pin-project-lite = "0.2.14" reusable-box-future = "0.2.0" -thiserror = "1.0.65" +thiserror = "2.0.3" tokio = { version = "1.39.3", optional = true, default-features = false } [features]