rewrite in sync back

This commit is contained in:
MeexReay 2025-04-10 21:42:16 +03:00
parent 7f2e84256a
commit 5538820e03
6 changed files with 125 additions and 324 deletions

214
Cargo.lock generated
View File

@ -2,21 +2,6 @@
# It is not intended for manual editing. # It is not intended for manual editing.
version = 4 version = 4
[[package]]
name = "addr2line"
version = "0.24.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1"
dependencies = [
"gimli",
]
[[package]]
name = "adler2"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
[[package]] [[package]]
name = "aho-corasick" name = "aho-corasick"
version = "1.1.3" version = "1.1.3"
@ -81,12 +66,6 @@ version = "1.0.88"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e1496f8fb1fbf272686b8d37f523dab3e4a7443300055e74cdaa449f3114356" checksum = "4e1496f8fb1fbf272686b8d37f523dab3e4a7443300055e74cdaa449f3114356"
[[package]]
name = "autocfg"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"
[[package]] [[package]]
name = "aws-lc-rs" name = "aws-lc-rs"
version = "1.13.0" version = "1.13.0"
@ -110,21 +89,6 @@ dependencies = [
"fs_extra", "fs_extra",
] ]
[[package]]
name = "backtrace"
version = "0.3.74"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a"
dependencies = [
"addr2line",
"cfg-if",
"libc",
"miniz_oxide",
"object",
"rustc-demangle",
"windows-targets 0.52.6",
]
[[package]] [[package]]
name = "bindgen" name = "bindgen"
version = "0.69.5" version = "0.69.5"
@ -154,12 +118,6 @@ version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de"
[[package]]
name = "bytes"
version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
[[package]] [[package]]
name = "cc" name = "cc"
version = "1.2.18" version = "1.2.18"
@ -294,9 +252,6 @@ dependencies = [
"rustls", "rustls",
"serde_json", "serde_json",
"serde_yml", "serde_yml",
"tokio",
"tokio-io-timeout",
"tokio-rustls",
"wildmatch", "wildmatch",
] ]
@ -329,12 +284,6 @@ dependencies = [
"wasi 0.14.2+wasi-0.2.4", "wasi 0.14.2+wasi-0.2.4",
] ]
[[package]]
name = "gimli"
version = "0.31.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f"
[[package]] [[package]]
name = "glob" name = "glob"
version = "0.3.2" version = "0.3.2"
@ -471,16 +420,6 @@ version = "0.4.15"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab"
[[package]]
name = "lock_api"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17"
dependencies = [
"autocfg",
"scopeguard",
]
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.27" version = "0.4.27"
@ -499,26 +438,6 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "miniz_oxide"
version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff70ce3e48ae43fa075863cef62e8b43b71a4f2382229920e0df362592919430"
dependencies = [
"adler2",
]
[[package]]
name = "mio"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd"
dependencies = [
"libc",
"wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys 0.52.0",
]
[[package]] [[package]]
name = "nom" name = "nom"
version = "7.1.3" version = "7.1.3"
@ -529,50 +448,12 @@ dependencies = [
"minimal-lexical", "minimal-lexical",
] ]
[[package]]
name = "object"
version = "0.36.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "once_cell" name = "once_cell"
version = "1.21.3" version = "1.21.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
[[package]]
name = "parking_lot"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.9.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-targets 0.52.6",
]
[[package]]
name = "pin-project-lite"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
[[package]] [[package]]
name = "portable-atomic" name = "portable-atomic"
version = "1.11.0" version = "1.11.0"
@ -622,15 +503,6 @@ version = "5.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5"
[[package]]
name = "redox_syscall"
version = "0.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2f103c6d277498fbceb16e84d317e2a400f160f46904d5f5410848c829511a3"
dependencies = [
"bitflags",
]
[[package]] [[package]]
name = "regex" name = "regex"
version = "1.10.6" version = "1.10.6"
@ -674,12 +546,6 @@ dependencies = [
"windows-sys 0.52.0", "windows-sys 0.52.0",
] ]
[[package]]
name = "rustc-demangle"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
[[package]] [[package]]
name = "rustc-hash" name = "rustc-hash"
version = "1.1.0" version = "1.1.0"
@ -738,12 +604,6 @@ version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f"
[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.219" version = "1.0.219"
@ -797,31 +657,6 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "signal-hook-registry"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1"
dependencies = [
"libc",
]
[[package]]
name = "smallvec"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8917285742e9f3e1683f0a9c4e6b57960b7314d0b08d30d1ecd426713ee2eee9"
[[package]]
name = "socket2"
version = "0.5.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f5fd57c80058a56cf5c777ab8a126398ece8e442983605d280a44ce79d0edef"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]] [[package]]
name = "subtle" name = "subtle"
version = "2.6.1" version = "2.6.1"
@ -839,55 +674,6 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "tokio"
version = "1.44.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6b88822cbe49de4185e3a4cbf8321dd487cf5fe0c5c65695fef6346371e9c48"
dependencies = [
"backtrace",
"bytes",
"libc",
"mio",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.52.0",
]
[[package]]
name = "tokio-io-timeout"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf"
dependencies = [
"pin-project-lite",
"tokio",
]
[[package]]
name = "tokio-macros"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tokio-rustls"
version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b"
dependencies = [
"rustls",
"tokio",
]
[[package]] [[package]]
name = "unicode-ident" name = "unicode-ident"
version = "1.0.13" version = "1.0.13"

