From a554a5a761777f14be177090a8e9ad5134897cde Mon Sep 17 00:00:00 2001 From: Toshit Chawda Date: Fri, 22 Nov 2024 22:02:26 -0800 Subject: [PATCH] move speed limits to be global across streams --- server/src/config.rs | 24 +++++++++--------- server/src/handle/wisp/mod.rs | 46 +++++++++++++++++++++++++---------- 2 files changed, 45 insertions(+), 25 deletions(-) diff --git a/server/src/config.rs b/server/src/config.rs index 3725eff..4ab8a39 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -159,6 +159,13 @@ pub struct WispConfig { /// Wisp version 2 authentication extension advertised. pub auth_extension: Option, + #[cfg(feature = "speed-limit")] + /// Read limit in bytes/second for all streams. + pub read_limit: f64, + #[cfg(feature = "speed-limit")] + /// Write limit in bytes/second for all streams. + pub write_limit: f64, + #[serde(skip_serializing_if = "HashMap::is_empty")] /// Wisp version 2 password authentication extension username/passwords. pub password_extension_users: HashMap, @@ -181,13 +188,6 @@ pub struct StreamConfig { /// Buffer size of reads from TCP sockets. pub buffer_size: usize, - #[cfg(feature = "speed-limit")] - /// Remote stream read limit in bytes/second. - pub read_limit: f64, - #[cfg(feature = "speed-limit")] - /// Remote stream write limit in bytes/second. - pub write_limit: f64, - /// Whether or not to allow Wisp clients to create UDP streams. pub allow_udp: bool, /// Whether or not to enable nonstandard legacy wsproxy UDP streams. @@ -338,6 +338,11 @@ impl Default for WispConfig { allow_wsproxy: true, prefix: String::new(), + #[cfg(feature = "speed-limit")] + read_limit: f64::INFINITY, + #[cfg(feature = "speed-limit")] + write_limit: f64::INFINITY, + wisp_v2: true, extensions: vec![ProtocolExtension::Udp, ProtocolExtension::Motd], auth_extension: None, @@ -414,11 +419,6 @@ impl Default for StreamConfig { tcp_nodelay: false, buffer_size: 16384, - #[cfg(feature = "speed-limit")] - read_limit: f64::INFINITY, - #[cfg(feature = "speed-limit")] - write_limit: f64::INFINITY, - allow_udp: true, allow_wsproxy_udp: false, #[cfg(feature = "twisp")] diff --git a/server/src/handle/wisp/mod.rs b/server/src/handle/wisp/mod.rs index 761923c..b49129e 100644 --- a/server/src/handle/wisp/mod.rs +++ b/server/src/handle/wisp/mod.rs @@ -33,12 +33,10 @@ use crate::{ async fn copy_read_fast( muxrx: MuxStreamAsyncRead, mut tcptx: OwnedWriteHalf, + #[cfg(feature = "speed-limit")] limiter: async_speed_limit::Limiter< + async_speed_limit::clock::StandardClock, + >, ) -> std::io::Result<()> { - #[cfg(feature = "speed-limit")] - let limiter = async_speed_limit::Limiter::builder(CONFIG.stream.write_limit) - .refill(Duration::from_secs(1)) - .clock(async_speed_limit::clock::StandardClock) - .build(); let mut muxrx = muxrx.compat(); loop { let buf = muxrx.fill_buf().await?; @@ -59,12 +57,13 @@ async fn copy_read_fast( } } -async fn copy_write_fast(muxtx: MuxStreamWrite, tcprx: OwnedReadHalf) -> anyhow::Result<()> { - #[cfg(feature = "speed-limit")] - let limiter = async_speed_limit::Limiter::builder(CONFIG.stream.read_limit) - .refill(Duration::from_secs(1)) - .clock(async_speed_limit::clock::StandardClock) - .build(); +async fn copy_write_fast( + muxtx: MuxStreamWrite, + tcprx: OwnedReadHalf, + #[cfg(feature = "speed-limit")] limiter: async_speed_limit::Limiter< + async_speed_limit::clock::StandardClock, + >, +) -> anyhow::Result<()> { let mut tcprx = BufReader::with_capacity(CONFIG.stream.buffer_size, tcprx); loop { let buf = tcprx.fill_buf().await?; @@ -88,6 +87,12 @@ async fn handle_stream( id: String, event: Arc, #[cfg(feature = "twisp")] twisp_map: twisp::TwispMap, + #[cfg(feature = "speed-limit")] read_limit: async_speed_limit::Limiter< + async_speed_limit::clock::StandardClock, + >, + #[cfg(feature = "speed-limit")] write_limit: async_speed_limit::Limiter< + async_speed_limit::clock::StandardClock, + >, ) { let requested_stream = connect.clone(); @@ -141,8 +146,8 @@ async fn handle_stream( let muxread = muxread.into_stream().into_asyncread(); let (tcpread, tcpwrite) = stream.into_split(); select! { - x = copy_read_fast(muxread, tcpwrite) => x?, - x = copy_write_fast(muxwrite, tcpread) => x?, + x = copy_read_fast(muxread, tcpwrite, #[cfg(feature = "speed-limit")] write_limit) => x?, + x = copy_write_fast(muxwrite, tcpread, #[cfg(feature = "speed-limit")] read_limit) => x?, } Ok(()) } @@ -248,6 +253,17 @@ pub async fn handle_wisp(stream: WispResult, is_v2: bool, id: String) -> anyhow: } } + #[cfg(feature = "speed-limit")] + let read_limiter = async_speed_limit::Limiter::builder(CONFIG.wisp.read_limit) + .refill(Duration::from_secs(1)) + .clock(async_speed_limit::clock::StandardClock) + .build(); + #[cfg(feature = "speed-limit")] + let write_limiter = async_speed_limit::Limiter::builder(CONFIG.wisp.write_limit) + .refill(Duration::from_secs(1)) + .clock(async_speed_limit::clock::StandardClock) + .build(); + let (mux, fut) = ServerMux::create( read, write, @@ -304,6 +320,10 @@ pub async fn handle_wisp(stream: WispResult, is_v2: bool, id: String) -> anyhow: event.clone(), #[cfg(feature = "twisp")] twisp_map.clone(), + #[cfg(feature = "speed-limit")] + read_limiter.clone(), + #[cfg(feature = "speed-limit")] + write_limiter.clone(), )); }