remove timer

This commit is contained in:
Toshit Chawda 2024-09-15 14:26:02 -07:00
parent 06cc16c692
commit 12a95658b9
No known key found for this signature in database
GPG key ID: 91480ED99E2B3D9D
3 changed files with 21 additions and 57 deletions

View file

@ -21,15 +21,11 @@ pub use crate::{packet::*, stream::*};
use extensions::{udp::UdpProtocolExtension, AnyProtocolExtension, ProtocolExtensionBuilder};
use flume as mpsc;
use futures::{channel::oneshot, select, Future, FutureExt};
use futures_timer::Delay;
use futures::{channel::oneshot, Future};
use inner::{MuxInner, WsEvent};
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use ws::{AppendingWebSocketRead, LockedWebSocketWrite};
@ -176,6 +172,7 @@ impl std::error::Error for WispError {}
async fn maybe_wisp_v2<R>(
read: &mut R,
write: &LockedWebSocketWrite,
role: Role,
builders: &mut [Box<dyn ProtocolExtensionBuilder + Sync + Send>],
) -> Result<(Vec<AnyProtocolExtension>, Option<ws::Frame<'static>>, bool), WispError>
where
@ -186,21 +183,18 @@ where
let mut downgraded = true;
let extension_ids: Vec<_> = builders.iter().map(|x| x.get_id()).collect();
if let Some(frame) = select! {
x = read.wisp_read_frame(write).fuse() => Some(x?),
_ = Delay::new(Duration::from_secs(5)).fuse() => None
} {
let packet = Packet::maybe_parse_info(frame, Role::Client, builders)?;
if let PacketType::Info(info) = packet.packet_type {
supported_extensions = info
.extensions
.into_iter()
.filter(|x| extension_ids.contains(&x.get_id()))
.collect();
downgraded = false;
} else {
extra_packet.replace(ws::Frame::from(packet).clone());
}
let frame = read.wisp_read_frame(write).await?;
let packet = Packet::maybe_parse_info(frame, role, builders)?;
if let PacketType::Info(info) = packet.packet_type {
supported_extensions = info
.extensions
.into_iter()
.filter(|x| extension_ids.contains(&x.get_id()))
.collect();
downgraded = false;
} else {
extra_packet.replace(ws::Frame::from(packet).clone());
}
for extension in supported_extensions.iter_mut() {
@ -265,7 +259,7 @@ impl ServerMux {
let (supported_extensions, extra_packet, downgraded) =
if let Some(mut builders) = extension_builders {
send_info_packet(&tx, &mut builders).await?;
maybe_wisp_v2(&mut rx, &tx, &mut builders).await?
maybe_wisp_v2(&mut rx, &tx, Role::Server, &mut builders).await?
} else {
(Vec::new(), None, true)
};
@ -452,7 +446,7 @@ impl ClientMux {
if let PacketType::Continue(packet) = first_packet.packet_type {
let (supported_extensions, extra_packet, downgraded) =
if let Some(mut builders) = extension_builders {
let res = maybe_wisp_v2(&mut rx, &tx, &mut builders).await?;
let res = maybe_wisp_v2(&mut rx, &tx, Role::Client, &mut builders).await?;
// if not downgraded
if !res.2 {
send_info_packet(&tx, &mut builders).await?;