View File

@ -4,9 +4,6 @@ version = "0.1.3"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
tokio = { version = "1.44.2", features = ["full"] }
tokio-io-timeout = "1.2.0"
tokio-rustls = "0.26.2"
rustls = "0.23.25" rustls = "0.23.25"
wildmatch = "2.4.0" wildmatch = "2.4.0"
serde_yml = "0.0.12" serde_yml = "0.0.12"

View File

@ -1,6 +1,6 @@
use std::{fs, time::Duration}; use std::{fs, time::Duration};
use tokio::net::TcpStream; use std::net::TcpStream;
use serde_yml::{Number, Value}; use serde_yml::{Number, Value};
use wildmatch::WildMatch; use wildmatch::WildMatch;
@ -19,8 +19,8 @@ pub struct SiteConfig {
} }
impl SiteConfig { impl SiteConfig {
pub async fn connect(&self) -> Option<TcpStream> { pub fn connect(&self) -> Option<TcpStream> {
TcpStream::connect(self.host.clone()).await.ok() TcpStream::connect(self.host.clone()).ok()
} }
} }

View File

@ -1,16 +1,12 @@
use std::{ use std::{
error::Error, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, str::FromStr, sync::Arc, time::Duration error::Error, io::{BufRead, BufReader, Read, Write},
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, TcpListener, TcpStream},
str::FromStr, sync::Arc, thread::{sleep, spawn}, time::Duration
}; };
use ignore_result::Ignore; use ignore_result::Ignore;
use log::{debug, info}; use log::{debug, info};
use rustls::{ServerConnection, StreamOwned};
use tokio::{
io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
net::{TcpListener, TcpStream}, time::sleep
};
use tokio_io_timeout::TimeoutStream;
use tokio_rustls::TlsAcceptor;
use super::{ use super::{
tls::create_server_config, tls::create_server_config,
@ -37,124 +33,125 @@ impl FlowgateServer {
FlowgateServer { config } FlowgateServer { config }
} }
pub async fn run(self) { pub fn run(self) {
self.start().await; self.start();
loop { loop {
sleep(Duration::from_secs(60)).await; sleep(Duration::from_secs(60));
} }
} }
pub async fn start(self) { pub fn start(self) {
let local_self = Arc::new(self); let local_self = Arc::new(self);
local_self.clone().start_http().await; local_self.clone().start_http();
local_self.clone().start_https().await; local_self.clone().start_https();
} }
pub async fn start_http(self: Arc<Self>) { pub fn start_http(self: Arc<Self>) {
tokio::spawn({ spawn({
let local_self = self.clone(); let local_self = self.clone();
async move { local_self.run_http().await.ignore(); } move || { local_self.run_http().ignore(); }
}); });
} }
pub async fn start_https(self: Arc<Self>) { pub fn start_https(self: Arc<Self>) {
tokio::spawn({ spawn({
let local_self = self.clone(); let local_self = self.clone();
async move { local_self.run_https().await.ignore(); } move || { local_self.run_https().ignore(); }
}); });
} }
pub async fn run_http(self: Arc<Self>) -> Result<(), Box<dyn Error>> { pub fn run_http(self: Arc<Self>) -> Result<(), Box<dyn Error>> {
let listener = TcpListener::bind(&self.config.http_host).await?; let listener = TcpListener::bind(&self.config.http_host)?;
info!("HTTP server runned on {}", &self.config.http_host); info!("HTTP server runned on {}", &self.config.http_host);
loop { for stream in listener.incoming() {
let Ok((stream, addr)) = listener.accept().await else { break }; if let Ok(mut stream) = stream {
let local_self = self.clone();
let local_self = self.clone(); let Ok(addr) = stream.peer_addr() else { continue };
tokio::spawn(async move { spawn(move || {
let mut stream = TimeoutStream::new(stream); let Ok(_) = stream.set_write_timeout(Some(local_self.config.connection_timeout)) else { return };
let Ok(_) = stream.set_read_timeout(Some(local_self.config.connection_timeout)) else { return };
stream.set_write_timeout(Some(local_self.config.connection_timeout)); local_self.accept_stream(
stream.set_read_timeout(Some(local_self.config.connection_timeout)); &mut stream,
addr,
let mut stream = Box::pin(stream); false
);
local_self.accept_stream( });
&mut stream, }
addr,
false
).await;
});
} }
Ok(()) Ok(())
} }
pub async fn run_https(self: Arc<Self>) -> Result<(), Box<dyn Error>> { pub fn run_https(self: Arc<Self>) -> Result<(), Box<dyn Error>> {
let listener = TcpListener::bind(&self.config.https_host).await?; let listener = TcpListener::bind(&self.config.https_host)?;
let acceptor = TlsAcceptor::from(Arc::new(create_server_config(self.config.clone()).await));
info!("HTTPS server runned on {}", &self.config.https_host); info!("HTTPS server runned on {}", &self.config.https_host);
loop { for stream in listener.incoming() {
let Ok((stream, addr)) = listener.accept().await else { break }; if let Ok(stream) = stream {
let local_self = self.clone();
let connection = ServerConnection::new(Arc::new(create_server_config(self.config.clone())))?;
let local_self = self.clone(); let Ok(addr) = stream.peer_addr() else { continue };
let acceptor = acceptor.clone();
tokio::spawn(async move { spawn(move || {
let mut stream = TimeoutStream::new(stream); let Ok(_) = stream.set_write_timeout(Some(local_self.config.connection_timeout)) else { return };
let Ok(_) = stream.set_read_timeout(Some(local_self.config.connection_timeout)) else { return };
stream.set_write_timeout(Some(local_self.config.connection_timeout)); let mut stream = StreamOwned::new(connection, stream);
stream.set_read_timeout(Some(local_self.config.connection_timeout));
let Ok(mut stream) = acceptor.accept(Box::pin(stream)).await else { return }; while stream.conn.is_handshaking() {
let Ok(_) = stream.conn.complete_io(&mut stream.sock) else { return };
}
local_self.accept_stream( local_self.accept_stream(
&mut stream, &mut stream,
addr, addr,
true true
).await.map(|_| { ).map(|_| {
debug!("{} close connection", addr); debug!("{} close connection", addr);
});
}); });
}); }
} }
Ok(()) Ok(())
} }
async fn accept_stream( fn accept_stream(
self: Arc<Self>, self: Arc<Self>,
stream: &mut (impl AsyncReadExt + AsyncWriteExt + Unpin), stream: &mut (impl Read + Write + Shutdown),
addr: SocketAddr, addr: SocketAddr,
https: bool https: bool
) -> Option<()> { ) -> Option<()> {
let mut conn = self.clone().read_request(stream, addr, https, None).await?; let mut conn = self.clone().read_request(stream, addr, https, None)?;
if conn.keep_alive && conn.config.enable_keep_alive { if conn.keep_alive && conn.config.enable_keep_alive {
loop { loop {
if !conn.config.support_keep_alive { if !conn.config.support_keep_alive {
conn.stream.shutdown().await.ignore(); conn.stream.shutdown();
conn.stream = BufReader::new(conn.config.connect().await?); conn.stream = BufReader::new(conn.config.connect()?);
} }
conn = self.clone().read_request(stream, addr, https, Some(conn)).await?; conn = self.clone().read_request(stream, addr, https, Some(conn))?;
} }
} }
conn.stream.shutdown().await.ignore(); conn.stream.shutdown();
stream.shutdown().await.ignore(); stream.shutdown();
Some(()) Some(())
} }
async fn read_request( fn read_request(
self: Arc<Self>, self: Arc<Self>,
stream: &mut (impl AsyncReadExt + AsyncWriteExt + Unpin), stream: &mut (impl Read + Write + Shutdown),
addr: SocketAddr, addr: SocketAddr,
https: bool, https: bool,
conn: Option<Connection> conn: Option<Connection>
@ -166,31 +163,31 @@ impl FlowgateServer {
match &self.config.incoming_ip_forwarding { match &self.config.incoming_ip_forwarding {
IpForwarding::Simple => { IpForwarding::Simple => {
let mut header = Vec::new(); let mut header = Vec::new();
stream.read_until(b'\n', &mut header).await.ok()?; stream.read_until(b'\n', &mut header).ok()?;
header.truncate(header.len()-1); header.truncate(header.len()-1);
addr = SocketAddr::from_str(&String::from_utf8(header).ok()?).ok()?; addr = SocketAddr::from_str(&String::from_utf8(header).ok()?).ok()?;
}, },
IpForwarding::Modern => { IpForwarding::Modern => {
let mut header = [0]; let mut header = [0];
stream.read(&mut header).await.ok()?; stream.read(&mut header).ok()?;
addr = match header[0] { addr = match header[0] {
0x01 => { 0x01 => {
let mut octets = [0; 4]; let mut octets = [0; 4];
stream.read(&mut octets).await.ok()?; stream.read(&mut octets).ok()?;
let mut port = [0; 2]; let mut port = [0; 2];
stream.read(&mut port).await.ok()?; stream.read(&mut port).ok()?;
let port = u16::from_be_bytes(port); let port = u16::from_be_bytes(port);
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(octets), port)) SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(octets), port))
}, 0x02 => { }, 0x02 => {
let mut octets = [0; 16]; let mut octets = [0; 16];
stream.read(&mut octets).await.ok()?; stream.read(&mut octets).ok()?;
let mut port = [0; 2]; let mut port = [0; 2];
stream.read(&mut port).await.ok()?; stream.read(&mut port).ok()?;
let port = u16::from_be_bytes(port); let port = u16::from_be_bytes(port);
SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::from(octets), port, 0, 0)) SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::from(octets), port, 0, 0))
@ -200,7 +197,7 @@ impl FlowgateServer {
_ => {} _ => {}
} }
let mut raw_status = read_until(&mut stream, b"\r\n").await?; let mut raw_status = read_until(&mut stream, b"\r\n")?;
let mut request = Vec::new(); let mut request = Vec::new();
request.append(&mut raw_status.clone()); request.append(&mut raw_status.clone());
@ -220,7 +217,7 @@ impl FlowgateServer {
let mut headers = Vec::new(); let mut headers = Vec::new();
loop { loop {
let mut header = read_until(&mut stream, b"\r\n").await?; let mut header = read_until(&mut stream, b"\r\n")?;
header.truncate(header.len() - 2); header.truncate(header.len() - 2);
if header.is_empty() { if header.is_empty() {
@ -264,7 +261,7 @@ impl FlowgateServer {
let site = self.config.get_site(&host)?.clone(); let site = self.config.get_site(&host)?.clone();
Connection { Connection {
stream: BufReader::new(site.connect().await?), stream: BufReader::new(site.connect()?),
config: site, config: site,
keep_alive, keep_alive,
host host
@ -331,14 +328,14 @@ impl FlowgateServer {
debug!("{:?}", String::from_utf8_lossy(&request)); debug!("{:?}", String::from_utf8_lossy(&request));
conn.stream.write_all(&request).await.ok()?; conn.stream.get_mut().write_all(&request).ok()?;
debug!("{} {} sent request to server", addr, status[1]); debug!("{} {} sent request to server", addr, status[1]);
if content_length > 0 { if content_length > 0 {
let buffer = stream.buffer().to_vec(); let buffer = stream.buffer().to_vec();
conn.stream.write_all(&buffer).await.ok()?; conn.stream.get_mut().write_all(&buffer).ok()?;
stream.consume(buffer.len()); stream.consume(buffer.len());
let mut read = buffer.len(); let mut read = buffer.len();
@ -347,7 +344,7 @@ impl FlowgateServer {
while read < content_length { while read < content_length {
let mut buf = vec![0; 4096]; let mut buf = vec![0; 4096];
let Ok(size) = conn.stream.get_mut().read(&mut buf).await else { break }; let Ok(size) = conn.stream.get_mut().read(&mut buf) else { break };
if size == 0 { break } if size == 0 { break }
@ -356,10 +353,10 @@ impl FlowgateServer {
debug!("{} {} send response body part {} to clientr", addr, status[1], size); debug!("{} {} send response body part {} to clientr", addr, status[1], size);
stream.write_all(&buf).await.ok()?; stream.get_mut().write_all(&buf).ok()?;
} }
} else if is_chunked { } else if is_chunked {
transfer_chunked(&mut stream, &mut conn.stream).await?; transfer_chunked(&mut stream, conn.stream.get_mut())?;
} }
debug!("{} {} send body to server", addr, status[1]); debug!("{} {} send body to server", addr, status[1]);
@ -367,7 +364,7 @@ impl FlowgateServer {
if conn.config.support_keep_alive { if conn.config.support_keep_alive {
let mut response = Vec::new(); let mut response = Vec::new();
let raw_status = read_until(&mut conn.stream, b"\r\n").await?; let raw_status = read_until(&mut conn.stream, b"\r\n")?;
response.append(&mut raw_status.clone()); response.append(&mut raw_status.clone());
@ -375,7 +372,7 @@ impl FlowgateServer {
let mut is_chunked = false; let mut is_chunked = false;
loop { loop {
let mut header = read_until(&mut conn.stream, b"\r\n").await?; let mut header = read_until(&mut conn.stream, b"\r\n")?;
response.append(&mut header.clone()); response.append(&mut header.clone());
@ -401,14 +398,14 @@ impl FlowgateServer {
} }
} }
stream.write_all(&response).await.ok()?; stream.get_mut().write_all(&response).ok()?;
debug!("{} {} send response header to clientr", addr, status[1]); debug!("{} {} send response header to clientr", addr, status[1]);
if content_length > 0 { if content_length > 0 {
let buffer = conn.stream.buffer().to_vec(); let buffer = conn.stream.buffer().to_vec();
stream.write_all(&buffer).await.ok()?; stream.get_mut().write_all(&buffer).ok()?;
conn.stream.consume(buffer.len()); conn.stream.consume(buffer.len());
debug!("{} {} send response body part {} to clientr", addr, status[1], buffer.len()); debug!("{} {} send response body part {} to clientr", addr, status[1], buffer.len());
@ -417,7 +414,7 @@ impl FlowgateServer {
while read < content_length { while read < content_length {
let mut buf = vec![0; 4096]; let mut buf = vec![0; 4096];
let Ok(size) = conn.stream.get_mut().read(&mut buf).await else { break }; let Ok(size) = conn.stream.get_mut().read(&mut buf) else { break };
if size == 0 { break } if size == 0 { break }
@ -426,24 +423,24 @@ impl FlowgateServer {
debug!("{} {} send response body part {} to clientr", addr, status[1], size); debug!("{} {} send response body part {} to clientr", addr, status[1], size);
stream.write_all(&buf).await.ok()?; stream.get_mut().write_all(&buf).ok()?;
} }
} else if is_chunked { } else if is_chunked {
transfer_chunked(&mut conn.stream, &mut stream).await?; transfer_chunked(&mut conn.stream, stream.get_mut())?;
} }
debug!("{} {} send response body to clientr", addr, status[1]); debug!("{} {} send response body to clientr", addr, status[1]);
} else { } else {
let buffer = conn.stream.buffer(); let buffer = conn.stream.buffer();
stream.write_all(buffer).await.ok()?; stream.get_mut().write_all(buffer).ok()?;
conn.stream.consume(buffer.len()); conn.stream.consume(buffer.len());
let mut buf = vec![0;4096]; let mut buf = vec![0;4096];
while let Ok(n) = conn.stream.get_mut().read(&mut buf).await { while let Ok(n) = conn.stream.get_mut().read(&mut buf) {
if n == 0 { break } if n == 0 { break }
buf.truncate(n); buf.truncate(n);
stream.write_all(&buf).await.ok()?; stream.get_mut().write_all(&buf).ok()?;
buf = vec![0;4096]; buf = vec![0;4096];
} }
} }
@ -454,14 +451,14 @@ impl FlowgateServer {
} }
} }
async fn read_until(stream: &mut (impl AsyncBufReadExt + Unpin), delimiter: &[u8]) -> Option<Vec<u8>> { fn read_until(stream: &mut impl BufRead, delimiter: &[u8]) -> Option<Vec<u8>> {
let mut data = Vec::new(); let mut data = Vec::new();
let last_byte = *delimiter.last()?; let last_byte = *delimiter.last()?;
loop { loop {
let mut buf = Vec::new(); let mut buf = Vec::new();
stream.read_until(last_byte, &mut buf).await.ok()?; stream.read_until(last_byte, &mut buf).ok()?;
data.append(&mut buf); data.append(&mut buf);
if data.ends_with(delimiter) { if data.ends_with(delimiter) {
break; break;
@ -471,17 +468,17 @@ async fn read_until(stream: &mut (impl AsyncBufReadExt + Unpin), delimiter: &[u8
Some(data) Some(data)
} }
async fn transfer_chunked(src: &mut (impl AsyncBufReadExt + Unpin), dest: &mut (impl AsyncWriteExt + Unpin)) -> Option<()> { fn transfer_chunked(src: &mut impl BufRead, dest: &mut impl Write) -> Option<()> {
loop { loop {
let mut length = read_until(src, b"\r\n").await?; let mut length = read_until(src, b"\r\n")?;
dest.write_all(&length).await.ok()?; dest.write_all(&length).ok()?;
length.truncate(length.len()-2); length.truncate(length.len()-2);
let length = String::from_utf8(length).ok()?; let length = String::from_utf8(length).ok()?;
let length = usize::from_str_radix(length.as_str(), 16).ok()?; let length = usize::from_str_radix(length.as_str(), 16).ok()?;
let mut data = vec![0u8; length+2]; let mut data = vec![0u8; length+2];
src.read_exact(&mut data).await.ok()?; src.read_exact(&mut data).ok()?;
dest.write_all(&data).await.ok()?; dest.write_all(&data).ok()?;
if length == 0 { if length == 0 {
break; break;
@ -489,4 +486,26 @@ async fn transfer_chunked(src: &mut (impl AsyncBufReadExt + Unpin), dest: &mut (
} }
Some(()) Some(())
}
pub trait Shutdown {
fn shutdown(&self);
}
impl Shutdown for TcpStream {
fn shutdown(&self) {
TcpStream::shutdown(self, std::net::Shutdown::Both).ignore();
}
}
impl <T: Shutdown> Shutdown for BufReader<T> {
fn shutdown(&self) {
self.get_ref().shutdown();
}
}
impl <C, T: Read + Write + Shutdown> Shutdown for StreamOwned<C, T> {
fn shutdown(&self) {
self.sock.shutdown();
}
} }

View File

@ -39,7 +39,7 @@ struct ResolvesServerCertWildcard {
} }
impl ResolvesServerCertWildcard { impl ResolvesServerCertWildcard {
pub async fn new(config: Arc<Config>) -> Self { pub fn new(config: Arc<Config>) -> Self {
Self { config } Self { config }
} }
} }
@ -56,8 +56,8 @@ impl ResolvesServerCert for ResolvesServerCertWildcard {
} }
} }
pub async fn create_server_config(config: Arc<Config>) -> ServerConfig { pub fn create_server_config(config: Arc<Config>) -> ServerConfig {
ServerConfig::builder() ServerConfig::builder()
.with_no_client_auth() .with_no_client_auth()
.with_cert_resolver(Arc::new(ResolvesServerCertWildcard::new(config).await)) .with_cert_resolver(Arc::new(ResolvesServerCertWildcard::new(config)))
} }

View File

@ -3,8 +3,7 @@ use std::{fs, path::Path, sync::Arc};
use flowgate::{config::Config, server::FlowgateServer}; use flowgate::{config::Config, server::FlowgateServer};
use ignore_result::Ignore; use ignore_result::Ignore;
#[tokio::main] fn main() {
async fn main() {
colog::init(); colog::init();
if !Path::new("conf.yml").exists() { if !Path::new("conf.yml").exists() {
@ -14,5 +13,5 @@ async fn main() {
let config = Arc::new(Config::parse("conf.yml").unwrap()); let config = Arc::new(Config::parse("conf.yml").unwrap());
let server = FlowgateServer::new(config.clone()); let server = FlowgateServer::new(config.clone());
server.run().await; server.run();
} }