bufreader + stream combine
This commit is contained in:
parent
2f7d558e02
commit
da3e5aade0
@ -1,23 +1,24 @@
|
|||||||
use std::{
|
use std::{
|
||||||
error::Error, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, str::FromStr, sync::Arc
|
error::Error, str::FromStr, sync::Arc, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}
|
||||||
};
|
};
|
||||||
|
|
||||||
use ignore_result::Ignore;
|
use ignore_result::Ignore;
|
||||||
|
use log::info;
|
||||||
|
|
||||||
use tokio::{
|
use tokio::{
|
||||||
io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
|
io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
|
||||||
net::{TcpListener, TcpStream}
|
net::{TcpListener, TcpStream}
|
||||||
};
|
};
|
||||||
|
|
||||||
use log::info;
|
|
||||||
use tokio_io_timeout::TimeoutStream;
|
use tokio_io_timeout::TimeoutStream;
|
||||||
use tokio_rustls::TlsAcceptor;
|
use tokio_rustls::TlsAcceptor;
|
||||||
|
|
||||||
use crate::tls::create_server_config;
|
use super::{
|
||||||
|
tls::create_server_config,
|
||||||
use super::config::{
|
config::{
|
||||||
Config,
|
Config,
|
||||||
SiteConfig,
|
SiteConfig,
|
||||||
IpForwarding
|
IpForwarding
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
pub struct FlowgateServer {
|
pub struct FlowgateServer {
|
||||||
@ -147,27 +148,34 @@ impl FlowgateServer {
|
|||||||
|
|
||||||
match &self.config.incoming_ip_forwarding {
|
match &self.config.incoming_ip_forwarding {
|
||||||
IpForwarding::Simple => {
|
IpForwarding::Simple => {
|
||||||
let mut header = read_until(&mut stream, b"\n").await?;
|
let mut header = Vec::new();
|
||||||
|
stream.read_until(b'\n', &mut header).await.ok()?;
|
||||||
header.truncate(header.len()-1);
|
header.truncate(header.len()-1);
|
||||||
|
|
||||||
addr = SocketAddr::from_str(&String::from_utf8(header).ok()?).ok()?;
|
addr = SocketAddr::from_str(&String::from_utf8(header).ok()?).ok()?;
|
||||||
},
|
},
|
||||||
IpForwarding::Modern => {
|
IpForwarding::Modern => {
|
||||||
let mut ipver = [0; 1];
|
let mut header = [0];
|
||||||
stream.read(&mut ipver).await.ok()?;
|
stream.read(&mut header).await.ok()?;
|
||||||
addr = match ipver[0] {
|
|
||||||
|
addr = match header[0] {
|
||||||
0x01 => {
|
0x01 => {
|
||||||
let mut octets = [0; 4];
|
let mut octets = [0; 4];
|
||||||
stream.read(&mut octets).await.ok()?;
|
stream.read(&mut octets).await.ok()?;
|
||||||
|
|
||||||
let mut port = [0; 2];
|
let mut port = [0; 2];
|
||||||
stream.read(&mut port).await.ok()?;
|
stream.read(&mut port).await.ok()?;
|
||||||
let port = u16::from_be_bytes(port);
|
let port = u16::from_be_bytes(port);
|
||||||
|
|
||||||
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(octets), port))
|
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(octets), port))
|
||||||
}, 0x02 => {
|
}, 0x02 => {
|
||||||
let mut octets = [0; 16];
|
let mut octets = [0; 16];
|
||||||
stream.read(&mut octets).await.ok()?;
|
stream.read(&mut octets).await.ok()?;
|
||||||
|
|
||||||
let mut port = [0; 2];
|
let mut port = [0; 2];
|
||||||
stream.read(&mut port).await.ok()?;
|
stream.read(&mut port).await.ok()?;
|
||||||
let port = u16::from_be_bytes(port);
|
let port = u16::from_be_bytes(port);
|
||||||
|
|
||||||
SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::from(octets), port, 0, 0))
|
SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::from(octets), port, 0, 0))
|
||||||
}, _ => { return None },
|
}, _ => { return None },
|
||||||
};
|
};
|
||||||
@ -175,49 +183,58 @@ impl FlowgateServer {
|
|||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut head = read_until(&mut stream, b"\r\n\r\n").await?;
|
let mut raw_status = read_until(&mut stream, b"\r\n").await?;
|
||||||
|
raw_status.truncate(raw_status.len() - 2);
|
||||||
|
let status = String::from_utf8(raw_status.clone()).ok()?;
|
||||||
|
let status = status.split(" ").collect::<Vec<&str>>();
|
||||||
|
|
||||||
if head.is_empty() { return None; }
|
let mut content_length = 0;
|
||||||
|
let mut host = None;
|
||||||
head.truncate(head.len()-4);
|
let mut is_chunked = false;
|
||||||
|
let mut keep_alive = false;
|
||||||
|
|
||||||
let head_str = String::from_utf8(head.clone()).ok()?;
|
let mut headers = Vec::new();
|
||||||
let head_str = head_str.trim_matches(char::from(0)).to_string();
|
|
||||||
|
|
||||||
let mut head_lines = head_str.split("\r\n");
|
loop {
|
||||||
|
let mut header = read_until(&mut stream, b"\r\n").await?;
|
||||||
|
header.truncate(header.len() - 2);
|
||||||
|
|
||||||
let status = head_lines.next()?;
|
if header.is_empty() {
|
||||||
let status_seq: Vec<&str> = status.split(" ").collect();
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
let headers: Vec<(&str, &str)> = head_lines
|
let header = String::from_utf8(header).ok()?;
|
||||||
.filter(|l| l.contains(": "))
|
let (key, value) = header.split_once(": ")?;
|
||||||
.map(|l| l.split_once(": ").unwrap())
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let is_chunked = headers.iter()
|
headers.push((key.to_string(), value.to_string()));
|
||||||
.find(|o| o.0.to_lowercase() == "transfer-encoding")
|
|
||||||
.map(|o| o.1.split(",").map(|x| x.trim_matches(' ').to_string()).collect::<Vec<String>>())
|
match key.to_lowercase().as_str() {
|
||||||
.map(|o| o.contains(&"chunked".to_string()))
|
"transfer-encoding" => {
|
||||||
.unwrap_or(false);
|
if value.contains("chunked") {
|
||||||
|
is_chunked = true;
|
||||||
if let IpForwarding::Header(header) = &self.config.incoming_ip_forwarding {
|
}
|
||||||
if let Some(ip) = headers.iter().find(|o| o.0 == header).map(|o| o.1) {
|
},
|
||||||
addr = SocketAddr::from_str(ip).ok()?;
|
"host" => {
|
||||||
|
host = Some(value.to_string());
|
||||||
|
},
|
||||||
|
"connection" => {
|
||||||
|
keep_alive = value.to_lowercase().contains("keep-alive");
|
||||||
|
},
|
||||||
|
"content-length" => {
|
||||||
|
content_length = value.parse::<usize>().ok()?;
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
if let IpForwarding::Header(header) = &self.config.incoming_ip_forwarding {
|
||||||
|
if key.to_lowercase() == header.to_lowercase() {
|
||||||
|
addr = SocketAddr::from_str(value).ok()?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut conn: Connection = if conn.is_none() {
|
let mut conn: Connection = if conn.is_none() {
|
||||||
let mut host = String::new();
|
let host = host?;
|
||||||
let mut keep_alive = false;
|
|
||||||
|
|
||||||
for (key, value) in &headers {
|
|
||||||
match key.to_lowercase().as_str() {
|
|
||||||
"host" => host = value.to_string(),
|
|
||||||
"connection" => keep_alive = *value == "keep-alive",
|
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let site = self.config.get_site(&host)?.clone();
|
let site = self.config.get_site(&host)?.clone();
|
||||||
|
|
||||||
Connection {
|
Connection {
|
||||||
@ -229,91 +246,76 @@ impl FlowgateServer {
|
|||||||
} else {
|
} else {
|
||||||
conn?
|
conn?
|
||||||
};
|
};
|
||||||
|
|
||||||
let content_length = headers
|
|
||||||
.iter()
|
|
||||||
.filter(|(k, _)| k.to_lowercase() == "content-length")
|
|
||||||
.next()
|
|
||||||
.map(|o| o.1.parse().ok())
|
|
||||||
.flatten()
|
|
||||||
.unwrap_or(0usize);
|
|
||||||
|
|
||||||
let mut reqbuf: Vec<u8> = Vec::new();
|
let mut request = Vec::new();
|
||||||
|
|
||||||
if let Some(replace_host) = conn.config.replace_host.clone() {
|
|
||||||
let mut new_head = Vec::new();
|
|
||||||
let mut is_status = true;
|
|
||||||
|
|
||||||
for line in head_str.split("\r\n") {
|
|
||||||
if is_status {
|
|
||||||
new_head.append(&mut line.as_bytes().to_vec());
|
|
||||||
is_status = false;
|
|
||||||
} else {
|
|
||||||
new_head.append(&mut b"\r\n".to_vec());
|
|
||||||
let (key, _) = line.split_once(": ")?;
|
|
||||||
if key.to_lowercase() == "host" {
|
|
||||||
new_head.append(&mut key.as_bytes().to_vec());
|
|
||||||
new_head.append(&mut b": ".to_vec());
|
|
||||||
new_head.append(&mut replace_host.as_bytes().to_vec());
|
|
||||||
} else {
|
|
||||||
new_head.append(&mut line.as_bytes().to_vec());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
head = new_head;
|
|
||||||
}
|
|
||||||
|
|
||||||
match &conn.config.ip_forwarding {
|
match &conn.config.ip_forwarding {
|
||||||
IpForwarding::Header(header) => {
|
|
||||||
reqbuf.append(&mut status.to_string().as_bytes().to_vec());
|
|
||||||
reqbuf.append(&mut b"\r\n".to_vec());
|
|
||||||
for (key, value) in String::from_utf8(head.clone()).ok()?
|
|
||||||
.split("\r\n")
|
|
||||||
.skip(1)
|
|
||||||
.filter_map(|o| o.split_once(": ")) {
|
|
||||||
if *key.to_lowercase() == header.to_lowercase() { continue }
|
|
||||||
reqbuf.append(&mut key.to_string().as_bytes().to_vec());
|
|
||||||
reqbuf.append(&mut b": ".to_vec());
|
|
||||||
reqbuf.append(&mut value.to_string().as_bytes().to_vec());
|
|
||||||
reqbuf.append(&mut b"\r\n".to_vec());
|
|
||||||
}
|
|
||||||
reqbuf.append(&mut header.as_bytes().to_vec());
|
|
||||||
reqbuf.append(&mut b": ".to_vec());
|
|
||||||
reqbuf.append(&mut addr.to_string().as_bytes().to_vec());
|
|
||||||
reqbuf.append(&mut b"\r\n\r\n".to_vec());
|
|
||||||
},
|
|
||||||
IpForwarding::Simple => {
|
IpForwarding::Simple => {
|
||||||
reqbuf.append(&mut addr.to_string().as_bytes().to_vec());
|
request.append(&mut addr.to_string().as_bytes().to_vec());
|
||||||
reqbuf.push(b'\n');
|
request.push(b'\n');
|
||||||
reqbuf.append(&mut head.clone());
|
|
||||||
reqbuf.append(&mut b"\r\n\r\n".to_vec());
|
|
||||||
},
|
},
|
||||||
IpForwarding::Modern => {
|
IpForwarding::Modern => {
|
||||||
reqbuf.push(if addr.is_ipv4() { 0x01 } else { 0x02 });
|
|
||||||
match addr.ip() {
|
match addr.ip() {
|
||||||
IpAddr::V4(ip) => {
|
IpAddr::V4(ip) => {
|
||||||
reqbuf.append(&mut ip.octets().to_vec());
|
request.push(0x01);
|
||||||
|
request.append(&mut ip.octets().to_vec());
|
||||||
}, IpAddr::V6(ip) => {
|
}, IpAddr::V6(ip) => {
|
||||||
reqbuf.append(&mut ip.octets().to_vec());
|
request.push(0x02);
|
||||||
|
request.append(&mut ip.octets().to_vec());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
reqbuf.append(&mut addr.port().to_be_bytes().to_vec());
|
request.append(&mut addr.port().to_be_bytes().to_vec());
|
||||||
reqbuf.append(&mut head.clone());
|
|
||||||
reqbuf.append(&mut b"\r\n\r\n".to_vec());
|
|
||||||
},
|
},
|
||||||
IpForwarding::None => {
|
_ => {}
|
||||||
reqbuf.append(&mut head.clone());
|
|
||||||
reqbuf.append(&mut b"\r\n\r\n".to_vec());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
conn.stream.write_all(&reqbuf).await.ok()?;
|
request.append(&mut raw_status.clone());
|
||||||
|
|
||||||
|
for (key, value) in headers {
|
||||||
|
let mut value = value.to_string();
|
||||||
|
|
||||||
|
match key.to_lowercase().as_str() {
|
||||||
|
"host" => {
|
||||||
|
if let Some(replace_host) = conn.config.replace_host.clone() {
|
||||||
|
value = replace_host;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let IpForwarding::Header(header) = &conn.config.ip_forwarding {
|
||||||
|
if key.to_lowercase() == header.to_lowercase() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
request.append(&mut key.as_bytes().to_vec());
|
||||||
|
request.append(&mut b": ".to_vec());
|
||||||
|
request.append(&mut value.as_bytes().to_vec());
|
||||||
|
request.append(&mut b"\r\n".to_vec());
|
||||||
|
}
|
||||||
|
|
||||||
|
if let IpForwarding::Header(header) = &conn.config.ip_forwarding {
|
||||||
|
request.append(&mut header.as_bytes().to_vec());
|
||||||
|
request.append(&mut b": ".to_vec());
|
||||||
|
request.append(&mut addr.to_string().as_bytes().to_vec());
|
||||||
|
request.append(&mut b"\r\n".to_vec());
|
||||||
|
}
|
||||||
|
|
||||||
|
request.append(&mut b"\r\n".to_vec());
|
||||||
|
|
||||||
|
conn.stream.write_all(&request).await.ok()?;
|
||||||
|
|
||||||
if content_length > 0 {
|
if content_length > 0 {
|
||||||
|
let buffer = stream.buffer();
|
||||||
|
|
||||||
|
conn.stream.write_all(buffer).await.ok()?;
|
||||||
|
stream.consume(buffer.len());
|
||||||
|
|
||||||
let mut read = 0usize;
|
let mut read = 0usize;
|
||||||
let mut buf = vec![0; 4096];
|
let mut buf = vec![0; 4096];
|
||||||
while let Ok(size) = stream.read(&mut buf).await {
|
|
||||||
|
while let Ok(size) = stream.get_mut().read(&mut buf).await {
|
||||||
if size == 0 { break }
|
if size == 0 { break }
|
||||||
read += size;
|
read += size;
|
||||||
buf.truncate(size);
|
buf.truncate(size);
|
||||||
@ -326,53 +328,68 @@ impl FlowgateServer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if conn.config.support_keep_alive {
|
if conn.config.support_keep_alive {
|
||||||
let mut head = read_until(&mut conn.stream, b"\r\n\r\n").await?;
|
let raw_status = read_until(&mut conn.stream, b"\r\n").await?;
|
||||||
|
|
||||||
if head.is_empty() { return None; }
|
|
||||||
|
|
||||||
stream.write_all(&head).await.ok()?;
|
|
||||||
|
|
||||||
head.truncate(head.len()-4);
|
|
||||||
|
|
||||||
let head_str = String::from_utf8(head.clone()).ok()?;
|
stream.write_all(&raw_status).await.ok()?;
|
||||||
let head_str = head_str.trim_matches(char::from(0));
|
|
||||||
|
|
||||||
let headers = head_str.split("\r\n")
|
let mut content_length = 0;
|
||||||
.skip(1)
|
let mut is_chunked = false;
|
||||||
.filter(|l| l.contains(": "))
|
|
||||||
.map(|l| l.split_once(": ").unwrap())
|
loop {
|
||||||
.map(|(k,v)| (k.to_lowercase(),v.to_string()))
|
let mut header = read_until(&mut conn.stream, b"\r\n").await?;
|
||||||
.collect::<Vec<(String,String)>>();
|
|
||||||
|
if header.len() == 2 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
stream.write_all(&header).await.ok()?;
|
||||||
|
|
||||||
|
header.truncate(header.len() - 2);
|
||||||
|
|
||||||
|
let header = String::from_utf8(header).ok()?;
|
||||||
|
let (key, value) = header.split_once(": ")?;
|
||||||
|
|
||||||
|
match key.to_lowercase().as_str() {
|
||||||
|
"transfer-encoding" => {
|
||||||
|
if value.contains("chunked") {
|
||||||
|
is_chunked = true;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"content-length" => {
|
||||||
|
content_length = value.parse::<usize>().ok()?;
|
||||||
|
},
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let content_length = headers.iter()
|
|
||||||
.find(|(k, _)| k == "content-length")
|
|
||||||
.map(|o| o.1.parse().ok())
|
|
||||||
.flatten()
|
|
||||||
.unwrap_or(0usize);
|
|
||||||
|
|
||||||
let is_chunked = headers.iter()
|
|
||||||
.find(|o| o.0.to_lowercase() == "transfer-encoding")
|
|
||||||
.map(|o| o.1.split(",").map(|x| x.trim_matches(' ').to_string()).collect::<Vec<String>>())
|
|
||||||
.map(|o| o.contains(&"chunked".to_string()))
|
|
||||||
.unwrap_or(false);
|
|
||||||
|
|
||||||
if content_length > 0 {
|
if content_length > 0 {
|
||||||
|
let buffer = conn.stream.buffer();
|
||||||
|
|
||||||
|
stream.write_all(buffer).await.ok()?;
|
||||||
|
stream.consume(buffer.len());
|
||||||
|
|
||||||
let mut read = 0usize;
|
let mut read = 0usize;
|
||||||
let mut buf = vec![0; 4096];
|
let mut buf = vec![0; 4096];
|
||||||
while let Ok(size) = conn.stream.read(&mut buf).await {
|
|
||||||
|
while let Ok(size) = conn.stream.get_mut().read(&mut buf).await {
|
||||||
if size == 0 { break }
|
if size == 0 { break }
|
||||||
read += size;
|
read += size;
|
||||||
buf.truncate(size);
|
buf.truncate(size);
|
||||||
stream.write_all(&buf).await.ok()?;
|
stream.write_all(&buf).await.ok()?;
|
||||||
buf = vec![0; 4096];
|
buf = vec![0; 4096];
|
||||||
if read == content_length { break }
|
if read >= content_length { break }
|
||||||
}
|
}
|
||||||
} else if is_chunked {
|
} else if is_chunked {
|
||||||
transfer_chunked(&mut conn.stream, &mut stream).await?;
|
transfer_chunked(&mut conn.stream, &mut stream).await?;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
let buffer = conn.stream.buffer();
|
||||||
|
|
||||||
|
stream.write_all(buffer).await.ok()?;
|
||||||
|
stream.consume(buffer.len());
|
||||||
|
|
||||||
let mut buf = vec![0;1024];
|
let mut buf = vec![0;1024];
|
||||||
while let Ok(n) = conn.stream.read(&mut buf).await {
|
while let Ok(n) = conn.stream.get_mut().read(&mut buf).await {
|
||||||
if n == 0 { break }
|
if n == 0 { break }
|
||||||
buf.truncate(n);
|
buf.truncate(n);
|
||||||
stream.write_all(&buf).await.ok()?;
|
stream.write_all(&buf).await.ok()?;
|
||||||
@ -380,7 +397,7 @@ impl FlowgateServer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("{addr} > {} {}://{}{}", status_seq[0], if https { "https" } else { "http" }, conn.host, status_seq[1]);
|
info!("{addr} > {} {}://{}{}", status[0], if https { "https" } else { "http" }, conn.host, status[1]);
|
||||||
|
|
||||||
Some(conn)
|
Some(conn)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user