actually add wsproxy udp, clean up some stuff

This commit is contained in:
Toshit Chawda 2024-07-21 21:38:25 -07:00
parent 04b8feaaf3
commit 5400ae32cc
No known key found for this signature in database
GPG key ID: 91480ED99E2B3D9D
2 changed files with 32 additions and 19 deletions

View file

@ -91,16 +91,6 @@ async fn handle_stream(connect: ConnectPacket, muxstream: MuxStream, id: String)
let closer = muxstream.get_close_handle(); let closer = muxstream.get_close_handle();
let ret: anyhow::Result<()> = async { let ret: anyhow::Result<()> = async {
/*
let (muxread, muxwrite) = muxstream.into_io().into_asyncrw().into_split();
let (mut tcpread, tcpwrite) = stream.into_split();
let mut muxwrite = muxwrite.compat_write();
select! {
x = copy_read_fast(muxread, tcpwrite) => x?,
x = copy(&mut tcpread, &mut muxwrite) => {x?;},
}
*/
// TODO why is copy_write_fast not working?
let (muxread, muxwrite) = muxstream.into_split(); let (muxread, muxwrite) = muxstream.into_split();
let muxread = muxread.into_stream().into_asyncread(); let muxread = muxread.into_stream().into_asyncread();
let (tcpread, tcpwrite) = stream.into_split(); let (tcpread, tcpwrite) = stream.into_split();

View file

@ -91,12 +91,7 @@ pub async fn handle_wsproxy(
ClientStream::Tcp(stream) => { ClientStream::Tcp(stream) => {
let mut stream = BufReader::new(stream); let mut stream = BufReader::new(stream);
let ret: anyhow::Result<()> = async { let ret: anyhow::Result<()> = async {
let mut to_consume = 0usize;
loop { loop {
if to_consume != 0 {
stream.consume(to_consume);
to_consume = 0;
}
select! { select! {
x = ws.read() => { x = ws.read() => {
match x? { match x? {
@ -112,7 +107,8 @@ pub async fn handle_wsproxy(
x = stream.fill_buf() => { x = stream.fill_buf() => {
let x = x?; let x = x?;
ws.write(x).await?; ws.write(x).await?;
to_consume += x.len(); let len = x.len();
stream.consume(len);
} }
} }
} }
@ -129,9 +125,36 @@ pub async fn handle_wsproxy(
} }
} }
} }
ClientStream::Udp(_stream) => { ClientStream::Udp(stream) => {
// TODO let ret: anyhow::Result<()> = async {
let _ = ws.close(CloseCode::Error.into(), b"coming soon").await; let mut data = vec![0u8; 65507];
loop {
select! {
x = ws.read() => {
match x? {
WebSocketFrame::Data(data) => {
stream.send(&data).await?;
}
WebSocketFrame::Close | WebSocketFrame::Ignore => {}
}
}
size = stream.recv(&mut data) => {
ws.write(&data[..size?]).await?;
}
}
}
}
.await;
match ret {
Ok(_) => {
let _ = ws.close(CloseCode::Normal.into(), b"").await;
}
Err(x) => {
let _ = ws
.close(CloseCode::Normal.into(), x.to_string().as_bytes())
.await;
}
}
} }
ClientStream::Blocked => { ClientStream::Blocked => {
let _ = ws.close(CloseCode::Error.into(), b"host is blocked").await; let _ = ws.close(CloseCode::Error.into(), b"host is blocked").await;