fix bufreader and chunked transfer

This commit is contained in:
MeexReay 2025-04-07 18:57:06 +03:00
parent ce0a1443dc
commit 2f7d558e02

View File

@ -25,7 +25,7 @@ pub struct FlowgateServer {
} }
struct Connection { struct Connection {
stream: TcpStream, stream: BufReader<TcpStream>,
config: SiteConfig, config: SiteConfig,
keep_alive: bool, keep_alive: bool,
host: String, host: String,
@ -122,7 +122,7 @@ impl FlowgateServer {
loop { loop {
if !conn.config.support_keep_alive { if !conn.config.support_keep_alive {
conn.stream.shutdown().await.ignore(); conn.stream.shutdown().await.ignore();
conn.stream = conn.config.connect().await?; conn.stream = BufReader::new(conn.config.connect().await?);
} }
conn = self.clone().read_request(stream, addr, https, Some(conn)).await?; conn = self.clone().read_request(stream, addr, https, Some(conn)).await?;
} }
@ -147,7 +147,8 @@ impl FlowgateServer {
match &self.config.incoming_ip_forwarding { match &self.config.incoming_ip_forwarding {
IpForwarding::Simple => { IpForwarding::Simple => {
let header = read_until(&mut stream, b"\n").await?; let mut header = read_until(&mut stream, b"\n").await?;
header.truncate(header.len()-1);
addr = SocketAddr::from_str(&String::from_utf8(header).ok()?).ok()?; addr = SocketAddr::from_str(&String::from_utf8(header).ok()?).ok()?;
}, },
IpForwarding::Modern => { IpForwarding::Modern => {
@ -178,6 +179,8 @@ impl FlowgateServer {
if head.is_empty() { return None; } if head.is_empty() { return None; }
head.truncate(head.len()-4);
let head_str = String::from_utf8(head.clone()).ok()?; let head_str = String::from_utf8(head.clone()).ok()?;
let head_str = head_str.trim_matches(char::from(0)).to_string(); let head_str = head_str.trim_matches(char::from(0)).to_string();
@ -218,7 +221,7 @@ impl FlowgateServer {
let site = self.config.get_site(&host)?.clone(); let site = self.config.get_site(&host)?.clone();
Connection { Connection {
stream: site.connect().await?, stream: BufReader::new(site.connect().await?),
config: site, config: site,
keep_alive, keep_alive,
host host
@ -319,67 +322,18 @@ impl FlowgateServer {
if read >= content_length { break } if read >= content_length { break }
} }
} else if is_chunked { } else if is_chunked {
loop { transfer_chunked(&mut stream, &mut conn.stream).await?;
let mut length = Vec::new();
{
let mut buf = [0; 1];
let mut counter = 0;
while let Ok(1) = stream.read(&mut buf).await {
let byte = buf[0];
length.push(byte);
counter = match (counter, byte) {
(0, b'\r') => 1,
(1, b'\n') => break,
_ => 0,
};
}
conn.stream.write_all(&length).await.ok()?;
length.truncate(length.len() - 2);
}
let length = String::from_utf8(length).ok()?;
let length = usize::from_str_radix(length.as_str(), 16).ok()?;
let mut data = vec![0u8; length+2];
stream.read_exact(&mut data).await.ok()?;
conn.stream.write_all(&data).await.ok()?;
if length == 0 {
break;
}
}
} }
if conn.config.support_keep_alive { if conn.config.support_keep_alive {
let mut head = Vec::new(); let mut head = read_until(&mut conn.stream, b"\r\n\r\n").await?;
{
let mut buf = [0; 1];
let mut counter = 0;
while let Ok(1) = conn.stream.read(&mut buf).await {
let byte = buf[0];
head.push(byte);
stream.write_all(&buf).await.ok()?;
counter = match (counter, byte) {
(0, b'\r') => 1,
(1, b'\n') => 2,
(2, b'\r') => 3,
(3, b'\n') => break,
_ => 0,
};
}
head.truncate(head.len() - 4);
}
if head.is_empty() { return None; } if head.is_empty() { return None; }
stream.write_all(&head).await.ok()?;
head.truncate(head.len()-4);
let head_str = String::from_utf8(head.clone()).ok()?; let head_str = String::from_utf8(head.clone()).ok()?;
let head_str = head_str.trim_matches(char::from(0)); let head_str = head_str.trim_matches(char::from(0));
@ -414,36 +368,7 @@ impl FlowgateServer {
if read == content_length { break } if read == content_length { break }
} }
} else if is_chunked { } else if is_chunked {
loop { transfer_chunked(&mut conn.stream, &mut stream).await?;
let mut length = Vec::new();
{
let mut buf = [0; 1];
let mut counter = 0;
while let Ok(1) = conn.stream.read(&mut buf).await {
let byte = buf[0];
length.push(byte);
counter = match (counter, byte) {
(0, b'\r') => 1,
(1, b'\n') => break,
_ => 0,
};
}
stream.write_all(&length).await.ok()?;
length.truncate(length.len() - 2);
}
let length = String::from_utf8(length).ok()?;
let length = usize::from_str_radix(length.as_str(), 16).ok()?;
let mut data = vec![0u8; length+2];
conn.stream.read_exact(&mut data).await.ok()?;
stream.write_all(&data).await.ok()?;
if length == 0 {
break;
}
}
} }
} else { } else {
let mut buf = vec![0;1024]; let mut buf = vec![0;1024];
@ -463,25 +388,37 @@ impl FlowgateServer {
async fn read_until(stream: &mut (impl AsyncBufReadExt + Unpin), delimiter: &[u8]) -> Option<Vec<u8>> { async fn read_until(stream: &mut (impl AsyncBufReadExt + Unpin), delimiter: &[u8]) -> Option<Vec<u8>> {
let mut data = Vec::new(); let mut data = Vec::new();
let mut counter = 0usize;
let mut buf = vec![0]; let last_byte = *delimiter.last()?;
while let Ok(1) = stream.read(&mut buf).await { loop {
let char = buf[0]; let mut buf = Vec::new();
data.push(char); stream.read_until(last_byte, &mut buf).await.ok()?;
data.append(&mut buf);
if char == delimiter[counter] { if data.ends_with(delimiter) {
counter += 1; break;
if counter == delimiter.len() {
break
}
} else {
counter = 0;
} }
} }
data.truncate(data.len()-delimiter.len());
Some(data) Some(data)
} }
async fn transfer_chunked(src: &mut (impl AsyncBufReadExt + Unpin), dest: &mut (impl AsyncWriteExt + Unpin)) -> Option<()> {
loop {
let mut length = read_until(src, b"\r\n").await?;
dest.write_all(&length).await.ok()?;
length.truncate(length.len()-2);
let length = String::from_utf8(length).ok()?;
let length = usize::from_str_radix(length.as_str(), 16).ok()?;
let mut data = vec![0u8; length+2];
src.read_exact(&mut data).await.ok()?;
dest.write_all(&data).await.ok()?;
if length == 0 {
break;
}
}
Some(())
}