remove dashmap

This commit is contained in:
Toshit Chawda 2024-11-24 17:02:26 -08:00
parent 1b7f5a10c0
commit 3cef68d164
No known key found for this signature in database
GPG key ID: 91480ED99E2B3D9D
7 changed files with 55 additions and 60 deletions

21
Cargo.lock generated
View file

@ -546,20 +546,6 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "dashmap"
version = "6.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
dependencies = [
"cfg-if",
"crossbeam-utils",
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]] [[package]]
name = "data-encoding" name = "data-encoding"
version = "2.6.0" version = "2.6.0"
@ -741,7 +727,6 @@ dependencies = [
"cfg-if", "cfg-if",
"clap", "clap",
"console-subscriber", "console-subscriber",
"dashmap",
"ed25519-dalek", "ed25519-dalek",
"env_logger", "env_logger",
"event-listener", "event-listener",
@ -1037,12 +1022,6 @@ version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]]
name = "hashbrown"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
[[package]] [[package]]
name = "hashbrown" name = "hashbrown"
version = "0.15.0" version = "0.15.0"

View file

@ -11,7 +11,6 @@ bytes = "1.7.1"
cfg-if = "1.0.0" cfg-if = "1.0.0"
clap = { version = "4.5.16", features = ["cargo", "derive"] } clap = { version = "4.5.16", features = ["cargo", "derive"] }
console-subscriber = { version = "0.4.1", optional = true } console-subscriber = { version = "0.4.1", optional = true }
dashmap = "6.0.1"
ed25519-dalek = { version = "2.1.1", features = ["pem"] } ed25519-dalek = { version = "2.1.1", features = ["pem"] }
env_logger = "0.11.5" env_logger = "0.11.5"
event-listener = "5.3.1" event-listener = "5.3.1"

View file

