diff --git a/src/flowgate/server.rs b/src/flowgate/server.rs index 2938e63..299978c 100755 --- a/src/flowgate/server.rs +++ b/src/flowgate/server.rs @@ -25,7 +25,7 @@ pub struct FlowgateServer { } struct Connection { - stream: TcpStream, + stream: BufReader, config: SiteConfig, keep_alive: bool, host: String, @@ -122,7 +122,7 @@ impl FlowgateServer { loop { if !conn.config.support_keep_alive { conn.stream.shutdown().await.ignore(); - conn.stream = conn.config.connect().await?; + conn.stream = BufReader::new(conn.config.connect().await?); } conn = self.clone().read_request(stream, addr, https, Some(conn)).await?; } @@ -147,7 +147,8 @@ impl FlowgateServer { match &self.config.incoming_ip_forwarding { IpForwarding::Simple => { - let header = read_until(&mut stream, b"\n").await?; + let mut header = read_until(&mut stream, b"\n").await?; + header.truncate(header.len()-1); addr = SocketAddr::from_str(&String::from_utf8(header).ok()?).ok()?; }, IpForwarding::Modern => { @@ -178,6 +179,8 @@ impl FlowgateServer { if head.is_empty() { return None; } + head.truncate(head.len()-4); + let head_str = String::from_utf8(head.clone()).ok()?; let head_str = head_str.trim_matches(char::from(0)).to_string(); @@ -218,7 +221,7 @@ impl FlowgateServer { let site = self.config.get_site(&host)?.clone(); Connection { - stream: site.connect().await?, + stream: BufReader::new(site.connect().await?), config: site, keep_alive, host @@ -319,66 +322,17 @@ impl FlowgateServer { if read >= content_length { break } } } else if is_chunked { - loop { - let mut length = Vec::new(); - - { - let mut buf = [0; 1]; - let mut counter = 0; - - while let Ok(1) = stream.read(&mut buf).await { - let byte = buf[0]; - length.push(byte); - - counter = match (counter, byte) { - (0, b'\r') => 1, - (1, b'\n') => break, - _ => 0, - }; - } - conn.stream.write_all(&length).await.ok()?; - - length.truncate(length.len() - 2); - } - - let length = String::from_utf8(length).ok()?; - let length = usize::from_str_radix(length.as_str(), 16).ok()?; - let mut data = vec![0u8; length+2]; - stream.read_exact(&mut data).await.ok()?; - - conn.stream.write_all(&data).await.ok()?; - if length == 0 { - break; - } - } + transfer_chunked(&mut stream, &mut conn.stream).await?; } if conn.config.support_keep_alive { - let mut head = Vec::new(); - - { - let mut buf = [0; 1]; - let mut counter = 0; - - while let Ok(1) = conn.stream.read(&mut buf).await { - let byte = buf[0]; - head.push(byte); - - stream.write_all(&buf).await.ok()?; - - counter = match (counter, byte) { - (0, b'\r') => 1, - (1, b'\n') => 2, - (2, b'\r') => 3, - (3, b'\n') => break, - _ => 0, - }; - } - - head.truncate(head.len() - 4); - } + let mut head = read_until(&mut conn.stream, b"\r\n\r\n").await?; if head.is_empty() { return None; } + + stream.write_all(&head).await.ok()?; + + head.truncate(head.len()-4); let head_str = String::from_utf8(head.clone()).ok()?; let head_str = head_str.trim_matches(char::from(0)); @@ -414,36 +368,7 @@ impl FlowgateServer { if read == content_length { break } } } else if is_chunked { - loop { - let mut length = Vec::new(); - { - let mut buf = [0; 1]; - let mut counter = 0; - - while let Ok(1) = conn.stream.read(&mut buf).await { - let byte = buf[0]; - length.push(byte); - - counter = match (counter, byte) { - (0, b'\r') => 1, - (1, b'\n') => break, - _ => 0, - }; - } - stream.write_all(&length).await.ok()?; - - length.truncate(length.len() - 2); - } - let length = String::from_utf8(length).ok()?; - let length = usize::from_str_radix(length.as_str(), 16).ok()?; - let mut data = vec![0u8; length+2]; - conn.stream.read_exact(&mut data).await.ok()?; - - stream.write_all(&data).await.ok()?; - if length == 0 { - break; - } - } + transfer_chunked(&mut conn.stream, &mut stream).await?; } } else { let mut buf = vec![0;1024]; @@ -463,25 +388,37 @@ impl FlowgateServer { async fn read_until(stream: &mut (impl AsyncBufReadExt + Unpin), delimiter: &[u8]) -> Option> { let mut data = Vec::new(); - let mut counter = 0usize; - let mut buf = vec![0]; + let last_byte = *delimiter.last()?; - while let Ok(1) = stream.read(&mut buf).await { - let char = buf[0]; - data.push(char); - - if char == delimiter[counter] { - counter += 1; - if counter == delimiter.len() { - break - } - } else { - counter = 0; + loop { + let mut buf = Vec::new(); + stream.read_until(last_byte, &mut buf).await.ok()?; + data.append(&mut buf); + if data.ends_with(delimiter) { + break; } } - data.truncate(data.len()-delimiter.len()); - Some(data) } + +async fn transfer_chunked(src: &mut (impl AsyncBufReadExt + Unpin), dest: &mut (impl AsyncWriteExt + Unpin)) -> Option<()> { + loop { + let mut length = read_until(src, b"\r\n").await?; + dest.write_all(&length).await.ok()?; + length.truncate(length.len()-2); + let length = String::from_utf8(length).ok()?; + let length = usize::from_str_radix(length.as_str(), 16).ok()?; + + let mut data = vec![0u8; length+2]; + src.read_exact(&mut data).await.ok()?; + dest.write_all(&data).await.ok()?; + + if length == 0 { + break; + } + } + + Some(()) +} \ No newline at end of file