add speed limits

This commit is contained in:
Toshit Chawda 2024-11-22 21:41:03 -08:00
parent 90936adb2c
commit 35431c3a41
No known key found for this signature in database
GPG key ID: 91480ED99E2B3D9D
4 changed files with 49 additions and 0 deletions

19
Cargo.lock generated
View file

@ -110,6 +110,18 @@ dependencies = [
"pin-project-lite", "pin-project-lite",
] ]
[[package]]
name = "async-speed-limit"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97c1688bb8e4eb3dcd68a8b0e5a81deae887c67362bb482a902b785e83ac2edc"
dependencies = [
"futures-core",
"futures-io",
"futures-timer",
"pin-project-lite",
]
[[package]] [[package]]
name = "async-stream" name = "async-stream"
version = "0.3.6" version = "0.3.6"
@ -724,6 +736,7 @@ name = "epoxy-server"
version = "2.0.0" version = "2.0.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-speed-limit",
"async-trait", "async-trait",
"bytes", "bytes",
"cfg-if", "cfg-if",
@ -932,6 +945,12 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
[[package]]
name = "futures-timer"
version = "3.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24"
[[package]] [[package]]
name = "futures-util" name = "futures-util"
version = "0.3.31" version = "0.3.31"

View file

@ -5,6 +5,7 @@ edition = "2021"
[dependencies] [dependencies]
anyhow = "1.0.86" anyhow = "1.0.86"
async-speed-limit = { version = "0.4.2", optional = true }
async-trait = { version = "0.1.81", optional = true } async-trait = { version = "0.1.81", optional = true }
bytes = "1.7.1" bytes = "1.7.1"
cfg-if = "1.0.0" cfg-if = "1.0.0"
@ -47,6 +48,7 @@ yaml = ["dep:serde_yaml"]
toml = ["dep:toml"] toml = ["dep:toml"]
twisp = ["dep:pty-process", "dep:libc", "dep:async-trait", "dep:shell-words"] twisp = ["dep:pty-process", "dep:libc", "dep:async-trait", "dep:shell-words"]
speed-limit = ["dep:async-speed-limit"]
[build-dependencies] [build-dependencies]
vergen-git2 = { version = "1.0.0", features = ["rustc"] } vergen-git2 = { version = "1.0.0", features = ["rustc"] }

View file

@ -181,6 +181,13 @@ pub struct StreamConfig {
/// Buffer size of reads from TCP sockets. /// Buffer size of reads from TCP sockets.
pub buffer_size: usize, pub buffer_size: usize,
#[cfg(feature = "speed-limit")]
/// Remote stream read limit in bytes/second.
pub read_limit: f64,
#[cfg(feature = "speed-limit")]
/// Remote stream write limit in bytes/second.
pub write_limit: f64,
/// Whether or not to allow Wisp clients to create UDP streams. /// Whether or not to allow Wisp clients to create UDP streams.
pub allow_udp: bool, pub allow_udp: bool,
/// Whether or not to enable nonstandard legacy wsproxy UDP streams. /// Whether or not to enable nonstandard legacy wsproxy UDP streams.
@ -407,6 +414,11 @@ impl Default for StreamConfig {
tcp_nodelay: false, tcp_nodelay: false,
buffer_size: 16384, buffer_size: 16384,
#[cfg(feature = "speed-limit")]
read_limit: f64::INFINITY,
#[cfg(feature = "speed-limit")]
write_limit: f64::INFINITY,
allow_udp: true, allow_udp: true,
allow_wsproxy_udp: false, allow_wsproxy_udp: false,
#[cfg(feature = "twisp")] #[cfg(feature = "twisp")]

View file

@ -34,6 +34,11 @@ async fn copy_read_fast(
muxrx: MuxStreamAsyncRead, muxrx: MuxStreamAsyncRead,
mut tcptx: OwnedWriteHalf, mut tcptx: OwnedWriteHalf,
) -> std::io::Result<()> { ) -> std::io::Result<()> {
#[cfg(feature = "speed-limit")]
let limiter = async_speed_limit::Limiter::builder(CONFIG.stream.write_limit)
.refill(Duration::from_secs(1))
.clock(async_speed_limit::clock::StandardClock)
.build();
let mut muxrx = muxrx.compat(); let mut muxrx = muxrx.compat();
loop { loop {
let buf = muxrx.fill_buf().await?; let buf = muxrx.fill_buf().await?;
@ -42,6 +47,9 @@ async fn copy_read_fast(
return Ok(()); return Ok(());
} }
#[cfg(feature = "speed-limit")]
limiter.consume(buf.len()).await;
let i = tcptx.write(buf).await?; let i = tcptx.write(buf).await?;
if i == 0 { if i == 0 {
return Err(std::io::ErrorKind::WriteZero.into()); return Err(std::io::ErrorKind::WriteZero.into());
@ -52,6 +60,11 @@ async fn copy_read_fast(
} }
async fn copy_write_fast(muxtx: MuxStreamWrite, tcprx: OwnedReadHalf) -> anyhow::Result<()> { async fn copy_write_fast(muxtx: MuxStreamWrite, tcprx: OwnedReadHalf) -> anyhow::Result<()> {
#[cfg(feature = "speed-limit")]
let limiter = async_speed_limit::Limiter::builder(CONFIG.stream.read_limit)
.refill(Duration::from_secs(1))
.clock(async_speed_limit::clock::StandardClock)
.build();
let mut tcprx = BufReader::with_capacity(CONFIG.stream.buffer_size, tcprx); let mut tcprx = BufReader::with_capacity(CONFIG.stream.buffer_size, tcprx);
loop { loop {
let buf = tcprx.fill_buf().await?; let buf = tcprx.fill_buf().await?;
@ -61,6 +74,9 @@ async fn copy_write_fast(muxtx: MuxStreamWrite, tcprx: OwnedReadHalf) -> anyhow:
return Ok(()); return Ok(());
} }
#[cfg(feature = "speed-limit")]
limiter.consume(buf.len()).await;
muxtx.write(&buf).await?; muxtx.write(&buf).await?;
tcprx.consume(len); tcprx.consume(len);
} }