From 35431c3a41f8cc928c7f5defc995a1b30d2f7f89 Mon Sep 17 00:00:00 2001 From: Toshit Chawda Date: Fri, 22 Nov 2024 21:41:03 -0800 Subject: [PATCH] add speed limits --- Cargo.lock | 19 +++++++++++++++++++ server/Cargo.toml | 2 ++ server/src/config.rs | 12 ++++++++++++ server/src/handle/wisp/mod.rs | 16 ++++++++++++++++ 4 files changed, 49 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index ad9e295..563e924 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -110,6 +110,18 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-speed-limit" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97c1688bb8e4eb3dcd68a8b0e5a81deae887c67362bb482a902b785e83ac2edc" +dependencies = [ + "futures-core", + "futures-io", + "futures-timer", + "pin-project-lite", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -724,6 +736,7 @@ name = "epoxy-server" version = "2.0.0" dependencies = [ "anyhow", + "async-speed-limit", "async-trait", "bytes", "cfg-if", @@ -932,6 +945,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.31" diff --git a/server/Cargo.toml b/server/Cargo.toml index 58a5e5b..5328d8e 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] anyhow = "1.0.86" +async-speed-limit = { version = "0.4.2", optional = true } async-trait = { version = "0.1.81", optional = true } bytes = "1.7.1" cfg-if = "1.0.0" @@ -47,6 +48,7 @@ yaml = ["dep:serde_yaml"] toml = ["dep:toml"] twisp = ["dep:pty-process", "dep:libc", "dep:async-trait", "dep:shell-words"] +speed-limit = ["dep:async-speed-limit"] [build-dependencies] vergen-git2 = { version = "1.0.0", features = ["rustc"] } diff --git a/server/src/config.rs b/server/src/config.rs index de6e02b..3725eff 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -181,6 +181,13 @@ pub struct StreamConfig { /// Buffer size of reads from TCP sockets. pub buffer_size: usize, + #[cfg(feature = "speed-limit")] + /// Remote stream read limit in bytes/second. + pub read_limit: f64, + #[cfg(feature = "speed-limit")] + /// Remote stream write limit in bytes/second. + pub write_limit: f64, + /// Whether or not to allow Wisp clients to create UDP streams. pub allow_udp: bool, /// Whether or not to enable nonstandard legacy wsproxy UDP streams. @@ -407,6 +414,11 @@ impl Default for StreamConfig { tcp_nodelay: false, buffer_size: 16384, + #[cfg(feature = "speed-limit")] + read_limit: f64::INFINITY, + #[cfg(feature = "speed-limit")] + write_limit: f64::INFINITY, + allow_udp: true, allow_wsproxy_udp: false, #[cfg(feature = "twisp")] diff --git a/server/src/handle/wisp/mod.rs b/server/src/handle/wisp/mod.rs index c48763d..761923c 100644 --- a/server/src/handle/wisp/mod.rs +++ b/server/src/handle/wisp/mod.rs @@ -34,6 +34,11 @@ async fn copy_read_fast( muxrx: MuxStreamAsyncRead, mut tcptx: OwnedWriteHalf, ) -> std::io::Result<()> { + #[cfg(feature = "speed-limit")] + let limiter = async_speed_limit::Limiter::builder(CONFIG.stream.write_limit) + .refill(Duration::from_secs(1)) + .clock(async_speed_limit::clock::StandardClock) + .build(); let mut muxrx = muxrx.compat(); loop { let buf = muxrx.fill_buf().await?; @@ -42,6 +47,9 @@ async fn copy_read_fast( return Ok(()); } + #[cfg(feature = "speed-limit")] + limiter.consume(buf.len()).await; + let i = tcptx.write(buf).await?; if i == 0 { return Err(std::io::ErrorKind::WriteZero.into()); @@ -52,6 +60,11 @@ async fn copy_read_fast( } async fn copy_write_fast(muxtx: MuxStreamWrite, tcprx: OwnedReadHalf) -> anyhow::Result<()> { + #[cfg(feature = "speed-limit")] + let limiter = async_speed_limit::Limiter::builder(CONFIG.stream.read_limit) + .refill(Duration::from_secs(1)) + .clock(async_speed_limit::clock::StandardClock) + .build(); let mut tcprx = BufReader::with_capacity(CONFIG.stream.buffer_size, tcprx); loop { let buf = tcprx.fill_buf().await?; @@ -61,6 +74,9 @@ async fn copy_write_fast(muxtx: MuxStreamWrite, tcprx: OwnedReadHalf) -> anyhow: return Ok(()); } + #[cfg(feature = "speed-limit")] + limiter.consume(buf.len()).await; + muxtx.write(&buf).await?; tcprx.consume(len); }