move speed limits to be global across streams

This commit is contained in:
Toshit Chawda 2024-11-22 22:02:26 -08:00
parent 35431c3a41
commit a554a5a761
No known key found for this signature in database
GPG key ID: 91480ED99E2B3D9D
2 changed files with 45 additions and 25 deletions

View file

@ -159,6 +159,13 @@ pub struct WispConfig {
/// Wisp version 2 authentication extension advertised. /// Wisp version 2 authentication extension advertised.
pub auth_extension: Option<ProtocolExtensionAuth>, pub auth_extension: Option<ProtocolExtensionAuth>,
#[cfg(feature = "speed-limit")]
/// Read limit in bytes/second for all streams.
pub read_limit: f64,
#[cfg(feature = "speed-limit")]
/// Write limit in bytes/second for all streams.
pub write_limit: f64,
#[serde(skip_serializing_if = "HashMap::is_empty")] #[serde(skip_serializing_if = "HashMap::is_empty")]
/// Wisp version 2 password authentication extension username/passwords. /// Wisp version 2 password authentication extension username/passwords.
pub password_extension_users: HashMap<String, String>, pub password_extension_users: HashMap<String, String>,
@ -181,13 +188,6 @@ pub struct StreamConfig {
/// Buffer size of reads from TCP sockets. /// Buffer size of reads from TCP sockets.
pub buffer_size: usize, pub buffer_size: usize,
#[cfg(feature = "speed-limit")]
/// Remote stream read limit in bytes/second.
pub read_limit: f64,
#[cfg(feature = "speed-limit")]
/// Remote stream write limit in bytes/second.
pub write_limit: f64,
/// Whether or not to allow Wisp clients to create UDP streams. /// Whether or not to allow Wisp clients to create UDP streams.
pub allow_udp: bool, pub allow_udp: bool,
/// Whether or not to enable nonstandard legacy wsproxy UDP streams. /// Whether or not to enable nonstandard legacy wsproxy UDP streams.
@ -338,6 +338,11 @@ impl Default for WispConfig {
allow_wsproxy: true, allow_wsproxy: true,
prefix: String::new(), prefix: String::new(),
#[cfg(feature = "speed-limit")]
read_limit: f64::INFINITY,
#[cfg(feature = "speed-limit")]
write_limit: f64::INFINITY,
wisp_v2: true, wisp_v2: true,
extensions: vec![ProtocolExtension::Udp, ProtocolExtension::Motd], extensions: vec![ProtocolExtension::Udp, ProtocolExtension::Motd],
auth_extension: None, auth_extension: None,
@ -414,11 +419,6 @@ impl Default for StreamConfig {
tcp_nodelay: false, tcp_nodelay: false,
buffer_size: 16384, buffer_size: 16384,
#[cfg(feature = "speed-limit")]
read_limit: f64::INFINITY,
#[cfg(feature = "speed-limit")]
write_limit: f64::INFINITY,
allow_udp: true, allow_udp: true,
allow_wsproxy_udp: false, allow_wsproxy_udp: false,
#[cfg(feature = "twisp")] #[cfg(feature = "twisp")]

View file

@ -33,12 +33,10 @@ use crate::{
async fn copy_read_fast( async fn copy_read_fast(
muxrx: MuxStreamAsyncRead, muxrx: MuxStreamAsyncRead,
mut tcptx: OwnedWriteHalf, mut tcptx: OwnedWriteHalf,
#[cfg(feature = "speed-limit")] limiter: async_speed_limit::Limiter<
async_speed_limit::clock::StandardClock,
>,
) -> std::io::Result<()> { ) -> std::io::Result<()> {
#[cfg(feature = "speed-limit")]
let limiter = async_speed_limit::Limiter::builder(CONFIG.stream.write_limit)
.refill(Duration::from_secs(1))
.clock(async_speed_limit::clock::StandardClock)
.build();
let mut muxrx = muxrx.compat(); let mut muxrx = muxrx.compat();
loop { loop {
let buf = muxrx.fill_buf().await?; let buf = muxrx.fill_buf().await?;
@ -59,12 +57,13 @@ async fn copy_read_fast(
} }
} }
async fn copy_write_fast(muxtx: MuxStreamWrite, tcprx: OwnedReadHalf) -> anyhow::Result<()> { async fn copy_write_fast(
#[cfg(feature = "speed-limit")] muxtx: MuxStreamWrite,
let limiter = async_speed_limit::Limiter::builder(CONFIG.stream.read_limit) tcprx: OwnedReadHalf,
.refill(Duration::from_secs(1)) #[cfg(feature = "speed-limit")] limiter: async_speed_limit::Limiter<
.clock(async_speed_limit::clock::StandardClock) async_speed_limit::clock::StandardClock,
.build(); >,
) -> anyhow::Result<()> {
let mut tcprx = BufReader::with_capacity(CONFIG.stream.buffer_size, tcprx); let mut tcprx = BufReader::with_capacity(CONFIG.stream.buffer_size, tcprx);
loop { loop {
let buf = tcprx.fill_buf().await?; let buf = tcprx.fill_buf().await?;
@ -88,6 +87,12 @@ async fn handle_stream(
id: String, id: String,
event: Arc<Event>, event: Arc<Event>,
#[cfg(feature = "twisp")] twisp_map: twisp::TwispMap, #[cfg(feature = "twisp")] twisp_map: twisp::TwispMap,
#[cfg(feature = "speed-limit")] read_limit: async_speed_limit::Limiter<
async_speed_limit::clock::StandardClock,
>,
#[cfg(feature = "speed-limit")] write_limit: async_speed_limit::Limiter<
async_speed_limit::clock::StandardClock,
>,
) { ) {
let requested_stream = connect.clone(); let requested_stream = connect.clone();
@ -141,8 +146,8 @@ async fn handle_stream(
let muxread = muxread.into_stream().into_asyncread(); let muxread = muxread.into_stream().into_asyncread();
let (tcpread, tcpwrite) = stream.into_split(); let (tcpread, tcpwrite) = stream.into_split();
select! { select! {
x = copy_read_fast(muxread, tcpwrite) => x?, x = copy_read_fast(muxread, tcpwrite, #[cfg(feature = "speed-limit")] write_limit) => x?,
x = copy_write_fast(muxwrite, tcpread) => x?, x = copy_write_fast(muxwrite, tcpread, #[cfg(feature = "speed-limit")] read_limit) => x?,
} }
Ok(()) Ok(())
} }
@ -248,6 +253,17 @@ pub async fn handle_wisp(stream: WispResult, is_v2: bool, id: String) -> anyhow:
} }
} }
#[cfg(feature = "speed-limit")]
let read_limiter = async_speed_limit::Limiter::builder(CONFIG.wisp.read_limit)
.refill(Duration::from_secs(1))
.clock(async_speed_limit::clock::StandardClock)
.build();
#[cfg(feature = "speed-limit")]
let write_limiter = async_speed_limit::Limiter::builder(CONFIG.wisp.write_limit)
.refill(Duration::from_secs(1))
.clock(async_speed_limit::clock::StandardClock)
.build();
let (mux, fut) = ServerMux::create( let (mux, fut) = ServerMux::create(
read, read,
write, write,
@ -304,6 +320,10 @@ pub async fn handle_wisp(stream: WispResult, is_v2: bool, id: String) -> anyhow:
event.clone(), event.clone(),
#[cfg(feature = "twisp")] #[cfg(feature = "twisp")]
twisp_map.clone(), twisp_map.clone(),
#[cfg(feature = "speed-limit")]
read_limiter.clone(),
#[cfg(feature = "speed-limit")]
write_limiter.clone(),
)); ));
} }