From c02a83f1856979939653f15076ca2d92c39fde20 Mon Sep 17 00:00:00 2001 From: Toshit Chawda Date: Wed, 2 Oct 2024 20:38:14 -0700 Subject: [PATCH] improve logging, separate stats out --- server/src/config.rs | 17 ++-- server/src/handle/wisp/mod.rs | 32 +++++--- server/src/main.rs | 149 ++++++++-------------------------- server/src/route.rs | 39 +++++---- server/src/stats.rs | 102 +++++++++++++++++++++++ 5 files changed, 187 insertions(+), 152 deletions(-) create mode 100644 server/src/stats.rs diff --git a/server/src/config.rs b/server/src/config.rs index 8187b08..1145c51 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -80,7 +80,7 @@ pub enum StatsEndpoint { SeparateServer(BindAddr), } -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] #[serde(default)] pub struct ServerConfig { /// Address and socket type to listen on. @@ -113,7 +113,7 @@ pub struct ServerConfig { pub runtime: RuntimeFlavor, } -#[derive(Serialize, Deserialize, PartialEq, Eq)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "lowercase")] pub enum ProtocolExtension { /// Wisp version 2 UDP protocol extension. @@ -122,7 +122,7 @@ pub enum ProtocolExtension { Motd, } -#[derive(Serialize, Deserialize, PartialEq, Eq)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "lowercase")] pub enum ProtocolExtensionAuth { /// Wisp version 2 password authentication protocol extension. @@ -139,7 +139,7 @@ fn is_default_motd(str: &String) -> bool { *str == default_motd() } -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] #[serde(default)] pub struct WispConfig { /// Allow legacy wsproxy connections. @@ -168,7 +168,7 @@ pub struct WispConfig { pub motd_extension: String, } -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] #[serde(default)] pub struct StreamConfig { /// Whether or not to enable TCP nodelay. @@ -218,7 +218,7 @@ pub struct StreamConfig { pub block_ports: Vec>, } -#[derive(Serialize, Deserialize, Default)] +#[derive(Debug, Serialize, Deserialize, Default)] #[serde(default)] pub struct Config { pub server: ServerConfig, @@ -226,6 +226,7 @@ pub struct Config { pub stream: StreamConfig, } +#[derive(Debug)] struct ConfigCache { pub blocked_ports: Vec>, pub allowed_ports: Vec>, @@ -475,7 +476,7 @@ impl Config { } } -#[derive(Clone, Copy, Eq, PartialEq, ValueEnum)] +#[derive(Debug, Clone, Copy, Eq, PartialEq, ValueEnum)] pub enum ConfigFormat { Json, #[cfg(feature = "toml")] @@ -499,7 +500,7 @@ impl Default for ConfigFormat { } /// Performant server implementation of the Wisp protocol in Rust, made for epoxy. -#[derive(Parser)] +#[derive(Parser, Debug)] #[command(version = VERSION_STRING)] pub struct Cli { /// Config file to use. diff --git a/server/src/handle/wisp/mod.rs b/server/src/handle/wisp/mod.rs index 11687ec..37dbda3 100644 --- a/server/src/handle/wisp/mod.rs +++ b/server/src/handle/wisp/mod.rs @@ -14,12 +14,14 @@ use tokio::{ io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, net::tcp::{OwnedReadHalf, OwnedWriteHalf}, select, - task::JoinSet, time::interval, + task::JoinSet, + time::interval, }; use tokio_util::compat::FuturesAsyncReadCompatExt; use uuid::Uuid; use wisp_mux::{ - ws::Payload, CloseReason, ConnectPacket, MuxStream, MuxStreamAsyncRead, MuxStreamWrite, ServerMux + ws::Payload, CloseReason, ConnectPacket, MuxStream, MuxStreamAsyncRead, MuxStreamWrite, + ServerMux, }; use crate::{ @@ -104,12 +106,9 @@ async fn handle_stream( let uuid = Uuid::new_v4(); - trace!( + debug!( "new stream created for client id {:?}: (stream uuid {:?}) {:?} {:?}", - id, - uuid, - requested_stream, - resolved_stream + id, uuid, requested_stream, resolved_stream ); if let Some(client) = CLIENTS.get(&id) { @@ -205,7 +204,7 @@ async fn handle_stream( x = event.listen() => x, }; - trace!("stream uuid {:?} disconnected for client id {:?}", uuid, id); + debug!("stream uuid {:?} disconnected for client id {:?}", uuid, id); if let Some(client) = CLIENTS.get(&id) { client.0.remove(&uuid); @@ -241,12 +240,13 @@ pub async fn handle_wisp(stream: WispResult, id: String) -> anyhow::Result<()> { let mux = Arc::new(mux); debug!( - "new wisp client id {:?} connected with extensions {:?}", + "new wisp client id {:?} connected with extensions {:?}, downgraded {:?}", id, mux.supported_extensions .iter() .map(|x| x.get_id()) - .collect::>() + .collect::>(), + mux.downgraded ); let mut set: JoinSet<()> = JoinSet::new(); @@ -254,14 +254,20 @@ pub async fn handle_wisp(stream: WispResult, id: String) -> anyhow::Result<()> { let mux_id = id.clone(); set.spawn(tokio::task::unconstrained(fut.map(move |x| { - trace!("wisp client id {:?} multiplexor result {:?}", mux_id, x) + debug!("wisp client id {:?} multiplexor result {:?}", mux_id, x) }))); let ping_mux = mux.clone(); let ping_event = event.clone(); + let ping_id = id.clone(); set.spawn(async move { let mut interval = interval(Duration::from_secs(30)); - while ping_mux.send_ping(Payload::Bytes(BytesMut::new())).await.is_ok() { + while ping_mux + .send_ping(Payload::Bytes(BytesMut::new())) + .await + .is_ok() + { + trace!("sent ping to wisp client id {:?}", ping_id); select! { _ = interval.tick() => (), _ = ping_event.listen() => break, @@ -280,7 +286,7 @@ pub async fn handle_wisp(stream: WispResult, id: String) -> anyhow::Result<()> { )); } - trace!("shutting down wisp client id {:?}", id); + debug!("shutting down wisp client id {:?}", id); let _ = mux.close().await; event.notify(usize::MAX); diff --git a/server/src/main.rs b/server/src/main.rs index f1fa6c4..9076636 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,7 +1,7 @@ #![feature(ip)] #![deny(clippy::todo)] -use std::{collections::HashMap, fs::read_to_string, net::IpAddr}; +use std::{fs::read_to_string, net::IpAddr}; use anyhow::Context; use clap::Parser; @@ -15,24 +15,26 @@ use hickory_resolver::{ }; use lazy_static::lazy_static; use listener::ServerListener; -use log::{error, info, warn}; +use log::{error, info, trace, warn}; use route::{route_stats, ServerRouteResult}; -use serde::Serialize; +use stats::generate_stats; use tokio::{ runtime, signal::unix::{signal, SignalKind}, }; use uuid::Uuid; -use wisp_mux::{ConnectPacket, StreamType}; +use wisp_mux::ConnectPacket; mod config; mod handle; mod listener; mod route; +mod stats; mod stream; type Client = (DashMap, bool); +#[derive(Debug)] pub enum Resolver { Hickory(TokioAsyncResolver), System, @@ -93,118 +95,6 @@ lazy_static! { }; } -fn format_stream_type(stream_type: StreamType) -> &'static str { - match stream_type { - StreamType::Tcp => "tcp", - StreamType::Udp => "udp", - #[cfg(feature = "twisp")] - StreamType::Unknown(crate::handle::wisp::twisp::STREAM_TYPE) => "twisp", - StreamType::Unknown(_) => unreachable!(), - } -} - -#[derive(Serialize)] -struct MemoryStats { - active: f64, - allocated: f64, - mapped: f64, - metadata: f64, - resident: f64, - retained: f64, -} - -#[derive(Serialize)] -struct StreamStats { - stream_type: String, - requested: String, - resolved: String, -} - -impl From<(ConnectPacket, ConnectPacket)> for StreamStats { - fn from(value: (ConnectPacket, ConnectPacket)) -> Self { - Self { - stream_type: format_stream_type(value.0.stream_type).to_string(), - requested: format!( - "{}:{}", - value.0.destination_hostname, value.0.destination_port - ), - resolved: format!( - "{}:{}", - value.1.destination_hostname, value.1.destination_port - ), - } - } -} - -#[derive(Serialize)] -struct ClientStats { - wsproxy: bool, - streams: HashMap, -} - -#[derive(Serialize)] -struct ServerStats { - config: String, - clients: HashMap, - memory: MemoryStats, -} - -fn generate_stats() -> anyhow::Result { - use tikv_jemalloc_ctl::stats::{active, allocated, mapped, metadata, resident, retained}; - tikv_jemalloc_ctl::epoch::advance()?; - - let memory = MemoryStats { - active: active::read()? as f64 / (1024 * 1024) as f64, - allocated: allocated::read()? as f64 / (1024 * 1024) as f64, - mapped: mapped::read()? as f64 / (1024 * 1024) as f64, - metadata: metadata::read()? as f64 / (1024 * 1024) as f64, - resident: resident::read()? as f64 / (1024 * 1024) as f64, - retained: retained::read()? as f64 / (1024 * 1024) as f64, - }; - - let clients = CLIENTS - .iter() - .map(|x| { - ( - x.key().to_string(), - ClientStats { - wsproxy: x.value().1, - streams: x - .value() - .0 - .iter() - .map(|x| (x.key().to_string(), StreamStats::from(x.value().clone()))) - .collect(), - }, - ) - }) - .collect(); - - let stats = ServerStats { - config: CONFIG.ser()?, - clients, - memory, - }; - - Ok(serde_json::to_string_pretty(&stats)?) -} - -fn handle_stream(stream: ServerRouteResult, id: String) { - tokio::spawn(async move { - CLIENTS.insert(id.clone(), (DashMap::new(), false)); - let res = match stream { - ServerRouteResult::Wisp(stream) => handle_wisp(stream, id.clone()).await, - ServerRouteResult::WsProxy(ws, path, udp) => { - handle_wsproxy(ws, id.clone(), path, udp).await - } - }; - if let Err(e) = res { - error!("error while handling client: {:?}", e); - } - CLIENTS.remove(&id) - }); -} - #[global_allocator] static JEMALLOCATOR: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; @@ -235,10 +125,17 @@ fn main() -> anyhow::Result<()> { CONFIG.server.bind, CONFIG.server.runtime, CONFIG.server.transport ); + trace!("CLI: {:#?}", &*CLI); + trace!("CONFIG: {:#?}", &*CONFIG); + trace!("RESOLVER: {:?}", &*RESOLVER); + tokio::spawn(async { let mut sig = signal(SignalKind::user_defined1()).unwrap(); while sig.recv().await.is_some() { - info!("Stats:\n{}", generate_stats().unwrap()); + match generate_stats() { + Ok(stats) => info!("Stats:\n{}", stats), + Err(err) => error!("error while creating stats {:?}", err), + } } }); @@ -287,6 +184,8 @@ fn main() -> anyhow::Result<()> { } else { client_id }; + + trace!("routed {:?}: {}", client_id, stream); handle_stream(stream, client_id) }) .await; @@ -301,3 +200,19 @@ fn main() -> anyhow::Result<()> { } }) } + +fn handle_stream(stream: ServerRouteResult, id: String) { + tokio::spawn(async move { + CLIENTS.insert(id.clone(), (DashMap::new(), false)); + let res = match stream { + ServerRouteResult::Wisp(stream) => handle_wisp(stream, id.clone()).await, + ServerRouteResult::WsProxy(ws, path, udp) => { + handle_wsproxy(ws, id.clone(), path, udp).await + } + }; + if let Err(e) = res { + error!("error while handling client: {:?}", e); + } + CLIENTS.remove(&id) + }); +} diff --git a/server/src/route.rs b/server/src/route.rs index 1efc978..fc8e662 100644 --- a/server/src/route.rs +++ b/server/src/route.rs @@ -1,4 +1,4 @@ -use std::{future::Future, io::Cursor}; +use std::{fmt::Display, future::Future, io::Cursor}; use anyhow::Context; use bytes::Bytes; @@ -9,7 +9,7 @@ use hyper::{ Response, StatusCode, }; use hyper_util::rt::TokioIo; -use log::{debug, error}; +use log::{debug, error, trace}; use tokio::io::AsyncReadExt; use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; use wisp_mux::{ @@ -26,11 +26,10 @@ use crate::{ }; type Body = Full; -fn non_ws_resp() -> Response { - Response::builder() +fn non_ws_resp() -> anyhow::Result> { + Ok(Response::builder() .status(StatusCode::OK) - .body(Body::new(CONFIG.server.non_ws_response.as_bytes().into())) - .unwrap() + .body(Body::new(CONFIG.server.non_ws_response.as_bytes().into()))?) } fn send_stats() -> anyhow::Result> { @@ -39,21 +38,22 @@ fn send_stats() -> anyhow::Result> { debug!("sent server stats to http client"); Ok(Response::builder() .status(StatusCode::OK) - .body(Body::new(x.into())) - .unwrap()) + .body(Body::new(x.into()))?) } Err(x) => { error!("failed to send stats to http client: {:?}", x); Ok(Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(Body::new(x.to_string().into())) - .unwrap()) + .body(Body::new(x.to_string().into()))?) } } } fn get_header(headers: &HeaderMap, header: &str) -> Option { - headers.get(header).and_then(|x| x.to_str().ok()).map(|x| x.to_string()) + headers + .get(header) + .and_then(|x| x.to_str().ok()) + .map(|x| x.to_string()) } enum HttpUpgradeResult { @@ -78,14 +78,16 @@ where return send_stats(); } else { debug!("sent non_ws_response to http client"); - return Ok(non_ws_resp()); + return non_ws_resp(); } } else { debug!("sent non_ws_response to http client"); - return Ok(non_ws_resp()); + return non_ws_resp(); } } + trace!("recieved request {:?}", req); + let (resp, fut) = fastwebsockets::upgrade::upgrade(&mut req)?; // replace body of Empty with Full let resp = Response::from_parts(resp.into_parts().0, Body::new(Bytes::new())); @@ -122,7 +124,7 @@ where }); } else { debug!("sent non_ws_response to http client"); - return Ok(non_ws_resp()); + return non_ws_resp(); } Ok(resp) @@ -223,3 +225,12 @@ pub enum ServerRouteResult { Wisp(WispResult), WsProxy(WebSocketStreamWrapper, String, bool), } + +impl Display for ServerRouteResult { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Wisp(_) => write!(f, "Wisp"), + Self::WsProxy(_, path, udp) => write!(f, "WsProxy path {:?} udp {:?}", path, udp), + } + } +} diff --git a/server/src/stats.rs b/server/src/stats.rs new file mode 100644 index 0000000..467c869 --- /dev/null +++ b/server/src/stats.rs @@ -0,0 +1,102 @@ +use std::collections::HashMap; + +use serde::Serialize; +use wisp_mux::{ConnectPacket, StreamType}; + +use crate::{CLIENTS, CONFIG}; + +fn format_stream_type(stream_type: StreamType) -> &'static str { + match stream_type { + StreamType::Tcp => "tcp", + StreamType::Udp => "udp", + #[cfg(feature = "twisp")] + StreamType::Unknown(crate::handle::wisp::twisp::STREAM_TYPE) => "twisp", + StreamType::Unknown(_) => unreachable!(), + } +} + +#[derive(Serialize)] +struct MemoryStats { + active: f64, + allocated: f64, + mapped: f64, + metadata: f64, + resident: f64, + retained: f64, +} + +#[derive(Serialize)] +struct StreamStats { + stream_type: String, + requested: String, + resolved: String, +} + +impl From<(ConnectPacket, ConnectPacket)> for StreamStats { + fn from(value: (ConnectPacket, ConnectPacket)) -> Self { + Self { + stream_type: format_stream_type(value.0.stream_type).to_string(), + requested: format!( + "{}:{}", + value.0.destination_hostname, value.0.destination_port + ), + resolved: format!( + "{}:{}", + value.1.destination_hostname, value.1.destination_port + ), + } + } +} + +#[derive(Serialize)] +struct ClientStats { + wsproxy: bool, + streams: HashMap, +} + +#[derive(Serialize)] +struct ServerStats { + config: String, + clients: HashMap, + memory: MemoryStats, +} + +pub fn generate_stats() -> anyhow::Result { + use tikv_jemalloc_ctl::stats::{active, allocated, mapped, metadata, resident, retained}; + tikv_jemalloc_ctl::epoch::advance()?; + + let memory = MemoryStats { + active: active::read()? as f64 / (1024 * 1024) as f64, + allocated: allocated::read()? as f64 / (1024 * 1024) as f64, + mapped: mapped::read()? as f64 / (1024 * 1024) as f64, + metadata: metadata::read()? as f64 / (1024 * 1024) as f64, + resident: resident::read()? as f64 / (1024 * 1024) as f64, + retained: retained::read()? as f64 / (1024 * 1024) as f64, + }; + + let clients = CLIENTS + .iter() + .map(|x| { + ( + x.key().to_string(), + ClientStats { + wsproxy: x.value().1, + streams: x + .value() + .0 + .iter() + .map(|x| (x.key().to_string(), StreamStats::from(x.value().clone()))) + .collect(), + }, + ) + }) + .collect(); + + let stats = ServerStats { + config: CONFIG.ser()?, + clients, + memory, + }; + + Ok(serde_json::to_string_pretty(&stats)?) +}