This commit is contained in:
Toshit Chawda 2024-03-10 07:57:28 -07:00
parent 19d9544e83
commit 53a399856f
4 changed files with 86 additions and 75 deletions

View file

@ -1,5 +1,4 @@
#![deny(missing_docs)]
#![feature(impl_trait_in_assoc_type)]
#![cfg_attr(docsrs, feature(doc_cfg))]
//! A library for easily creating [Wisp] clients and servers.
//!
@ -109,6 +108,7 @@ impl std::error::Error for WispError {}
struct MuxMapValue {
stream: mpsc::UnboundedSender<MuxEvent>,
stream_type: StreamType,
flow_control: Arc<AtomicU32>,
flow_control_event: Arc<Event>,
}
@ -200,6 +200,7 @@ impl<W: ws::WebSocketWrite + Send + 'static> ServerMuxInner<W> {
packet.stream_id,
MuxMapValue {
stream: ch_tx,
stream_type,
flow_control: flow_control.clone(),
flow_control_event: flow_control_event.clone(),
},
@ -224,13 +225,15 @@ impl<W: ws::WebSocketWrite + Send + 'static> ServerMuxInner<W> {
Data(data) => {
if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) {
let _ = stream.stream.unbounded_send(MuxEvent::Send(data));
stream.flow_control.store(
stream
.flow_control
.load(Ordering::Acquire)
.saturating_sub(1),
Ordering::Release,
);
if stream.stream_type == StreamType::Tcp {
stream.flow_control.store(
stream
.flow_control
.load(Ordering::Acquire)
.saturating_sub(1),
Ordering::Release,
);
}
}
}
Continue(_) => unreachable!(),
@ -388,10 +391,12 @@ impl<W: ws::WebSocketWrite + Send> ClientMuxInner<W> {
}
Continue(inner_packet) => {
if let Some(stream) = self.stream_map.lock().await.get(&packet.stream_id) {
stream
.flow_control
.store(inner_packet.buffer_remaining, Ordering::Release);
let _ = stream.flow_control_event.notify(u32::MAX);
if stream.stream_type == StreamType::Tcp {
stream
.flow_control
.store(inner_packet.buffer_remaining, Ordering::Release);
let _ = stream.flow_control_event.notify(u32::MAX);
}
}
}
Close(inner_packet) => {
@ -490,6 +495,7 @@ impl<W: ws::WebSocketWrite + Send + 'static> ClientMux<W> {
stream_id,
MuxMapValue {
stream: ch_tx,
stream_type,
flow_control: flow_control.clone(),
flow_control_event: evt.clone(),
},