diff --git a/server/src/handle/wisp.rs b/server/src/handle/wisp.rs index 7bfac71..b00efb5 100644 --- a/server/src/handle/wisp.rs +++ b/server/src/handle/wisp.rs @@ -91,16 +91,6 @@ async fn handle_stream(connect: ConnectPacket, muxstream: MuxStream, id: String) let closer = muxstream.get_close_handle(); let ret: anyhow::Result<()> = async { - /* - let (muxread, muxwrite) = muxstream.into_io().into_asyncrw().into_split(); - let (mut tcpread, tcpwrite) = stream.into_split(); - let mut muxwrite = muxwrite.compat_write(); - select! { - x = copy_read_fast(muxread, tcpwrite) => x?, - x = copy(&mut tcpread, &mut muxwrite) => {x?;}, - } - */ - // TODO why is copy_write_fast not working? let (muxread, muxwrite) = muxstream.into_split(); let muxread = muxread.into_stream().into_asyncread(); let (tcpread, tcpwrite) = stream.into_split(); diff --git a/server/src/handle/wsproxy.rs b/server/src/handle/wsproxy.rs index a98955e..4353c28 100644 --- a/server/src/handle/wsproxy.rs +++ b/server/src/handle/wsproxy.rs @@ -91,12 +91,7 @@ pub async fn handle_wsproxy( ClientStream::Tcp(stream) => { let mut stream = BufReader::new(stream); let ret: anyhow::Result<()> = async { - let mut to_consume = 0usize; loop { - if to_consume != 0 { - stream.consume(to_consume); - to_consume = 0; - } select! { x = ws.read() => { match x? { @@ -112,7 +107,8 @@ pub async fn handle_wsproxy( x = stream.fill_buf() => { let x = x?; ws.write(x).await?; - to_consume += x.len(); + let len = x.len(); + stream.consume(len); } } } @@ -129,9 +125,36 @@ pub async fn handle_wsproxy( } } } - ClientStream::Udp(_stream) => { - // TODO - let _ = ws.close(CloseCode::Error.into(), b"coming soon").await; + ClientStream::Udp(stream) => { + let ret: anyhow::Result<()> = async { + let mut data = vec![0u8; 65507]; + loop { + select! { + x = ws.read() => { + match x? { + WebSocketFrame::Data(data) => { + stream.send(&data).await?; + } + WebSocketFrame::Close | WebSocketFrame::Ignore => {} + } + } + size = stream.recv(&mut data) => { + ws.write(&data[..size?]).await?; + } + } + } + } + .await; + match ret { + Ok(_) => { + let _ = ws.close(CloseCode::Normal.into(), b"").await; + } + Err(x) => { + let _ = ws + .close(CloseCode::Normal.into(), x.to_string().as_bytes()) + .await; + } + } } ClientStream::Blocked => { let _ = ws.close(CloseCode::Error.into(), b"host is blocked").await;