diff --git a/src/flowgate/server.rs b/src/flowgate/server.rs index 299978c..3c371c9 100755 --- a/src/flowgate/server.rs +++ b/src/flowgate/server.rs @@ -1,23 +1,24 @@ use std::{ - error::Error, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, str::FromStr, sync::Arc + error::Error, str::FromStr, sync::Arc, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6} }; use ignore_result::Ignore; +use log::info; + use tokio::{ io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}, net::{TcpListener, TcpStream} }; - -use log::info; use tokio_io_timeout::TimeoutStream; use tokio_rustls::TlsAcceptor; -use crate::tls::create_server_config; - -use super::config::{ - Config, - SiteConfig, - IpForwarding +use super::{ + tls::create_server_config, + config::{ + Config, + SiteConfig, + IpForwarding + } }; pub struct FlowgateServer { @@ -147,27 +148,34 @@ impl FlowgateServer { match &self.config.incoming_ip_forwarding { IpForwarding::Simple => { - let mut header = read_until(&mut stream, b"\n").await?; + let mut header = Vec::new(); + stream.read_until(b'\n', &mut header).await.ok()?; header.truncate(header.len()-1); + addr = SocketAddr::from_str(&String::from_utf8(header).ok()?).ok()?; }, IpForwarding::Modern => { - let mut ipver = [0; 1]; - stream.read(&mut ipver).await.ok()?; - addr = match ipver[0] { + let mut header = [0]; + stream.read(&mut header).await.ok()?; + + addr = match header[0] { 0x01 => { let mut octets = [0; 4]; stream.read(&mut octets).await.ok()?; + let mut port = [0; 2]; stream.read(&mut port).await.ok()?; let port = u16::from_be_bytes(port); + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(octets), port)) }, 0x02 => { let mut octets = [0; 16]; stream.read(&mut octets).await.ok()?; + let mut port = [0; 2]; stream.read(&mut port).await.ok()?; let port = u16::from_be_bytes(port); + SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::from(octets), port, 0, 0)) }, _ => { return None }, }; @@ -175,49 +183,58 @@ impl FlowgateServer { _ => {} } - let mut head = read_until(&mut stream, b"\r\n\r\n").await?; + let mut raw_status = read_until(&mut stream, b"\r\n").await?; + raw_status.truncate(raw_status.len() - 2); + let status = String::from_utf8(raw_status.clone()).ok()?; + let status = status.split(" ").collect::>(); - if head.is_empty() { return None; } - - head.truncate(head.len()-4); + let mut content_length = 0; + let mut host = None; + let mut is_chunked = false; + let mut keep_alive = false; - let head_str = String::from_utf8(head.clone()).ok()?; - let head_str = head_str.trim_matches(char::from(0)).to_string(); + let mut headers = Vec::new(); - let mut head_lines = head_str.split("\r\n"); + loop { + let mut header = read_until(&mut stream, b"\r\n").await?; + header.truncate(header.len() - 2); - let status = head_lines.next()?; - let status_seq: Vec<&str> = status.split(" ").collect(); + if header.is_empty() { + break; + } - let headers: Vec<(&str, &str)> = head_lines - .filter(|l| l.contains(": ")) - .map(|l| l.split_once(": ").unwrap()) - .collect(); + let header = String::from_utf8(header).ok()?; + let (key, value) = header.split_once(": ")?; - let is_chunked = headers.iter() - .find(|o| o.0.to_lowercase() == "transfer-encoding") - .map(|o| o.1.split(",").map(|x| x.trim_matches(' ').to_string()).collect::>()) - .map(|o| o.contains(&"chunked".to_string())) - .unwrap_or(false); - - if let IpForwarding::Header(header) = &self.config.incoming_ip_forwarding { - if let Some(ip) = headers.iter().find(|o| o.0 == header).map(|o| o.1) { - addr = SocketAddr::from_str(ip).ok()?; + headers.push((key.to_string(), value.to_string())); + + match key.to_lowercase().as_str() { + "transfer-encoding" => { + if value.contains("chunked") { + is_chunked = true; + } + }, + "host" => { + host = Some(value.to_string()); + }, + "connection" => { + keep_alive = value.to_lowercase().contains("keep-alive"); + }, + "content-length" => { + content_length = value.parse::().ok()?; + }, + _ => { + if let IpForwarding::Header(header) = &self.config.incoming_ip_forwarding { + if key.to_lowercase() == header.to_lowercase() { + addr = SocketAddr::from_str(value).ok()?; + } + } + }, } } let mut conn: Connection = if conn.is_none() { - let mut host = String::new(); - let mut keep_alive = false; - - for (key, value) in &headers { - match key.to_lowercase().as_str() { - "host" => host = value.to_string(), - "connection" => keep_alive = *value == "keep-alive", - _ => {} - } - } - + let host = host?; let site = self.config.get_site(&host)?.clone(); Connection { @@ -229,91 +246,76 @@ impl FlowgateServer { } else { conn? }; - - let content_length = headers - .iter() - .filter(|(k, _)| k.to_lowercase() == "content-length") - .next() - .map(|o| o.1.parse().ok()) - .flatten() - .unwrap_or(0usize); - let mut reqbuf: Vec = Vec::new(); - - if let Some(replace_host) = conn.config.replace_host.clone() { - let mut new_head = Vec::new(); - let mut is_status = true; - - for line in head_str.split("\r\n") { - if is_status { - new_head.append(&mut line.as_bytes().to_vec()); - is_status = false; - } else { - new_head.append(&mut b"\r\n".to_vec()); - let (key, _) = line.split_once(": ")?; - if key.to_lowercase() == "host" { - new_head.append(&mut key.as_bytes().to_vec()); - new_head.append(&mut b": ".to_vec()); - new_head.append(&mut replace_host.as_bytes().to_vec()); - } else { - new_head.append(&mut line.as_bytes().to_vec()); - } - } - } - - head = new_head; - } + let mut request = Vec::new(); match &conn.config.ip_forwarding { - IpForwarding::Header(header) => { - reqbuf.append(&mut status.to_string().as_bytes().to_vec()); - reqbuf.append(&mut b"\r\n".to_vec()); - for (key, value) in String::from_utf8(head.clone()).ok()? - .split("\r\n") - .skip(1) - .filter_map(|o| o.split_once(": ")) { - if *key.to_lowercase() == header.to_lowercase() { continue } - reqbuf.append(&mut key.to_string().as_bytes().to_vec()); - reqbuf.append(&mut b": ".to_vec()); - reqbuf.append(&mut value.to_string().as_bytes().to_vec()); - reqbuf.append(&mut b"\r\n".to_vec()); - } - reqbuf.append(&mut header.as_bytes().to_vec()); - reqbuf.append(&mut b": ".to_vec()); - reqbuf.append(&mut addr.to_string().as_bytes().to_vec()); - reqbuf.append(&mut b"\r\n\r\n".to_vec()); - }, IpForwarding::Simple => { - reqbuf.append(&mut addr.to_string().as_bytes().to_vec()); - reqbuf.push(b'\n'); - reqbuf.append(&mut head.clone()); - reqbuf.append(&mut b"\r\n\r\n".to_vec()); + request.append(&mut addr.to_string().as_bytes().to_vec()); + request.push(b'\n'); }, IpForwarding::Modern => { - reqbuf.push(if addr.is_ipv4() { 0x01 } else { 0x02 }); match addr.ip() { IpAddr::V4(ip) => { - reqbuf.append(&mut ip.octets().to_vec()); + request.push(0x01); + request.append(&mut ip.octets().to_vec()); }, IpAddr::V6(ip) => { - reqbuf.append(&mut ip.octets().to_vec()); + request.push(0x02); + request.append(&mut ip.octets().to_vec()); } } - reqbuf.append(&mut addr.port().to_be_bytes().to_vec()); - reqbuf.append(&mut head.clone()); - reqbuf.append(&mut b"\r\n\r\n".to_vec()); + request.append(&mut addr.port().to_be_bytes().to_vec()); }, - IpForwarding::None => { - reqbuf.append(&mut head.clone()); - reqbuf.append(&mut b"\r\n\r\n".to_vec()); - } + _ => {} } - conn.stream.write_all(&reqbuf).await.ok()?; + request.append(&mut raw_status.clone()); + + for (key, value) in headers { + let mut value = value.to_string(); + + match key.to_lowercase().as_str() { + "host" => { + if let Some(replace_host) = conn.config.replace_host.clone() { + value = replace_host; + } + }, + _ => {} + } + + if let IpForwarding::Header(header) = &conn.config.ip_forwarding { + if key.to_lowercase() == header.to_lowercase() { + continue; + } + } + + request.append(&mut key.as_bytes().to_vec()); + request.append(&mut b": ".to_vec()); + request.append(&mut value.as_bytes().to_vec()); + request.append(&mut b"\r\n".to_vec()); + } + + if let IpForwarding::Header(header) = &conn.config.ip_forwarding { + request.append(&mut header.as_bytes().to_vec()); + request.append(&mut b": ".to_vec()); + request.append(&mut addr.to_string().as_bytes().to_vec()); + request.append(&mut b"\r\n".to_vec()); + } + + request.append(&mut b"\r\n".to_vec()); + + conn.stream.write_all(&request).await.ok()?; if content_length > 0 { + let buffer = stream.buffer(); + + conn.stream.write_all(buffer).await.ok()?; + stream.consume(buffer.len()); + let mut read = 0usize; let mut buf = vec![0; 4096]; - while let Ok(size) = stream.read(&mut buf).await { + + while let Ok(size) = stream.get_mut().read(&mut buf).await { if size == 0 { break } read += size; buf.truncate(size); @@ -326,53 +328,68 @@ impl FlowgateServer { } if conn.config.support_keep_alive { - let mut head = read_until(&mut conn.stream, b"\r\n\r\n").await?; - - if head.is_empty() { return None; } - - stream.write_all(&head).await.ok()?; - - head.truncate(head.len()-4); + let raw_status = read_until(&mut conn.stream, b"\r\n").await?; - let head_str = String::from_utf8(head.clone()).ok()?; - let head_str = head_str.trim_matches(char::from(0)); + stream.write_all(&raw_status).await.ok()?; - let headers = head_str.split("\r\n") - .skip(1) - .filter(|l| l.contains(": ")) - .map(|l| l.split_once(": ").unwrap()) - .map(|(k,v)| (k.to_lowercase(),v.to_string())) - .collect::>(); + let mut content_length = 0; + let mut is_chunked = false; + + loop { + let mut header = read_until(&mut conn.stream, b"\r\n").await?; + + if header.len() == 2 { + break; + } + + stream.write_all(&header).await.ok()?; + + header.truncate(header.len() - 2); + + let header = String::from_utf8(header).ok()?; + let (key, value) = header.split_once(": ")?; + + match key.to_lowercase().as_str() { + "transfer-encoding" => { + if value.contains("chunked") { + is_chunked = true; + } + }, + "content-length" => { + content_length = value.parse::().ok()?; + }, + _ => {} + } + } - let content_length = headers.iter() - .find(|(k, _)| k == "content-length") - .map(|o| o.1.parse().ok()) - .flatten() - .unwrap_or(0usize); - - let is_chunked = headers.iter() - .find(|o| o.0.to_lowercase() == "transfer-encoding") - .map(|o| o.1.split(",").map(|x| x.trim_matches(' ').to_string()).collect::>()) - .map(|o| o.contains(&"chunked".to_string())) - .unwrap_or(false); - if content_length > 0 { + let buffer = conn.stream.buffer(); + + stream.write_all(buffer).await.ok()?; + stream.consume(buffer.len()); + let mut read = 0usize; let mut buf = vec![0; 4096]; - while let Ok(size) = conn.stream.read(&mut buf).await { + + while let Ok(size) = conn.stream.get_mut().read(&mut buf).await { if size == 0 { break } read += size; buf.truncate(size); stream.write_all(&buf).await.ok()?; buf = vec![0; 4096]; - if read == content_length { break } + if read >= content_length { break } } } else if is_chunked { transfer_chunked(&mut conn.stream, &mut stream).await?; } } else { + let buffer = conn.stream.buffer(); + + stream.write_all(buffer).await.ok()?; + stream.consume(buffer.len()); + let mut buf = vec![0;1024]; - while let Ok(n) = conn.stream.read(&mut buf).await { + while let Ok(n) = conn.stream.get_mut().read(&mut buf).await { if n == 0 { break } buf.truncate(n); stream.write_all(&buf).await.ok()?; @@ -380,7 +397,7 @@ impl FlowgateServer { } } - info!("{addr} > {} {}://{}{}", status_seq[0], if https { "https" } else { "http" }, conn.host, status_seq[1]); + info!("{addr} > {} {}://{}{}", status[0], if https { "https" } else { "http" }, conn.host, status[1]); Some(conn) }