fix read by content-length

This commit is contained in:
MeexReay 2025-04-10 01:38:28 +03:00
parent da3e5aade0
commit e135292817
2 changed files with 101 additions and 29 deletions

View File

@ -3,7 +3,7 @@ use std::{
}; };
use ignore_result::Ignore; use ignore_result::Ignore;
use log::info; use log::{debug, info};
use tokio::{ use tokio::{
io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}, io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
@ -104,7 +104,9 @@ impl FlowgateServer {
&mut stream, &mut stream,
addr, addr,
true true
).await; ).await.map(|_| {
debug!("{} close connection", addr);
});
}); });
} }
@ -130,7 +132,7 @@ impl FlowgateServer {
} }
conn.stream.shutdown().await.ignore(); conn.stream.shutdown().await.ignore();
stream.shutdown().await.ok()?; stream.shutdown().await.ignore();
Some(()) Some(())
} }
@ -188,6 +190,8 @@ impl FlowgateServer {
let status = String::from_utf8(raw_status.clone()).ok()?; let status = String::from_utf8(raw_status.clone()).ok()?;
let status = status.split(" ").collect::<Vec<&str>>(); let status = status.split(" ").collect::<Vec<&str>>();
debug!("{} {} read status", addr, status[1]);
let mut content_length = 0; let mut content_length = 0;
let mut host = None; let mut host = None;
let mut is_chunked = false; let mut is_chunked = false;
@ -233,6 +237,8 @@ impl FlowgateServer {
} }
} }
debug!("{} {} read headers", addr, status[1]);
let mut conn: Connection = if conn.is_none() { let mut conn: Connection = if conn.is_none() {
let host = host?; let host = host?;
let site = self.config.get_site(&host)?.clone(); let site = self.config.get_site(&host)?.clone();
@ -247,6 +253,8 @@ impl FlowgateServer {
conn? conn?
}; };
debug!("{} {} got connection", addr, status[1]);
let mut request = Vec::new(); let mut request = Vec::new();
match &conn.config.ip_forwarding { match &conn.config.ip_forwarding {
@ -306,31 +314,43 @@ impl FlowgateServer {
conn.stream.write_all(&request).await.ok()?; conn.stream.write_all(&request).await.ok()?;
if content_length > 0 { debug!("{} {} sent request to server", addr, status[1]);
let buffer = stream.buffer();
conn.stream.write_all(buffer).await.ok()?; if content_length > 0 {
let buffer = stream.buffer().to_vec();
conn.stream.write_all(&buffer).await.ok()?;
stream.consume(buffer.len()); stream.consume(buffer.len());
let mut read = 0usize; let mut read = buffer.len();
let mut buf = vec![0; 4096];
debug!("{} {} send part of body to server", addr, status[1]);
while read < content_length {
let mut buf = vec![0; 4096];
let Ok(size) = conn.stream.get_mut().read(&mut buf).await else { break };
while let Ok(size) = stream.get_mut().read(&mut buf).await {
if size == 0 { break } if size == 0 { break }
read += size;
buf.truncate(size); buf.truncate(size);
conn.stream.write_all(&buf).await.ok()?; read += size;
buf = vec![0; 4096];
if read >= content_length { break } debug!("{} {} send response body part {} to clientr", addr, status[1], size);
stream.write_all(&buf).await.ok()?;
} }
} else if is_chunked { } else if is_chunked {
transfer_chunked(&mut stream, &mut conn.stream).await?; transfer_chunked(&mut stream, &mut conn.stream).await?;
} }
debug!("{} {} send body to server", addr, status[1]);
if conn.config.support_keep_alive { if conn.config.support_keep_alive {
let mut response = Vec::new();
let raw_status = read_until(&mut conn.stream, b"\r\n").await?; let raw_status = read_until(&mut conn.stream, b"\r\n").await?;
stream.write_all(&raw_status).await.ok()?; response.append(&mut raw_status.clone());
let mut content_length = 0; let mut content_length = 0;
let mut is_chunked = false; let mut is_chunked = false;
@ -338,12 +358,12 @@ impl FlowgateServer {
loop { loop {
let mut header = read_until(&mut conn.stream, b"\r\n").await?; let mut header = read_until(&mut conn.stream, b"\r\n").await?;
response.append(&mut header.clone());
if header.len() == 2 { if header.len() == 2 {
break; break;
} }
stream.write_all(&header).await.ok()?;
header.truncate(header.len() - 2); header.truncate(header.len() - 2);
let header = String::from_utf8(header).ok()?; let header = String::from_utf8(header).ok()?;
@ -362,38 +382,50 @@ impl FlowgateServer {
} }
} }
stream.write_all(&response).await.ok()?;
debug!("{} {} send response header to clientr", addr, status[1]);
if content_length > 0 { if content_length > 0 {
let buffer = conn.stream.buffer(); let buffer = conn.stream.buffer().to_vec();
stream.write_all(buffer).await.ok()?; stream.write_all(&buffer).await.ok()?;
stream.consume(buffer.len()); conn.stream.consume(buffer.len());
let mut read = 0usize; debug!("{} {} send response body part {} to clientr", addr, status[1], buffer.len());
let mut read = buffer.len();
while read < content_length {
let mut buf = vec![0; 4096]; let mut buf = vec![0; 4096];
let Ok(size) = conn.stream.get_mut().read(&mut buf).await else { break };
while let Ok(size) = conn.stream.get_mut().read(&mut buf).await {
if size == 0 { break } if size == 0 { break }
read += size;
buf.truncate(size); buf.truncate(size);
read += size;
debug!("{} {} send response body part {} to clientr", addr, status[1], size);
stream.write_all(&buf).await.ok()?; stream.write_all(&buf).await.ok()?;
buf = vec![0; 4096];
if read >= content_length { break }
} }
} else if is_chunked { } else if is_chunked {
transfer_chunked(&mut conn.stream, &mut stream).await?; transfer_chunked(&mut conn.stream, &mut stream).await?;
} }
debug!("{} {} send response body to clientr", addr, status[1]);
} else { } else {
let buffer = conn.stream.buffer(); let buffer = conn.stream.buffer();
stream.write_all(buffer).await.ok()?; stream.write_all(buffer).await.ok()?;
stream.consume(buffer.len()); conn.stream.consume(buffer.len());
let mut buf = vec![0;1024]; let mut buf = vec![0;4096];
while let Ok(n) = conn.stream.get_mut().read(&mut buf).await { while let Ok(n) = conn.stream.get_mut().read(&mut buf).await {
if n == 0 { break } if n == 0 { break }
buf.truncate(n); buf.truncate(n);
stream.write_all(&buf).await.ok()?; stream.write_all(&buf).await.ok()?;
buf = vec![0;1024]; buf = vec![0;4096];
} }
} }

40
tests/http_client.py Normal file
View File

@ -0,0 +1,40 @@
import socket
HOST = '172.16.1.32'
PORT = 80
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((HOST, PORT))
i = 0
while True:
s.send(b"""GET / HTTP/1.1\r
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7\r
Accept-Encoding: gzip, deflate, br, zstd\r
Accept-Language: en-US,en;q=0.9\r
Cache-Control: no-cache\r
Connection: keep-alive\r
Host: meex.lol\r
Pragma: no-cache\r
Sec-Fetch-Dest: document\r
Sec-Fetch-Mode: navigate\r
Sec-Fetch-Site: none\r
Sec-Fetch-User: ?1\r
Upgrade-Insecure-Requests: 1\r
User-Agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36\r
sec-ch-ua: \"Chromium\";v=\"135\", \"Not-A.Brand\";v=\"8\"\r
sec-ch-ua-mobile: ?0\r
sec-ch-ua-platform: \"Linux\"\r
\r
""")
content_length = 0
while content_length < 14564:
data = s.recv(1024)
if data:
content_length += len(data)
# print(data.decode("utf8", errors="ignore"), end="")
else:
print("sdfsdf")
i += 1
print(i)