main spawns the REAL main

This commit is contained in:
Toshit Chawda 2024-11-24 16:47:30 -08:00
parent 5a48c10cd9
commit 1b7f5a10c0
No known key found for this signature in database
GPG key ID: 91480ED99E2B3D9D
8 changed files with 122 additions and 90 deletions

View file

@ -1,5 +0,0 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]
[target.wasm32-unknown-unknown]
runner = "wasm-bindgen-test-runner"

41
Cargo.lock generated
View file

@ -720,7 +720,7 @@ dependencies = [
"rustls-pki-types", "rustls-pki-types",
"rustls-webpki", "rustls-webpki",
"send_wrapper", "send_wrapper",
"thiserror", "thiserror 2.0.3",
"tokio", "tokio",
"wasm-bindgen", "wasm-bindgen",
"wasm-bindgen-futures", "wasm-bindgen-futures",
@ -740,6 +740,7 @@ dependencies = [
"bytes", "bytes",
"cfg-if", "cfg-if",
"clap", "clap",
"console-subscriber",
"dashmap", "dashmap",
"ed25519-dalek", "ed25519-dalek",
"env_logger", "env_logger",
@ -815,7 +816,7 @@ dependencies = [
"rand", "rand",
"sha1", "sha1",
"simdutf8", "simdutf8",
"thiserror", "thiserror 1.0.65",
"tokio", "tokio",
"utf-8", "utf-8",
] ]
@ -1090,7 +1091,7 @@ dependencies = [
"ipnet", "ipnet",
"once_cell", "once_cell",
"rand", "rand",
"thiserror", "thiserror 1.0.65",
"tinyvec", "tinyvec",
"tokio", "tokio",
"tracing", "tracing",
@ -1113,7 +1114,7 @@ dependencies = [
"rand", "rand",
"resolv-conf", "resolv-conf",
"smallvec", "smallvec",
"thiserror", "thiserror 1.0.65",
"tokio", "tokio",
"tracing", "tracing",
] ]
@ -1692,9 +1693,9 @@ dependencies = [
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.89" version = "1.0.92"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f139b0662de085916d1fb67d2b4169d1addddda1919e696f3252b740b629986e" checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0"
dependencies = [ dependencies = [
"unicode-ident", "unicode-ident",
] ]
@ -2185,9 +2186,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]] [[package]]
name = "syn" name = "syn"
version = "2.0.85" version = "2.0.89"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5023162dfcd14ef8f32034d8bcd4cc5ddc61ef7a247c024a33e24e1f24d21b56" checksum = "44d46482f1c1c87acd84dea20c1bf5ebff4c757009ed6bf19cfd36fb10e92c4e"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -2212,7 +2213,16 @@ version = "1.0.65"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d11abd9594d9b38965ef50805c5e469ca9cc6f197f883f717e0269a3057b3d5" checksum = "5d11abd9594d9b38965ef50805c5e469ca9cc6f197f883f717e0269a3057b3d5"
dependencies = [ dependencies = [
"thiserror-impl", "thiserror-impl 1.0.65",
]
[[package]]
name = "thiserror"
version = "2.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c006c85c7651b3cf2ada4584faa36773bd07bac24acfb39f3c431b36d7e667aa"
dependencies = [
"thiserror-impl 2.0.3",
] ]
[[package]] [[package]]
@ -2226,6 +2236,17 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "thiserror-impl"
version = "2.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "thread_local" name = "thread_local"
version = "1.1.8" version = "1.1.8"
@ -3022,7 +3043,7 @@ dependencies = [
"nohash-hasher", "nohash-hasher",
"pin-project-lite", "pin-project-lite",
"reusable-box-future", "reusable-box-future",
"thiserror", "thiserror 2.0.3",
"tokio", "tokio",
] ]

View file

@ -25,7 +25,7 @@ pin-project-lite = "0.2.14"
rustls-pemfile = { version = "2.1.3", optional = true } rustls-pemfile = { version = "2.1.3", optional = true }
rustls-webpki = { version = "0.102.7", optional = true } rustls-webpki = { version = "0.102.7", optional = true }
send_wrapper = { version = "0.6.0", features = ["futures"] } send_wrapper = { version = "0.6.0", features = ["futures"] }
thiserror = "1.0.63" thiserror = "2.0.3"
tokio = "1.39.3" tokio = "1.39.3"
wasm-bindgen = "0.2.93" wasm-bindgen = "0.2.93"
wasm-bindgen-futures = "0.4.43" wasm-bindgen-futures = "0.4.43"

