tiktok iouring attempt 2

This commit is contained in:
Toshit Chawda 2024-11-05 19:53:37 -08:00
parent fa06962d16
commit c903f58c25
No known key found for this signature in database
GPG key ID: 91480ED99E2B3D9D
11 changed files with 276 additions and 153 deletions

131
Cargo.lock generated
View file

@ -166,6 +166,17 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "auto-const-array"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62f7df18977a1ee03650ee4b31b4aefed6d56bac188760b6e37610400fe8d4bb"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.4.0" version = "1.4.0"
@ -252,6 +263,12 @@ version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b"
[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]] [[package]]
name = "bitflags" name = "bitflags"
version = "2.6.0" version = "2.6.0"
@ -741,7 +758,9 @@ dependencies = [
"lazy_static", "lazy_static",
"libc", "libc",
"log", "log",
"nix", "monoio",
"monoio-compat",
"nix 0.29.0",
"pty-process", "pty-process",
"regex", "regex",
"rustls-pemfile", "rustls-pemfile",
@ -950,6 +969,15 @@ dependencies = [
"slab", "slab",
] ]
[[package]]
name = "fxhash"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c"
dependencies = [
"byteorder",
]
[[package]] [[package]]
name = "generic-array" name = "generic-array"
version = "0.14.7" version = "0.14.7"
@ -985,7 +1013,7 @@ version = "0.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b903b73e45dc0c6c596f2d37eccece7c1c8bb6e4407b001096387c63d0d93724" checksum = "b903b73e45dc0c6c596f2d37eccece7c1c8bb6e4407b001096387c63d0d93724"
dependencies = [ dependencies = [
"bitflags", "bitflags 2.6.0",
"libc", "libc",
"libgit2-sys", "libgit2-sys",
"log", "log",
@ -1289,6 +1317,16 @@ dependencies = [
"web-sys", "web-sys",
] ]
[[package]]
name = "io-uring"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "595a0399f411a508feb2ec1e970a4a30c249351e30208960d58298de8660b0e5"
dependencies = [
"bitflags 1.3.2",
"libc",
]
[[package]] [[package]]
name = "ipconfig" name = "ipconfig"
version = "0.3.2" version = "0.3.2"
@ -1449,6 +1487,15 @@ version = "2.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
[[package]]
name = "memoffset"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4"
dependencies = [
"autocfg",
]
[[package]] [[package]]
name = "mime" name = "mime"
version = "0.3.17" version = "0.3.17"
@ -1470,6 +1517,18 @@ dependencies = [
"adler2", "adler2",
] ]
[[package]]
name = "mio"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
dependencies = [
"libc",
"log",
"wasi",
"windows-sys 0.48.0",
]
[[package]] [[package]]
name = "mio" name = "mio"
version = "1.0.2" version = "1.0.2"
@ -1482,6 +1541,45 @@ dependencies = [
"windows-sys 0.52.0", "windows-sys 0.52.0",
] ]
[[package]]
name = "monoio"
version = "0.2.4"
source = "git+https://github.com/bytedance/monoio#90c9c3e7bd1b3166e29bf198aad12fdc0fb63dbd"
dependencies = [
"auto-const-array",
"bytes",
"fxhash",
"io-uring",
"libc",
"memchr",
"mio 0.8.11",
"monoio-macros",
"nix 0.26.4",
"pin-project-lite",
"socket2",
"windows-sys 0.48.0",
]
[[package]]
name = "monoio-compat"
version = "0.2.2"
source = "git+https://github.com/bytedance/monoio#90c9c3e7bd1b3166e29bf198aad12fdc0fb63dbd"
dependencies = [
"monoio",
"reusable-box-future",
"tokio",
]
[[package]]
name = "monoio-macros"
version = "0.1.0"
source = "git+https://github.com/bytedance/monoio#90c9c3e7bd1b3166e29bf198aad12fdc0fb63dbd"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "nanorand" name = "nanorand"
version = "0.7.0" version = "0.7.0"
@ -1491,13 +1589,26 @@ dependencies = [
"getrandom", "getrandom",
] ]
[[package]]
name = "nix"
version = "0.26.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b"
dependencies = [
"bitflags 1.3.2",
"cfg-if",
"libc",
"memoffset",
"pin-utils",
]
[[package]] [[package]]
name = "nix" name = "nix"
version = "0.29.0" version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46"
dependencies = [ dependencies = [
"bitflags", "bitflags 2.6.0",
"cfg-if", "cfg-if",
"cfg_aliases", "cfg_aliases",
"libc", "libc",
@ -1774,7 +1885,7 @@ version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f" checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f"
dependencies = [ dependencies = [
"bitflags", "bitflags 2.6.0",
] ]
[[package]] [[package]]
@ -1831,6 +1942,12 @@ dependencies = [
"quick-error", "quick-error",
] ]
[[package]]
name = "reusable-box-future"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e0e61cd21fbddd85fbd9367b775660a01d388c08a61c6d2824af480b0309bb9"
[[package]] [[package]]
name = "ring" name = "ring"
version = "0.17.8" version = "0.17.8"
@ -1867,7 +1984,7 @@ version = "0.38.38"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa260229e6538e52293eeb577aabd09945a09d6d9cc0fc550ed7529056c2e32a" checksum = "aa260229e6538e52293eeb577aabd09945a09d6d9cc0fc550ed7529056c2e32a"
dependencies = [ dependencies = [
"bitflags", "bitflags 2.6.0",
"errno", "errno",
"itoa", "itoa",
"libc", "libc",
@ -2299,7 +2416,7 @@ dependencies = [
"backtrace", "backtrace",
"bytes", "bytes",
"libc", "libc",
"mio", "mio 1.0.2",
"parking_lot", "parking_lot",
"pin-project-lite", "pin-project-lite",
"signal-hook-registry", "signal-hook-registry",
@ -2986,7 +3103,7 @@ version = "6.0.0"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"atomic_enum", "atomic_enum",
"bitflags", "bitflags 2.6.0",
"bytes", "bytes",
"ed25519", "ed25519",
"event-listener", "event-listener",

View file

@ -5,7 +5,7 @@ edition = "2021"
[dependencies] [dependencies]
anyhow = "1.0.86" anyhow = "1.0.86"
async-trait = { version = "0.1.81", optional = true } async-trait = "0.1.81"
bytes = "1.7.1" bytes = "1.7.1"
cfg-if = "1.0.0" cfg-if = "1.0.0"
clap = { version = "4.5.16", features = ["cargo", "derive"] } clap = { version = "4.5.16", features = ["cargo", "derive"] }
@ -22,6 +22,8 @@ hyper-util = { version = "0.1.7", features = ["tokio"] }
lazy_static = "1.5.0" lazy_static = "1.5.0"
libc = { version = "0.2.158", optional = true } libc = { version = "0.2.158", optional = true }
log = { version = "0.4.22", features = ["serde", "std"] } log = { version = "0.4.22", features = ["serde", "std"] }
monoio = { version = "0.2.4", git = "https://github.com/bytedance/monoio" }
monoio-compat = { version = "0.2.2", git = "https://github.com/bytedance/monoio" }
nix = { version = "0.29.0", features = ["term"] } nix = { version = "0.29.0", features = ["term"] }
pty-process = { version = "0.4.0", features = ["async", "tokio"], optional = true } pty-process = { version = "0.4.0", features = ["async", "tokio"], optional = true }
regex = "1.10.6" regex = "1.10.6"
@ -33,7 +35,7 @@ sha2 = "0.10.8"
shell-words = { version = "1.1.0", optional = true } shell-words = { version = "1.1.0", optional = true }
tikv-jemalloc-ctl = { version = "0.6.0", features = ["stats", "use_std"] } tikv-jemalloc-ctl = { version = "0.6.0", features = ["stats", "use_std"] }
tikv-jemallocator = "0.6.0" tikv-jemallocator = "0.6.0"
tokio = { version = "1.39.3", features = ["full"] } tokio = { version = "1.39.3", features = ["macros"] }
tokio-rustls = { version = "0.26.0", features = ["ring", "tls12"], default-features = false } tokio-rustls = { version = "0.26.0", features = ["ring", "tls12"], default-features = false }
tokio-util = { version = "0.7.11", features = ["codec", "compat", "io-util", "net"] } tokio-util = { version = "0.7.11", features = ["codec", "compat", "io-util", "net"] }
toml = { version = "0.8.19", optional = true } toml = { version = "0.8.19", optional = true }
@ -46,7 +48,7 @@ default = ["toml"]
yaml = ["dep:serde_yaml"] yaml = ["dep:serde_yaml"]
toml = ["dep:toml"] toml = ["dep:toml"]
twisp = ["dep:pty-process", "dep:libc", "dep:async-trait", "dep:shell-words"] twisp = ["dep:pty-process", "dep:libc", "dep:shell-words"]
[build-dependencies] [build-dependencies]
vergen-git2 = { version = "1.0.0", features = ["rustc"] } vergen-git2 = { version = "1.0.0", features = ["rustc"] }

View file

@ -59,19 +59,6 @@ pub enum SocketTransport {
LengthDelimitedLe, LengthDelimitedLe,
} }
#[derive(Serialize, Deserialize, Default, Debug)]
#[serde(rename_all = "lowercase")]
pub enum RuntimeFlavor {
/// Single-threaded tokio runtime.
SingleThread,
/// Multi-threaded tokio runtime.
#[default]
MultiThread,
/// Alternate multi-threaded tokio runtime.
#[cfg(tokio_unstable)]
MultiThreadAlt,
}
pub type BindAddr = (SocketType, String); pub type BindAddr = (SocketType, String);
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
@ -112,8 +99,6 @@ pub struct ServerConfig {
/// Server log level. /// Server log level.
pub log_level: LevelFilter, pub log_level: LevelFilter,
/// Runtime type.
pub runtime: RuntimeFlavor,
} }
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
@ -319,7 +304,6 @@ impl Default for ServerConfig {
max_message_size: 64 * 1024, max_message_size: 64 * 1024,
log_level: LevelFilter::Info, log_level: LevelFilter::Info,
runtime: RuntimeFlavor::default(),
} }
} }
} }

