improve logging, separate stats out

This commit is contained in:
Toshit Chawda 2024-10-02 20:38:14 -07:00
parent bca8be0bd2
commit c02a83f185
No known key found for this signature in database
GPG key ID: 91480ED99E2B3D9D
5 changed files with 187 additions and 152 deletions

View file

@ -80,7 +80,7 @@ pub enum StatsEndpoint {
SeparateServer(BindAddr), SeparateServer(BindAddr),
} }
#[derive(Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
#[serde(default)] #[serde(default)]
pub struct ServerConfig { pub struct ServerConfig {
/// Address and socket type to listen on. /// Address and socket type to listen on.
@ -113,7 +113,7 @@ pub struct ServerConfig {
pub runtime: RuntimeFlavor, pub runtime: RuntimeFlavor,
} }
#[derive(Serialize, Deserialize, PartialEq, Eq)] #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")] #[serde(rename_all = "lowercase")]
pub enum ProtocolExtension { pub enum ProtocolExtension {
/// Wisp version 2 UDP protocol extension. /// Wisp version 2 UDP protocol extension.
@ -122,7 +122,7 @@ pub enum ProtocolExtension {
Motd, Motd,
} }
#[derive(Serialize, Deserialize, PartialEq, Eq)] #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")] #[serde(rename_all = "lowercase")]
pub enum ProtocolExtensionAuth { pub enum ProtocolExtensionAuth {
/// Wisp version 2 password authentication protocol extension. /// Wisp version 2 password authentication protocol extension.
@ -139,7 +139,7 @@ fn is_default_motd(str: &String) -> bool {
*str == default_motd() *str == default_motd()
} }
#[derive(Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
#[serde(default)] #[serde(default)]
pub struct WispConfig { pub struct WispConfig {
/// Allow legacy wsproxy connections. /// Allow legacy wsproxy connections.
@ -168,7 +168,7 @@ pub struct WispConfig {
pub motd_extension: String, pub motd_extension: String,
} }
#[derive(Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
#[serde(default)] #[serde(default)]
pub struct StreamConfig { pub struct StreamConfig {
/// Whether or not to enable TCP nodelay. /// Whether or not to enable TCP nodelay.
@ -218,7 +218,7 @@ pub struct StreamConfig {
pub block_ports: Vec<Vec<u16>>, pub block_ports: Vec<Vec<u16>>,
} }
#[derive(Serialize, Deserialize, Default)] #[derive(Debug, Serialize, Deserialize, Default)]
#[serde(default)] #[serde(default)]
pub struct Config { pub struct Config {
pub server: ServerConfig, pub server: ServerConfig,
@ -226,6 +226,7 @@ pub struct Config {
pub stream: StreamConfig, pub stream: StreamConfig,
} }
#[derive(Debug)]
struct ConfigCache { struct ConfigCache {
pub blocked_ports: Vec<RangeInclusive<u16>>, pub blocked_ports: Vec<RangeInclusive<u16>>,
pub allowed_ports: Vec<RangeInclusive<u16>>, pub allowed_ports: Vec<RangeInclusive<u16>>,
@ -475,7 +476,7 @@ impl Config {
} }
} }
#[derive(Clone, Copy, Eq, PartialEq, ValueEnum)] #[derive(Debug, Clone, Copy, Eq, PartialEq, ValueEnum)]
pub enum ConfigFormat { pub enum ConfigFormat {
Json, Json,
#[cfg(feature = "toml")] #[cfg(feature = "toml")]
@ -499,7 +500,7 @@ impl Default for ConfigFormat {
} }
/// Performant server implementation of the Wisp protocol in Rust, made for epoxy. /// Performant server implementation of the Wisp protocol in Rust, made for epoxy.
#[derive(Parser)] #[derive(Parser, Debug)]
#[command(version = VERSION_STRING)] #[command(version = VERSION_STRING)]
pub struct Cli { pub struct Cli {
/// Config file to use. /// Config file to use.

View file

@ -14,12 +14,14 @@ use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
net::tcp::{OwnedReadHalf, OwnedWriteHalf}, net::tcp::{OwnedReadHalf, OwnedWriteHalf},
select, select,
task::JoinSet, time::interval, task::JoinSet,
time::interval,
}; };
use tokio_util::compat::FuturesAsyncReadCompatExt; use tokio_util::compat::FuturesAsyncReadCompatExt;
use uuid::Uuid; use uuid::Uuid;
use wisp_mux::{ use wisp_mux::{
ws::Payload, CloseReason, ConnectPacket, MuxStream, MuxStreamAsyncRead, MuxStreamWrite, ServerMux ws::Payload, CloseReason, ConnectPacket, MuxStream, MuxStreamAsyncRead, MuxStreamWrite,
ServerMux,
}; };
use crate::{ use crate::{
@ -104,12 +106,9 @@ async fn handle_stream(
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
trace!( debug!(
"new stream created for client id {:?}: (stream uuid {:?}) {:?} {:?}", "new stream created for client id {:?}: (stream uuid {:?}) {:?} {:?}",
id, id, uuid, requested_stream, resolved_stream
uuid,
requested_stream,
resolved_stream
); );
if let Some(client) = CLIENTS.get(&id) { if let Some(client) = CLIENTS.get(&id) {
@ -205,7 +204,7 @@ async fn handle_stream(
x = event.listen() => x, x = event.listen() => x,
}; };
trace!("stream uuid {:?} disconnected for client id {:?}", uuid, id); debug!("stream uuid {:?} disconnected for client id {:?}", uuid, id);
if let Some(client) = CLIENTS.get(&id) { if let Some(client) = CLIENTS.get(&id) {
client.0.remove(&uuid); client.0.remove(&uuid);
@ -241,12 +240,13 @@ pub async fn handle_wisp(stream: WispResult, id: String) -> anyhow::Result<()> {
let mux = Arc::new(mux); let mux = Arc::new(mux);
debug!( debug!(
"new wisp client id {:?} connected with extensions {:?}", "new wisp client id {:?} connected with extensions {:?}, downgraded {:?}",
id, id,
mux.supported_extensions mux.supported_extensions
.iter() .iter()
.map(|x| x.get_id()) .map(|x| x.get_id())
.collect::<Vec<_>>() .collect::<Vec<_>>(),
mux.downgraded
); );
let mut set: JoinSet<()> = JoinSet::new(); let mut set: JoinSet<()> = JoinSet::new();
@ -254,14 +254,20 @@ pub async fn handle_wisp(stream: WispResult, id: String) -> anyhow::Result<()> {
let mux_id = id.clone(); let mux_id = id.clone();
set.spawn(tokio::task::unconstrained(fut.map(move |x| { set.spawn(tokio::task::unconstrained(fut.map(move |x| {
trace!("wisp client id {:?} multiplexor result {:?}", mux_id, x) debug!("wisp client id {:?} multiplexor result {:?}", mux_id, x)
}))); })));
let ping_mux = mux.clone(); let ping_mux = mux.clone();
let ping_event = event.clone(); let ping_event = event.clone();
let ping_id = id.clone();
set.spawn(async move { set.spawn(async move {
let mut interval = interval(Duration::from_secs(30)); let mut interval = interval(Duration::from_secs(30));
while ping_mux.send_ping(Payload::Bytes(BytesMut::new())).await.is_ok() { while ping_mux
.send_ping(Payload::Bytes(BytesMut::new()))
.await
.is_ok()
{
trace!("sent ping to wisp client id {:?}", ping_id);
select! { select! {
_ = interval.tick() => (), _ = interval.tick() => (),
_ = ping_event.listen() => break, _ = ping_event.listen() => break,
@ -280,7 +286,7 @@ pub async fn handle_wisp(stream: WispResult, id: String) -> anyhow::Result<()> {
)); ));
} }
trace!("shutting down wisp client id {:?}", id); debug!("shutting down wisp client id {:?}", id);
let _ = mux.close().await; let _ = mux.close().await;
event.notify(usize::MAX); event.notify(usize::MAX);

View file

@ -1,7 +1,7 @@
#![feature(ip)] #![feature(ip)]
#![deny(clippy::todo)] #![deny(clippy::todo)]
use std::{collections::HashMap, fs::read_to_string, net::IpAddr}; use std::{fs::read_to_string, net::IpAddr};
use anyhow::Context; use anyhow::Context;
use clap::Parser; use clap::Parser;
@ -15,24 +15,26 @@ use hickory_resolver::{
}; };
use lazy_static::lazy_static; use lazy_static::lazy_static;
use listener::ServerListener; use listener::ServerListener;
use log::{error, info, warn}; use log::{error, info, trace, warn};
use route::{route_stats, ServerRouteResult}; use route::{route_stats, ServerRouteResult};
use serde::Serialize; use stats::generate_stats;
use tokio::{ use tokio::{
runtime, runtime,
signal::unix::{signal, SignalKind}, signal::unix::{signal, SignalKind},
}; };
use uuid::Uuid; use uuid::Uuid;
use wisp_mux::{ConnectPacket, StreamType}; use wisp_mux::ConnectPacket;
mod config; mod config;
mod handle; mod handle;
mod listener; mod listener;
mod route; mod route;
mod stats;
mod stream; mod stream;
type Client = (DashMap<Uuid, (ConnectPacket, ConnectPacket)>, bool); type Client = (DashMap<Uuid, (ConnectPacket, ConnectPacket)>, bool);
#[derive(Debug)]
pub enum Resolver { pub enum Resolver {
Hickory(TokioAsyncResolver), Hickory(TokioAsyncResolver),
System, System,
@ -93,118 +95,6 @@ lazy_static! {
}; };
} }
fn format_stream_type(stream_type: StreamType) -> &'static str {
match stream_type {
StreamType::Tcp => "tcp",
StreamType::Udp => "udp",
#[cfg(feature = "twisp")]
StreamType::Unknown(crate::handle::wisp::twisp::STREAM_TYPE) => "twisp",
StreamType::Unknown(_) => unreachable!(),
}
}
#[derive(Serialize)]
struct MemoryStats {
active: f64,
allocated: f64,
mapped: f64,
metadata: f64,
resident: f64,
retained: f64,
}
#[derive(Serialize)]
struct StreamStats {
stream_type: String,
requested: String,
resolved: String,
}
impl From<(ConnectPacket, ConnectPacket)> for StreamStats {
fn from(value: (ConnectPacket, ConnectPacket)) -> Self {
Self {
stream_type: format_stream_type(value.0.stream_type).to_string(),
requested: format!(
"{}:{}",
value.0.destination_hostname, value.0.destination_port
),
resolved: format!(
"{}:{}",
value.1.destination_hostname, value.1.destination_port
),
}
}
}
#[derive(Serialize)]
struct ClientStats {
wsproxy: bool,
streams: HashMap<String, StreamStats>,
}
#[derive(Serialize)]
struct ServerStats {
config: String,
clients: HashMap<String, ClientStats>,
memory: MemoryStats,
}
fn generate_stats() -> anyhow::Result<String> {
use tikv_jemalloc_ctl::stats::{active, allocated, mapped, metadata, resident, retained};
tikv_jemalloc_ctl::epoch::advance()?;
let memory = MemoryStats {
active: active::read()? as f64 / (1024 * 1024) as f64,
allocated: allocated::read()? as f64 / (1024 * 1024) as f64,
mapped: mapped::read()? as f64 / (1024 * 1024) as f64,
metadata: metadata::read()? as f64 / (1024 * 1024) as f64,
resident: resident::read()? as f64 / (1024 * 1024) as f64,
retained: retained::read()? as f64 / (1024 * 1024) as f64,
};
let clients = CLIENTS
.iter()
.map(|x| {
(
x.key().to_string(),
ClientStats {
wsproxy: x.value().1,
streams: x
.value()
.0
.iter()
.map(|x| (x.key().to_string(), StreamStats::from(x.value().clone())))
.collect(),
},
)
})
.collect();
let stats = ServerStats {
config: CONFIG.ser()?,
clients,
memory,
};
Ok(serde_json::to_string_pretty(&stats)?)
}
fn handle_stream(stream: ServerRouteResult, id: String) {
tokio::spawn(async move {
CLIENTS.insert(id.clone(), (DashMap::new(), false));
let res = match stream {
ServerRouteResult::Wisp(stream) => handle_wisp(stream, id.clone()).await,
ServerRouteResult::WsProxy(ws, path, udp) => {
handle_wsproxy(ws, id.clone(), path, udp).await
}
};
if let Err(e) = res {
error!("error while handling client: {:?}", e);
}
CLIENTS.remove(&id)
});
}
#[global_allocator] #[global_allocator]
static JEMALLOCATOR: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; static JEMALLOCATOR: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
@ -235,10 +125,17 @@ fn main() -> anyhow::Result<()> {
CONFIG.server.bind, CONFIG.server.runtime, CONFIG.server.transport CONFIG.server.bind, CONFIG.server.runtime, CONFIG.server.transport
); );
trace!("CLI: {:#?}", &*CLI);
trace!("CONFIG: {:#?}", &*CONFIG);
trace!("RESOLVER: {:?}", &*RESOLVER);
tokio::spawn(async { tokio::spawn(async {
let mut sig = signal(SignalKind::user_defined1()).unwrap(); let mut sig = signal(SignalKind::user_defined1()).unwrap();
while sig.recv().await.is_some() { while sig.recv().await.is_some() {
info!("Stats:\n{}", generate_stats().unwrap()); match generate_stats() {
Ok(stats) => info!("Stats:\n{}", stats),
Err(err) => error!("error while creating stats {:?}", err),
}
} }
}); });
@ -287,6 +184,8 @@ fn main() -> anyhow::Result<()> {
} else { } else {
client_id client_id
}; };
trace!("routed {:?}: {}", client_id, stream);
handle_stream(stream, client_id) handle_stream(stream, client_id)
}) })
.await; .await;
@ -301,3 +200,19 @@ fn main() -> anyhow::Result<()> {
} }
}) })
} }
fn handle_stream(stream: ServerRouteResult, id: String) {
tokio::spawn(async move {
CLIENTS.insert(id.clone(), (DashMap::new(), false));
let res = match stream {
ServerRouteResult::Wisp(stream) => handle_wisp(stream, id.clone()).await,
ServerRouteResult::WsProxy(ws, path, udp) => {
handle_wsproxy(ws, id.clone(), path, udp).await
}
};
if let Err(e) = res {
error!("error while handling client: {:?}", e);
}
CLIENTS.remove(&id)
});
}

View file

@ -1,4 +1,4 @@
use std::{future::Future, io::Cursor}; use std::{fmt::Display, future::Future, io::Cursor};
use anyhow::Context; use anyhow::Context;
use bytes::Bytes; use bytes::Bytes;
@ -9,7 +9,7 @@ use hyper::{
Response, StatusCode, Response, StatusCode,
}; };
use hyper_util::rt::TokioIo; use hyper_util::rt::TokioIo;
use log::{debug, error}; use log::{debug, error, trace};
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use wisp_mux::{ use wisp_mux::{
@ -26,11 +26,10 @@ use crate::{
}; };
type Body = Full<Bytes>; type Body = Full<Bytes>;
fn non_ws_resp() -> Response<Body> { fn non_ws_resp() -> anyhow::Result<Response<Body>> {
Response::builder() Ok(Response::builder()
.status(StatusCode::OK) .status(StatusCode::OK)
.body(Body::new(CONFIG.server.non_ws_response.as_bytes().into())) .body(Body::new(CONFIG.server.non_ws_response.as_bytes().into()))?)
.unwrap()
} }
fn send_stats() -> anyhow::Result<Response<Body>> { fn send_stats() -> anyhow::Result<Response<Body>> {
@ -39,21 +38,22 @@ fn send_stats() -> anyhow::Result<Response<Body>> {
debug!("sent server stats to http client"); debug!("sent server stats to http client");
Ok(Response::builder() Ok(Response::builder()
.status(StatusCode::OK) .status(StatusCode::OK)
.body(Body::new(x.into())) .body(Body::new(x.into()))?)
.unwrap())
} }
Err(x) => { Err(x) => {
error!("failed to send stats to http client: {:?}", x); error!("failed to send stats to http client: {:?}", x);
Ok(Response::builder() Ok(Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR) .status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::new(x.to_string().into())) .body(Body::new(x.to_string().into()))?)
.unwrap())
} }
} }
} }
fn get_header(headers: &HeaderMap, header: &str) -> Option<String> { fn get_header(headers: &HeaderMap, header: &str) -> Option<String> {
headers.get(header).and_then(|x| x.to_str().ok()).map(|x| x.to_string()) headers
.get(header)
.and_then(|x| x.to_str().ok())
.map(|x| x.to_string())
} }
enum HttpUpgradeResult { enum HttpUpgradeResult {
@ -78,14 +78,16 @@ where
return send_stats(); return send_stats();
} else { } else {
debug!("sent non_ws_response to http client"); debug!("sent non_ws_response to http client");
return Ok(non_ws_resp()); return non_ws_resp();
} }
} else { } else {
debug!("sent non_ws_response to http client"); debug!("sent non_ws_response to http client");
return Ok(non_ws_resp()); return non_ws_resp();
} }
} }
trace!("recieved request {:?}", req);
let (resp, fut) = fastwebsockets::upgrade::upgrade(&mut req)?; let (resp, fut) = fastwebsockets::upgrade::upgrade(&mut req)?;
// replace body of Empty<Bytes> with Full<Bytes> // replace body of Empty<Bytes> with Full<Bytes>
let resp = Response::from_parts(resp.into_parts().0, Body::new(Bytes::new())); let resp = Response::from_parts(resp.into_parts().0, Body::new(Bytes::new()));
@ -122,7 +124,7 @@ where
}); });
} else { } else {
debug!("sent non_ws_response to http client"); debug!("sent non_ws_response to http client");
return Ok(non_ws_resp()); return non_ws_resp();
} }
Ok(resp) Ok(resp)
@ -223,3 +225,12 @@ pub enum ServerRouteResult {
Wisp(WispResult), Wisp(WispResult),
WsProxy(WebSocketStreamWrapper, String, bool), WsProxy(WebSocketStreamWrapper, String, bool),
} }
impl Display for ServerRouteResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Wisp(_) => write!(f, "Wisp"),
Self::WsProxy(_, path, udp) => write!(f, "WsProxy path {:?} udp {:?}", path, udp),
}
}
}

