ping every 30 seconds

This commit is contained in:
r58Playz 2024-09-23 17:33:16 -07:00
parent fdd641c67f
commit 09b15e3c43
4 changed files with 59 additions and 4 deletions

View file

@ -2,9 +2,10 @@
pub mod twisp; pub mod twisp;
pub mod utils; pub mod utils;
use std::sync::Arc; use std::{sync::Arc, time::Duration};
use anyhow::Context; use anyhow::Context;
use bytes::BytesMut;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use event_listener::Event; use event_listener::Event;
use futures_util::FutureExt; use futures_util::FutureExt;
@ -13,12 +14,12 @@ use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
net::tcp::{OwnedReadHalf, OwnedWriteHalf}, net::tcp::{OwnedReadHalf, OwnedWriteHalf},
select, select,
task::JoinSet, task::JoinSet, time::interval,
}; };
use tokio_util::compat::FuturesAsyncReadCompatExt; use tokio_util::compat::FuturesAsyncReadCompatExt;
use uuid::Uuid; use uuid::Uuid;
use wisp_mux::{ use wisp_mux::{
CloseReason, ConnectPacket, MuxStream, MuxStreamAsyncRead, MuxStreamWrite, ServerMux, ws::Payload, CloseReason, ConnectPacket, MuxStream, MuxStreamAsyncRead, MuxStreamWrite, ServerMux
}; };
use crate::{ use crate::{
@ -237,6 +238,7 @@ pub async fn handle_wisp(stream: WispResult, id: String) -> anyhow::Result<()> {
.context("failed to create server multiplexor")? .context("failed to create server multiplexor")?
.with_required_extensions(&required_extensions) .with_required_extensions(&required_extensions)
.await?; .await?;
let mux = Arc::new(mux);
debug!( debug!(
"new wisp client id {:?} connected with extensions {:?}", "new wisp client id {:?} connected with extensions {:?}",
@ -255,6 +257,14 @@ pub async fn handle_wisp(stream: WispResult, id: String) -> anyhow::Result<()> {
trace!("wisp client id {:?} multiplexor result {:?}", mux_id, x) trace!("wisp client id {:?} multiplexor result {:?}", mux_id, x)
}))); })));
let ping_mux = mux.clone();
set.spawn(async move {
let mut interval = interval(Duration::from_secs(30));
while ping_mux.send_ping(Payload::Bytes(BytesMut::new())).await.is_ok() {
interval.tick().await;
}
});
while let Some((connect, stream)) = mux.server_new_stream().await { while let Some((connect, stream)) = mux.server_new_stream().await {
set.spawn(handle_stream( set.spawn(handle_stream(
connect, connect,

View file

@ -23,6 +23,8 @@ pub(crate) enum WsEvent {
u16, u16,
oneshot::Sender<Result<MuxStream, WispError>>, oneshot::Sender<Result<MuxStream, WispError>>,
), ),
SendPing(Payload<'static>, oneshot::Sender<Result<(), WispError>>),
SendPong(Payload<'static>),
WispMessage(Option<Packet<'static>>, Option<Frame<'static>>), WispMessage(Option<Packet<'static>>, Option<Frame<'static>>),
EndFut(Option<CloseReason>), EndFut(Option<CloseReason>),
} }
@ -234,6 +236,8 @@ impl<R: WebSocketRead + Send> MuxInner<R> {
let (mut frame, optional_frame) = msg?; let (mut frame, optional_frame) = msg?;
if frame.opcode == OpCode::Close { if frame.opcode == OpCode::Close {
return Ok(None); return Ok(None);
} else if frame.opcode == OpCode::Ping {
return Ok(Some(WsEvent::SendPong(frame.payload)));
} }
if let Some(ref extra_frame) = optional_frame { if let Some(ref extra_frame) = optional_frame {
@ -308,6 +312,12 @@ impl<R: WebSocketRead + Send> MuxInner<R> {
let _ = channel.send(Err(WispError::InvalidStreamId)); let _ = channel.send(Err(WispError::InvalidStreamId));
} }
} }
WsEvent::SendPing(payload, channel) => {
let _ = channel.send(self.tx.write_frame(Frame::new(OpCode::Ping, payload, true)).await);
}
WsEvent::SendPong(payload) => {
self.tx.write_frame(Frame::new(OpCode::Pong, payload, true)).await?;
}
WsEvent::EndFut(x) => { WsEvent::EndFut(x) => {
if let Some(reason) = x { if let Some(reason) = x {
let _ = self let _ = self

View file

@ -31,7 +31,7 @@ use std::{
Arc, Arc,
}, },
}; };
use ws::{AppendingWebSocketRead, LockedWebSocketWrite}; use ws::{AppendingWebSocketRead, LockedWebSocketWrite, Payload};
/// Wisp version supported by this crate. /// Wisp version supported by this crate.
pub const WISP_VERSION: WispVersion = WispVersion { major: 2, minor: 0 }; pub const WISP_VERSION: WispVersion = WispVersion { major: 2, minor: 0 };
@ -363,6 +363,19 @@ impl ServerMux {
self.muxstream_recv.recv_async().await.ok() self.muxstream_recv.recv_async().await.ok()
} }
/// Send a ping to the client.
pub async fn send_ping(&self, payload: Payload<'static>) -> Result<(), WispError> {
if self.actor_exited.load(Ordering::Acquire) {
return Err(WispError::MuxTaskEnded);
}
let (tx, rx) = oneshot::channel();
self.actor_tx
.send_async(WsEvent::SendPing(payload, tx))
.await
.map_err(|_| WispError::MuxMessageFailedToSend)?;
rx.await.map_err(|_| WispError::MuxMessageFailedToRecv)?
}
async fn close_internal(&self, reason: Option<CloseReason>) -> Result<(), WispError> { async fn close_internal(&self, reason: Option<CloseReason>) -> Result<(), WispError> {
if self.actor_exited.load(Ordering::Acquire) { if self.actor_exited.load(Ordering::Acquire) {
return Err(WispError::MuxTaskEnded); return Err(WispError::MuxTaskEnded);
@ -554,6 +567,19 @@ impl ClientMux {
rx.await.map_err(|_| WispError::MuxMessageFailedToRecv)? rx.await.map_err(|_| WispError::MuxMessageFailedToRecv)?
} }
/// Send a ping to the server.
pub async fn send_ping(&self, payload: Payload<'static>) -> Result<(), WispError> {
if self.actor_exited.load(Ordering::Acquire) {
return Err(WispError::MuxTaskEnded);
}
let (tx, rx) = oneshot::channel();
self.actor_tx
.send_async(WsEvent::SendPing(payload, tx))
.await
.map_err(|_| WispError::MuxMessageFailedToSend)?;
rx.await.map_err(|_| WispError::MuxMessageFailedToRecv)?
}
async fn close_internal(&self, reason: Option<CloseReason>) -> Result<(), WispError> { async fn close_internal(&self, reason: Option<CloseReason>) -> Result<(), WispError> {
if self.actor_exited.load(Ordering::Acquire) { if self.actor_exited.load(Ordering::Acquire) {
return Err(WispError::MuxTaskEnded); return Err(WispError::MuxTaskEnded);

View file

@ -120,6 +120,15 @@ pub struct Frame<'a> {
} }
impl<'a> Frame<'a> { impl<'a> Frame<'a> {
/// Create a new frame.
pub fn new(opcode: OpCode, payload: Payload<'a>, finished: bool) -> Self {
Self {
finished,
opcode,
payload,
}
}
/// Create a new text frame. /// Create a new text frame.
pub fn text(payload: Payload<'a>) -> Self { pub fn text(payload: Payload<'a>) -> Self {
Self { Self {