From 57140449f794a76fd44c1369aae0e4bb48d7156f Mon Sep 17 00:00:00 2001 From: MeexReay Date: Sun, 24 Nov 2024 01:23:16 +0300 Subject: [PATCH] chunked --- conf.yml | 1 - src/flowgate/server.rs | 67 ++++++++++++++++++++++++++++-------------- 2 files changed, 45 insertions(+), 23 deletions(-) diff --git a/conf.yml b/conf.yml index 8cf6f65..f713e41 100644 --- a/conf.yml +++ b/conf.yml @@ -4,7 +4,6 @@ https_host: localhost:443 # Https server host threadpool_size: 10 # Threadpool size (count of threads that accept requests) (optional, default - 10) connection_timeout: 10 # Read and write timeout of connections in seconds (optional, default - 10) incoming_ip_forwarding: none # Read IP forwarding on incoming connections (optional, default - none) - websocket_host: localhost:999 # Websocket messaging host to edit sites (optional, default - null) sites: diff --git a/src/flowgate/server.rs b/src/flowgate/server.rs index 2fa8b86..a84c59e 100644 --- a/src/flowgate/server.rs +++ b/src/flowgate/server.rs @@ -11,6 +11,13 @@ pub struct FlowgateServer { config: Arc>, } +struct Connection { + stream: TcpStream, + config: SiteConfig, + keep_alive: bool, + host: String, +} + impl FlowgateServer { pub fn new(config: Arc>) -> Self { FlowgateServer { config } @@ -188,17 +195,17 @@ impl FlowgateServer { ) -> Option<()> { let mut conn = Self::read_request(config.clone(), stream, addr, https, None)?; - if conn.2 && conn.1.enable_keep_alive { + if conn.keep_alive && conn.config.enable_keep_alive { loop { - if !conn.1.support_keep_alive { - conn.0.close(); - conn.0 = conn.1.connect()?; + if !conn.config.support_keep_alive { + conn.stream.close(); + conn.stream = conn.config.connect()?; } conn = Self::read_request(config.clone(), stream, addr, https, Some(conn))?; } } - conn.0.close(); + conn.stream.close(); stream.close(); Some(()) @@ -209,8 +216,8 @@ impl FlowgateServer { stream: &'a mut (impl Read + Write + Closeable), addr: SocketAddr, https: bool, - conn: Option<(TcpStream, SiteConfig, bool, String)> - ) -> Option<(TcpStream, SiteConfig, bool, String)> { + conn: Option + ) -> Option { let mut addr = addr; match &config.read().ok()?.incoming_ip_forwarding { @@ -278,7 +285,7 @@ impl FlowgateServer { if head.is_empty() { return None; } let head_str = String::from_utf8(head.clone()).ok()?; - let head_str = head_str.trim_matches(char::from(0)); + let head_str = head_str.trim_matches(char::from(0)).to_string(); let mut head_lines = head_str.split("\r\n"); @@ -289,6 +296,12 @@ impl FlowgateServer { .filter(|l| l.contains(": ")) .map(|l| l.split_once(": ").unwrap()) .collect(); + + let is_chunked = headers.iter() + .find(|o| o.0.to_lowercase() == "transfer-encoding") + .map(|o| o.1.split(",").map(|x| x.trim_matches(' ').to_string()).collect::>()) + .map(|o| o.contains(&"chunked".to_string())) + .unwrap_or(false); if let IpForwarding::Header(header) = &config.read().ok()?.incoming_ip_forwarding { if let Some(ip) = headers.iter().find(|o| o.0 == header).map(|o| o.1) { @@ -296,7 +309,7 @@ impl FlowgateServer { } } - let mut conn: (TcpStream, SiteConfig, bool, String) = if conn.is_none() { + let mut conn: Connection = if conn.is_none() { let mut host = String::new(); let mut keep_alive = false; @@ -310,7 +323,12 @@ impl FlowgateServer { let site = config.read().ok()?.get_site(&host)?.clone(); - (site.connect()?, site, keep_alive, host) + Connection { + stream: site.connect()?, + config: site, + keep_alive, + host + } } else { conn? }; @@ -325,7 +343,7 @@ impl FlowgateServer { let mut reqbuf: Vec = Vec::new(); - if let Some(replace_host) = conn.1.replace_host.clone() { + if let Some(replace_host) = conn.config.replace_host.clone() { let mut new_head = Vec::new(); let mut is_status = true; @@ -349,12 +367,15 @@ impl FlowgateServer { head = new_head; } - match &conn.1.ip_forwarding { + match &conn.config.ip_forwarding { IpForwarding::Header(header) => { reqbuf.append(&mut status.to_string().as_bytes().to_vec()); reqbuf.append(&mut b"\r\n".to_vec()); - for (key, value) in &headers { - if *key == header { continue } + for (key, value) in String::from_utf8(head.clone()).ok()? + .split("\r\n") + .skip(1) + .filter_map(|o| o.split_once(": ")) { + if *key.to_lowercase() == header.to_lowercase() { continue } reqbuf.append(&mut key.to_string().as_bytes().to_vec()); reqbuf.append(&mut b": ".to_vec()); reqbuf.append(&mut value.to_string().as_bytes().to_vec()); @@ -390,7 +411,7 @@ impl FlowgateServer { } } - conn.0.write_all(&reqbuf).ok()?; + conn.stream.write_all(&reqbuf).ok()?; if content_length > 0 { let mut read = 0usize; @@ -399,20 +420,22 @@ impl FlowgateServer { if size == 0 { break } read += size; buf.truncate(size); - conn.0.write_all(&buf).ok()?; + conn.stream.write_all(&buf).ok()?; buf = vec![0; 4096]; - if read == content_length { break } + if read >= content_length { break } } + } else if is_chunked { + // write chunked logic } - if conn.1.support_keep_alive { + if conn.config.support_keep_alive { let mut head = Vec::new(); { let mut buf = [0; 1]; let mut counter = 0; - while let Ok(1) = conn.0.read(&mut buf) { + while let Ok(1) = conn.stream.read(&mut buf) { let byte = buf[0]; head.push(byte); @@ -448,7 +471,7 @@ impl FlowgateServer { if content_length > 0 { let mut read = 0usize; let mut buf = vec![0; 4096]; - while let Ok(size) = conn.0.read(&mut buf) { + while let Ok(size) = conn.stream.read(&mut buf) { if size == 0 { break } read += size; buf.truncate(size); @@ -459,11 +482,11 @@ impl FlowgateServer { } } else { let mut buf = Vec::new(); - conn.0.read_to_end(&mut buf).ok()?; + conn.stream.read_to_end(&mut buf).ok()?; stream.write_all(&buf).ok()?; } - info!("{addr} > {} {}://{}{}", status_seq[0], if https { "https" } else { "http" }, conn.3, status_seq[1]); + info!("{addr} > {} {}://{}{}", status_seq[0], if https { "https" } else { "http" }, conn.host, status_seq[1]); Some(conn) }