View file

@ -10,6 +10,7 @@ async-trait = { version = "0.1.81", optional = true }
bytes = "1.7.1" bytes = "1.7.1"
cfg-if = "1.0.0" cfg-if = "1.0.0"
clap = { version = "4.5.16", features = ["cargo", "derive"] } clap = { version = "4.5.16", features = ["cargo", "derive"] }
console-subscriber = { version = "0.4.1", optional = true }
dashmap = "6.0.1" dashmap = "6.0.1"
ed25519-dalek = { version = "2.1.1", features = ["pem"] } ed25519-dalek = { version = "2.1.1", features = ["pem"] }
env_logger = "0.11.5" env_logger = "0.11.5"
@ -50,6 +51,7 @@ toml = ["dep:toml"]
twisp = ["dep:pty-process", "dep:libc", "dep:async-trait", "dep:shell-words"] twisp = ["dep:pty-process", "dep:libc", "dep:async-trait", "dep:shell-words"]
speed-limit = ["dep:async-speed-limit"] speed-limit = ["dep:async-speed-limit"]
tokio-console = ["dep:console-subscriber", "tokio/tracing"]
[build-dependencies] [build-dependencies]
vergen-git2 = { version = "1.0.0", features = ["rustc"] } vergen-git2 = { version = "1.0.0", features = ["rustc"] }

View file

@ -290,9 +290,7 @@ pub async fn handle_wisp(stream: WispResult, is_v2: bool, id: String) -> anyhow:
let event: Arc<Event> = Event::new().into(); let event: Arc<Event> = Event::new().into();
let mux_id = id.clone(); let mux_id = id.clone();
set.spawn(tokio::task::unconstrained(fut.map(move |x| { set.spawn(fut.map(move |x| debug!("wisp client id {:?} multiplexor result {:?}", mux_id, x)));
debug!("wisp client id {:?} multiplexor result {:?}", mux_id, x)
})));
let ping_mux = mux.clone(); let ping_mux = mux.clone();
let ping_event = event.clone(); let ping_event = event.clone();

View file

