From d0ef7b476cd106c1556cd8b6b068e8ff7118f69f Mon Sep 17 00:00:00 2001 From: Toshit Chawda Date: Thu, 26 Sep 2024 18:06:18 -0700 Subject: [PATCH] json stats --- server/Cargo.toml | 3 +- server/e.toml | 3 + server/src/config.rs | 25 ++---- server/src/main.rs | 189 ++++++++++++++++++++++--------------------- 4 files changed, 104 insertions(+), 116 deletions(-) create mode 100644 server/e.toml diff --git a/server/Cargo.toml b/server/Cargo.toml index 5ef80d5..dc3b7e2 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -27,7 +27,7 @@ pty-process = { version = "0.4.0", features = ["async", "tokio"], optional = tru regex = "1.10.6" rustls-pemfile = "2.1.3" serde = { version = "1.0.208", features = ["derive"] } -serde_json = { version = "1.0.125", optional = true } +serde_json = "1.0.125" serde_yaml = { version = "0.9.34", optional = true } sha2 = "0.10.8" shell-words = { version = "1.1.0", optional = true } @@ -43,7 +43,6 @@ wisp-mux = { version = "5.0.0", path = "../wisp", features = ["fastwebsockets", [features] default = ["toml"] -json = ["dep:serde_json"] yaml = ["dep:serde_yaml"] toml = ["dep:toml"] diff --git a/server/e.toml b/server/e.toml new file mode 100644 index 0000000..dca5b69 --- /dev/null +++ b/server/e.toml @@ -0,0 +1,3 @@ +[server] +enable_stats_endpoint = true +stats_endpoint = ["tcp", "127.0.0.1:5000"] diff --git a/server/src/config.rs b/server/src/config.rs index ef8920a..ae4b00b 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -67,7 +67,7 @@ pub enum StatsEndpoint { /// Stats on the same listener as the Wisp server. SameServer(String), /// Stats on this address and socket type. - SeparateServer((SocketType, String)), + SeparateServer(BindAddr), } #[derive(Serialize, Deserialize)] @@ -467,44 +467,32 @@ impl StreamConfig { impl Config { pub fn ser(&self) -> anyhow::Result { Ok(match CLI.format { + ConfigFormat::Json => serde_json::to_string_pretty(self)?, #[cfg(feature = "toml")] ConfigFormat::Toml => toml::to_string_pretty(self)?, - #[cfg(feature = "json")] - ConfigFormat::Json => serde_json::to_string_pretty(self)?, #[cfg(feature = "yaml")] ConfigFormat::Yaml => serde_yaml::to_string(self)?, - #[cfg(not(any(feature = "toml", feature = "json", feature = "yaml")))] - ConfigFormat::None => String::new(), }) } pub fn de(string: String) -> anyhow::Result { Ok(match CLI.format { + ConfigFormat::Json => serde_json::from_str(&string)?, #[cfg(feature = "toml")] ConfigFormat::Toml => toml::from_str(&string)?, - #[cfg(feature = "json")] - ConfigFormat::Json => serde_json::from_str(&string)?, #[cfg(feature = "yaml")] ConfigFormat::Yaml => serde_yaml::from_str(&string)?, - #[cfg(not(any(feature = "toml", feature = "json", feature = "yaml")))] - ConfigFormat::None => { - let _ = string; - Self::default() - } }) } } #[derive(Clone, Copy, Eq, PartialEq, ValueEnum)] pub enum ConfigFormat { + Json, #[cfg(feature = "toml")] Toml, - #[cfg(feature = "json")] - Json, #[cfg(feature = "yaml")] Yaml, - #[cfg(not(any(feature = "toml", feature = "json", feature = "yaml")))] - None, } impl Default for ConfigFormat { @@ -512,13 +500,10 @@ impl Default for ConfigFormat { cfg_if! { if #[cfg(feature = "toml")] { Self::Toml - } else if #[cfg(feature = "json")] { - Self::Json } else if #[cfg(feature = "yaml")] { Self::Yaml } else { - compile_error!("no config format feature enabled!"); - Self::None + Self::Json } } } diff --git a/server/src/main.rs b/server/src/main.rs index bcf9f4a..6e89762 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,7 +1,7 @@ #![feature(ip)] #![deny(clippy::todo)] -use std::{fmt::Write, fs::read_to_string, net::IpAddr}; +use std::{collections::HashMap, fs::read_to_string, net::IpAddr}; use anyhow::Context; use clap::Parser; @@ -16,6 +16,7 @@ use lazy_static::lazy_static; use listener::ServerListener; use log::{error, info}; use route::{route_stats, ServerRouteResult}; +use serde::Serialize; use tokio::signal::unix::{signal, SignalKind}; use uuid::Uuid; use wisp_mux::{ConnectPacket, StreamType}; @@ -93,92 +94,90 @@ fn format_stream_type(stream_type: StreamType) -> &'static str { } } -fn generate_stats() -> anyhow::Result { - let mut out = String::new(); - let len = CLIENTS.len(); +#[derive(Serialize)] +struct MemoryStats { + active: f64, + allocated: f64, + mapped: f64, + metadata: f64, + resident: f64, + retained: f64, +} +#[derive(Serialize)] +struct StreamStats { + stream_type: String, + requested: String, + resolved: String, +} + +impl From<(ConnectPacket, ConnectPacket)> for StreamStats { + fn from(value: (ConnectPacket, ConnectPacket)) -> Self { + Self { + stream_type: format_stream_type(value.0.stream_type).to_string(), + requested: format!( + "{}:{}", + value.0.destination_hostname, value.0.destination_port + ), + resolved: format!( + "{}:{}", + value.1.destination_hostname, value.1.destination_port + ), + } + } +} + +#[derive(Serialize)] +struct ClientStats { + wsproxy: bool, + streams: HashMap, +} + +#[derive(Serialize)] +struct ServerStats { + config: String, + clients: HashMap, + memory: MemoryStats, +} + +fn generate_stats() -> anyhow::Result { use tikv_jemalloc_ctl::stats::{active, allocated, mapped, metadata, resident, retained}; tikv_jemalloc_ctl::epoch::advance()?; - writeln!(&mut out, "Memory usage:")?; - writeln!( - &mut out, - "\tActive: {:?} MiB", - active::read()? as f64 / (1024 * 1024) as f64 - )?; - writeln!( - &mut out, - "\tAllocated: {:?} MiB", - allocated::read()? as f64 / (1024 * 1024) as f64 - )?; - writeln!( - &mut out, - "\tMapped: {:?} MiB", - mapped::read()? as f64 / (1024 * 1024) as f64 - )?; - writeln!( - &mut out, - "\tMetadata: {:?} MiB", - metadata::read()? as f64 / (1024 * 1024) as f64 - )?; - writeln!( - &mut out, - "\tResident: {:?} MiB", - resident::read()? as f64 / (1024 * 1024) as f64 - )?; - writeln!( - &mut out, - "\tRetained: {:?} MiB", - retained::read()? as f64 / (1024 * 1024) as f64 - )?; + let memory = MemoryStats { + active: active::read()? as f64 / (1024 * 1024) as f64, + allocated: allocated::read()? as f64 / (1024 * 1024) as f64, + mapped: mapped::read()? as f64 / (1024 * 1024) as f64, + metadata: metadata::read()? as f64 / (1024 * 1024) as f64, + resident: resident::read()? as f64 / (1024 * 1024) as f64, + retained: retained::read()? as f64 / (1024 * 1024) as f64, + }; - writeln!( - &mut out, - "{} clients connected{}", - len, - if len != 0 { ":" } else { "" } - )?; + let clients = CLIENTS + .iter() + .map(|x| { + ( + x.key().to_string(), + ClientStats { + wsproxy: x.value().1, + streams: x + .value() + .0 + .iter() + .map(|x| (x.key().to_string(), StreamStats::from(x.value().clone()))) + .collect(), + }, + ) + }) + .collect(); - for client in CLIENTS.iter() { - let len = client.value().0.len(); + let stats = ServerStats { + config: CONFIG.ser()?, + clients, + memory, + }; - writeln!( - &mut out, - "\tClient \"{}\"{}: {} streams connected{}", - client.key(), - if client.value().1 { " (wsproxy)" } else { "" }, - len, - if len != 0 && CONFIG.server.verbose_stats { - ":" - } else { - "" - } - )?; - - if CONFIG.server.verbose_stats { - for stream in client.value().0.iter() { - writeln!( - &mut out, - "\t\tStream \"{}\": {}", - stream.key(), - format_stream_type(stream.value().0.stream_type) - )?; - writeln!( - &mut out, - "\t\t\tRequested: {}:{}", - stream.value().0.destination_hostname, - stream.value().0.destination_port - )?; - writeln!( - &mut out, - "\t\t\tResolved: {}:{}", - stream.value().1.destination_hostname, - stream.value().1.destination_port - )?; - } - } - } - Ok(out) + Ok(serde_json::to_string_pretty(&stats)?) } fn handle_stream(stream: ServerRouteResult, id: String) { @@ -230,24 +229,26 @@ async fn main() -> anyhow::Result<()> { .await .with_context(|| format!("failed to bind to address {}", CONFIG.server.bind.1))?; - if let Some(bind_addr) = CONFIG.server.stats_endpoint.get_bindaddr() { - info!("stats server listening on {:?}", bind_addr); - let mut stats_listener = ServerListener::new(&bind_addr).await.with_context(|| { - format!("failed to bind to address {} for stats server", bind_addr.1) - })?; + if CONFIG.server.enable_stats_endpoint { + if let Some(bind_addr) = CONFIG.server.stats_endpoint.get_bindaddr() { + info!("stats server listening on {:?}", bind_addr); + let mut stats_listener = ServerListener::new(&bind_addr).await.with_context(|| { + format!("failed to bind to address {} for stats server", bind_addr.1) + })?; - tokio::spawn(async move { - loop { - match stats_listener.accept().await { - Ok((stream, _)) => { - if let Err(e) = route_stats(stream).await { - error!("error while routing stats client: {:?}", e); + tokio::spawn(async move { + loop { + match stats_listener.accept().await { + Ok((stream, _)) => { + if let Err(e) = route_stats(stream).await { + error!("error while routing stats client: {:?}", e); + } } + Err(e) => error!("error while accepting stats client: {:?}", e), } - Err(e) => error!("error while accepting stats client: {:?}", e), } - } - }); + }); + } } let stats_endpoint = CONFIG.server.stats_endpoint.get_endpoint();