diff --git a/Cargo.lock b/Cargo.lock index ad9e295..ad4061d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -166,6 +166,17 @@ dependencies = [ "syn", ] +[[package]] +name = "auto-const-array" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62f7df18977a1ee03650ee4b31b4aefed6d56bac188760b6e37610400fe8d4bb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.4.0" @@ -252,6 +263,12 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.6.0" @@ -741,7 +758,9 @@ dependencies = [ "lazy_static", "libc", "log", - "nix", + "monoio", + "monoio-compat", + "nix 0.29.0", "pty-process", "regex", "rustls-pemfile", @@ -950,6 +969,15 @@ dependencies = [ "slab", ] +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -985,7 +1013,7 @@ version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b903b73e45dc0c6c596f2d37eccece7c1c8bb6e4407b001096387c63d0d93724" dependencies = [ - "bitflags", + "bitflags 2.6.0", "libc", "libgit2-sys", "log", @@ -1289,6 +1317,16 @@ dependencies = [ "web-sys", ] +[[package]] +name = "io-uring" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "595a0399f411a508feb2ec1e970a4a30c249351e30208960d58298de8660b0e5" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "ipconfig" version = "0.3.2" @@ -1449,6 +1487,15 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "memoffset" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" +dependencies = [ + "autocfg", +] + [[package]] name = "mime" version = "0.3.17" @@ -1470,6 +1517,18 @@ dependencies = [ "adler2", ] +[[package]] +name = "mio" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +dependencies = [ + "libc", + "log", + "wasi", + "windows-sys 0.48.0", +] + [[package]] name = "mio" version = "1.0.2" @@ -1482,6 +1541,45 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "monoio" +version = "0.2.4" +source = "git+https://github.com/bytedance/monoio#90c9c3e7bd1b3166e29bf198aad12fdc0fb63dbd" +dependencies = [ + "auto-const-array", + "bytes", + "fxhash", + "io-uring", + "libc", + "memchr", + "mio 0.8.11", + "monoio-macros", + "nix 0.26.4", + "pin-project-lite", + "socket2", + "windows-sys 0.48.0", +] + +[[package]] +name = "monoio-compat" +version = "0.2.2" +source = "git+https://github.com/bytedance/monoio#90c9c3e7bd1b3166e29bf198aad12fdc0fb63dbd" +dependencies = [ + "monoio", + "reusable-box-future", + "tokio", +] + +[[package]] +name = "monoio-macros" +version = "0.1.0" +source = "git+https://github.com/bytedance/monoio#90c9c3e7bd1b3166e29bf198aad12fdc0fb63dbd" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "nanorand" version = "0.7.0" @@ -1491,13 +1589,26 @@ dependencies = [ "getrandom", ] +[[package]] +name = "nix" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b" +dependencies = [ + "bitflags 1.3.2", + "cfg-if", + "libc", + "memoffset", + "pin-utils", +] + [[package]] name = "nix" version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" dependencies = [ - "bitflags", + "bitflags 2.6.0", "cfg-if", "cfg_aliases", "libc", @@ -1774,7 +1885,7 @@ version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f" dependencies = [ - "bitflags", + "bitflags 2.6.0", ] [[package]] @@ -1831,6 +1942,12 @@ dependencies = [ "quick-error", ] +[[package]] +name = "reusable-box-future" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e0e61cd21fbddd85fbd9367b775660a01d388c08a61c6d2824af480b0309bb9" + [[package]] name = "ring" version = "0.17.8" @@ -1867,7 +1984,7 @@ version = "0.38.38" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aa260229e6538e52293eeb577aabd09945a09d6d9cc0fc550ed7529056c2e32a" dependencies = [ - "bitflags", + "bitflags 2.6.0", "errno", "itoa", "libc", @@ -2299,7 +2416,7 @@ dependencies = [ "backtrace", "bytes", "libc", - "mio", + "mio 1.0.2", "parking_lot", "pin-project-lite", "signal-hook-registry", @@ -2986,7 +3103,7 @@ version = "6.0.0" dependencies = [ "async-trait", "atomic_enum", - "bitflags", + "bitflags 2.6.0", "bytes", "ed25519", "event-listener", diff --git a/server/Cargo.toml b/server/Cargo.toml index 58a5e5b..669d7d1 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" [dependencies] anyhow = "1.0.86" -async-trait = { version = "0.1.81", optional = true } +async-trait = "0.1.81" bytes = "1.7.1" cfg-if = "1.0.0" clap = { version = "4.5.16", features = ["cargo", "derive"] } @@ -22,6 +22,8 @@ hyper-util = { version = "0.1.7", features = ["tokio"] } lazy_static = "1.5.0" libc = { version = "0.2.158", optional = true } log = { version = "0.4.22", features = ["serde", "std"] } +monoio = { version = "0.2.4", git = "https://github.com/bytedance/monoio" } +monoio-compat = { version = "0.2.2", git = "https://github.com/bytedance/monoio" } nix = { version = "0.29.0", features = ["term"] } pty-process = { version = "0.4.0", features = ["async", "tokio"], optional = true } regex = "1.10.6" @@ -33,7 +35,7 @@ sha2 = "0.10.8" shell-words = { version = "1.1.0", optional = true } tikv-jemalloc-ctl = { version = "0.6.0", features = ["stats", "use_std"] } tikv-jemallocator = "0.6.0" -tokio = { version = "1.39.3", features = ["full"] } +tokio = { version = "1.39.3", features = ["macros"] } tokio-rustls = { version = "0.26.0", features = ["ring", "tls12"], default-features = false } tokio-util = { version = "0.7.11", features = ["codec", "compat", "io-util", "net"] } toml = { version = "0.8.19", optional = true } @@ -46,7 +48,7 @@ default = ["toml"] yaml = ["dep:serde_yaml"] toml = ["dep:toml"] -twisp = ["dep:pty-process", "dep:libc", "dep:async-trait", "dep:shell-words"] +twisp = ["dep:pty-process", "dep:libc", "dep:shell-words"] [build-dependencies] vergen-git2 = { version = "1.0.0", features = ["rustc"] } diff --git a/server/src/config.rs b/server/src/config.rs index de6e02b..ca15379 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -59,19 +59,6 @@ pub enum SocketTransport { LengthDelimitedLe, } -#[derive(Serialize, Deserialize, Default, Debug)] -#[serde(rename_all = "lowercase")] -pub enum RuntimeFlavor { - /// Single-threaded tokio runtime. - SingleThread, - /// Multi-threaded tokio runtime. - #[default] - MultiThread, - /// Alternate multi-threaded tokio runtime. - #[cfg(tokio_unstable)] - MultiThreadAlt, -} - pub type BindAddr = (SocketType, String); #[derive(Serialize, Deserialize, Debug)] @@ -112,8 +99,6 @@ pub struct ServerConfig { /// Server log level. pub log_level: LevelFilter, - /// Runtime type. - pub runtime: RuntimeFlavor, } #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] @@ -319,7 +304,6 @@ impl Default for ServerConfig { max_message_size: 64 * 1024, log_level: LevelFilter::Info, - runtime: RuntimeFlavor::default(), } } } diff --git a/server/src/handle/mod.rs b/server/src/handle/mod.rs index 80106ea..683480e 100644 --- a/server/src/handle/mod.rs +++ b/server/src/handle/mod.rs @@ -1,5 +1,13 @@ pub mod wisp; -pub mod wsproxy; +//pub mod wsproxy; pub use wisp::handle_wisp; -pub use wsproxy::handle_wsproxy; +//pub use wsproxy::handle_wsproxy; +pub async fn handle_wsproxy( + mut ws: crate::stream::WebSocketStreamWrapper, + id: String, + path: String, + udp: bool, +) -> anyhow::Result<()> { + todo!(); +} diff --git a/server/src/handle/wisp/mod.rs b/server/src/handle/wisp/mod.rs index c48763d..fe5400a 100644 --- a/server/src/handle/wisp/mod.rs +++ b/server/src/handle/wisp/mod.rs @@ -10,18 +10,16 @@ use cfg_if::cfg_if; use event_listener::Event; use futures_util::FutureExt; use log::{debug, trace}; -use tokio::{ - io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, - net::tcp::{OwnedReadHalf, OwnedWriteHalf}, - select, - task::JoinSet, +use monoio::{ + io::{AsyncReadRent, AsyncWriteRentExt, Splitable}, + net::tcp::{TcpOwnedReadHalf, TcpOwnedWriteHalf}, + task::JoinHandle, time::interval, }; -use tokio_util::compat::FuturesAsyncReadCompatExt; +use tokio::select; use uuid::Uuid; use wisp_mux::{ - ws::Payload, CloseReason, ConnectPacket, MuxStream, MuxStreamAsyncRead, MuxStreamWrite, - ServerMux, + ws::Payload, CloseReason, ConnectPacket, MuxStream, MuxStreamRead, MuxStreamWrite, ServerMux, }; use crate::{ @@ -30,39 +28,26 @@ use crate::{ CLIENTS, CONFIG, }; -async fn copy_read_fast( - muxrx: MuxStreamAsyncRead, - mut tcptx: OwnedWriteHalf, -) -> std::io::Result<()> { - let mut muxrx = muxrx.compat(); - loop { - let buf = muxrx.fill_buf().await?; - if buf.is_empty() { - tcptx.flush().await?; - return Ok(()); - } - - let i = tcptx.write(buf).await?; - if i == 0 { - return Err(std::io::ErrorKind::WriteZero.into()); - } - - muxrx.consume(i); +async fn copy_read_fast(rx: MuxStreamRead, mut tx: TcpOwnedWriteHalf) -> anyhow::Result<()> { + let mut res; + while let Some(x) = rx.read().await? { + (res, _) = tx.write_all(x).await; + res?; } + + Ok(()) } -async fn copy_write_fast(muxtx: MuxStreamWrite, tcprx: OwnedReadHalf) -> anyhow::Result<()> { - let mut tcprx = BufReader::with_capacity(CONFIG.stream.buffer_size, tcprx); +async fn copy_write_fast(tx: MuxStreamWrite, mut rx: TcpOwnedReadHalf) -> anyhow::Result<()> { + let mut buf = Vec::with_capacity(CONFIG.stream.buffer_size); + let mut res; loop { - let buf = tcprx.fill_buf().await?; - - let len = buf.len(); - if len == 0 { - return Ok(()); + (res, buf) = rx.read(buf).await; + let cnt = res?; + if cnt == 0 { + break Ok(()); } - - muxtx.write(&buf).await?; - tcprx.consume(len); + tx.write_payload(Payload::Borrowed(&buf[0..cnt])).await?; } } @@ -122,7 +107,6 @@ async fn handle_stream( let ret: anyhow::Result<()> = async { let (muxread, muxwrite) = muxstream.into_split(); - let muxread = muxread.into_stream().into_asyncread(); let (tcpread, tcpwrite) = stream.into_split(); select! { x = copy_read_fast(muxread, tcpwrite) => x?, @@ -145,21 +129,32 @@ async fn handle_stream( let closer = muxstream.get_close_handle(); let ret: anyhow::Result<()> = async move { - let mut data = vec![0u8; 65507]; - loop { - select! { - size = stream.recv(&mut data) => { - let size = size?; - muxstream.write(&data[..size]).await?; - } - data = muxstream.read() => { - if let Some(data) = data? { - stream.send(&data).await?; - } else { - break Ok(()); - } + let read = async { + let mut data = vec![0u8; 65507]; + let mut ret; + loop { + (ret, data) = stream.recv(data).await; + let size = ret?; + if size != 0 { + muxstream + .write_payload(Payload::Borrowed(&data[..size])) + .await?; + } else { + break Ok(()); } } + }; + let write = async { + let mut ret; + while let Some(data) = muxstream.read().await? { + (ret, _) = stream.send(data).await; + ret?; + } + Ok(()) + }; + select! { + x = read => x, + x = write => x, } } .await; @@ -254,18 +249,18 @@ pub async fn handle_wisp(stream: WispResult, is_v2: bool, id: String) -> anyhow: mux.downgraded ); - let mut set: JoinSet<()> = JoinSet::new(); + let mut set: Vec> = Vec::new(); let event: Arc = Event::new().into(); let mux_id = id.clone(); - set.spawn(tokio::task::unconstrained(fut.map(move |x| { + set.push(monoio::spawn(fut.map(move |x| { debug!("wisp client id {:?} multiplexor result {:?}", mux_id, x) }))); let ping_mux = mux.clone(); let ping_event = event.clone(); let ping_id = id.clone(); - set.spawn(async move { + set.push(monoio::spawn(async move { let mut interval = interval(Duration::from_secs(30)); while ping_mux .send_ping(Payload::Bytes(BytesMut::new())) @@ -278,17 +273,17 @@ pub async fn handle_wisp(stream: WispResult, is_v2: bool, id: String) -> anyhow: _ = ping_event.listen() => break, }; } - }); + })); while let Some((connect, stream)) = mux.server_new_stream().await { - set.spawn(handle_stream( + set.push(monoio::spawn(handle_stream( connect, stream, id.clone(), event.clone(), #[cfg(feature = "twisp")] twisp_map.clone(), - )); + ))); } debug!("shutting down wisp client id {:?}", id); @@ -298,7 +293,9 @@ pub async fn handle_wisp(stream: WispResult, is_v2: bool, id: String) -> anyhow: trace!("waiting for tasks to close for wisp client id {:?}", id); - while set.join_next().await.is_some() {} + for task in set { + task.await; + } debug!("wisp client id {:?} disconnected", id); diff --git a/server/src/handle/wisp/utils.rs b/server/src/handle/wisp/utils.rs index f763dda..7076587 100644 --- a/server/src/handle/wisp/utils.rs +++ b/server/src/handle/wisp/utils.rs @@ -7,7 +7,7 @@ use wisp_mux::extensions::cert::VerifyKey; pub async fn get_certificates_from_paths(paths: Vec) -> anyhow::Result> { let mut out = Vec::new(); for path in paths { - let data = tokio::fs::read_to_string(path).await?; + let data = String::from_utf8(monoio::fs::read(path).await?)?; let verifier = VerifyingKey::from_public_key_pem(&data)?; let binary_key = verifier.to_bytes(); diff --git a/server/src/handle/wsproxy.rs b/server/src/handle/wsproxy.rs index e066f6a..f65f512 100644 --- a/server/src/handle/wsproxy.rs +++ b/server/src/handle/wsproxy.rs @@ -2,10 +2,7 @@ use std::str::FromStr; use fastwebsockets::CloseCode; use log::debug; -use tokio::{ - io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, - select, -}; +use tokio::{io::BufReader, select}; use uuid::Uuid; use wisp_mux::{ConnectPacket, StreamType}; diff --git a/server/src/listener.rs b/server/src/listener.rs index ff83ca8..3be9ade 100644 --- a/server/src/listener.rs +++ b/server/src/listener.rs @@ -1,18 +1,24 @@ use std::{ + fs::remove_file, io::{BufReader, Cursor}, - os::fd::AsFd, + os::fd::{AsRawFd, BorrowedFd}, path::PathBuf, pin::Pin, sync::Arc, }; use anyhow::Context; -use rustls_pemfile::{certs, private_key}; -use tokio::{ - fs::{remove_file, try_exists, File}, - io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadHalf, WriteHalf}, - net::{tcp, unix, TcpListener, TcpStream, UnixListener, UnixStream}, +use monoio::{ + fs::{File, OpenOptions}, + io::Splitable, + net::{ + tcp::{TcpListener, TcpOwnedReadHalf, TcpOwnedWriteHalf, TcpStream}, + unix::{UnixListener, UnixOwnedReadHalf, UnixOwnedWriteHalf, UnixStream}, + }, }; +use monoio_compat::StreamWrapper; +use rustls_pemfile::{certs, private_key}; +use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadHalf, WriteHalf}; use tokio_rustls::{rustls, server::TlsStream, TlsAcceptor}; use uuid::Uuid; @@ -157,6 +163,8 @@ impl< } } +unsafe impl Send for Quintet {} + pub struct Duplex(A, B); impl Duplex { @@ -228,21 +236,26 @@ impl AsyncWrite for Duplex { } } -pub type ServerStream = - Quintet, UnixStream, TlsStream, Duplex>; +pub type ServerStream = Quintet< + StreamWrapper, + TlsStream>, + StreamWrapper, + TlsStream>, + Duplex, StreamWrapper>, +>; pub type ServerStreamRead = Quintet< - tcp::OwnedReadHalf, - ReadHalf>, - unix::OwnedReadHalf, - ReadHalf>, - File, + StreamWrapper, + ReadHalf>>, + StreamWrapper, + ReadHalf>>, + StreamWrapper, >; pub type ServerStreamWrite = Quintet< - tcp::OwnedWriteHalf, - WriteHalf>, - unix::OwnedWriteHalf, - WriteHalf>, - File, + StreamWrapper, + WriteHalf>>, + StreamWrapper, + WriteHalf>>, + StreamWrapper, >; pub trait ServerStreamExt { @@ -253,16 +266,22 @@ impl ServerStreamExt for ServerStream { fn split(self) -> (ServerStreamRead, ServerStreamWrite) { match self { Self::One(x) => { - let (r, w) = x.into_split(); - (Quintet::One(r), Quintet::One(w)) + let (r, w) = x.into_inner().into_split(); + ( + Quintet::One(StreamWrapper::new(r)), + Quintet::One(StreamWrapper::new(w)), + ) } Self::Two(x) => { let (r, w) = tokio::io::split(x); (Quintet::Two(r), Quintet::Two(w)) } Self::Three(x) => { - let (r, w) = x.into_split(); - (Quintet::Three(r), Quintet::Three(w)) + let (r, w) = x.into_inner().into_split(); + ( + Quintet::Three(StreamWrapper::new(r)), + Quintet::Three(StreamWrapper::new(w)), + ) } Self::Four(x) => { let (r, w) = tokio::io::split(x); @@ -287,13 +306,13 @@ pub enum ServerListener { impl ServerListener { async fn bind_tcp(bind: &BindAddr) -> anyhow::Result { TcpListener::bind(&bind.1) - .await .with_context(|| format!("failed to bind to tcp address `{}`", bind.1)) } async fn bind_unix(bind: &BindAddr) -> anyhow::Result { - if try_exists(&bind.1).await? { - remove_file(&bind.1).await?; + todo!("this uses sync"); + if PathBuf::from(&bind.1).try_exists()? { + remove_file(&bind.1)?; } UnixListener::bind(&bind.1) .with_context(|| format!("failed to bind to unix socket at `{}`", bind.1)) @@ -307,7 +326,7 @@ impl ServerListener { .context("no tls keypair provided")?; let mut public = BufReader::new(Cursor::new( - tokio::fs::read(&tls_keypair[0]) + monoio::fs::read(&tls_keypair[0]) .await .context("failed to read public key")?, )); @@ -315,7 +334,7 @@ impl ServerListener { .collect::, _>>() .context("failed to parse public key")?; let mut private = BufReader::new(Cursor::new( - tokio::fs::read(&tls_keypair[1]) + monoio::fs::read(&tls_keypair[1]) .await .context("failed to read private key")?, )); @@ -379,25 +398,25 @@ impl ServerListener { match self { Self::Tcp(x) => { let (x, y) = Self::accept_tcp(x).await?; - Ok((Quintet::One(x), y)) + Ok((Quintet::One(StreamWrapper::new(x)), y)) } Self::TlsTcp(tcp, tls) => { let (x, y) = Self::accept_tcp(tcp).await?; - let x = tls.accept(x).await?; + let x = tls.accept(StreamWrapper::new(x)).await?; Ok((Quintet::Two(x), y)) } Self::Unix(x) => { let (x, y) = Self::accept_unix(x).await?; - Ok((Quintet::Three(x), y)) + Ok((Quintet::Three(StreamWrapper::new(x)), y)) } Self::TlsUnix(unix, tls) => { let (x, y) = Self::accept_unix(unix).await?; - let x = tls.accept(x).await?; + let x = tls.accept(StreamWrapper::new(x)).await?; Ok((Quintet::Four(x), y)) } Self::File(path) => { if let Some(path) = path.take() { - let rx = File::options() + let rx = OpenOptions::new() .read(true) .write(false) .open(&path) @@ -405,19 +424,20 @@ impl ServerListener { .context("failed to open read file")?; if CONFIG.server.file_raw_mode { - let mut termios = nix::sys::termios::tcgetattr(rx.as_fd()) + let fd = unsafe { BorrowedFd::borrow_raw(rx.as_raw_fd()) }; + let mut termios = nix::sys::termios::tcgetattr(fd) .context("failed to get termios for read file")? .clone(); nix::sys::termios::cfmakeraw(&mut termios); nix::sys::termios::tcsetattr( - rx.as_fd(), + fd, nix::sys::termios::SetArg::TCSANOW, &termios, ) .context("failed to set raw mode for read file")?; } - let tx = File::options() + let tx = OpenOptions::new() .read(false) .write(true) .open(&path) @@ -425,12 +445,13 @@ impl ServerListener { .context("failed to open write file")?; if CONFIG.server.file_raw_mode { - let mut termios = nix::sys::termios::tcgetattr(tx.as_fd()) + let fd = unsafe { BorrowedFd::borrow_raw(tx.as_raw_fd()) }; + let mut termios = nix::sys::termios::tcgetattr(fd) .context("failed to get termios for write file")? .clone(); nix::sys::termios::cfmakeraw(&mut termios); nix::sys::termios::tcsetattr( - tx.as_fd(), + fd, nix::sys::termios::SetArg::TCSANOW, &termios, ) @@ -438,7 +459,7 @@ impl ServerListener { } Ok(( - Quintet::Five(Duplex::new(rx, tx)), + Quintet::Five(Duplex::new(StreamWrapper::new(rx), StreamWrapper::new(tx))), path.to_string_lossy().to_string(), )) } else { diff --git a/server/src/main.rs b/server/src/main.rs index a98b122..3497164 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -2,11 +2,14 @@ #![deny(clippy::todo)] #![allow(unexpected_cfgs)] -use std::{fs::read_to_string, net::IpAddr}; +use std::{ + fs::read_to_string, + net::{IpAddr, ToSocketAddrs}, +}; use anyhow::Context; use clap::Parser; -use config::{validate_config_cache, Cli, Config, RuntimeFlavor}; +use config::{validate_config_cache, Cli, Config}; use dashmap::DashMap; use handle::{handle_wisp, handle_wsproxy}; use hickory_resolver::{ @@ -17,12 +20,9 @@ use hickory_resolver::{ use lazy_static::lazy_static; use listener::ServerListener; use log::{error, info, trace, warn}; +use monoio::{time::TimeDriver, IoUringDriver, RuntimeBuilder}; use route::{route_stats, ServerRouteResult}; use stats::generate_stats; -use tokio::{ - runtime, - signal::unix::{signal, SignalKind}, -}; use uuid::Uuid; use wisp_mux::ConnectPacket; @@ -43,10 +43,11 @@ pub enum Resolver { impl Resolver { pub async fn resolve(&self, host: String) -> anyhow::Result>> { + // TODO this uses sync match self { Self::Hickory(resolver) => Ok(Box::new(resolver.lookup_ip(host).await?.into_iter())), Self::System => Ok(Box::new( - tokio::net::lookup_host(host + ":0").await?.map(|x| x.ip()), + (host.as_str(), 0).to_socket_addrs()?.map(|x| x.ip()), )), } } @@ -76,6 +77,7 @@ lazy_static! { }; pub static ref CLIENTS: DashMap = DashMap::new(); pub static ref RESOLVER: Resolver = { + return Resolver::System; if CONFIG.stream.dns_servers.is_empty() { if let Ok((config, opts)) = read_system_conf() { Resolver::Hickory(TokioAsyncResolver::tokio(config, opts)) @@ -110,28 +112,22 @@ fn main() -> anyhow::Result<()> { .parse_default_env() .init(); - let mut builder: runtime::Builder = match CONFIG.server.runtime { - RuntimeFlavor::SingleThread => runtime::Builder::new_current_thread(), - RuntimeFlavor::MultiThread => runtime::Builder::new_multi_thread(), - #[cfg(tokio_unstable)] - RuntimeFlavor::MultiThreadAlt => runtime::Builder::new_multi_thread_alt(), - }; - - builder.enable_all(); - let rt = builder.build()?; + let builder = RuntimeBuilder::>::new(); + let mut rt = builder.build().context("failed to create monoio driver")?; rt.block_on(async { validate_config_cache().await; info!( - "listening on {:?} with runtime flavor {:?} and socket transport {:?}", - CONFIG.server.bind, CONFIG.server.runtime, CONFIG.server.transport + "listening on {:?} with socket transport {:?}", + CONFIG.server.bind, CONFIG.server.transport ); trace!("CLI: {:#?}", &*CLI); trace!("CONFIG: {:#?}", &*CONFIG); trace!("RESOLVER: {:?}", &*RESOLVER); + /* tokio::spawn(async { let mut sig = signal(SignalKind::user_defined1()).unwrap(); while sig.recv().await.is_some() { @@ -141,6 +137,7 @@ fn main() -> anyhow::Result<()> { } } }); + */ let mut listener = ServerListener::new(&CONFIG.server.bind) .await @@ -157,7 +154,7 @@ fn main() -> anyhow::Result<()> { format!("failed to bind to address {} for stats server", bind_addr.1) })?; - tokio::spawn(async move { + monoio::spawn(async move { loop { match stats_listener.accept().await { Ok((stream, _)) => { @@ -180,7 +177,7 @@ fn main() -> anyhow::Result<()> { let stats_endpoint = stats_endpoint.clone(); match listener.accept().await { Ok((stream, client_id)) => { - tokio::spawn(async move { + monoio::spawn(async move { let res = route::route(stream, stats_endpoint, move |stream, maybe_ip| { let client_id = if let Some(ip) = maybe_ip { format!("{} ({})", client_id, ip) @@ -205,7 +202,7 @@ fn main() -> anyhow::Result<()> { } fn handle_stream(stream: ServerRouteResult, id: String) { - tokio::spawn(async move { + monoio::spawn(async move { CLIENTS.insert(id.clone(), (DashMap::new(), false)); let res = match stream { ServerRouteResult::Wisp(stream, is_v2) => handle_wisp(stream, is_v2, id.clone()).await, diff --git a/server/src/route.rs b/server/src/route.rs index c6fb601..2d2923a 100644 --- a/server/src/route.rs +++ b/server/src/route.rs @@ -123,7 +123,7 @@ where if req_path.starts_with(&(CONFIG.wisp.prefix.clone() + "/")) { let has_ws_protocol = ws_protocol.is_some(); - tokio::spawn(async move { + monoio::spawn(async move { if let Err(err) = (callback)(fut, HttpUpgradeResult::Wisp(has_ws_protocol), ip_header).await { @@ -136,7 +136,7 @@ where } } else if CONFIG.wisp.allow_wsproxy { let udp = req.uri().query().unwrap_or_default() == "?udp"; - tokio::spawn(async move { + monoio::spawn(async move { if let Err(err) = (callback)(fut, HttpUpgradeResult::WsProxy(req_path, udp), ip_header).await { diff --git a/server/src/stream.rs b/server/src/stream.rs index a483b6b..6938917 100644 --- a/server/src/stream.rs +++ b/server/src/stream.rs @@ -9,8 +9,8 @@ use cfg_if::cfg_if; use fastwebsockets::{FragmentCollector, Frame, OpCode, Payload, WebSocketError}; use hyper::upgrade::Upgraded; use hyper_util::rt::TokioIo; +use monoio::net::{udp::UdpSocket, TcpStream}; use regex::RegexSet; -use tokio::net::{TcpStream, UdpSocket}; use wisp_mux::{ConnectPacket, StreamType}; use crate::{CONFIG, RESOLVER}; @@ -175,7 +175,7 @@ impl ClientStream { SocketAddr::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0).into(), 0) }; - let stream = UdpSocket::bind(bind_addr).await?; + let stream = UdpSocket::bind(bind_addr)?; stream .connect(SocketAddr::new(ipaddr, packet.destination_port))