@ -4,7 +4,7 @@
use std::{fs::read_to_string, net::IpAddr}; use std::{fs::read_to_string, net::IpAddr};
use anyhow::Context; use anyhow::{Context, Result};
use clap::Parser; use clap::Parser;
use config::{validate_config_cache, Cli, Config, RuntimeFlavor}; use config::{validate_config_cache, Cli, Config, RuntimeFlavor};
use dashmap::DashMap; use dashmap::DashMap;
@ -114,7 +114,7 @@ lazy_static! {
static JEMALLOCATOR: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; static JEMALLOCATOR: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[doc(hidden)] #[doc(hidden)]
fn main() -> anyhow::Result<()> { fn main() -> Result<()> {
if CLI.default_config { if CLI.default_config {
println!("{}", Config::default().ser()?); println!("{}", Config::default().ser()?);
return Ok(()); return Ok(());
@ -135,88 +135,100 @@ fn main() -> anyhow::Result<()> {
builder.enable_all(); builder.enable_all();
let rt = builder.build()?; let rt = builder.build()?;
rt.block_on(async { rt.block_on(async move {
validate_config_cache().await; tokio::spawn(async_main()).await??;
Ok(())
})
}
info!( #[doc(hidden)]
"listening on {:?} with runtime flavor {:?} and socket transport {:?}", async fn async_main() -> Result<()> {
CONFIG.server.bind, CONFIG.server.runtime, CONFIG.server.transport #[cfg(feature = "tokio-console")]
); console_subscriber::init();
trace!("CLI: {:#?}", &*CLI); validate_config_cache().await;
trace!("CONFIG: {:#?}", &*CONFIG);
trace!("RESOLVER: {:?}", &*RESOLVER);
tokio::spawn(async { info!(
let mut sig = signal(SignalKind::user_defined1()).unwrap(); "listening on {:?} with runtime flavor {:?} and socket transport {:?}",
while sig.recv().await.is_some() { CONFIG.server.bind, CONFIG.server.runtime, CONFIG.server.transport
match generate_stats() { );
Ok(stats) => info!("Stats:\n{}", stats),
Err(err) => error!("error while creating stats {:?}", err), trace!("CLI: {:#?}", &*CLI);
} trace!("CONFIG: {:#?}", &*CONFIG);
trace!("RESOLVER: {:?}", &*RESOLVER);
tokio::spawn(async {
let mut sig = signal(SignalKind::user_defined1()).unwrap();
while sig.recv().await.is_some() {
match generate_stats() {
Ok(stats) => info!("Stats:\n{}", stats),
Err(err) => error!("error while creating stats {:?}", err),
} }
}); }
});
let mut listener = ServerListener::new(&CONFIG.server.bind) let mut listener = ServerListener::new(&CONFIG.server.bind)
.await .await
.with_context(|| format!("failed to bind to address {}", CONFIG.server.bind.1))?; .with_context(|| format!("failed to bind to address {}", CONFIG.server.bind.1))?;
if let Some(bind_addr) = CONFIG if let Some(bind_addr) = CONFIG
.server .server
.stats_endpoint .stats_endpoint
.as_ref() .as_ref()
.and_then(|x| x.get_bindaddr()) .and_then(|x| x.get_bindaddr())
{ {
info!("stats server listening on {:?}", bind_addr); info!("stats server listening on {:?}", bind_addr);
let mut stats_listener = ServerListener::new(&bind_addr).await.with_context(|| { let mut stats_listener = ServerListener::new(&bind_addr).await.with_context(|| {
format!("failed to bind to address {} for stats server", bind_addr.1) format!("failed to bind to address {} for stats server", bind_addr.1)
})?; })?;
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
match stats_listener.accept().await { match stats_listener.accept().await {
Ok((stream, _)) => { Ok((stream, _)) => {
tokio::spawn(async move {
if let Err(e) = route_stats(stream).await { if let Err(e) = route_stats(stream).await {
error!("error while routing stats client: {:?}", e); error!("error while routing stats client: {:?}", e);
} }
} });
Err(e) => error!("error while accepting stats client: {:?}", e),
} }
Err(e) => error!("error while accepting stats client: {:?}", e),
} }
});
}
let stats_endpoint = CONFIG
.server
.stats_endpoint
.as_ref()
.and_then(|x| x.get_endpoint());
loop {
let stats_endpoint = stats_endpoint.clone();
match listener.accept().await {
Ok((stream, client_id)) => {
tokio::spawn(async move {
let res = route::route(stream, stats_endpoint, move |stream, maybe_ip| {
let client_id = if let Some(ip) = maybe_ip {
format!("{} ({})", client_id, ip)
} else {
client_id
};
trace!("routed {:?}: {}", client_id, stream);
handle_stream(stream, client_id)
})
.await;
if let Err(e) = res {
error!("error while routing client: {:?}", e);
}
});
}
Err(e) => error!("error while accepting client: {:?}", e),
} }
});
}
let stats_endpoint = CONFIG
.server
.stats_endpoint
.as_ref()
.and_then(|x| x.get_endpoint());
loop {
let stats_endpoint = stats_endpoint.clone();
match listener.accept().await {
Ok((stream, client_id)) => {
tokio::spawn(async move {
let res = route::route(stream, stats_endpoint, move |stream, maybe_ip| {
let client_id = if let Some(ip) = maybe_ip {
format!("{} ({})", client_id, ip)
} else {
client_id
};
trace!("routed {:?}: {}", client_id, stream);
handle_stream(stream, client_id)
})
.await;
if let Err(e) = res {
error!("error while routing client: {:?}", e);
}
});
}
Err(e) => error!("error while accepting client: {:?}", e),
} }
}) }
} }
#[doc(hidden)] #[doc(hidden)]

View file

@ -121,6 +121,10 @@ async fn get_cert(path: PathBuf) -> Result<SigningKey, Box<dyn Error + Sync + Se
#[tokio::main(flavor = "multi_thread")] #[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> { async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
tokio::spawn(real_main()).await?
}
async fn real_main() -> Result<(), Box<dyn Error + Send + Sync>> {
#[cfg(feature = "tokio-console")] #[cfg(feature = "tokio-console")]
console_subscriber::init(); console_subscriber::init();
let opts = Cli::parse(); let opts = Cli::parse();

View file

@ -24,7 +24,7 @@ getrandom = { version = "0.2.15", features = ["std"], optional = true }
nohash-hasher = "0.2.0" nohash-hasher = "0.2.0"
pin-project-lite = "0.2.14" pin-project-lite = "0.2.14"
reusable-box-future = "0.2.0" reusable-box-future = "0.2.0"
thiserror = "1.0.65" thiserror = "2.0.3"
tokio = { version = "1.39.3", optional = true, default-features = false } tokio = { version = "1.39.3", optional = true, default-features = false }
[features] [features]