untested udp support (example client doesn't support udp)

This commit is contained in:
Toshit Chawda 2024-01-27 19:35:54 -08:00
parent 2a5684192a
commit 29adf77a2e
No known key found for this signature in database
GPG key ID: 91480ED99E2B3D9D

View file

@ -12,7 +12,7 @@ use hyper::{
Response, StatusCode, Response, StatusCode,
}; };
use hyper_util::rt::TokioIo; use hyper_util::rt::TokioIo;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream, UdpSocket};
use tokio_native_tls::{native_tls, TlsAcceptor}; use tokio_native_tls::{native_tls, TlsAcceptor};
use tokio_util::codec::{BytesCodec, Framed}; use tokio_util::codec::{BytesCodec, Framed};
@ -102,7 +102,7 @@ async fn accept_http(
async fn handle_mux( async fn handle_mux(
packet: ConnectPacket, packet: ConnectPacket,
mut stream: MuxStream<impl ws::WebSocketWrite>, mut stream: MuxStream<impl ws::WebSocketWrite>,
) -> Result<(), WispError> { ) -> Result<bool, WispError> {
let uri = format!( let uri = format!(
"{}:{}", "{}:{}",
packet.destination_hostname, packet.destination_port packet.destination_hostname, packet.destination_port
@ -122,23 +122,47 @@ async fn handle_mux(
WsEvent::Send(data) => { WsEvent::Send(data) => {
tcp_stream_framed.send(data).await.map_err(|x| WispError::Other(Box::new(x)))?; tcp_stream_framed.send(data).await.map_err(|x| WispError::Other(Box::new(x)))?;
} }
WsEvent::Close(_) => break, WsEvent::Close(_) => return Ok(false),
}, },
None => break None => break,
} }
}, },
event = tcp_stream_framed.next() => { event = tcp_stream_framed.next() => {
match event.and_then(|x| x.ok()) { match event.and_then(|x| x.ok()) {
Some(event) => stream.write(event.into()).await?, Some(event) => stream.write(event.into()).await?,
None => break None => return Ok(true),
}
}
}
}
}
StreamType::Udp => {
let udp_socket = UdpSocket::bind(uri)
.await
.map_err(|x| WispError::Other(Box::new(x)))?;
let mut data = vec![0u8; 65507]; // udp standard max datagram size
loop {
tokio::select! {
size = udp_socket.recv(&mut data).map_err(|x| WispError::Other(Box::new(x))) => {
let size = size?;
stream.write(Bytes::copy_from_slice(&data[..size])).await?
},
event = stream.read() => {
match event {
Some(event) => match event {
WsEvent::Send(data) => {
udp_socket.send(&data).await.map_err(|x| WispError::Other(Box::new(x)))?;
}
WsEvent::Close(_) => return Ok(false),
},
None => break,
} }
} }
} }
} }
} }
StreamType::Udp => todo!(),
} }
Ok(()) Ok(false)
} }
async fn accept_ws( async fn accept_ws(
@ -153,20 +177,25 @@ async fn accept_ws(
let mut mux = ServerMux::new(rx, tx); let mut mux = ServerMux::new(rx, tx);
mux.server_loop(&mut |packet, stream| async move { mux.server_loop(&mut |packet, stream| async move {
let tx_cloned = stream.get_write_half(); let tx_cloned_err = stream.get_write_half();
let tx_cloned_ok = stream.get_write_half();
let stream_id = stream.stream_id; let stream_id = stream.stream_id;
tokio::spawn(async move { tokio::spawn(async move {
let _ = handle_mux(packet, stream) let _ = handle_mux(packet, stream)
.or_else(|err| async { .or_else(|err| async move {
let _ = tx_cloned let _ = tx_cloned_err
.write_frame(ws::Frame::from(Packet::new_close(stream_id, 0x03))) .write_frame(ws::Frame::from(Packet::new_close(stream_id, 0x03)))
.await; .await;
Err(err) Err(err)
}) })
.and_then(|_| async { .and_then(|should_send| async move {
tx_cloned if should_send {
.write_frame(ws::Frame::from(Packet::new_close(stream_id, 0x02))) tx_cloned_ok
.await .write_frame(ws::Frame::from(Packet::new_close(stream_id, 0x02)))
.await
} else {
Ok(())
}
}) })
.await; .await;
}); });