From 3cef68d1647abcdc58e43c060d6b04ed8caef121 Mon Sep 17 00:00:00 2001 From: Toshit Chawda Date: Sun, 24 Nov 2024 17:02:26 -0800 Subject: [PATCH] remove dashmap --- Cargo.lock | 21 ------------------ server/Cargo.toml | 1 - server/src/handle/wisp/mod.rs | 12 +++++++---- server/src/handle/wsproxy.rs | 16 +++++++++----- server/src/main.rs | 17 +++++++++------ server/src/route.rs | 8 +++---- server/src/stats.rs | 40 +++++++++++++++++++---------------- 7 files changed, 55 insertions(+), 60 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e01bb2d..48287db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -546,20 +546,6 @@ dependencies = [ "syn", ] -[[package]] -name = "dashmap" -version = "6.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" -dependencies = [ - "cfg-if", - "crossbeam-utils", - "hashbrown 0.14.5", - "lock_api", - "once_cell", - "parking_lot_core", -] - [[package]] name = "data-encoding" version = "2.6.0" @@ -741,7 +727,6 @@ dependencies = [ "cfg-if", "clap", "console-subscriber", - "dashmap", "ed25519-dalek", "env_logger", "event-listener", @@ -1037,12 +1022,6 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" -[[package]] -name = "hashbrown" -version = "0.14.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" - [[package]] name = "hashbrown" version = "0.15.0" diff --git a/server/Cargo.toml b/server/Cargo.toml index 9788719..26ec28e 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -11,7 +11,6 @@ bytes = "1.7.1" cfg-if = "1.0.0" clap = { version = "4.5.16", features = ["cargo", "derive"] } console-subscriber = { version = "0.4.1", optional = true } -dashmap = "6.0.1" ed25519-dalek = { version = "2.1.1", features = ["pem"] } env_logger = "0.11.5" event-listener = "5.3.1" diff --git a/server/src/handle/wisp/mod.rs b/server/src/handle/wisp/mod.rs index 49fd7ef..70810b5 100644 --- a/server/src/handle/wisp/mod.rs +++ b/server/src/handle/wisp/mod.rs @@ -132,8 +132,12 @@ async fn handle_stream( id, uuid, requested_stream, resolved_stream ); - if let Some(client) = CLIENTS.get(&id) { - client.0.insert(uuid, (requested_stream, resolved_stream)); + if let Some(client) = CLIENTS.lock().await.get(&id) { + client + .0 + .lock() + .await + .insert(uuid, (requested_stream, resolved_stream)); } let forward_fut = async { @@ -227,8 +231,8 @@ async fn handle_stream( debug!("stream uuid {:?} disconnected for client id {:?}", uuid, id); - if let Some(client) = CLIENTS.get(&id) { - client.0.remove(&uuid); + if let Some(client) = CLIENTS.lock().await.get(&id) { + client.0.lock().await.remove(&uuid); } } diff --git a/server/src/handle/wsproxy.rs b/server/src/handle/wsproxy.rs index e066f6a..6d2c216 100644 --- a/server/src/handle/wsproxy.rs +++ b/server/src/handle/wsproxy.rs @@ -90,11 +90,13 @@ pub async fn handle_wsproxy( id, uuid, requested_stream, resolved_stream ); - CLIENTS - .get(&id) - .unwrap() - .0 - .insert(uuid, (requested_stream, resolved_stream)); + if let Some(client) = CLIENTS.lock().await.get(&id) { + client + .0 + .lock() + .await + .insert(uuid, (requested_stream, resolved_stream)); + } match stream { ClientStream::Tcp(stream) => { @@ -184,5 +186,9 @@ pub async fn handle_wsproxy( id, uuid ); + if let Some(client) = CLIENTS.lock().await.get(&id) { + client.0.lock().await.remove(&uuid); + } + Ok(()) } diff --git a/server/src/main.rs b/server/src/main.rs index 89a784e..93e0168 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -2,12 +2,11 @@ #![deny(clippy::todo)] #![allow(unexpected_cfgs)] -use std::{fs::read_to_string, net::IpAddr}; +use std::{collections::HashMap, fs::read_to_string, net::IpAddr}; use anyhow::{Context, Result}; use clap::Parser; use config::{validate_config_cache, Cli, Config, RuntimeFlavor}; -use dashmap::DashMap; use handle::{handle_wisp, handle_wsproxy}; use hickory_resolver::{ config::{NameServerConfigGroup, ResolverConfig, ResolverOpts}, @@ -22,6 +21,7 @@ use stats::generate_stats; use tokio::{ runtime, signal::unix::{signal, SignalKind}, + sync::Mutex, }; use uuid::Uuid; use wisp_mux::ConnectPacket; @@ -41,7 +41,7 @@ mod stream; mod util_chain; #[doc(hidden)] -type Client = (DashMap, bool); +type Client = (Mutex>, bool); #[doc(hidden)] #[derive(Debug)] @@ -86,7 +86,7 @@ lazy_static! { } }; #[doc(hidden)] - pub static ref CLIENTS: DashMap = DashMap::new(); + pub static ref CLIENTS: Mutex> = Mutex::new(HashMap::new()); #[doc(hidden)] pub static ref RESOLVER: Resolver = { if CONFIG.stream.dns_servers.is_empty() { @@ -160,7 +160,7 @@ async fn async_main() -> Result<()> { tokio::spawn(async { let mut sig = signal(SignalKind::user_defined1()).unwrap(); while sig.recv().await.is_some() { - match generate_stats() { + match generate_stats().await { Ok(stats) => info!("Stats:\n{}", stats), Err(err) => error!("error while creating stats {:?}", err), } @@ -234,7 +234,10 @@ async fn async_main() -> Result<()> { #[doc(hidden)] fn handle_stream(stream: ServerRouteResult, id: String) { tokio::spawn(async move { - CLIENTS.insert(id.clone(), (DashMap::new(), false)); + CLIENTS + .lock() + .await + .insert(id.clone(), (Mutex::new(HashMap::new()), false)); let res = match stream { ServerRouteResult::Wisp(stream, is_v2) => handle_wisp(stream, is_v2, id.clone()).await, ServerRouteResult::WsProxy(ws, path, udp) => { @@ -244,6 +247,6 @@ fn handle_stream(stream: ServerRouteResult, id: String) { if let Err(e) = res { error!("error while handling client: {:?}", e); } - CLIENTS.remove(&id) + CLIENTS.lock().await.remove(&id) }); } diff --git a/server/src/route.rs b/server/src/route.rs index 8642177..bd83003 100644 --- a/server/src/route.rs +++ b/server/src/route.rs @@ -56,8 +56,8 @@ fn non_ws_resp() -> anyhow::Result> { .body(Body::new(CONFIG.server.non_ws_response.as_bytes().into()))?) } -fn send_stats() -> anyhow::Result> { - match generate_stats() { +async fn send_stats() -> anyhow::Result> { + match generate_stats().await { Ok(x) => { debug!("sent server stats to http client"); Ok(Response::builder() @@ -99,7 +99,7 @@ where if !is_upgrade { if let Some(stats_endpoint) = stats_endpoint { if req.uri().path() == stats_endpoint { - return send_stats(); + return send_stats().await; } else { debug!("sent non_ws_response to http client"); return non_ws_resp(); @@ -159,7 +159,7 @@ where pub async fn route_stats(stream: ServerStream) -> anyhow::Result<()> { let stream = TokioIo::new(stream); Builder::new() - .serve_connection(stream, service_fn(move |_| async { send_stats() })) + .serve_connection(stream, service_fn(move |_| async { send_stats().await })) .await?; Ok(()) } diff --git a/server/src/stats.rs b/server/src/stats.rs index 467c869..c26a93b 100644 --- a/server/src/stats.rs +++ b/server/src/stats.rs @@ -61,7 +61,7 @@ struct ServerStats { memory: MemoryStats, } -pub fn generate_stats() -> anyhow::Result { +pub async fn generate_stats() -> anyhow::Result { use tikv_jemalloc_ctl::stats::{active, allocated, mapped, metadata, resident, retained}; tikv_jemalloc_ctl::epoch::advance()?; @@ -74,23 +74,27 @@ pub fn generate_stats() -> anyhow::Result { retained: retained::read()? as f64 / (1024 * 1024) as f64, }; - let clients = CLIENTS - .iter() - .map(|x| { - ( - x.key().to_string(), - ClientStats { - wsproxy: x.value().1, - streams: x - .value() - .0 - .iter() - .map(|x| (x.key().to_string(), StreamStats::from(x.value().clone()))) - .collect(), - }, - ) - }) - .collect(); + let clients_locked = CLIENTS.lock().await; + + let mut clients = HashMap::with_capacity(clients_locked.len()); + for client in clients_locked.iter() { + clients.insert( + client.0.to_string(), + ClientStats { + wsproxy: client.1 .1, + streams: client + .1 + .0 + .lock() + .await + .iter() + .map(|x| (x.0.to_string(), StreamStats::from(x.1.clone()))) + .collect(), + }, + ); + } + + drop(clients_locked); let stats = ServerStats { config: CONFIG.ser()?,