fix udp and tls tests, add send_bytes to ws

This commit is contained in:
Toshit Chawda 2024-03-09 22:11:50 -08:00
parent 55d40aa32d
commit 19d9544e83
7 changed files with 64 additions and 35 deletions

View file

@ -182,7 +182,12 @@ impl<W: ws::WebSocketWrite + Send + 'static> ServerMuxInner<W> {
.await?;
loop {
let packet: Packet = rx.wisp_read_frame(&self.tx).await?.try_into()?;
let frame = rx.wisp_read_frame(&self.tx).await?;
if frame.opcode == ws::OpCode::Close {
break Ok(());
}
let packet = Packet::try_from(frame)?;
use PacketType::*;
match packet.packet_type {
Connect(inner_packet) => {
@ -368,29 +373,32 @@ impl<W: ws::WebSocketWrite + Send> ClientMuxInner<W> {
{
loop {
let frame = rx.wisp_read_frame(&self.tx).await?;
if let Ok(packet) = Packet::try_from(frame) {
use PacketType::*;
match packet.packet_type {
Connect(_) => unreachable!(),
Data(data) => {
if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) {
let _ = stream.stream.unbounded_send(MuxEvent::Send(data));
}
if frame.opcode == ws::OpCode::Close {
break Ok(());
}
let packet = Packet::try_from(frame)?;
use PacketType::*;
match packet.packet_type {
Connect(_) => unreachable!(),
Data(data) => {
if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) {
let _ = stream.stream.unbounded_send(MuxEvent::Send(data));
}
Continue(inner_packet) => {
if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) {
stream
.flow_control
.store(inner_packet.buffer_remaining, Ordering::Release);
let _ = stream.flow_control_event.notify(u32::MAX);
}
}
Continue(inner_packet) => {
if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) {
stream
.flow_control
.store(inner_packet.buffer_remaining, Ordering::Release);
let _ = stream.flow_control_event.notify(u32::MAX);
}
Close(inner_packet) => {
if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) {
let _ = stream.stream.unbounded_send(MuxEvent::Close(inner_packet));
}
self.stream_map.lock().await.remove(&packet.stream_id);
}
Close(inner_packet) => {
if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) {
let _ = stream.stream.unbounded_send(MuxEvent::Close(inner_packet));
}
self.stream_map.lock().await.remove(&packet.stream_id);
}
}
}