102
server/src/stats.rs Normal file
View file

@ -0,0 +1,102 @@
use std::collections::HashMap;
use serde::Serialize;
use wisp_mux::{ConnectPacket, StreamType};
use crate::{CLIENTS, CONFIG};
fn format_stream_type(stream_type: StreamType) -> &'static str {
match stream_type {
StreamType::Tcp => "tcp",
StreamType::Udp => "udp",
#[cfg(feature = "twisp")]
StreamType::Unknown(crate::handle::wisp::twisp::STREAM_TYPE) => "twisp",
StreamType::Unknown(_) => unreachable!(),
}
}
#[derive(Serialize)]
struct MemoryStats {
active: f64,
allocated: f64,
mapped: f64,
metadata: f64,
resident: f64,
retained: f64,
}
#[derive(Serialize)]
struct StreamStats {
stream_type: String,
requested: String,
resolved: String,
}
impl From<(ConnectPacket, ConnectPacket)> for StreamStats {
fn from(value: (ConnectPacket, ConnectPacket)) -> Self {
Self {
stream_type: format_stream_type(value.0.stream_type).to_string(),
requested: format!(
"{}:{}",
value.0.destination_hostname, value.0.destination_port
),
resolved: format!(
"{}:{}",
value.1.destination_hostname, value.1.destination_port
),
}
}
}
#[derive(Serialize)]
struct ClientStats {
wsproxy: bool,
streams: HashMap<String, StreamStats>,
}
#[derive(Serialize)]
struct ServerStats {
config: String,
clients: HashMap<String, ClientStats>,
memory: MemoryStats,
}
pub fn generate_stats() -> anyhow::Result<String> {
use tikv_jemalloc_ctl::stats::{active, allocated, mapped, metadata, resident, retained};
tikv_jemalloc_ctl::epoch::advance()?;
let memory = MemoryStats {
active: active::read()? as f64 / (1024 * 1024) as f64,
allocated: allocated::read()? as f64 / (1024 * 1024) as f64,
mapped: mapped::read()? as f64 / (1024 * 1024) as f64,
metadata: metadata::read()? as f64 / (1024 * 1024) as f64,
resident: resident::read()? as f64 / (1024 * 1024) as f64,
retained: retained::read()? as f64 / (1024 * 1024) as f64,
};
let clients = CLIENTS
.iter()
.map(|x| {
(
x.key().to_string(),
ClientStats {
wsproxy: x.value().1,
streams: x
.value()
.0
.iter()
.map(|x| (x.key().to_string(), StreamStats::from(x.value().clone())))
.collect(),
},
)
})
.collect();
let stats = ServerStats {
config: CONFIG.ser()?,
clients,
memory,
};
Ok(serde_json::to_string_pretty(&stats)?)
}