mirror of
https://github.com/MercuryWorkshop/epoxy-tls.git
synced 2025-05-17 16:10:01 -04:00
delete server
This commit is contained in:
parent
2872e02a2c
commit
3bf19be9f0
3 changed files with 1 additions and 588 deletions
|
@ -1,7 +1,6 @@
|
||||||
[workspace]
|
[workspace]
|
||||||
resolver = "2"
|
resolver = "2"
|
||||||
members = ["server", "client", "wisp", "simple-wisp-client", "certs-grabber"]
|
members = ["client", "wisp", "simple-wisp-client", "certs-grabber"]
|
||||||
default-members = ["server"]
|
|
||||||
|
|
||||||
[profile.release]
|
[profile.release]
|
||||||
lto = true
|
lto = true
|
||||||
|
|
|
@ -1,23 +0,0 @@
|
||||||
[package]
|
|
||||||
name = "epoxy-server"
|
|
||||||
version = "1.0.0"
|
|
||||||
edition = "2021"
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
bytes = "1.5.0"
|
|
||||||
cfg-if = "1.0.0"
|
|
||||||
clap = { version = "4.4.18", features = ["derive", "help", "usage", "color", "wrap_help", "cargo"] }
|
|
||||||
clio = { version = "0.3.5", features = ["clap-parse"] }
|
|
||||||
console-subscriber = { version = "0.2.0", optional = true }
|
|
||||||
dashmap = "5.5.3"
|
|
||||||
fastwebsockets = { version = "0.8.0", features = ["upgrade", "simdutf8", "unstable-split"] }
|
|
||||||
futures-util = { version = "0.3.30", features = ["sink"] }
|
|
||||||
http-body-util = "0.1.0"
|
|
||||||
hyper = { version = "1.1.0", features = ["server", "http1"] }
|
|
||||||
hyper-util = { version = "0.1.2", features = ["tokio"] }
|
|
||||||
tokio = { version = "1.5.1", features = ["rt-multi-thread", "macros"] }
|
|
||||||
tokio-util = { version = "0.7.10", features = ["codec", "compat"] }
|
|
||||||
wisp-mux = { path = "../wisp", features = ["fastwebsockets"] }
|
|
||||||
|
|
||||||
[features]
|
|
||||||
tokio-console = ["tokio/tracing", "dep:console-subscriber"]
|
|
|
@ -1,563 +0,0 @@
|
||||||
#![feature(let_chains, ip)]
|
|
||||||
use std::{collections::HashMap, io::Error, path::PathBuf, sync::Arc};
|
|
||||||
|
|
||||||
use bytes::Bytes;
|
|
||||||
use cfg_if::cfg_if;
|
|
||||||
use clap::Parser;
|
|
||||||
use fastwebsockets::{
|
|
||||||
upgrade::{self, UpgradeFut},
|
|
||||||
CloseCode, FragmentCollector, FragmentCollectorRead, Frame, OpCode, Payload, WebSocketError,
|
|
||||||
};
|
|
||||||
use futures_util::{SinkExt, StreamExt, TryFutureExt};
|
|
||||||
use hyper::{
|
|
||||||
body::Incoming, server::conn::http1, service::service_fn, upgrade::Parts, Request, Response,
|
|
||||||
StatusCode,
|
|
||||||
};
|
|
||||||
use hyper_util::rt::TokioIo;
|
|
||||||
#[cfg(unix)]
|
|
||||||
use tokio::net::{UnixListener, UnixStream};
|
|
||||||
use tokio::{
|
|
||||||
io::{copy, AsyncBufReadExt, AsyncWriteExt},
|
|
||||||
net::{lookup_host, TcpListener, TcpStream, UdpSocket},
|
|
||||||
select,
|
|
||||||
};
|
|
||||||
#[cfg(unix)]
|
|
||||||
use tokio_util::either::Either;
|
|
||||||
use tokio_util::{
|
|
||||||
codec::{BytesCodec, Framed},
|
|
||||||
compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt},
|
|
||||||
};
|
|
||||||
|
|
||||||
use wisp_mux::{
|
|
||||||
extensions::{
|
|
||||||
password::{PasswordProtocolExtension, PasswordProtocolExtensionBuilder},
|
|
||||||
udp::UdpProtocolExtensionBuilder,
|
|
||||||
ProtocolExtensionBuilder,
|
|
||||||
},
|
|
||||||
CloseReason, ConnectPacket, MuxStream, MuxStreamAsyncRW, ServerMux, StreamType, WispError,
|
|
||||||
};
|
|
||||||
|
|
||||||
type HttpBody = http_body_util::Full<hyper::body::Bytes>;
|
|
||||||
|
|
||||||
/// Server implementation of the Wisp protocol in Rust, made for epoxy
|
|
||||||
#[derive(Parser)]
|
|
||||||
#[command(version = clap::crate_version!())]
|
|
||||||
struct Cli {
|
|
||||||
/// URL prefix the server should serve on
|
|
||||||
#[arg(long)]
|
|
||||||
prefix: Option<String>,
|
|
||||||
/// Port the server should bind to
|
|
||||||
#[arg(long, short, default_value = "4000")]
|
|
||||||
port: String,
|
|
||||||
/// Host the server should bind to
|
|
||||||
#[arg(long = "host", short, value_name = "HOST", default_value = "0.0.0.0")]
|
|
||||||
bind_host: String,
|
|
||||||
/// Whether the server should listen on a Unix socket located at the value of the host argument
|
|
||||||
#[arg(long, short)]
|
|
||||||
unix_socket: bool,
|
|
||||||
/// Whether the server should block IP addresses that are not globally reachable
|
|
||||||
///
|
|
||||||
/// See https://doc.rust-lang.org/std/net/struct.Ipv4Addr.html#method.is_global for which IP
|
|
||||||
/// addresses are blocked
|
|
||||||
#[arg(long, short = 'B')]
|
|
||||||
block_local: bool,
|
|
||||||
/// Whether the server should block UDP
|
|
||||||
///
|
|
||||||
/// This does nothing for wsproxy as that is always TCP
|
|
||||||
#[arg(long)]
|
|
||||||
block_udp: bool,
|
|
||||||
/// Whether the server should block ports other than 80 or 443
|
|
||||||
#[arg(long)]
|
|
||||||
block_non_http: bool,
|
|
||||||
/// Path to a file containing `user:password` separated by newlines. This is plaintext!!!
|
|
||||||
///
|
|
||||||
/// `user` cannot contain `:`. Whitespace will be trimmed.
|
|
||||||
#[arg(long)]
|
|
||||||
auth: Option<PathBuf>,
|
|
||||||
/// Use Wisp V1.
|
|
||||||
#[arg(long)]
|
|
||||||
wisp_v1: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
struct MuxOptions {
|
|
||||||
pub block_local: bool,
|
|
||||||
pub block_udp: bool,
|
|
||||||
pub block_non_http: bool,
|
|
||||||
pub enforce_auth: bool,
|
|
||||||
pub auth: Arc<Vec<Box<(dyn ProtocolExtensionBuilder + Send + Sync)>>>,
|
|
||||||
pub wisp_v1: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg_if! {
|
|
||||||
if #[cfg(unix)] {
|
|
||||||
type ListenerStream = Either<TcpStream, UnixStream>;
|
|
||||||
} else {
|
|
||||||
type ListenerStream = TcpStream;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
enum Listener {
|
|
||||||
Tcp(TcpListener),
|
|
||||||
#[cfg(unix)]
|
|
||||||
Unix(UnixListener),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Listener {
|
|
||||||
pub async fn accept(&self) -> Result<(ListenerStream, String), std::io::Error> {
|
|
||||||
Ok(match self {
|
|
||||||
Listener::Tcp(listener) => {
|
|
||||||
let (stream, addr) = listener.accept().await?;
|
|
||||||
cfg_if! {
|
|
||||||
if #[cfg(unix)] {
|
|
||||||
(Either::Left(stream), addr.to_string())
|
|
||||||
} else {
|
|
||||||
(stream, addr.to_string())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#[cfg(unix)]
|
|
||||||
Listener::Unix(listener) => {
|
|
||||||
let (stream, addr) = listener.accept().await?;
|
|
||||||
(
|
|
||||||
Either::Right(stream),
|
|
||||||
addr.as_pathname()
|
|
||||||
.map(|x| x.to_string_lossy().into())
|
|
||||||
.unwrap_or("unknown_unix_socket".into()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn bind(addr: &str, unix: bool) -> Result<Listener, std::io::Error> {
|
|
||||||
cfg_if! {
|
|
||||||
if #[cfg(unix)] {
|
|
||||||
if unix {
|
|
||||||
if std::fs::metadata(addr).is_ok() {
|
|
||||||
println!("attempting to remove old socket {:?}", addr);
|
|
||||||
std::fs::remove_file(addr)?;
|
|
||||||
}
|
|
||||||
return Ok(Listener::Unix(UnixListener::bind(addr)?));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if unix {
|
|
||||||
panic!("Unix sockets are only supported on Unix.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(Listener::Tcp(TcpListener::bind(addr).await?))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main(flavor = "multi_thread")]
|
|
||||||
async fn main() -> Result<(), Error> {
|
|
||||||
#[cfg(feature = "tokio-console")]
|
|
||||||
console_subscriber::init();
|
|
||||||
let opt = Cli::parse();
|
|
||||||
let addr = if opt.unix_socket {
|
|
||||||
opt.bind_host
|
|
||||||
} else {
|
|
||||||
format!("{}:{}", opt.bind_host, opt.port)
|
|
||||||
};
|
|
||||||
|
|
||||||
let socket = bind(&addr, opt.unix_socket).await?;
|
|
||||||
|
|
||||||
let prefix = if let Some(prefix) = opt.prefix {
|
|
||||||
match (prefix.starts_with('/'), prefix.ends_with('/')) {
|
|
||||||
(true, true) => prefix,
|
|
||||||
(true, false) => prefix + "/",
|
|
||||||
(false, true) => "/".to_string() + &prefix,
|
|
||||||
(false, false) => "/".to_string() + &prefix + "/",
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
"/".to_string()
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut auth = HashMap::new();
|
|
||||||
let enforce_auth = opt.auth.is_some();
|
|
||||||
if let Some(file) = opt.auth {
|
|
||||||
let file = std::fs::read_to_string(file)?;
|
|
||||||
for entry in file.split('\n').filter_map(|x| {
|
|
||||||
if x.contains(':') {
|
|
||||||
Some(x.trim())
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}) {
|
|
||||||
let split: Vec<_> = entry.split(':').collect();
|
|
||||||
let username = split[0];
|
|
||||||
let password = split[1..].join(":");
|
|
||||||
println!(
|
|
||||||
"adding username {:?} password {:?} to allowed auth",
|
|
||||||
username, password
|
|
||||||
);
|
|
||||||
auth.insert(username.to_string(), password.to_string());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let pw_ext = PasswordProtocolExtensionBuilder::new_server(auth);
|
|
||||||
|
|
||||||
let mux_options = MuxOptions {
|
|
||||||
block_local: opt.block_local,
|
|
||||||
block_non_http: opt.block_non_http,
|
|
||||||
block_udp: opt.block_udp,
|
|
||||||
auth: Arc::new(vec![
|
|
||||||
Box::new(UdpProtocolExtensionBuilder),
|
|
||||||
Box::new(pw_ext),
|
|
||||||
]),
|
|
||||||
enforce_auth,
|
|
||||||
wisp_v1: opt.wisp_v1,
|
|
||||||
};
|
|
||||||
|
|
||||||
println!("listening on `{}` with prefix `{}`", addr, prefix);
|
|
||||||
while let Ok((stream, addr)) = socket.accept().await {
|
|
||||||
let prefix = prefix.clone();
|
|
||||||
let mux_options = mux_options.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let service = service_fn(move |res| {
|
|
||||||
accept_http(res, addr.clone(), prefix.clone(), mux_options.clone())
|
|
||||||
});
|
|
||||||
let conn = http1::Builder::new()
|
|
||||||
.serve_connection(TokioIo::new(stream), service)
|
|
||||||
.with_upgrades();
|
|
||||||
if let Err(err) = conn.await {
|
|
||||||
println!("failed to serve conn: {:?}", err);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn accept_http(
|
|
||||||
mut req: Request<Incoming>,
|
|
||||||
addr: String,
|
|
||||||
prefix: String,
|
|
||||||
mux_options: MuxOptions,
|
|
||||||
) -> Result<Response<HttpBody>, WebSocketError> {
|
|
||||||
let uri = req.uri().path().to_string();
|
|
||||||
if upgrade::is_upgrade_request(&req)
|
|
||||||
&& let Some(uri) = uri.strip_prefix(&prefix)
|
|
||||||
{
|
|
||||||
let (res, fut) = upgrade::upgrade(&mut req)?;
|
|
||||||
|
|
||||||
if uri.is_empty() {
|
|
||||||
tokio::spawn(async move { accept_ws(fut, addr.clone(), mux_options).await });
|
|
||||||
} else if let Some(uri) = uri.strip_prefix('/').map(|x| x.to_string()) {
|
|
||||||
tokio::spawn(async move {
|
|
||||||
accept_wsproxy(
|
|
||||||
fut,
|
|
||||||
uri,
|
|
||||||
addr.clone(),
|
|
||||||
mux_options.block_local,
|
|
||||||
mux_options.block_non_http,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(Response::from_parts(
|
|
||||||
res.into_parts().0,
|
|
||||||
HttpBody::new(Bytes::new()),
|
|
||||||
))
|
|
||||||
} else {
|
|
||||||
println!("random request to path {:?}", uri);
|
|
||||||
Ok(Response::builder()
|
|
||||||
.status(StatusCode::OK)
|
|
||||||
.body(HttpBody::new(":3".into()))
|
|
||||||
.unwrap())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn copy_buf(mux: MuxStreamAsyncRW, tcp: TcpStream) -> std::io::Result<()> {
|
|
||||||
let (muxrx, muxtx) = mux.into_split();
|
|
||||||
let mut muxrx = muxrx.compat();
|
|
||||||
let mut muxtx = muxtx.compat_write();
|
|
||||||
|
|
||||||
let (mut tcprx, mut tcptx) = tcp.into_split();
|
|
||||||
|
|
||||||
let fast_fut = async {
|
|
||||||
loop {
|
|
||||||
let buf = muxrx.fill_buf().await?;
|
|
||||||
if buf.is_empty() {
|
|
||||||
tcptx.flush().await?;
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
let i = tcptx.write(buf).await?;
|
|
||||||
if i == 0 {
|
|
||||||
return Err(std::io::ErrorKind::WriteZero.into());
|
|
||||||
}
|
|
||||||
|
|
||||||
muxrx.consume(i);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let slow_fut = copy(&mut tcprx, &mut muxtx);
|
|
||||||
|
|
||||||
select! {
|
|
||||||
x = fast_fut => x,
|
|
||||||
x = slow_fut => x.map(|_| ()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_mux(
|
|
||||||
packet: ConnectPacket,
|
|
||||||
stream: MuxStream,
|
|
||||||
) -> Result<bool, Box<dyn std::error::Error + Sync + Send>> {
|
|
||||||
let uri = format!(
|
|
||||||
"{}:{}",
|
|
||||||
packet.destination_hostname, packet.destination_port
|
|
||||||
);
|
|
||||||
match packet.stream_type {
|
|
||||||
StreamType::Tcp => {
|
|
||||||
let tcp_stream = TcpStream::connect(uri).await?;
|
|
||||||
let mux = stream.into_io().into_asyncrw();
|
|
||||||
copy_buf(mux, tcp_stream).await?;
|
|
||||||
}
|
|
||||||
StreamType::Udp => {
|
|
||||||
let uri = lookup_host(uri)
|
|
||||||
.await?
|
|
||||||
.next()
|
|
||||||
.ok_or(WispError::InvalidUri)?;
|
|
||||||
let udp_socket =
|
|
||||||
UdpSocket::bind(if uri.is_ipv4() { "0.0.0.0:0" } else { "[::]:0" }).await?;
|
|
||||||
udp_socket.connect(uri).await?;
|
|
||||||
let mut data = vec![0u8; 65507]; // udp standard max datagram size
|
|
||||||
loop {
|
|
||||||
tokio::select! {
|
|
||||||
size = udp_socket.recv(&mut data) => {
|
|
||||||
let size = size?;
|
|
||||||
stream.write(Bytes::copy_from_slice(&data[..size])).await?
|
|
||||||
},
|
|
||||||
event = stream.read() => {
|
|
||||||
match event {
|
|
||||||
Some(event) => {
|
|
||||||
let _ = udp_socket.send(&event).await?;
|
|
||||||
}
|
|
||||||
None => break,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
StreamType::Unknown(_) => {
|
|
||||||
stream.close(CloseReason::ServerStreamInvalidInfo).await?;
|
|
||||||
return Ok(false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(true)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn accept_ws(
|
|
||||||
ws: UpgradeFut,
|
|
||||||
addr: String,
|
|
||||||
mux_options: MuxOptions,
|
|
||||||
) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
|
|
||||||
let mut ws = ws.await?;
|
|
||||||
// to prevent memory ""leaks"" because users are sending in packets way too fast the message
|
|
||||||
// size is set to 1M
|
|
||||||
ws.set_max_message_size(1024 * 1024);
|
|
||||||
let (rx, tx) = ws.split(|x| {
|
|
||||||
let Parts {
|
|
||||||
io, read_buf: buf, ..
|
|
||||||
} = x.into_inner()
|
|
||||||
.downcast::<TokioIo<ListenerStream>>()
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(buf.len(), 0);
|
|
||||||
cfg_if! {
|
|
||||||
if #[cfg(unix)] {
|
|
||||||
match io.into_inner() {
|
|
||||||
Either::Left(x) => {
|
|
||||||
let (rx, tx) = x.into_split();
|
|
||||||
(Either::Left(rx), Either::Left(tx))
|
|
||||||
}
|
|
||||||
Either::Right(x) => {
|
|
||||||
let (rx, tx) = x.into_split();
|
|
||||||
(Either::Right(rx), Either::Right(tx))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
io.into_inner().into_split()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
let rx = FragmentCollectorRead::new(rx);
|
|
||||||
|
|
||||||
println!("{:?}: connected", addr);
|
|
||||||
// to prevent memory ""leaks"" because users are sending in packets way too fast the buffer
|
|
||||||
// size is set to 512
|
|
||||||
let (mux, fut) = if mux_options.wisp_v1 {
|
|
||||||
ServerMux::create(rx, tx, 512, None)
|
|
||||||
.await?
|
|
||||||
.with_no_required_extensions()
|
|
||||||
} else if mux_options.enforce_auth {
|
|
||||||
ServerMux::create(rx, tx, 512, Some(mux_options.auth.as_slice()))
|
|
||||||
.await?
|
|
||||||
.with_required_extensions(&[PasswordProtocolExtension::ID])
|
|
||||||
.await?
|
|
||||||
} else {
|
|
||||||
ServerMux::create(rx, tx, 512, Some(&[Box::new(UdpProtocolExtensionBuilder)]))
|
|
||||||
.await?
|
|
||||||
.with_no_required_extensions()
|
|
||||||
};
|
|
||||||
|
|
||||||
// this results in one stream ""leaking"" a maximum of ~512M
|
|
||||||
|
|
||||||
println!(
|
|
||||||
"{:?}: downgraded: {} extensions supported: {:?}",
|
|
||||||
addr, mux.downgraded, mux.supported_extension_ids
|
|
||||||
);
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Err(e) = fut.await {
|
|
||||||
println!("err in mux: {:?}", e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
while let Some((packet, stream)) = mux.server_new_stream().await {
|
|
||||||
tokio::spawn(async move {
|
|
||||||
if (mux_options.block_non_http
|
|
||||||
&& !(packet.destination_port == 80 || packet.destination_port == 443))
|
|
||||||
|| (mux_options.block_udp && packet.stream_type == StreamType::Udp)
|
|
||||||
{
|
|
||||||
let _ = stream.close(CloseReason::ServerStreamBlockedAddress).await;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if mux_options.block_local {
|
|
||||||
match lookup_host(format!(
|
|
||||||
"{}:{}",
|
|
||||||
packet.destination_hostname, packet.destination_port
|
|
||||||
))
|
|
||||||
.await
|
|
||||||
.ok()
|
|
||||||
.and_then(|mut x| x.next())
|
|
||||||
.map(|x| !x.ip().is_global())
|
|
||||||
{
|
|
||||||
Some(true) => {
|
|
||||||
let _ = stream.close(CloseReason::ServerStreamBlockedAddress).await;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
Some(false) => {}
|
|
||||||
None => {
|
|
||||||
let _ = stream
|
|
||||||
.close(CloseReason::ServerStreamConnectionRefused)
|
|
||||||
.await;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let close_err = stream.get_close_handle();
|
|
||||||
let close_ok = stream.get_close_handle();
|
|
||||||
let _ = handle_mux(packet, stream)
|
|
||||||
.or_else(|err| async move {
|
|
||||||
let _ = close_err.close(CloseReason::Unexpected).await;
|
|
||||||
Err(err)
|
|
||||||
})
|
|
||||||
.and_then(|should_send| async move {
|
|
||||||
if should_send {
|
|
||||||
let _ = close_ok.close(CloseReason::Voluntary).await;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
println!("{:?}: disconnected", addr);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn accept_wsproxy(
|
|
||||||
ws: UpgradeFut,
|
|
||||||
incoming_uri: String,
|
|
||||||
addr: String,
|
|
||||||
block_local: bool,
|
|
||||||
block_non_http: bool,
|
|
||||||
) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
|
|
||||||
let mut ws_stream = FragmentCollector::new(ws.await?);
|
|
||||||
|
|
||||||
println!("{:?}: connected (wsproxy): {:?}", addr, incoming_uri);
|
|
||||||
|
|
||||||
let Some(host) = lookup_host(&incoming_uri)
|
|
||||||
.await
|
|
||||||
.ok()
|
|
||||||
.and_then(|mut x| x.next())
|
|
||||||
else {
|
|
||||||
ws_stream
|
|
||||||
.write_frame(Frame::close(
|
|
||||||
CloseCode::Error.into(),
|
|
||||||
b"failed to resolve uri",
|
|
||||||
))
|
|
||||||
.await?;
|
|
||||||
return Ok(());
|
|
||||||
};
|
|
||||||
|
|
||||||
if block_local && !host.ip().is_global() {
|
|
||||||
ws_stream
|
|
||||||
.write_frame(Frame::close(CloseCode::Error.into(), b"blocked uri"))
|
|
||||||
.await?;
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
if block_non_http && !(host.port() == 80 || host.port() == 443) {
|
|
||||||
ws_stream
|
|
||||||
.write_frame(Frame::close(CloseCode::Error.into(), b"blocked uri"))
|
|
||||||
.await?;
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
let tcp_stream = match TcpStream::connect(incoming_uri).await {
|
|
||||||
Ok(stream) => stream,
|
|
||||||
Err(err) => {
|
|
||||||
ws_stream
|
|
||||||
.write_frame(Frame::close(CloseCode::Error.into(), b"failed to connect"))
|
|
||||||
.await?;
|
|
||||||
return Err(Box::new(err));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let mut tcp_stream_framed = Framed::new(tcp_stream, BytesCodec::new());
|
|
||||||
|
|
||||||
loop {
|
|
||||||
tokio::select! {
|
|
||||||
event = ws_stream.read_frame() => {
|
|
||||||
match event {
|
|
||||||
Ok(frame) => {
|
|
||||||
match frame.opcode {
|
|
||||||
OpCode::Text | OpCode::Binary => {
|
|
||||||
let _ = tcp_stream_framed.send(Bytes::from(frame.payload.to_vec())).await;
|
|
||||||
}
|
|
||||||
OpCode::Close => {
|
|
||||||
// tokio closes the stream for us
|
|
||||||
drop(tcp_stream_framed);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(_) => {
|
|
||||||
// tokio closes the stream for us
|
|
||||||
drop(tcp_stream_framed);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
event = tcp_stream_framed.next() => {
|
|
||||||
if let Some(res) = event {
|
|
||||||
match res {
|
|
||||||
Ok(buf) => {
|
|
||||||
let _ = ws_stream.write_frame(Frame::binary(Payload::Borrowed(&buf))).await;
|
|
||||||
}
|
|
||||||
Err(_) => {
|
|
||||||
let _ = ws_stream.write_frame(Frame::close(CloseCode::Away.into(), b"tcp side is going away")).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
println!("{:?}: disconnected (wsproxy)", addr);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
Loading…
Add table
Add a link
Reference in a new issue