View file

@ -1,5 +1,13 @@
pub mod wisp; pub mod wisp;
pub mod wsproxy; //pub mod wsproxy;
pub use wisp::handle_wisp; pub use wisp::handle_wisp;
pub use wsproxy::handle_wsproxy; //pub use wsproxy::handle_wsproxy;
pub async fn handle_wsproxy(
mut ws: crate::stream::WebSocketStreamWrapper,
id: String,
path: String,
udp: bool,
) -> anyhow::Result<()> {
todo!();
}

View file

@ -10,18 +10,16 @@ use cfg_if::cfg_if;
use event_listener::Event; use event_listener::Event;
use futures_util::FutureExt; use futures_util::FutureExt;
use log::{debug, trace}; use log::{debug, trace};
use tokio::{ use monoio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, io::{AsyncReadRent, AsyncWriteRentExt, Splitable},
net::tcp::{OwnedReadHalf, OwnedWriteHalf}, net::tcp::{TcpOwnedReadHalf, TcpOwnedWriteHalf},
select, task::JoinHandle,
task::JoinSet,
time::interval, time::interval,
}; };
use tokio_util::compat::FuturesAsyncReadCompatExt; use tokio::select;
use uuid::Uuid; use uuid::Uuid;
use wisp_mux::{ use wisp_mux::{
ws::Payload, CloseReason, ConnectPacket, MuxStream, MuxStreamAsyncRead, MuxStreamWrite, ws::Payload, CloseReason, ConnectPacket, MuxStream, MuxStreamRead, MuxStreamWrite, ServerMux,
ServerMux,
}; };
use crate::{ use crate::{
@ -30,39 +28,26 @@ use crate::{
CLIENTS, CONFIG, CLIENTS, CONFIG,
}; };
async fn copy_read_fast( async fn copy_read_fast(rx: MuxStreamRead, mut tx: TcpOwnedWriteHalf) -> anyhow::Result<()> {
muxrx: MuxStreamAsyncRead, let mut res;
mut tcptx: OwnedWriteHalf, while let Some(x) = rx.read().await? {
) -> std::io::Result<()> { (res, _) = tx.write_all(x).await;
let mut muxrx = muxrx.compat(); res?;
}
Ok(())
}
async fn copy_write_fast(tx: MuxStreamWrite, mut rx: TcpOwnedReadHalf) -> anyhow::Result<()> {
let mut buf = Vec::with_capacity(CONFIG.stream.buffer_size);
let mut res;
loop { loop {
let buf = muxrx.fill_buf().await?; (res, buf) = rx.read(buf).await;
if buf.is_empty() { let cnt = res?;
tcptx.flush().await?; if cnt == 0 {
return Ok(()); break Ok(());
} }
tx.write_payload(Payload::Borrowed(&buf[0..cnt])).await?;
let i = tcptx.write(buf).await?;
if i == 0 {
return Err(std::io::ErrorKind::WriteZero.into());
}
muxrx.consume(i);
}
}
async fn copy_write_fast(muxtx: MuxStreamWrite, tcprx: OwnedReadHalf) -> anyhow::Result<()> {
let mut tcprx = BufReader::with_capacity(CONFIG.stream.buffer_size, tcprx);
loop {
let buf = tcprx.fill_buf().await?;
let len = buf.len();
if len == 0 {
return Ok(());
}
muxtx.write(&buf).await?;
tcprx.consume(len);
} }
} }
@ -122,7 +107,6 @@ async fn handle_stream(
let ret: anyhow::Result<()> = async { let ret: anyhow::Result<()> = async {
let (muxread, muxwrite) = muxstream.into_split(); let (muxread, muxwrite) = muxstream.into_split();
let muxread = muxread.into_stream().into_asyncread();
let (tcpread, tcpwrite) = stream.into_split(); let (tcpread, tcpwrite) = stream.into_split();
select! { select! {
x = copy_read_fast(muxread, tcpwrite) => x?, x = copy_read_fast(muxread, tcpwrite) => x?,
@ -145,21 +129,32 @@ async fn handle_stream(
let closer = muxstream.get_close_handle(); let closer = muxstream.get_close_handle();
let ret: anyhow::Result<()> = async move { let ret: anyhow::Result<()> = async move {
let read = async {
let mut data = vec![0u8; 65507]; let mut data = vec![0u8; 65507];
let mut ret;
loop { loop {
select! { (ret, data) = stream.recv(data).await;
size = stream.recv(&mut data) => { let size = ret?;
let size = size?; if size != 0 {
muxstream.write(&data[..size]).await?; muxstream
} .write_payload(Payload::Borrowed(&data[..size]))
data = muxstream.read() => { .await?;
if let Some(data) = data? {
stream.send(&data).await?;
} else { } else {
break Ok(()); break Ok(());
} }
} }
};
let write = async {
let mut ret;
while let Some(data) = muxstream.read().await? {
(ret, _) = stream.send(data).await;
ret?;
} }
Ok(())
};
select! {
x = read => x,
x = write => x,
} }
} }
.await; .await;
@ -254,18 +249,18 @@ pub async fn handle_wisp(stream: WispResult, is_v2: bool, id: String) -> anyhow:
mux.downgraded mux.downgraded
); );
let mut set: JoinSet<()> = JoinSet::new(); let mut set: Vec<JoinHandle<()>> = Vec::new();
let event: Arc<Event> = Event::new().into(); let event: Arc<Event> = Event::new().into();
let mux_id = id.clone(); let mux_id = id.clone();
set.spawn(tokio::task::unconstrained(fut.map(move |x| { set.push(monoio::spawn(fut.map(move |x| {
debug!("wisp client id {:?} multiplexor result {:?}", mux_id, x) debug!("wisp client id {:?} multiplexor result {:?}", mux_id, x)
}))); })));
let ping_mux = mux.clone(); let ping_mux = mux.clone();
let ping_event = event.clone(); let ping_event = event.clone();
let ping_id = id.clone(); let ping_id = id.clone();
set.spawn(async move { set.push(monoio::spawn(async move {
let mut interval = interval(Duration::from_secs(30)); let mut interval = interval(Duration::from_secs(30));
while ping_mux while ping_mux
.send_ping(Payload::Bytes(BytesMut::new())) .send_ping(Payload::Bytes(BytesMut::new()))
@ -278,17 +273,17 @@ pub async fn handle_wisp(stream: WispResult, is_v2: bool, id: String) -> anyhow:
_ = ping_event.listen() => break, _ = ping_event.listen() => break,
}; };
} }
}); }));
while let Some((connect, stream)) = mux.server_new_stream().await { while let Some((connect, stream)) = mux.server_new_stream().await {
set.spawn(handle_stream( set.push(monoio::spawn(handle_stream(
connect, connect,
stream, stream,
id.clone(), id.clone(),
event.clone(), event.clone(),
#[cfg(feature = "twisp")] #[cfg(feature = "twisp")]
twisp_map.clone(), twisp_map.clone(),
)); )));
} }
debug!("shutting down wisp client id {:?}", id); debug!("shutting down wisp client id {:?}", id);
@ -298,7 +293,9 @@ pub async fn handle_wisp(stream: WispResult, is_v2: bool, id: String) -> anyhow:
trace!("waiting for tasks to close for wisp client id {:?}", id); trace!("waiting for tasks to close for wisp client id {:?}", id);
while set.join_next().await.is_some() {} for task in set {
task.await;
}
debug!("wisp client id {:?} disconnected", id); debug!("wisp client id {:?} disconnected", id);

View file

@ -7,7 +7,7 @@ use wisp_mux::extensions::cert::VerifyKey;
pub async fn get_certificates_from_paths(paths: Vec<PathBuf>) -> anyhow::Result<Vec<VerifyKey>> { pub async fn get_certificates_from_paths(paths: Vec<PathBuf>) -> anyhow::Result<Vec<VerifyKey>> {
let mut out = Vec::new(); let mut out = Vec::new();
for path in paths { for path in paths {
let data = tokio::fs::read_to_string(path).await?; let data = String::from_utf8(monoio::fs::read(path).await?)?;
let verifier = VerifyingKey::from_public_key_pem(&data)?; let verifier = VerifyingKey::from_public_key_pem(&data)?;
let binary_key = verifier.to_bytes(); let binary_key = verifier.to_bytes();

View file

@ -2,10 +2,7 @@ use std::str::FromStr;
use fastwebsockets::CloseCode; use fastwebsockets::CloseCode;
use log::debug; use log::debug;
use tokio::{ use tokio::{io::BufReader, select};
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
select,
};
use uuid::Uuid; use uuid::Uuid;
use wisp_mux::{ConnectPacket, StreamType}; use wisp_mux::{ConnectPacket, StreamType};

View file

@ -1,18 +1,24 @@
use std::{ use std::{
fs::remove_file,
io::{BufReader, Cursor}, io::{BufReader, Cursor},
os::fd::AsFd, os::fd::{AsRawFd, BorrowedFd},
path::PathBuf, path::PathBuf,
pin::Pin, pin::Pin,
sync::Arc, sync::Arc,
}; };
use anyhow::Context; use anyhow::Context;
use rustls_pemfile::{certs, private_key}; use monoio::{
use tokio::{ fs::{File, OpenOptions},
fs::{remove_file, try_exists, File}, io::Splitable,
io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadHalf, WriteHalf}, net::{
net::{tcp, unix, TcpListener, TcpStream, UnixListener, UnixStream}, tcp::{TcpListener, TcpOwnedReadHalf, TcpOwnedWriteHalf, TcpStream},
unix::{UnixListener, UnixOwnedReadHalf, UnixOwnedWriteHalf, UnixStream},
},
}; };
use monoio_compat::StreamWrapper;
use rustls_pemfile::{certs, private_key};
use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadHalf, WriteHalf};
use tokio_rustls::{rustls, server::TlsStream, TlsAcceptor}; use tokio_rustls::{rustls, server::TlsStream, TlsAcceptor};
use uuid::Uuid; use uuid::Uuid;
@ -157,6 +163,8 @@ impl<
} }
} }
unsafe impl<A, B, C, D, E> Send for Quintet<A, B, C, D, E> {}
pub struct Duplex<A, B>(A, B); pub struct Duplex<A, B>(A, B);
impl<A, B> Duplex<A, B> { impl<A, B> Duplex<A, B> {
@ -228,21 +236,26 @@ impl<A: Unpin, B: AsyncWrite + Unpin> AsyncWrite for Duplex<A, B> {
} }
} }
pub type ServerStream = pub type ServerStream = Quintet<
Quintet<TcpStream, TlsStream<TcpStream>, UnixStream, TlsStream<UnixStream>, Duplex<File, File>>; StreamWrapper<TcpStream>,
TlsStream<StreamWrapper<TcpStream>>,
StreamWrapper<UnixStream>,
TlsStream<StreamWrapper<UnixStream>>,
Duplex<StreamWrapper<File>, StreamWrapper<File>>,
>;
pub type ServerStreamRead = Quintet< pub type ServerStreamRead = Quintet<
tcp::OwnedReadHalf, StreamWrapper<TcpOwnedReadHalf>,
ReadHalf<TlsStream<TcpStream>>, ReadHalf<TlsStream<StreamWrapper<TcpStream>>>,
unix::OwnedReadHalf, StreamWrapper<UnixOwnedReadHalf>,
ReadHalf<TlsStream<UnixStream>>, ReadHalf<TlsStream<StreamWrapper<UnixStream>>>,
File, StreamWrapper<File>,
>; >;
pub type ServerStreamWrite = Quintet< pub type ServerStreamWrite = Quintet<
tcp::OwnedWriteHalf, StreamWrapper<TcpOwnedWriteHalf>,
WriteHalf<TlsStream<TcpStream>>, WriteHalf<TlsStream<StreamWrapper<TcpStream>>>,
unix::OwnedWriteHalf, StreamWrapper<UnixOwnedWriteHalf>,
WriteHalf<TlsStream<UnixStream>>, WriteHalf<TlsStream<StreamWrapper<UnixStream>>>,
File, StreamWrapper<File>,
>; >;
pub trait ServerStreamExt { pub trait ServerStreamExt {
@ -253,16 +266,22 @@ impl ServerStreamExt for ServerStream {
fn split(self) -> (ServerStreamRead, ServerStreamWrite) { fn split(self) -> (ServerStreamRead, ServerStreamWrite) {
match self { match self {
Self::One(x) => { Self::One(x) => {
let (r, w) = x.into_split(); let (r, w) = x.into_inner().into_split();
(Quintet::One(r), Quintet::One(w)) (
Quintet::One(StreamWrapper::new(r)),
Quintet::One(StreamWrapper::new(w)),
)
} }
Self::Two(x) => { Self::Two(x) => {
let (r, w) = tokio::io::split(x); let (r, w) = tokio::io::split(x);
(Quintet::Two(r), Quintet::Two(w)) (Quintet::Two(r), Quintet::Two(w))
} }
Self::Three(x) => { Self::Three(x) => {
let (r, w) = x.into_split(); let (r, w) = x.into_inner().into_split();
(Quintet::Three(r), Quintet::Three(w)) (
Quintet::Three(StreamWrapper::new(r)),
Quintet::Three(StreamWrapper::new(w)),
)
} }
Self::Four(x) => { Self::Four(x) => {
let (r, w) = tokio::io::split(x); let (r, w) = tokio::io::split(x);
@ -287,13 +306,13 @@ pub enum ServerListener {
impl ServerListener { impl ServerListener {
async fn bind_tcp(bind: &BindAddr) -> anyhow::Result<TcpListener> { async fn bind_tcp(bind: &BindAddr) -> anyhow::Result<TcpListener> {
TcpListener::bind(&bind.1) TcpListener::bind(&bind.1)
.await
.with_context(|| format!("failed to bind to tcp address `{}`", bind.1)) .with_context(|| format!("failed to bind to tcp address `{}`", bind.1))
} }
async fn bind_unix(bind: &BindAddr) -> anyhow::Result<UnixListener> { async fn bind_unix(bind: &BindAddr) -> anyhow::Result<UnixListener> {
if try_exists(&bind.1).await? { todo!("this uses sync");
remove_file(&bind.1).await?; if PathBuf::from(&bind.1).try_exists()? {
remove_file(&bind.1)?;
} }
UnixListener::bind(&bind.1) UnixListener::bind(&bind.1)
.with_context(|| format!("failed to bind to unix socket at `{}`", bind.1)) .with_context(|| format!("failed to bind to unix socket at `{}`", bind.1))
@ -307,7 +326,7 @@ impl ServerListener {
.context("no tls keypair provided")?; .context("no tls keypair provided")?;
let mut public = BufReader::new(Cursor::new( let mut public = BufReader::new(Cursor::new(
tokio::fs::read(&tls_keypair[0]) monoio::fs::read(&tls_keypair[0])
.await .await
.context("failed to read public key")?, .context("failed to read public key")?,
)); ));
@ -315,7 +334,7 @@ impl ServerListener {
.collect::<Result<Vec<_>, _>>() .collect::<Result<Vec<_>, _>>()
.context("failed to parse public key")?; .context("failed to parse public key")?;
let mut private = BufReader::new(Cursor::new( let mut private = BufReader::new(Cursor::new(
tokio::fs::read(&tls_keypair[1]) monoio::fs::read(&tls_keypair[1])
.await .await
.context("failed to read private key")?, .context("failed to read private key")?,
)); ));
@ -379,25 +398,25 @@ impl ServerListener {
match self { match self {
Self::Tcp(x) => { Self::Tcp(x) => {
let (x, y) = Self::accept_tcp(x).await?; let (x, y) = Self::accept_tcp(x).await?;
Ok((Quintet::One(x), y)) Ok((Quintet::One(StreamWrapper::new(x)), y))
} }
Self::TlsTcp(tcp, tls) => { Self::TlsTcp(tcp, tls) => {
let (x, y) = Self::accept_tcp(tcp).await?; let (x, y) = Self::accept_tcp(tcp).await?;
let x = tls.accept(x).await?; let x = tls.accept(StreamWrapper::new(x)).await?;
Ok((Quintet::Two(x), y)) Ok((Quintet::Two(x), y))
} }
Self::Unix(x) => { Self::Unix(x) => {
let (x, y) = Self::accept_unix(x).await?; let (x, y) = Self::accept_unix(x).await?;
Ok((Quintet::Three(x), y)) Ok((Quintet::Three(StreamWrapper::new(x)), y))
} }
Self::TlsUnix(unix, tls) => { Self::TlsUnix(unix, tls) => {
let (x, y) = Self::accept_unix(unix).await?; let (x, y) = Self::accept_unix(unix).await?;
let x = tls.accept(x).await?; let x = tls.accept(StreamWrapper::new(x)).await?;
Ok((Quintet::Four(x), y)) Ok((Quintet::Four(x), y))
} }
Self::File(path) => { Self::File(path) => {
if let Some(path) = path.take() { if let Some(path) = path.take() {
let rx = File::options() let rx = OpenOptions::new()
.read(true) .read(true)
.write(false) .write(false)
.open(&path) .open(&path)
@ -405,19 +424,20 @@ impl ServerListener {
.context("failed to open read file")?; .context("failed to open read file")?;
if CONFIG.server.file_raw_mode { if CONFIG.server.file_raw_mode {
let mut termios = nix::sys::termios::tcgetattr(rx.as_fd()) let fd = unsafe { BorrowedFd::borrow_raw(rx.as_raw_fd()) };
let mut termios = nix::sys::termios::tcgetattr(fd)
.context("failed to get termios for read file")? .context("failed to get termios for read file")?
.clone(); .clone();
nix::sys::termios::cfmakeraw(&mut termios); nix::sys::termios::cfmakeraw(&mut termios);
nix::sys::termios::tcsetattr( nix::sys::termios::tcsetattr(
rx.as_fd(), fd,
nix::sys::termios::SetArg::TCSANOW, nix::sys::termios::SetArg::TCSANOW,
&termios, &termios,
) )
.context("failed to set raw mode for read file")?; .context("failed to set raw mode for read file")?;
} }
let tx = File::options() let tx = OpenOptions::new()
.read(false) .read(false)
.write(true) .write(true)
.open(&path) .open(&path)
@ -425,12 +445,13 @@ impl ServerListener {
.context("failed to open write file")?; .context("failed to open write file")?;
if CONFIG.server.file_raw_mode { if CONFIG.server.file_raw_mode {
let mut termios = nix::sys::termios::tcgetattr(tx.as_fd()) let fd = unsafe { BorrowedFd::borrow_raw(tx.as_raw_fd()) };
let mut termios = nix::sys::termios::tcgetattr(fd)
.context("failed to get termios for write file")? .context("failed to get termios for write file")?
.clone(); .clone();
nix::sys::termios::cfmakeraw(&mut termios); nix::sys::termios::cfmakeraw(&mut termios);
nix::sys::termios::tcsetattr( nix::sys::termios::tcsetattr(
tx.as_fd(), fd,
nix::sys::termios::SetArg::TCSANOW, nix::sys::termios::SetArg::TCSANOW,
&termios, &termios,
) )
@ -438,7 +459,7 @@ impl ServerListener {
} }
Ok(( Ok((
Quintet::Five(Duplex::new(rx, tx)), Quintet::Five(Duplex::new(StreamWrapper::new(rx), StreamWrapper::new(tx))),
path.to_string_lossy().to_string(), path.to_string_lossy().to_string(),
)) ))
} else { } else {

View file

@ -2,11 +2,14 @@
#![deny(clippy::todo)] #![deny(clippy::todo)]
#![allow(unexpected_cfgs)] #![allow(unexpected_cfgs)]
use std::{fs::read_to_string, net::IpAddr}; use std::{
fs::read_to_string,
net::{IpAddr, ToSocketAddrs},
};
use anyhow::Context; use anyhow::Context;
use clap::Parser; use clap::Parser;
use config::{validate_config_cache, Cli, Config, RuntimeFlavor}; use config::{validate_config_cache, Cli, Config};
use dashmap::DashMap; use dashmap::DashMap;
use handle::{handle_wisp, handle_wsproxy}; use handle::{handle_wisp, handle_wsproxy};
use hickory_resolver::{ use hickory_resolver::{
@ -17,12 +20,9 @@ use hickory_resolver::{
use lazy_static::lazy_static; use lazy_static::lazy_static;
use listener::ServerListener; use listener::ServerListener;
use log::{error, info, trace, warn}; use log::{error, info, trace, warn};
use monoio::{time::TimeDriver, IoUringDriver, RuntimeBuilder};
use route::{route_stats, ServerRouteResult}; use route::{route_stats, ServerRouteResult};
use stats::generate_stats; use stats::generate_stats;
use tokio::{
runtime,
signal::unix::{signal, SignalKind},
};
use uuid::Uuid; use uuid::Uuid;
use wisp_mux::ConnectPacket; use wisp_mux::ConnectPacket;
@ -43,10 +43,11 @@ pub enum Resolver {
impl Resolver { impl Resolver {
pub async fn resolve(&self, host: String) -> anyhow::Result<Box<dyn Iterator<Item = IpAddr>>> { pub async fn resolve(&self, host: String) -> anyhow::Result<Box<dyn Iterator<Item = IpAddr>>> {
// TODO this uses sync
match self { match self {
Self::Hickory(resolver) => Ok(Box::new(resolver.lookup_ip(host).await?.into_iter())), Self::Hickory(resolver) => Ok(Box::new(resolver.lookup_ip(host).await?.into_iter())),
Self::System => Ok(Box::new( Self::System => Ok(Box::new(
tokio::net::lookup_host(host + ":0").await?.map(|x| x.ip()), (host.as_str(), 0).to_socket_addrs()?.map(|x| x.ip()),
)), )),
} }
} }
@ -76,6 +77,7 @@ lazy_static! {
}; };
pub static ref CLIENTS: DashMap<String, Client> = DashMap::new(); pub static ref CLIENTS: DashMap<String, Client> = DashMap::new();
pub static ref RESOLVER: Resolver = { pub static ref RESOLVER: Resolver = {
return Resolver::System;
if CONFIG.stream.dns_servers.is_empty() { if CONFIG.stream.dns_servers.is_empty() {
if let Ok((config, opts)) = read_system_conf() { if let Ok((config, opts)) = read_system_conf() {
Resolver::Hickory(TokioAsyncResolver::tokio(config, opts)) Resolver::Hickory(TokioAsyncResolver::tokio(config, opts))
@ -110,28 +112,22 @@ fn main() -> anyhow::Result<()> {
.parse_default_env() .parse_default_env()
.init(); .init();
let mut builder: runtime::Builder = match CONFIG.server.runtime { let builder = RuntimeBuilder::<TimeDriver<IoUringDriver>>::new();
RuntimeFlavor::SingleThread => runtime::Builder::new_current_thread(), let mut rt = builder.build().context("failed to create monoio driver")?;
RuntimeFlavor::MultiThread => runtime::Builder::new_multi_thread(),
#[cfg(tokio_unstable)]
RuntimeFlavor::MultiThreadAlt => runtime::Builder::new_multi_thread_alt(),
};
builder.enable_all();
let rt = builder.build()?;
rt.block_on(async { rt.block_on(async {
validate_config_cache().await; validate_config_cache().await;
info!( info!(
"listening on {:?} with runtime flavor {:?} and socket transport {:?}", "listening on {:?} with socket transport {:?}",
CONFIG.server.bind, CONFIG.server.runtime, CONFIG.server.transport CONFIG.server.bind, CONFIG.server.transport
); );
trace!("CLI: {:#?}", &*CLI); trace!("CLI: {:#?}", &*CLI);
trace!("CONFIG: {:#?}", &*CONFIG); trace!("CONFIG: {:#?}", &*CONFIG);
trace!("RESOLVER: {:?}", &*RESOLVER); trace!("RESOLVER: {:?}", &*RESOLVER);
/*
tokio::spawn(async { tokio::spawn(async {
let mut sig = signal(SignalKind::user_defined1()).unwrap(); let mut sig = signal(SignalKind::user_defined1()).unwrap();
while sig.recv().await.is_some() { while sig.recv().await.is_some() {
@ -141,6 +137,7 @@ fn main() -> anyhow::Result<()> {
} }
} }
}); });
*/
let mut listener = ServerListener::new(&CONFIG.server.bind) let mut listener = ServerListener::new(&CONFIG.server.bind)
.await .await
@ -157,7 +154,7 @@ fn main() -> anyhow::Result<()> {
format!("failed to bind to address {} for stats server", bind_addr.1) format!("failed to bind to address {} for stats server", bind_addr.1)
})?; })?;
tokio::spawn(async move { monoio::spawn(async move {
loop { loop {
match stats_listener.accept().await { match stats_listener.accept().await {
Ok((stream, _)) => { Ok((stream, _)) => {
@ -180,7 +177,7 @@ fn main() -> anyhow::Result<()> {
let stats_endpoint = stats_endpoint.clone(); let stats_endpoint = stats_endpoint.clone();
match listener.accept().await { match listener.accept().await {
Ok((stream, client_id)) => { Ok((stream, client_id)) => {
tokio::spawn(async move { monoio::spawn(async move {
let res = route::route(stream, stats_endpoint, move |stream, maybe_ip| { let res = route::route(stream, stats_endpoint, move |stream, maybe_ip| {
let client_id = if let Some(ip) = maybe_ip { let client_id = if let Some(ip) = maybe_ip {
format!("{} ({})", client_id, ip) format!("{} ({})", client_id, ip)
@ -205,7 +202,7 @@ fn main() -> anyhow::Result<()> {
} }
fn handle_stream(stream: ServerRouteResult, id: String) { fn handle_stream(stream: ServerRouteResult, id: String) {
tokio::spawn(async move { monoio::spawn(async move {
CLIENTS.insert(id.clone(), (DashMap::new(), false)); CLIENTS.insert(id.clone(), (DashMap::new(), false));
let res = match stream { let res = match stream {
ServerRouteResult::Wisp(stream, is_v2) => handle_wisp(stream, is_v2, id.clone()).await, ServerRouteResult::Wisp(stream, is_v2) => handle_wisp(stream, is_v2, id.clone()).await,

View file

@ -123,7 +123,7 @@ where
if req_path.starts_with(&(CONFIG.wisp.prefix.clone() + "/")) { if req_path.starts_with(&(CONFIG.wisp.prefix.clone() + "/")) {
let has_ws_protocol = ws_protocol.is_some(); let has_ws_protocol = ws_protocol.is_some();
tokio::spawn(async move { monoio::spawn(async move {
if let Err(err) = if let Err(err) =
(callback)(fut, HttpUpgradeResult::Wisp(has_ws_protocol), ip_header).await (callback)(fut, HttpUpgradeResult::Wisp(has_ws_protocol), ip_header).await
{ {
@ -136,7 +136,7 @@ where
} }
} else if CONFIG.wisp.allow_wsproxy { } else if CONFIG.wisp.allow_wsproxy {
let udp = req.uri().query().unwrap_or_default() == "?udp"; let udp = req.uri().query().unwrap_or_default() == "?udp";
tokio::spawn(async move { monoio::spawn(async move {
if let Err(err) = if let Err(err) =
(callback)(fut, HttpUpgradeResult::WsProxy(req_path, udp), ip_header).await (callback)(fut, HttpUpgradeResult::WsProxy(req_path, udp), ip_header).await
{ {

View file

@ -9,8 +9,8 @@ use cfg_if::cfg_if;
use fastwebsockets::{FragmentCollector, Frame, OpCode, Payload, WebSocketError}; use fastwebsockets::{FragmentCollector, Frame, OpCode, Payload, WebSocketError};
use hyper::upgrade::Upgraded; use hyper::upgrade::Upgraded;
use hyper_util::rt::TokioIo; use hyper_util::rt::TokioIo;
use monoio::net::{udp::UdpSocket, TcpStream};
use regex::RegexSet; use regex::RegexSet;
use tokio::net::{TcpStream, UdpSocket};
use wisp_mux::{ConnectPacket, StreamType}; use wisp_mux::{ConnectPacket, StreamType};
use crate::{CONFIG, RESOLVER}; use crate::{CONFIG, RESOLVER};
@ -175,7 +175,7 @@ impl ClientStream {
SocketAddr::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0).into(), 0) SocketAddr::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0).into(), 0)
}; };
let stream = UdpSocket::bind(bind_addr).await?; let stream = UdpSocket::bind(bind_addr)?;
stream stream
.connect(SocketAddr::new(ipaddr, packet.destination_port)) .connect(SocketAddr::new(ipaddr, packet.destination_port))