@ -132,8 +132,12 @@ async fn handle_stream(
id, uuid, requested_stream, resolved_stream id, uuid, requested_stream, resolved_stream
); );
if let Some(client) = CLIENTS.get(&id) { if let Some(client) = CLIENTS.lock().await.get(&id) {
client.0.insert(uuid, (requested_stream, resolved_stream)); client
.0
.lock()
.await
.insert(uuid, (requested_stream, resolved_stream));
} }
let forward_fut = async { let forward_fut = async {
@ -227,8 +231,8 @@ async fn handle_stream(
debug!("stream uuid {:?} disconnected for client id {:?}", uuid, id); debug!("stream uuid {:?} disconnected for client id {:?}", uuid, id);
if let Some(client) = CLIENTS.get(&id) { if let Some(client) = CLIENTS.lock().await.get(&id) {
client.0.remove(&uuid); client.0.lock().await.remove(&uuid);
} }
} }

View file

@ -90,11 +90,13 @@ pub async fn handle_wsproxy(
id, uuid, requested_stream, resolved_stream id, uuid, requested_stream, resolved_stream
); );
CLIENTS if let Some(client) = CLIENTS.lock().await.get(&id) {
.get(&id) client
.unwrap() .0
.0 .lock()
.insert(uuid, (requested_stream, resolved_stream)); .await
.insert(uuid, (requested_stream, resolved_stream));
}
match stream { match stream {
ClientStream::Tcp(stream) => { ClientStream::Tcp(stream) => {
@ -184,5 +186,9 @@ pub async fn handle_wsproxy(
id, uuid id, uuid
); );
if let Some(client) = CLIENTS.lock().await.get(&id) {
client.0.lock().await.remove(&uuid);
}
Ok(()) Ok(())
} }

View file

@ -2,12 +2,11 @@
#![deny(clippy::todo)] #![deny(clippy::todo)]
#![allow(unexpected_cfgs)] #![allow(unexpected_cfgs)]
use std::{fs::read_to_string, net::IpAddr}; use std::{collections::HashMap, fs::read_to_string, net::IpAddr};
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use clap::Parser; use clap::Parser;
use config::{validate_config_cache, Cli, Config, RuntimeFlavor}; use config::{validate_config_cache, Cli, Config, RuntimeFlavor};
use dashmap::DashMap;
use handle::{handle_wisp, handle_wsproxy}; use handle::{handle_wisp, handle_wsproxy};
use hickory_resolver::{ use hickory_resolver::{
config::{NameServerConfigGroup, ResolverConfig, ResolverOpts}, config::{NameServerConfigGroup, ResolverConfig, ResolverOpts},
@ -22,6 +21,7 @@ use stats::generate_stats;
use tokio::{ use tokio::{
runtime, runtime,
signal::unix::{signal, SignalKind}, signal::unix::{signal, SignalKind},
sync::Mutex,
}; };
use uuid::Uuid; use uuid::Uuid;
use wisp_mux::ConnectPacket; use wisp_mux::ConnectPacket;
@ -41,7 +41,7 @@ mod stream;
mod util_chain; mod util_chain;
#[doc(hidden)] #[doc(hidden)]
type Client = (DashMap<Uuid, (ConnectPacket, ConnectPacket)>, bool); type Client = (Mutex<HashMap<Uuid, (ConnectPacket, ConnectPacket)>>, bool);
#[doc(hidden)] #[doc(hidden)]
#[derive(Debug)] #[derive(Debug)]
@ -86,7 +86,7 @@ lazy_static! {
} }
}; };
#[doc(hidden)] #[doc(hidden)]
pub static ref CLIENTS: DashMap<String, Client> = DashMap::new(); pub static ref CLIENTS: Mutex<HashMap<String, Client>> = Mutex::new(HashMap::new());
#[doc(hidden)] #[doc(hidden)]
pub static ref RESOLVER: Resolver = { pub static ref RESOLVER: Resolver = {
if CONFIG.stream.dns_servers.is_empty() { if CONFIG.stream.dns_servers.is_empty() {
@ -160,7 +160,7 @@ async fn async_main() -> Result<()> {
tokio::spawn(async { tokio::spawn(async {
let mut sig = signal(SignalKind::user_defined1()).unwrap(); let mut sig = signal(SignalKind::user_defined1()).unwrap();
while sig.recv().await.is_some() { while sig.recv().await.is_some() {
match generate_stats() { match generate_stats().await {
Ok(stats) => info!("Stats:\n{}", stats), Ok(stats) => info!("Stats:\n{}", stats),
Err(err) => error!("error while creating stats {:?}", err), Err(err) => error!("error while creating stats {:?}", err),
} }
@ -234,7 +234,10 @@ async fn async_main() -> Result<()> {
#[doc(hidden)] #[doc(hidden)]
fn handle_stream(stream: ServerRouteResult, id: String) { fn handle_stream(stream: ServerRouteResult, id: String) {
tokio::spawn(async move { tokio::spawn(async move {
CLIENTS.insert(id.clone(), (DashMap::new(), false)); CLIENTS
.lock()
.await
.insert(id.clone(), (Mutex::new(HashMap::new()), false));
let res = match stream { let res = match stream {
ServerRouteResult::Wisp(stream, is_v2) => handle_wisp(stream, is_v2, id.clone()).await, ServerRouteResult::Wisp(stream, is_v2) => handle_wisp(stream, is_v2, id.clone()).await,
ServerRouteResult::WsProxy(ws, path, udp) => { ServerRouteResult::WsProxy(ws, path, udp) => {
@ -244,6 +247,6 @@ fn handle_stream(stream: ServerRouteResult, id: String) {
if let Err(e) = res { if let Err(e) = res {
error!("error while handling client: {:?}", e); error!("error while handling client: {:?}", e);
} }
CLIENTS.remove(&id) CLIENTS.lock().await.remove(&id)
}); });
} }

View file

@ -56,8 +56,8 @@ fn non_ws_resp() -> anyhow::Result<Response<Body>> {
.body(Body::new(CONFIG.server.non_ws_response.as_bytes().into()))?) .body(Body::new(CONFIG.server.non_ws_response.as_bytes().into()))?)
} }
fn send_stats() -> anyhow::Result<Response<Body>> { async fn send_stats() -> anyhow::Result<Response<Body>> {
match generate_stats() { match generate_stats().await {
Ok(x) => { Ok(x) => {
debug!("sent server stats to http client"); debug!("sent server stats to http client");
Ok(Response::builder() Ok(Response::builder()
@ -99,7 +99,7 @@ where
if !is_upgrade { if !is_upgrade {
if let Some(stats_endpoint) = stats_endpoint { if let Some(stats_endpoint) = stats_endpoint {
if req.uri().path() == stats_endpoint { if req.uri().path() == stats_endpoint {
return send_stats(); return send_stats().await;
} else { } else {
debug!("sent non_ws_response to http client"); debug!("sent non_ws_response to http client");
return non_ws_resp(); return non_ws_resp();
@ -159,7 +159,7 @@ where
pub async fn route_stats(stream: ServerStream) -> anyhow::Result<()> { pub async fn route_stats(stream: ServerStream) -> anyhow::Result<()> {
let stream = TokioIo::new(stream); let stream = TokioIo::new(stream);
Builder::new() Builder::new()
.serve_connection(stream, service_fn(move |_| async { send_stats() })) .serve_connection(stream, service_fn(move |_| async { send_stats().await }))
.await?; .await?;
Ok(()) Ok(())
} }

View file

@ -61,7 +61,7 @@ struct ServerStats {
memory: MemoryStats, memory: MemoryStats,
} }
pub fn generate_stats() -> anyhow::Result<String> { pub async fn generate_stats() -> anyhow::Result<String> {
use tikv_jemalloc_ctl::stats::{active, allocated, mapped, metadata, resident, retained}; use tikv_jemalloc_ctl::stats::{active, allocated, mapped, metadata, resident, retained};
tikv_jemalloc_ctl::epoch::advance()?; tikv_jemalloc_ctl::epoch::advance()?;
@ -74,23 +74,27 @@ pub fn generate_stats() -> anyhow::Result<String> {
retained: retained::read()? as f64 / (1024 * 1024) as f64, retained: retained::read()? as f64 / (1024 * 1024) as f64,
}; };
let clients = CLIENTS let clients_locked = CLIENTS.lock().await;
.iter()
.map(|x| { let mut clients = HashMap::with_capacity(clients_locked.len());
( for client in clients_locked.iter() {
x.key().to_string(), clients.insert(
ClientStats { client.0.to_string(),
wsproxy: x.value().1, ClientStats {
streams: x wsproxy: client.1 .1,
.value() streams: client
.0 .1
.iter() .0
.map(|x| (x.key().to_string(), StreamStats::from(x.value().clone()))) .lock()
.collect(), .await
}, .iter()
) .map(|x| (x.0.to_string(), StreamStats::from(x.1.clone())))
}) .collect(),
.collect(); },
);
}
drop(clients_locked);
let stats = ServerStats { let stats = ServerStats {
config: CONFIG.ser()?, config: CONFIG.ser()?,