From 377090869f9838f9acbe7257d98b21cb6c94668f Mon Sep 17 00:00:00 2001 From: MeexReay Date: Sat, 12 Apr 2025 23:09:01 +0300 Subject: [PATCH] more config and some fixes --- Cargo.lock | 0 src/config.rs | 16 +++++--- src/server.rs | 93 +++++++++++++++++++++++++------------------ tests/half_request.py | 32 +++++++++++++++ tests/http_client.py | 0 5 files changed, 96 insertions(+), 45 deletions(-) mode change 100644 => 100755 Cargo.lock create mode 100755 tests/half_request.py mode change 100644 => 100755 tests/http_client.py diff --git a/Cargo.lock b/Cargo.lock old mode 100644 new mode 100755 diff --git a/src/config.rs b/src/config.rs index 36db9f3..6f02352 100755 --- a/src/config.rs +++ b/src/config.rs @@ -51,10 +51,11 @@ impl IpForwarding { #[derive(Clone, Debug)] pub struct Config { pub sites: Vec, - pub http_host: String, - pub https_host: String, + pub http_host: Option, + pub https_host: Option, pub connection_timeout: Duration, - pub incoming_ip_forwarding: IpForwarding + pub incoming_ip_forwarding: IpForwarding, + pub threadpool_size: usize } impl Config { @@ -62,8 +63,10 @@ impl Config { let file_content = fs::read_to_string(filename).ok()?; let doc = serde_yml::from_str::(file_content.as_str()).ok()?; - let http_host = doc["http_host"].as_str()?.to_string(); - let https_host = doc["https_host"].as_str()?.to_string(); + let http_host = doc.get("http_host").and_then(|o| Some(o.as_str()?.to_string())); + let https_host = doc.get("https_host").and_then(|o| Some(o.as_str()?.to_string())); + + let threadpool_size = doc.get("threadpool_size").and_then(|o| Some(o.as_u64()? as usize + 2)).unwrap_or(12); let connection_timeout = Duration::from_secs(doc.get("connection_timeout") .unwrap_or(&Value::Number(Number::from(10))).as_u64()?); @@ -115,7 +118,8 @@ impl Config { http_host, https_host, connection_timeout, - incoming_ip_forwarding + incoming_ip_forwarding, + threadpool_size }.clone()) } diff --git a/src/server.rs b/src/server.rs index 3043aaa..4a67510 100755 --- a/src/server.rs +++ b/src/server.rs @@ -41,7 +41,8 @@ impl FlowgateServer { pub fn start(self) -> ThreadPool { let local_self = Arc::new(self); - let threadpool = ThreadPool::new(10); + // let threadpool = ThreadPool::new(local_self.config.threadpool_size); + let threadpool = ThreadPool::new(3); let mut handles = Vec::new(); @@ -68,25 +69,27 @@ impl FlowgateServer { } pub fn run_http(self: Arc, threadpool: &ThreadPool) -> Result<(), Box> { - let listener = TcpListener::bind(&self.config.http_host)?; + if let Some(host) = self.config.http_host.clone() { + let listener = TcpListener::bind(&host)?; - info!("HTTP server runned on {}", &self.config.http_host); + info!("HTTP server runned on {}", &host); - for stream in listener.incoming() { - if let Ok(mut stream) = stream { - let local_self = self.clone(); + for stream in listener.incoming() { + if let Ok(mut stream) = stream { + let local_self = self.clone(); - let Ok(addr) = stream.peer_addr() else { continue }; - let Ok(_) = stream.set_write_timeout(Some(local_self.config.connection_timeout)) else { continue }; - let Ok(_) = stream.set_read_timeout(Some(local_self.config.connection_timeout)) else { continue }; + let Ok(addr) = stream.peer_addr() else { continue }; + let Ok(_) = stream.set_write_timeout(Some(local_self.config.connection_timeout)) else { continue }; + let Ok(_) = stream.set_read_timeout(Some(local_self.config.connection_timeout)) else { continue }; - threadpool.execute(move || { - local_self.accept_stream( - &mut stream, - addr, - false - ); - }); + threadpool.execute(move || { + local_self.accept_stream( + &mut stream, + addr, + false + ); + }); + } } } @@ -94,37 +97,39 @@ impl FlowgateServer { } pub fn run_https(self: Arc, threadpool: &ThreadPool) -> Result<(), Box> { - let listener = TcpListener::bind(&self.config.https_host)?; + if let Some(host) = self.config.https_host.clone() { + let listener = TcpListener::bind(&host)?; - info!("HTTPS server runned on {}", &self.config.https_host); + info!("HTTPS server runned on {}", &host); - let config = Arc::new(create_server_config(self.config.clone())); + let config = Arc::new(create_server_config(self.config.clone())); - for stream in listener.incoming() { - if let Ok(stream) = stream { - let local_self = self.clone(); - let config = config.clone(); + for stream in listener.incoming() { + if let Ok(stream) = stream { + let local_self = self.clone(); + let config = config.clone(); - let Ok(addr) = stream.peer_addr() else { continue }; - let Ok(_) = stream.set_write_timeout(Some(local_self.config.connection_timeout)) else { continue }; - let Ok(_) = stream.set_read_timeout(Some(local_self.config.connection_timeout)) else { continue }; + let Ok(addr) = stream.peer_addr() else { continue }; + let Ok(_) = stream.set_write_timeout(Some(local_self.config.connection_timeout)) else { continue }; + let Ok(_) = stream.set_read_timeout(Some(local_self.config.connection_timeout)) else { continue }; - threadpool.execute(move || { - let Ok(connection) = ServerConnection::new(config) else { return }; - let mut stream = StreamOwned::new(connection, stream); + threadpool.execute(move || { + let Ok(connection) = ServerConnection::new(config) else { return }; + let mut stream = StreamOwned::new(connection, stream); - while stream.conn.is_handshaking() { - let Ok(_) = stream.conn.complete_io(&mut stream.sock) else { return }; - } + while stream.conn.is_handshaking() { + let Ok(_) = stream.conn.complete_io(&mut stream.sock) else { return }; + } - local_self.accept_stream( - &mut stream, - addr, - true - ).map(|_| { - debug!("{} close connection", addr); + local_self.accept_stream( + &mut stream, + addr, + true + ).map(|_| { + debug!("{} close connection", addr); + }); }); - }); + } } } @@ -363,6 +368,11 @@ impl FlowgateServer { } } else if is_chunked { transfer_chunked(&mut stream, conn.stream.get_mut())?; + } else { + let buffer = stream.buffer().to_vec(); + + conn.stream.get_mut().write_all(&buffer).ok()?; + stream.consume(buffer.len()); } debug!("{} {} send body to server", addr, status[1]); @@ -433,6 +443,11 @@ impl FlowgateServer { } } else if is_chunked { transfer_chunked(&mut conn.stream, stream.get_mut())?; + } else { + let buffer = conn.stream.buffer().to_vec(); + + stream.get_mut().write_all(&buffer).ok()?; + conn.stream.consume(buffer.len()); } debug!("{} {} send response body to clientr", addr, status[1]); diff --git a/tests/half_request.py b/tests/half_request.py new file mode 100755 index 0000000..74e3e61 --- /dev/null +++ b/tests/half_request.py @@ -0,0 +1,32 @@ +import socket + +HOST = '172.16.1.32' +PORT = 80 + +with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.connect((HOST, PORT)) + + s.send(b"""GET / HTTP/1.1\r +Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7\r +Accept-Encoding: gzip, deflate, br, zstd\r +Accept-Language: en-US,en;q=0.9\r +Cache-Control: no-cache\r +Connection: keep-alive\r +Host: git.meex.lol\r +Pragma: no-cache\r +Sec-Fetch-Dest: document\r +Sec-Fetch-Mode: navigate\r +Sec-Fetch-Site: none\r +Sec-Fetch-User: ?1\r +Upgrade-Insecure-Requests: 1\r +User-Agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36\r +sec-ch-ua: \"Chromium\";v=\"135\", \"Not-A.Brand\";v=\"8\"\r +sec-ch-ua-mobile: ?0\r +sec-ch-ua-platform: \"Linux\"\r +\r +""") + + content_length = 0 + while True: + data = s.recv(1024) + print(data) \ No newline at end of file diff --git a/tests/http_client.py b/tests/http_client.py old mode 100644 new mode 100755