runtime flavors

This commit is contained in:
Toshit Chawda 2024-09-27 16:59:46 -07:00
parent 4c69b55a0a
commit acb863a661
No known key found for this signature in database
GPG key ID: 91480ED99E2B3D9D
2 changed files with 81 additions and 52 deletions

View file

@ -59,6 +59,18 @@ pub enum SocketTransport {
LengthDelimitedLe, LengthDelimitedLe,
} }
#[derive(Serialize, Deserialize, Default, Debug)]
#[serde(rename_all = "lowercase")]
pub enum RuntimeFlavor {
/// Single-threaded tokio runtime.
SingleThread,
/// Multi-threaded tokio runtime.
#[default]
MultiThread,
/// Multi-threaded tokio runtime with an alternate work in progress scheduler.
MultiThreadAlt,
}
pub type BindAddr = (SocketType, String); pub type BindAddr = (SocketType, String);
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
@ -106,6 +118,8 @@ pub struct ServerConfig {
/// Server log level. /// Server log level.
pub log_level: LevelFilter, pub log_level: LevelFilter,
/// Runtime type.
pub runtime: RuntimeFlavor,
} }
#[derive(Serialize, Deserialize, PartialEq, Eq)] #[derive(Serialize, Deserialize, PartialEq, Eq)]
@ -324,6 +338,7 @@ impl Default for ServerConfig {
max_message_size: 64 * 1024, max_message_size: 64 * 1024,
log_level: LevelFilter::Info, log_level: LevelFilter::Info,
runtime: RuntimeFlavor::default(),
} }
} }
} }

View file

@ -5,7 +5,7 @@ use std::{collections::HashMap, fs::read_to_string, net::IpAddr};
use anyhow::Context; use anyhow::Context;
use clap::Parser; use clap::Parser;
use config::{validate_config_cache, Cli, Config}; use config::{validate_config_cache, Cli, Config, RuntimeFlavor};
use dashmap::DashMap; use dashmap::DashMap;
use handle::{handle_wisp, handle_wsproxy}; use handle::{handle_wisp, handle_wsproxy};
use hickory_resolver::{ use hickory_resolver::{
@ -17,7 +17,10 @@ use listener::ServerListener;
use log::{error, info}; use log::{error, info};
use route::{route_stats, ServerRouteResult}; use route::{route_stats, ServerRouteResult};
use serde::Serialize; use serde::Serialize;
use tokio::signal::unix::{signal, SignalKind}; use tokio::{
runtime,
signal::unix::{signal, SignalKind},
};
use uuid::Uuid; use uuid::Uuid;
use wisp_mux::{ConnectPacket, StreamType}; use wisp_mux::{ConnectPacket, StreamType};
@ -199,8 +202,7 @@ fn handle_stream(stream: ServerRouteResult, id: String) {
#[global_allocator] #[global_allocator]
static JEMALLOCATOR: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; static JEMALLOCATOR: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[tokio::main(flavor = "multi_thread")] fn main() -> anyhow::Result<()> {
async fn main() -> anyhow::Result<()> {
if CLI.default_config { if CLI.default_config {
println!("{}", Config::default().ser()?); println!("{}", Config::default().ser()?);
return Ok(()); return Ok(());
@ -211,63 +213,75 @@ async fn main() -> anyhow::Result<()> {
.parse_default_env() .parse_default_env()
.init(); .init();
validate_config_cache().await; let mut builder: runtime::Builder = match CONFIG.server.runtime {
RuntimeFlavor::SingleThread => runtime::Builder::new_current_thread(),
RuntimeFlavor::MultiThread => runtime::Builder::new_multi_thread(),
RuntimeFlavor::MultiThreadAlt => runtime::Builder::new_multi_thread_alt(),
};
info!( builder.enable_all();
"listening on {:?} with socket transport {:?}", let rt = builder.build()?;
CONFIG.server.bind, CONFIG.server.transport
);
tokio::spawn(async { rt.block_on(async {
let mut sig = signal(SignalKind::user_defined1()).unwrap(); validate_config_cache().await;
while sig.recv().await.is_some() {
info!("Stats:\n{}", generate_stats().unwrap());
}
});
let mut listener = ServerListener::new(&CONFIG.server.bind) info!(
.await "listening on {:?} with runtime flavor {:?} and socket transport {:?}",
.with_context(|| format!("failed to bind to address {}", CONFIG.server.bind.1))?; CONFIG.server.bind, CONFIG.server.runtime, CONFIG.server.transport
);
if CONFIG.server.enable_stats_endpoint { tokio::spawn(async {
if let Some(bind_addr) = CONFIG.server.stats_endpoint.get_bindaddr() { let mut sig = signal(SignalKind::user_defined1()).unwrap();
info!("stats server listening on {:?}", bind_addr); while sig.recv().await.is_some() {
let mut stats_listener = ServerListener::new(&bind_addr).await.with_context(|| { info!("Stats:\n{}", generate_stats().unwrap());
format!("failed to bind to address {} for stats server", bind_addr.1) }
})?; });
tokio::spawn(async move { let mut listener = ServerListener::new(&CONFIG.server.bind)
loop { .await
match stats_listener.accept().await { .with_context(|| format!("failed to bind to address {}", CONFIG.server.bind.1))?;
Ok((stream, _)) => {
if let Err(e) = route_stats(stream).await { if CONFIG.server.enable_stats_endpoint {
error!("error while routing stats client: {:?}", e); if let Some(bind_addr) = CONFIG.server.stats_endpoint.get_bindaddr() {
} info!("stats server listening on {:?}", bind_addr);
} let mut stats_listener =
Err(e) => error!("error while accepting stats client: {:?}", e), ServerListener::new(&bind_addr).await.with_context(|| {
} format!("failed to bind to address {} for stats server", bind_addr.1)
} })?;
});
}
}
let stats_endpoint = CONFIG.server.stats_endpoint.get_endpoint();
loop {
let stats_endpoint = stats_endpoint.clone();
match listener.accept().await {
Ok((stream, id)) => {
tokio::spawn(async move { tokio::spawn(async move {
let res = route::route(stream, stats_endpoint, move |stream| { loop {
handle_stream(stream, id) match stats_listener.accept().await {
}) Ok((stream, _)) => {
.await; if let Err(e) = route_stats(stream).await {
error!("error while routing stats client: {:?}", e);
if let Err(e) = res { }
error!("error while routing client: {:?}", e); }
Err(e) => error!("error while accepting stats client: {:?}", e),
}
} }
}); });
} }
Err(e) => error!("error while accepting client: {:?}", e),
} }
}
let stats_endpoint = CONFIG.server.stats_endpoint.get_endpoint();
loop {
let stats_endpoint = stats_endpoint.clone();
match listener.accept().await {
Ok((stream, id)) => {
tokio::spawn(async move {
let res = route::route(stream, stats_endpoint, move |stream| {
handle_stream(stream, id)
})
.await;
if let Err(e) = res {
error!("error while routing client: {:?}", e);
}
});
}
Err(e) => error!("error while accepting client: {:?}", e),
}
}
})
} }