mirror of
https://github.com/MercuryWorkshop/epoxy-tls.git
synced 2025-05-12 05:50:01 -04:00
clippy pedantic
This commit is contained in:
parent
ed8d22a52f
commit
9e2e2a3056
12 changed files with 279 additions and 228 deletions
|
@ -19,3 +19,5 @@ must_use_candidate = "allow"
|
|||
missing_errors_doc = "allow"
|
||||
module_name_repetitions = "allow"
|
||||
struct_excessive_bools = "allow"
|
||||
missing_fields_in_debug = "allow"
|
||||
case_sensitive_file_extension_comparisons = "allow"
|
||||
|
|
1
clippy.toml
Normal file
1
clippy.toml
Normal file
|
@ -0,0 +1 @@
|
|||
future-size-threshold = 2048
|
|
@ -138,7 +138,7 @@ pub enum ProtocolExtensionAuth {
|
|||
|
||||
#[doc(hidden)]
|
||||
fn default_motd() -> String {
|
||||
format!("epoxy_server ({})", VERSION_STRING)
|
||||
format!("epoxy_server ({VERSION_STRING})")
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
|
@ -196,7 +196,7 @@ pub struct StreamConfig {
|
|||
pub allow_udp: bool,
|
||||
/// Whether or not to enable nonstandard legacy wsproxy UDP streams.
|
||||
pub allow_wsproxy_udp: bool,
|
||||
/// Whether or not to allow TWisp streams.
|
||||
/// Whether or not to allow `TWisp` streams.
|
||||
#[cfg(feature = "twisp")]
|
||||
pub allow_twisp: bool,
|
||||
|
||||
|
@ -519,13 +519,13 @@ impl Config {
|
|||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub fn de(string: String) -> anyhow::Result<Self> {
|
||||
pub fn de(string: &str) -> anyhow::Result<Self> {
|
||||
Ok(match CLI.format {
|
||||
ConfigFormat::Json => serde_json::from_str(&string)?,
|
||||
ConfigFormat::Json => serde_json::from_str(string)?,
|
||||
#[cfg(feature = "toml")]
|
||||
ConfigFormat::Toml => toml::from_str(&string)?,
|
||||
ConfigFormat::Toml => toml::from_str(string)?,
|
||||
#[cfg(feature = "yaml")]
|
||||
ConfigFormat::Yaml => serde_yaml::from_str(&string)?,
|
||||
ConfigFormat::Yaml => serde_yaml::from_str(string)?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -83,6 +83,165 @@ async fn copy_write_fast(
|
|||
}
|
||||
}
|
||||
|
||||
async fn resolve_stream(
|
||||
connect: ConnectPacket,
|
||||
muxstream: &MuxStream<WispStreamWrite>,
|
||||
) -> Option<(ConnectPacket, ConnectPacket, ClientStream)> {
|
||||
let requested_stream = connect.clone();
|
||||
|
||||
let Ok(resolved) = ClientStream::resolve(connect).await else {
|
||||
let _ = muxstream.close(CloseReason::ServerStreamUnreachable).await;
|
||||
return None;
|
||||
};
|
||||
let (stream, resolved_stream) = match resolved {
|
||||
ResolvedPacket::Valid(connect) => {
|
||||
let resolved = connect.clone();
|
||||
let Ok(stream) = ClientStream::connect(connect).await else {
|
||||
let _ = muxstream.close(CloseReason::ServerStreamUnreachable).await;
|
||||
return None;
|
||||
};
|
||||
(stream, resolved)
|
||||
}
|
||||
ResolvedPacket::ValidWispnet(server, connect) => {
|
||||
let resolved = connect.clone();
|
||||
let Ok(stream) = route_wispnet(server, connect).await else {
|
||||
let _ = muxstream.close(CloseReason::ServerStreamUnreachable).await;
|
||||
return None;
|
||||
};
|
||||
(stream, resolved)
|
||||
}
|
||||
ResolvedPacket::NoResolvedAddrs => {
|
||||
let _ = muxstream.close(CloseReason::ServerStreamUnreachable).await;
|
||||
return None;
|
||||
}
|
||||
ResolvedPacket::Blocked => {
|
||||
let _ = muxstream
|
||||
.close(CloseReason::ServerStreamBlockedAddress)
|
||||
.await;
|
||||
return None;
|
||||
}
|
||||
ResolvedPacket::Invalid => {
|
||||
let _ = muxstream.close(CloseReason::ServerStreamInvalidInfo).await;
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
Some((requested_stream, resolved_stream, stream))
|
||||
}
|
||||
|
||||
async fn forward_stream(
|
||||
muxstream: MuxStream<WispStreamWrite>,
|
||||
stream: ClientStream,
|
||||
resolved_stream: ConnectPacket,
|
||||
uuid: Uuid,
|
||||
#[cfg(feature = "twisp")] twisp_map: twisp::TwispMap,
|
||||
#[cfg(feature = "speed-limit")] read_limit: async_speed_limit::Limiter<
|
||||
async_speed_limit::clock::StandardClock,
|
||||
>,
|
||||
#[cfg(feature = "speed-limit")] write_limit: async_speed_limit::Limiter<
|
||||
async_speed_limit::clock::StandardClock,
|
||||
>,
|
||||
) {
|
||||
match stream {
|
||||
ClientStream::Tcp(stream) => {
|
||||
let closer = muxstream.get_close_handle();
|
||||
|
||||
let ret: anyhow::Result<()> = async {
|
||||
let (muxread, muxwrite) = muxstream.into_split();
|
||||
let muxread = muxread.into_stream().into_asyncread();
|
||||
let (tcpread, tcpwrite) = stream.into_split();
|
||||
select! {
|
||||
x = copy_read_fast(muxread, tcpwrite, #[cfg(feature = "speed-limit")] write_limit) => x?,
|
||||
x = copy_write_fast(muxwrite, tcpread, #[cfg(feature = "speed-limit")] read_limit) => x?,
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
.await;
|
||||
|
||||
match ret {
|
||||
Ok(()) => {
|
||||
let _ = closer.close(CloseReason::Voluntary).await;
|
||||
}
|
||||
Err(_) => {
|
||||
let _ = closer.close(CloseReason::Unexpected).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
ClientStream::Udp(stream) => {
|
||||
let closer = muxstream.get_close_handle();
|
||||
|
||||
let ret: anyhow::Result<()> = async move {
|
||||
let mut data = vec![0u8; 65507];
|
||||
loop {
|
||||
select! {
|
||||
size = stream.recv(&mut data) => {
|
||||
let size = size?;
|
||||
muxstream.write(&data[..size]).await?;
|
||||
}
|
||||
data = muxstream.read() => {
|
||||
if let Some(data) = data? {
|
||||
stream.send(&data).await?;
|
||||
} else {
|
||||
break Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
.await;
|
||||
|
||||
match ret {
|
||||
Ok(()) => {
|
||||
let _ = closer.close(CloseReason::Voluntary).await;
|
||||
}
|
||||
Err(_) => {
|
||||
let _ = closer.close(CloseReason::Unexpected).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "twisp")]
|
||||
ClientStream::Pty(cmd, pty) => {
|
||||
let closer = muxstream.get_close_handle();
|
||||
let id = muxstream.stream_id;
|
||||
let (mut rx, mut tx) = muxstream.into_io().into_asyncrw().into_split();
|
||||
|
||||
match twisp::handle_twisp(id, &mut rx, &mut tx, twisp_map.clone(), pty, cmd).await {
|
||||
Ok(()) => {
|
||||
let _ = closer.close(CloseReason::Voluntary).await;
|
||||
}
|
||||
Err(_) => {
|
||||
let _ = closer.close(CloseReason::Unexpected).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
ClientStream::Wispnet(stream, mux_id) => {
|
||||
Box::pin(wispnet::handle_stream(
|
||||
muxstream,
|
||||
stream,
|
||||
mux_id,
|
||||
uuid,
|
||||
resolved_stream,
|
||||
#[cfg(feature = "speed-limit")]
|
||||
read_limit,
|
||||
#[cfg(feature = "speed-limit")]
|
||||
write_limit,
|
||||
))
|
||||
.await;
|
||||
}
|
||||
ClientStream::NoResolvedAddrs => {
|
||||
let _ = muxstream.close(CloseReason::ServerStreamUnreachable).await;
|
||||
}
|
||||
ClientStream::Invalid => {
|
||||
let _ = muxstream.close(CloseReason::ServerStreamInvalidInfo).await;
|
||||
}
|
||||
ClientStream::Blocked => {
|
||||
let _ = muxstream
|
||||
.close(CloseReason::ServerStreamBlockedAddress)
|
||||
.await;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
async fn handle_stream(
|
||||
connect: ConnectPacket,
|
||||
muxstream: MuxStream<WispStreamWrite>,
|
||||
|
@ -96,44 +255,12 @@ async fn handle_stream(
|
|||
async_speed_limit::clock::StandardClock,
|
||||
>,
|
||||
) {
|
||||
let requested_stream = connect.clone();
|
||||
|
||||
let Ok(resolved) = ClientStream::resolve(connect).await else {
|
||||
let _ = muxstream.close(CloseReason::ServerStreamUnreachable).await;
|
||||
let Some((requested_stream, resolved_stream, stream)) =
|
||||
resolve_stream(connect, &muxstream).await
|
||||
else {
|
||||
// muxstream was closed
|
||||
return;
|
||||
};
|
||||
let (stream, resolved_stream) = match resolved {
|
||||
ResolvedPacket::Valid(connect) => {
|
||||
let resolved = connect.clone();
|
||||
let Ok(stream) = ClientStream::connect(connect).await else {
|
||||
let _ = muxstream.close(CloseReason::ServerStreamUnreachable).await;
|
||||
return;
|
||||
};
|
||||
(stream, resolved)
|
||||
}
|
||||
ResolvedPacket::ValidWispnet(server, connect) => {
|
||||
let resolved = connect.clone();
|
||||
let Ok(stream) = route_wispnet(server, connect).await else {
|
||||
let _ = muxstream.close(CloseReason::ServerStreamUnreachable).await;
|
||||
return;
|
||||
};
|
||||
(stream, resolved)
|
||||
}
|
||||
ResolvedPacket::NoResolvedAddrs => {
|
||||
let _ = muxstream.close(CloseReason::ServerStreamUnreachable).await;
|
||||
return;
|
||||
}
|
||||
ResolvedPacket::Blocked => {
|
||||
let _ = muxstream
|
||||
.close(CloseReason::ServerStreamBlockedAddress)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
ResolvedPacket::Invalid => {
|
||||
let _ = muxstream.close(CloseReason::ServerStreamInvalidInfo).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let uuid = Uuid::new_v4();
|
||||
|
||||
|
@ -150,106 +277,18 @@ async fn handle_stream(
|
|||
.insert(uuid, (requested_stream, resolved_stream.clone()));
|
||||
}
|
||||
|
||||
let forward_fut = async {
|
||||
match stream {
|
||||
ClientStream::Tcp(stream) => {
|
||||
let closer = muxstream.get_close_handle();
|
||||
|
||||
let ret: anyhow::Result<()> = async {
|
||||
let (muxread, muxwrite) = muxstream.into_split();
|
||||
let muxread = muxread.into_stream().into_asyncread();
|
||||
let (tcpread, tcpwrite) = stream.into_split();
|
||||
select! {
|
||||
x = copy_read_fast(muxread, tcpwrite, #[cfg(feature = "speed-limit")] write_limit) => x?,
|
||||
x = copy_write_fast(muxwrite, tcpread, #[cfg(feature = "speed-limit")] read_limit) => x?,
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
.await;
|
||||
|
||||
match ret {
|
||||
Ok(()) => {
|
||||
let _ = closer.close(CloseReason::Voluntary).await;
|
||||
}
|
||||
Err(_) => {
|
||||
let _ = closer.close(CloseReason::Unexpected).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
ClientStream::Udp(stream) => {
|
||||
let closer = muxstream.get_close_handle();
|
||||
|
||||
let ret: anyhow::Result<()> = async move {
|
||||
let mut data = vec![0u8; 65507];
|
||||
loop {
|
||||
select! {
|
||||
size = stream.recv(&mut data) => {
|
||||
let size = size?;
|
||||
muxstream.write(&data[..size]).await?;
|
||||
}
|
||||
data = muxstream.read() => {
|
||||
if let Some(data) = data? {
|
||||
stream.send(&data).await?;
|
||||
} else {
|
||||
break Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
.await;
|
||||
|
||||
match ret {
|
||||
Ok(()) => {
|
||||
let _ = closer.close(CloseReason::Voluntary).await;
|
||||
}
|
||||
Err(_) => {
|
||||
let _ = closer.close(CloseReason::Unexpected).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "twisp")]
|
||||
ClientStream::Pty(cmd, pty) => {
|
||||
let closer = muxstream.get_close_handle();
|
||||
let id = muxstream.stream_id;
|
||||
let (mut rx, mut tx) = muxstream.into_io().into_asyncrw().into_split();
|
||||
|
||||
match twisp::handle_twisp(id, &mut rx, &mut tx, twisp_map.clone(), pty, cmd).await {
|
||||
Ok(()) => {
|
||||
let _ = closer.close(CloseReason::Voluntary).await;
|
||||
}
|
||||
Err(_) => {
|
||||
let _ = closer.close(CloseReason::Unexpected).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
ClientStream::Wispnet(stream, mux_id) => {
|
||||
wispnet::handle_stream(
|
||||
muxstream,
|
||||
stream,
|
||||
mux_id,
|
||||
uuid,
|
||||
resolved_stream,
|
||||
#[cfg(feature = "speed-limit")]
|
||||
read_limit,
|
||||
#[cfg(feature = "speed-limit")]
|
||||
write_limit,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ClientStream::NoResolvedAddrs => {
|
||||
let _ = muxstream.close(CloseReason::ServerStreamUnreachable).await;
|
||||
}
|
||||
ClientStream::Invalid => {
|
||||
let _ = muxstream.close(CloseReason::ServerStreamInvalidInfo).await;
|
||||
}
|
||||
ClientStream::Blocked => {
|
||||
let _ = muxstream
|
||||
.close(CloseReason::ServerStreamBlockedAddress)
|
||||
.await;
|
||||
}
|
||||
};
|
||||
};
|
||||
let forward_fut = forward_stream(
|
||||
muxstream,
|
||||
stream,
|
||||
resolved_stream,
|
||||
uuid,
|
||||
#[cfg(feature = "twisp")]
|
||||
twisp_map,
|
||||
#[cfg(feature = "speed-limit")]
|
||||
read_limit,
|
||||
#[cfg(feature = "speed-limit")]
|
||||
write_limit,
|
||||
);
|
||||
|
||||
select! {
|
||||
x = forward_fut => x,
|
||||
|
@ -295,15 +334,17 @@ pub async fn handle_wisp(stream: WispResult, is_v2: bool, id: String) -> anyhow:
|
|||
.clock(async_speed_limit::clock::StandardClock)
|
||||
.build();
|
||||
|
||||
let (mux, fut) = ServerMux::create(
|
||||
read,
|
||||
write,
|
||||
buffer_size,
|
||||
if is_v2 { extensions } else { None },
|
||||
let (mux, fut) = Box::pin(
|
||||
Box::pin(ServerMux::create(
|
||||
read,
|
||||
write,
|
||||
buffer_size,
|
||||
if is_v2 { extensions } else { None },
|
||||
))
|
||||
.await
|
||||
.context("failed to create server multiplexor")?
|
||||
.with_required_extensions(&required_extensions),
|
||||
)
|
||||
.await
|
||||
.context("failed to create server multiplexor")?
|
||||
.with_required_extensions(&required_extensions)
|
||||
.await?;
|
||||
let mux = Arc::new(mux);
|
||||
|
||||
|
@ -336,7 +377,7 @@ pub async fn handle_wisp(stream: WispResult, is_v2: bool, id: String) -> anyhow:
|
|||
trace!("sent ping to wisp client id {:?}", ping_id);
|
||||
select! {
|
||||
_ = interval.tick() => (),
|
||||
_ = ping_event.listen() => break,
|
||||
() = ping_event.listen() => break,
|
||||
};
|
||||
}
|
||||
});
|
||||
|
|
|
@ -218,12 +218,14 @@ pub async fn handle_wispnet(stream: WispResult, id: String) -> Result<()> {
|
|||
|
||||
let extensions = vec![WispnetServerProtocolExtensionBuilder(net_id).into()];
|
||||
|
||||
let (mux, fut) = ClientMux::create(read, write, Some(WispV2Handshake::new(extensions)))
|
||||
.await
|
||||
.context("failed to create client multiplexor")?
|
||||
.with_required_extensions(&[WispnetServerProtocolExtension::ID])
|
||||
.await
|
||||
.context("wispnet client did not have wispnet extension")?;
|
||||
let (mux, fut) = Box::pin(
|
||||
ClientMux::create(read, write, Some(WispV2Handshake::new(extensions)))
|
||||
.await
|
||||
.context("failed to create client multiplexor")?
|
||||
.with_required_extensions(&[WispnetServerProtocolExtension::ID]),
|
||||
)
|
||||
.await
|
||||
.context("wispnet client did not have wispnet extension")?;
|
||||
|
||||
let is_private = mux
|
||||
.supported_extensions
|
||||
|
|
|
@ -15,6 +15,9 @@ use crate::{
|
|||
CLIENTS, CONFIG,
|
||||
};
|
||||
|
||||
// TODO rewrite this whole thing
|
||||
// isn't even cancel safe i think
|
||||
#[allow(clippy::too_many_lines)]
|
||||
pub async fn handle_wsproxy(
|
||||
mut ws: WebSocketStreamWrapper,
|
||||
id: String,
|
||||
|
@ -26,7 +29,7 @@ pub async fn handle_wsproxy(
|
|||
return Ok(());
|
||||
}
|
||||
|
||||
let vec: Vec<&str> = path.split("/").last().unwrap().split(":").collect();
|
||||
let vec: Vec<&str> = path.split('/').last().unwrap().split(':').collect();
|
||||
let Ok(port) = FromStr::from_str(vec[1]) else {
|
||||
let _ = ws.close(CloseCode::Error.into(), b"invalid port").await;
|
||||
return Ok(());
|
||||
|
@ -137,7 +140,7 @@ pub async fn handle_wsproxy(
|
|||
}
|
||||
.await;
|
||||
match ret {
|
||||
Ok(_) => {
|
||||
Ok(()) => {
|
||||
let _ = ws.close(CloseCode::Normal.into(), b"").await;
|
||||
}
|
||||
Err(x) => {
|
||||
|
@ -168,7 +171,7 @@ pub async fn handle_wsproxy(
|
|||
}
|
||||
.await;
|
||||
match ret {
|
||||
Ok(_) => {
|
||||
Ok(()) => {
|
||||
let _ = ws.close(CloseCode::Normal.into(), b"").await;
|
||||
}
|
||||
Err(x) => {
|
||||
|
@ -224,7 +227,7 @@ pub async fn handle_wsproxy(
|
|||
}
|
||||
|
||||
match ret {
|
||||
Ok(_) => {
|
||||
Ok(()) => {
|
||||
let _ = ws.close(CloseCode::Normal.into(), b"").await;
|
||||
}
|
||||
Err(x) => {
|
||||
|
|
|
@ -188,7 +188,7 @@ impl<A: AsyncBufRead + Unpin, B: Unpin> AsyncBufRead for Duplex<A, B> {
|
|||
}
|
||||
|
||||
fn consume(self: Pin<&mut Self>, amt: usize) {
|
||||
Pin::new(&mut self.get_mut().0).consume(amt)
|
||||
Pin::new(&mut self.get_mut().0).consume(amt);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -368,10 +368,10 @@ impl ServerListener {
|
|||
|
||||
Ok((
|
||||
stream,
|
||||
addr.as_pathname()
|
||||
.and_then(|x| x.to_str())
|
||||
.map(ToString::to_string)
|
||||
.unwrap_or_else(|| Uuid::new_v4().to_string() + "-unix_socket"),
|
||||
addr.as_pathname().and_then(|x| x.to_str()).map_or_else(
|
||||
|| Uuid::new_v4().to_string() + "-unix_socket",
|
||||
ToString::to_string,
|
||||
),
|
||||
))
|
||||
}
|
||||
|
||||
|
|
|
@ -6,7 +6,7 @@ use std::{collections::HashMap, fs::read_to_string, net::IpAddr};
|
|||
|
||||
use anyhow::{Context, Result};
|
||||
use clap::Parser;
|
||||
use config::{validate_config_cache, Cli, Config, RuntimeFlavor};
|
||||
use config::{validate_config_cache, Cli, Config, RuntimeFlavor, StatsEndpoint};
|
||||
use handle::{handle_wisp, handle_wsproxy, wisp::wispnet::handle_wispnet};
|
||||
use hickory_resolver::{
|
||||
config::{NameServerConfigGroup, ResolverConfig, ResolverOpts},
|
||||
|
@ -38,9 +38,9 @@ mod stats;
|
|||
#[doc(hidden)]
|
||||
mod stream;
|
||||
#[doc(hidden)]
|
||||
mod util_chain;
|
||||
#[doc(hidden)]
|
||||
mod upgrade;
|
||||
#[doc(hidden)]
|
||||
mod util_chain;
|
||||
|
||||
#[doc(hidden)]
|
||||
type Client = (Mutex<HashMap<Uuid, (ConnectPacket, ConnectPacket)>>, String);
|
||||
|
@ -77,7 +77,7 @@ lazy_static! {
|
|||
pub static ref CONFIG: Config = {
|
||||
if let Some(path) = &CLI.config {
|
||||
Config::de(
|
||||
read_to_string(path)
|
||||
&read_to_string(path)
|
||||
.context("failed to read config")
|
||||
.unwrap(),
|
||||
)
|
||||
|
@ -177,7 +177,7 @@ async fn async_main() -> Result<()> {
|
|||
.server
|
||||
.stats_endpoint
|
||||
.as_ref()
|
||||
.and_then(|x| x.get_bindaddr())
|
||||
.and_then(StatsEndpoint::get_bindaddr)
|
||||
{
|
||||
info!("stats server listening on {:?}", bind_addr);
|
||||
let mut stats_listener = ServerListener::new(&bind_addr).await.with_context(|| {
|
||||
|
@ -189,7 +189,7 @@ async fn async_main() -> Result<()> {
|
|||
match stats_listener.accept().await {
|
||||
Ok((stream, _)) => {
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = route_stats(stream).await {
|
||||
if let Err(e) = Box::pin(route_stats(stream)).await {
|
||||
error!("error while routing stats client: {:?}", e);
|
||||
}
|
||||
});
|
||||
|
@ -204,23 +204,27 @@ async fn async_main() -> Result<()> {
|
|||
.server
|
||||
.stats_endpoint
|
||||
.as_ref()
|
||||
.and_then(|x| x.get_endpoint());
|
||||
.and_then(StatsEndpoint::get_endpoint);
|
||||
|
||||
loop {
|
||||
let stats_endpoint = stats_endpoint.clone();
|
||||
match listener.accept().await {
|
||||
Ok((stream, client_id)) => {
|
||||
tokio::spawn(async move {
|
||||
let res = route::route(stream, stats_endpoint, move |stream, maybe_ip| {
|
||||
let client_id = if let Some(ip) = maybe_ip {
|
||||
format!("{} ({})", client_id, ip)
|
||||
} else {
|
||||
client_id
|
||||
};
|
||||
let res = Box::pin(route::route(
|
||||
stream,
|
||||
stats_endpoint,
|
||||
move |stream, maybe_ip| {
|
||||
let client_id = if let Some(ip) = maybe_ip {
|
||||
format!("{client_id} ({ip})")
|
||||
} else {
|
||||
client_id
|
||||
};
|
||||
|
||||
trace!("routed {:?}: {}", client_id, stream);
|
||||
handle_stream(stream, client_id)
|
||||
})
|
||||
trace!("routed {:?}: {}", client_id, stream);
|
||||
handle_stream(stream, client_id);
|
||||
},
|
||||
))
|
||||
.await;
|
||||
|
||||
if let Err(e) = res {
|
||||
|
@ -238,16 +242,18 @@ fn handle_stream(stream: ServerRouteResult, id: String) {
|
|||
tokio::spawn(async move {
|
||||
CLIENTS.lock().await.insert(
|
||||
id.clone(),
|
||||
(Mutex::new(HashMap::new()), format!("{}", stream)),
|
||||
(Mutex::new(HashMap::new()), format!("{stream}")),
|
||||
);
|
||||
let res = match stream {
|
||||
ServerRouteResult::Wisp {
|
||||
stream,
|
||||
has_ws_protocol,
|
||||
} => handle_wisp(stream, has_ws_protocol, id.clone()).await,
|
||||
ServerRouteResult::Wispnet { stream } => handle_wispnet(stream, id.clone()).await,
|
||||
} => Box::pin(handle_wisp(stream, has_ws_protocol, id.clone())).await,
|
||||
ServerRouteResult::Wispnet { stream } => {
|
||||
Box::pin(handle_wispnet(stream, id.clone())).await
|
||||
}
|
||||
ServerRouteResult::WsProxy { stream, path, udp } => {
|
||||
handle_wsproxy(stream, id.clone(), path, udp).await
|
||||
Box::pin(handle_wsproxy(stream, id.clone(), path, udp)).await
|
||||
}
|
||||
};
|
||||
if let Err(e) = res {
|
||||
|
|
|
@ -17,12 +17,12 @@ fn format_stream_type(stream_type: StreamType) -> &'static str {
|
|||
|
||||
#[derive(Serialize)]
|
||||
struct MemoryStats {
|
||||
active: f64,
|
||||
allocated: f64,
|
||||
mapped: f64,
|
||||
metadata: f64,
|
||||
resident: f64,
|
||||
retained: f64,
|
||||
active: usize,
|
||||
allocated: usize,
|
||||
mapped: usize,
|
||||
metadata: usize,
|
||||
resident: usize,
|
||||
retained: usize,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
|
@ -66,12 +66,12 @@ pub async fn generate_stats() -> anyhow::Result<String> {
|
|||
tikv_jemalloc_ctl::epoch::advance()?;
|
||||
|
||||
let memory = MemoryStats {
|
||||
active: active::read()? as f64 / (1024 * 1024) as f64,
|
||||
allocated: allocated::read()? as f64 / (1024 * 1024) as f64,
|
||||
mapped: mapped::read()? as f64 / (1024 * 1024) as f64,
|
||||
metadata: metadata::read()? as f64 / (1024 * 1024) as f64,
|
||||
resident: resident::read()? as f64 / (1024 * 1024) as f64,
|
||||
retained: retained::read()? as f64 / (1024 * 1024) as f64,
|
||||
active: active::read()?,
|
||||
allocated: allocated::read()?,
|
||||
mapped: mapped::read()?,
|
||||
metadata: metadata::read()?,
|
||||
resident: resident::read()?,
|
||||
retained: retained::read()?,
|
||||
};
|
||||
|
||||
let clients_locked = CLIENTS.lock().await;
|
||||
|
|
|
@ -50,7 +50,7 @@ pub enum ClientStream {
|
|||
}
|
||||
|
||||
// taken from rust 1.82.0
|
||||
fn ipv4_is_global(addr: &Ipv4Addr) -> bool {
|
||||
fn ipv4_is_global(addr: Ipv4Addr) -> bool {
|
||||
!(addr.octets()[0] == 0 // "This network"
|
||||
|| addr.is_private()
|
||||
|| (addr.octets()[0] == 100 && (addr.octets()[1] & 0b1100_0000 == 0b0100_0000)) // || addr.is_shared()
|
||||
|
@ -67,7 +67,7 @@ fn ipv4_is_global(addr: &Ipv4Addr) -> bool {
|
|||
|| (addr.octets()[0] & 240 == 240) // || addr.is_reserved()
|
||||
|| addr.is_broadcast())
|
||||
}
|
||||
fn ipv6_is_global(addr: &Ipv6Addr) -> bool {
|
||||
fn ipv6_is_global(addr: Ipv6Addr) -> bool {
|
||||
!(
|
||||
addr.is_unspecified()
|
||||
|| addr.is_loopback()
|
||||
|
@ -90,7 +90,7 @@ fn ipv6_is_global(addr: &Ipv6Addr) -> bool {
|
|||
|| matches!(addr.segments(), [0x2001, 4, 0x112, _, _, _, _, _])
|
||||
// ORCHIDv2 (`2001:20::/28`)
|
||||
// Drone Remote ID Protocol Entity Tags (DETs) Prefix (`2001:30::/28`)`
|
||||
|| matches!(addr.segments(), [0x2001, b, _, _, _, _, _, _] if b >= 0x20 && b <= 0x3F)
|
||||
|| matches!(addr.segments(), [0x2001, b, _, _, _, _, _, _] if (0x20..=0x3F).contains(&b))
|
||||
))
|
||||
// 6to4 (`2002::/16`) – it's not explicitly documented as globally reachable,
|
||||
// IANA says N/A.
|
||||
|
@ -101,7 +101,7 @@ fn ipv6_is_global(addr: &Ipv6Addr) -> bool {
|
|||
// || addr.is_unicast_link_local()
|
||||
)
|
||||
}
|
||||
fn is_global(addr: &IpAddr) -> bool {
|
||||
fn is_global(addr: IpAddr) -> bool {
|
||||
match addr {
|
||||
IpAddr::V4(x) => ipv4_is_global(x),
|
||||
IpAddr::V6(x) => ipv6_is_global(x),
|
||||
|
@ -137,9 +137,8 @@ impl ClientStream {
|
|||
if let StreamType::Unknown(ty) = packet.stream_type {
|
||||
if ty == crate::handle::wisp::twisp::STREAM_TYPE && CONFIG.stream.allow_twisp && CONFIG.wisp.wisp_v2 {
|
||||
return Ok(ResolvedPacket::Valid(packet));
|
||||
} else {
|
||||
return Ok(ResolvedPacket::Invalid);
|
||||
}
|
||||
return Ok(ResolvedPacket::Invalid);
|
||||
}
|
||||
} else {
|
||||
if matches!(packet.stream_type, StreamType::Unknown(_)) {
|
||||
|
@ -179,8 +178,8 @@ impl ClientStream {
|
|||
return Ok(ResolvedPacket::Blocked);
|
||||
}
|
||||
|
||||
if (is_global(&addr) && !CONFIG.stream.allow_global)
|
||||
|| (!is_global(&addr) && !CONFIG.stream.allow_non_global)
|
||||
if (is_global(addr) && !CONFIG.stream.allow_global)
|
||||
|| (!is_global(addr) && !CONFIG.stream.allow_non_global)
|
||||
{
|
||||
return Ok(ResolvedPacket::Blocked);
|
||||
}
|
||||
|
@ -216,9 +215,7 @@ impl ClientStream {
|
|||
})
|
||||
.next();
|
||||
|
||||
Ok(packet
|
||||
.map(ResolvedPacket::Valid)
|
||||
.unwrap_or(ResolvedPacket::NoResolvedAddrs))
|
||||
Ok(packet.map_or(ResolvedPacket::NoResolvedAddrs, ResolvedPacket::Valid))
|
||||
}
|
||||
|
||||
pub async fn connect(packet: ConnectPacket) -> anyhow::Result<Self> {
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
// taken from tokio io util
|
||||
#![allow(clippy::pedantic, clippy::all)]
|
||||
|
||||
use std::{
|
||||
fmt, io,
|
||||
|
|
|
@ -1,6 +1,4 @@
|
|||
//! WebSocketRead + WebSocketWrite implementation for the fastwebsockets library.
|
||||
|
||||
use std::ops::Deref;
|
||||
//! `WebSocketRead` + `WebSocketWrite` implementation for the fastwebsockets library.
|
||||
|
||||
use bytes::BytesMut;
|
||||
use fastwebsockets::{
|
||||
|
@ -14,7 +12,7 @@ use crate::{ws::LockingWebSocketWrite, WispError};
|
|||
fn match_payload(payload: Payload<'_>) -> crate::ws::Payload<'_> {
|
||||
match payload {
|
||||
Payload::Bytes(x) => crate::ws::Payload::Bytes(x),
|
||||
Payload::Owned(x) => crate::ws::Payload::Bytes(BytesMut::from(x.deref())),
|
||||
Payload::Owned(x) => crate::ws::Payload::Bytes(BytesMut::from(&*x)),
|
||||
Payload::BorrowedMut(x) => crate::ws::Payload::Borrowed(&*x),
|
||||
Payload::Borrowed(x) => crate::ws::Payload::Borrowed(x),
|
||||
}
|
||||
|
@ -38,16 +36,16 @@ fn payload_to_bytesmut(payload: Payload<'_>) -> BytesMut {
|
|||
|
||||
impl From<OpCode> for crate::ws::OpCode {
|
||||
fn from(opcode: OpCode) -> Self {
|
||||
use OpCode::*;
|
||||
use OpCode as O;
|
||||
match opcode {
|
||||
Continuation => {
|
||||
O::Continuation => {
|
||||
unreachable!("continuation should never be recieved when using a fragmentcollector")
|
||||
}
|
||||
Text => Self::Text,
|
||||
Binary => Self::Binary,
|
||||
Close => Self::Close,
|
||||
Ping => Self::Ping,
|
||||
Pong => Self::Pong,
|
||||
O::Text => Self::Text,
|
||||
O::Binary => Self::Binary,
|
||||
O::Close => Self::Close,
|
||||
O::Ping => Self::Ping,
|
||||
O::Pong => Self::Pong,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -64,14 +62,14 @@ impl<'a> From<Frame<'a>> for crate::ws::Frame<'a> {
|
|||
|
||||
impl<'a> From<crate::ws::Frame<'a>> for Frame<'a> {
|
||||
fn from(frame: crate::ws::Frame<'a>) -> Self {
|
||||
use crate::ws::OpCode::*;
|
||||
use crate::ws::OpCode as O;
|
||||
let payload = match_payload_reverse(frame.payload);
|
||||
match frame.opcode {
|
||||
Text => Self::text(payload),
|
||||
Binary => Self::binary(payload),
|
||||
Close => Self::close_raw(payload),
|
||||
Ping => Self::new(true, OpCode::Ping, None, payload),
|
||||
Pong => Self::pong(payload),
|
||||
O::Text => Self::text(payload),
|
||||
O::Binary => Self::binary(payload),
|
||||
O::Close => Self::close_raw(payload),
|
||||
O::Ping => Self::new(true, OpCode::Ping, None, payload),
|
||||
O::Pong => Self::pong(payload),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -196,7 +194,7 @@ impl<S: AsyncRead + Unpin + Send> crate::ws::WebSocketRead for WebSocketRead<S>
|
|||
|
||||
impl<S: AsyncWrite + Unpin + Send> crate::ws::WebSocketWrite for WebSocketWrite<S> {
|
||||
async fn wisp_write_frame(&mut self, frame: crate::ws::Frame<'_>) -> Result<(), WispError> {
|
||||
self.write_frame(frame.into()).await.map_err(|e| e.into())
|
||||
self.write_frame(frame.into()).await.map_err(Into::into)
|
||||
}
|
||||
|
||||
async fn wisp_write_split(
|
||||
|
@ -218,6 +216,6 @@ impl<S: AsyncWrite + Unpin + Send> crate::ws::WebSocketWrite for WebSocketWrite<
|
|||
async fn wisp_close(&mut self) -> Result<(), WispError> {
|
||||
self.write_frame(Frame::close(CloseCode::Normal.into(), b""))
|
||||
.await
|
||||
.map_err(|e| e.into())
|
||||
.map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue