From e1352928171a29b25e6b28c5315de4a30e7e01cc Mon Sep 17 00:00:00 2001 From: MeexReay Date: Thu, 10 Apr 2025 01:38:28 +0300 Subject: [PATCH] fix read by content-length --- src/flowgate/server.rs | 90 ++++++++++++++++++++++++++++-------------- tests/http_client.py | 40 +++++++++++++++++++ 2 files changed, 101 insertions(+), 29 deletions(-) create mode 100644 tests/http_client.py diff --git a/src/flowgate/server.rs b/src/flowgate/server.rs index 3c371c9..0033e30 100755 --- a/src/flowgate/server.rs +++ b/src/flowgate/server.rs @@ -3,7 +3,7 @@ use std::{ }; use ignore_result::Ignore; -use log::info; +use log::{debug, info}; use tokio::{ io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}, @@ -104,7 +104,9 @@ impl FlowgateServer { &mut stream, addr, true - ).await; + ).await.map(|_| { + debug!("{} close connection", addr); + }); }); } @@ -130,7 +132,7 @@ impl FlowgateServer { } conn.stream.shutdown().await.ignore(); - stream.shutdown().await.ok()?; + stream.shutdown().await.ignore(); Some(()) } @@ -188,6 +190,8 @@ impl FlowgateServer { let status = String::from_utf8(raw_status.clone()).ok()?; let status = status.split(" ").collect::>(); + debug!("{} {} read status", addr, status[1]); + let mut content_length = 0; let mut host = None; let mut is_chunked = false; @@ -233,6 +237,8 @@ impl FlowgateServer { } } + debug!("{} {} read headers", addr, status[1]); + let mut conn: Connection = if conn.is_none() { let host = host?; let site = self.config.get_site(&host)?.clone(); @@ -247,6 +253,8 @@ impl FlowgateServer { conn? }; + debug!("{} {} got connection", addr, status[1]); + let mut request = Vec::new(); match &conn.config.ip_forwarding { @@ -306,31 +314,43 @@ impl FlowgateServer { conn.stream.write_all(&request).await.ok()?; - if content_length > 0 { - let buffer = stream.buffer(); + debug!("{} {} sent request to server", addr, status[1]); - conn.stream.write_all(buffer).await.ok()?; + if content_length > 0 { + let buffer = stream.buffer().to_vec(); + + conn.stream.write_all(&buffer).await.ok()?; stream.consume(buffer.len()); - let mut read = 0usize; - let mut buf = vec![0; 4096]; + let mut read = buffer.len(); + + debug!("{} {} send part of body to server", addr, status[1]); + + while read < content_length { + let mut buf = vec![0; 4096]; + let Ok(size) = conn.stream.get_mut().read(&mut buf).await else { break }; - while let Ok(size) = stream.get_mut().read(&mut buf).await { if size == 0 { break } - read += size; + buf.truncate(size); - conn.stream.write_all(&buf).await.ok()?; - buf = vec![0; 4096]; - if read >= content_length { break } + read += size; + + debug!("{} {} send response body part {} to clientr", addr, status[1], size); + + stream.write_all(&buf).await.ok()?; } } else if is_chunked { transfer_chunked(&mut stream, &mut conn.stream).await?; } + debug!("{} {} send body to server", addr, status[1]); + if conn.config.support_keep_alive { + let mut response = Vec::new(); + let raw_status = read_until(&mut conn.stream, b"\r\n").await?; - - stream.write_all(&raw_status).await.ok()?; + + response.append(&mut raw_status.clone()); let mut content_length = 0; let mut is_chunked = false; @@ -338,12 +358,12 @@ impl FlowgateServer { loop { let mut header = read_until(&mut conn.stream, b"\r\n").await?; + response.append(&mut header.clone()); + if header.len() == 2 { break; } - stream.write_all(&header).await.ok()?; - header.truncate(header.len() - 2); let header = String::from_utf8(header).ok()?; @@ -361,39 +381,51 @@ impl FlowgateServer { _ => {} } } + + stream.write_all(&response).await.ok()?; + + debug!("{} {} send response header to clientr", addr, status[1]); if content_length > 0 { - let buffer = conn.stream.buffer(); + let buffer = conn.stream.buffer().to_vec(); + + stream.write_all(&buffer).await.ok()?; + conn.stream.consume(buffer.len()); - stream.write_all(buffer).await.ok()?; - stream.consume(buffer.len()); + debug!("{} {} send response body part {} to clientr", addr, status[1], buffer.len()); - let mut read = 0usize; - let mut buf = vec![0; 4096]; + let mut read = buffer.len(); + + while read < content_length { + let mut buf = vec![0; 4096]; + let Ok(size) = conn.stream.get_mut().read(&mut buf).await else { break }; - while let Ok(size) = conn.stream.get_mut().read(&mut buf).await { if size == 0 { break } - read += size; + buf.truncate(size); + read += size; + + debug!("{} {} send response body part {} to clientr", addr, status[1], size); + stream.write_all(&buf).await.ok()?; - buf = vec![0; 4096]; - if read >= content_length { break } } } else if is_chunked { transfer_chunked(&mut conn.stream, &mut stream).await?; } + + debug!("{} {} send response body to clientr", addr, status[1]); } else { let buffer = conn.stream.buffer(); stream.write_all(buffer).await.ok()?; - stream.consume(buffer.len()); + conn.stream.consume(buffer.len()); - let mut buf = vec![0;1024]; + let mut buf = vec![0;4096]; while let Ok(n) = conn.stream.get_mut().read(&mut buf).await { if n == 0 { break } buf.truncate(n); stream.write_all(&buf).await.ok()?; - buf = vec![0;1024]; + buf = vec![0;4096]; } } diff --git a/tests/http_client.py b/tests/http_client.py new file mode 100644 index 0000000..a329a3a --- /dev/null +++ b/tests/http_client.py @@ -0,0 +1,40 @@ +import socket + +HOST = '172.16.1.32' +PORT = 80 + +with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.connect((HOST, PORT)) + i = 0 + while True: + s.send(b"""GET / HTTP/1.1\r +Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7\r +Accept-Encoding: gzip, deflate, br, zstd\r +Accept-Language: en-US,en;q=0.9\r +Cache-Control: no-cache\r +Connection: keep-alive\r +Host: meex.lol\r +Pragma: no-cache\r +Sec-Fetch-Dest: document\r +Sec-Fetch-Mode: navigate\r +Sec-Fetch-Site: none\r +Sec-Fetch-User: ?1\r +Upgrade-Insecure-Requests: 1\r +User-Agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36\r +sec-ch-ua: \"Chromium\";v=\"135\", \"Not-A.Brand\";v=\"8\"\r +sec-ch-ua-mobile: ?0\r +sec-ch-ua-platform: \"Linux\"\r +\r +""") + + content_length = 0 + while content_length < 14564: + data = s.recv(1024) + if data: + content_length += len(data) + # print(data.decode("utf8", errors="ignore"), end="") + else: + print("sdfsdf") + i += 1 + + print(i) \ No newline at end of file