From 29adf77a2e547f7d02104b04f068d33994f5503f Mon Sep 17 00:00:00 2001 From: Toshit Chawda Date: Sat, 27 Jan 2024 19:35:54 -0800 Subject: [PATCH] untested udp support (example client doesn't support udp) --- server/src/main.rs | 57 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 43 insertions(+), 14 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index 4dcbf0f..f254b7f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -12,7 +12,7 @@ use hyper::{ Response, StatusCode, }; use hyper_util::rt::TokioIo; -use tokio::net::{TcpListener, TcpStream}; +use tokio::net::{TcpListener, TcpStream, UdpSocket}; use tokio_native_tls::{native_tls, TlsAcceptor}; use tokio_util::codec::{BytesCodec, Framed}; @@ -102,7 +102,7 @@ async fn accept_http( async fn handle_mux( packet: ConnectPacket, mut stream: MuxStream, -) -> Result<(), WispError> { +) -> Result { let uri = format!( "{}:{}", packet.destination_hostname, packet.destination_port @@ -122,23 +122,47 @@ async fn handle_mux( WsEvent::Send(data) => { tcp_stream_framed.send(data).await.map_err(|x| WispError::Other(Box::new(x)))?; } - WsEvent::Close(_) => break, + WsEvent::Close(_) => return Ok(false), }, - None => break + None => break, } }, event = tcp_stream_framed.next() => { match event.and_then(|x| x.ok()) { Some(event) => stream.write(event.into()).await?, - None => break + None => return Ok(true), + } + } + } + } + } + StreamType::Udp => { + let udp_socket = UdpSocket::bind(uri) + .await + .map_err(|x| WispError::Other(Box::new(x)))?; + let mut data = vec![0u8; 65507]; // udp standard max datagram size + loop { + tokio::select! { + size = udp_socket.recv(&mut data).map_err(|x| WispError::Other(Box::new(x))) => { + let size = size?; + stream.write(Bytes::copy_from_slice(&data[..size])).await? + }, + event = stream.read() => { + match event { + Some(event) => match event { + WsEvent::Send(data) => { + udp_socket.send(&data).await.map_err(|x| WispError::Other(Box::new(x)))?; + } + WsEvent::Close(_) => return Ok(false), + }, + None => break, } } } } } - StreamType::Udp => todo!(), } - Ok(()) + Ok(false) } async fn accept_ws( @@ -153,20 +177,25 @@ async fn accept_ws( let mut mux = ServerMux::new(rx, tx); mux.server_loop(&mut |packet, stream| async move { - let tx_cloned = stream.get_write_half(); + let tx_cloned_err = stream.get_write_half(); + let tx_cloned_ok = stream.get_write_half(); let stream_id = stream.stream_id; tokio::spawn(async move { let _ = handle_mux(packet, stream) - .or_else(|err| async { - let _ = tx_cloned + .or_else(|err| async move { + let _ = tx_cloned_err .write_frame(ws::Frame::from(Packet::new_close(stream_id, 0x03))) .await; Err(err) }) - .and_then(|_| async { - tx_cloned - .write_frame(ws::Frame::from(Packet::new_close(stream_id, 0x02))) - .await + .and_then(|should_send| async move { + if should_send { + tx_cloned_ok + .write_frame(ws::Frame::from(Packet::new_close(stream_id, 0x02))) + .await + } else { + Ok(()) + } }) .await; });