From b3e51654f6c653b0603e4ebfc023a856d88e7ebd Mon Sep 17 00:00:00 2001 From: MeexReay Date: Fri, 13 Sep 2024 21:43:48 +0300 Subject: [PATCH] rewrite --- Cargo.toml | 19 +- README.md | 20 +- src/flowgate.rs | 7 + src/flowgate/config.rs | 81 ++++++ src/flowgate/server.rs | 197 +++++++++++++ src/flowgate/ssl_cert.rs | 43 +++ src/http_server.rs | 590 --------------------------------------- src/lib.rs | 71 ----- src/main.rs | 116 +------- 9 files changed, 359 insertions(+), 785 deletions(-) create mode 100644 src/flowgate.rs create mode 100644 src/flowgate/config.rs create mode 100644 src/flowgate/server.rs create mode 100644 src/flowgate/ssl_cert.rs delete mode 100644 src/http_server.rs delete mode 100644 src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index cebaa88..19cc936 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,12 +1,17 @@ [package] -name = "http_rrs" +name = "flowgate" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] -openssl = { version = "0.10.64", features = ["vendored"] } -yaml-rust = "0.4.5" -log = "0.4.21" -log4rs = "1.3.0" \ No newline at end of file +openssl = { version = "0.10.66", optional = true } +rustls = { version = "0.23.13", optional = true } +serde_yml = "0.0.12" +log = "0.4.22" +pretty_env_logger = "0.5.0" +threadpool = "1.8.1" + +[features] +default = ["use-openssl"] +use-openssl = ["dep:openssl"] +use-rustls = ["dep:rustls"] \ No newline at end of file diff --git a/README.md b/README.md index f3af232..768096b 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,13 @@ -# HttpRRS -HTTP request redirection system +# Flowgate +HTTP requests redirection system + +Features: +- Request redirection +- SSL/TLS support +- Rustls support (not yet) +- Keep-alive streams (not yet) + +## Config Default `conf.yml`: ```yml @@ -19,8 +27,6 @@ sites: host: localhost:8080 ``` -## How it works - -This works as a proxy that redirects based on the Host header - -![explaination.png](explaination.png) +Rust features: +- use-openssl +- use-rustls ([rustls](https://github.com/rustls/rustls) - openssl alternative) \ No newline at end of file diff --git a/src/flowgate.rs b/src/flowgate.rs new file mode 100644 index 0000000..d4fd4c1 --- /dev/null +++ b/src/flowgate.rs @@ -0,0 +1,7 @@ +pub mod config; +pub mod server; +pub mod ssl_cert; + +pub use config::*; +pub use server::*; +pub use ssl_cert::*; \ No newline at end of file diff --git a/src/flowgate/config.rs b/src/flowgate/config.rs new file mode 100644 index 0000000..ead9846 --- /dev/null +++ b/src/flowgate/config.rs @@ -0,0 +1,81 @@ +use std::{fs, net::TcpStream, sync::Arc}; + +use serde_yml::Value; + +use super::SslCert; + +#[derive(Clone)] +pub struct SiteConfig { + pub domain: String, + pub host: String, + pub ssl: Option, +} + +impl SiteConfig { + pub fn connect(self) -> Option { + TcpStream::connect(self.host).ok() + } +} + +#[derive(Clone)] +pub struct Config { + pub sites: Arc>, + pub http_host: String, + pub https_host: String, +} + +impl Config { + pub fn parse(filename: &str) -> Option { + let Ok(file_content) = fs::read_to_string(filename) else { + return None; + }; + let Ok(docs) = serde_yml::from_str::(file_content.as_str()) else { + return None; + }; + let doc = docs.get(0)?; + + let http_host = doc["http_host"].as_str()?.to_string(); + let https_host = doc["https_host"].as_str()?.to_string(); + + let mut sites: Vec = Vec::new(); + + let sites_yaml = doc["sites"].as_sequence()?; + + for s in sites_yaml { + let mut cert: Option = None; + let s = s.as_mapping()?; + + if s.contains_key("ssl_cert") && !s.get("ssl_cert")?.is_null() { + cert = Some( + SslCert::new( + s.get("ssl_cert")?.as_str()?, + s.get("ssl_key")?.as_str()?, + )?, + ); + } + + let site = SiteConfig { + domain: s.get("domain")?.as_str()?.to_string(), + host: s.get("host")?.as_str()?.to_string(), + ssl: cert, + }; + + sites.push(site); + } + + Some(Config { + sites: Arc::new(sites), + http_host, + https_host, + }) + } + + pub fn get_site(&self, domain: &str) -> Option<&SiteConfig> { + for i in self.sites.as_ref() { + if i.domain == domain { + return Some(i); + } + } + return None; + } +} \ No newline at end of file diff --git a/src/flowgate/server.rs b/src/flowgate/server.rs new file mode 100644 index 0000000..d746cb4 --- /dev/null +++ b/src/flowgate/server.rs @@ -0,0 +1,197 @@ +use std::{io::{Read, Write}, net::{Shutdown, SocketAddr, TcpListener}, sync::Arc, thread, time::Duration}; + +use log::info; +use openssl::ssl::{NameType, SniError, SslAcceptor, SslAlert, SslMethod, SslRef}; +use threadpool::ThreadPool; + +use crate::Config; + +pub struct FlowgateServer { + config: Arc, +} + +impl FlowgateServer { + pub fn new(config: Config) -> Self { + FlowgateServer { config: Arc::new(config) } + } + + pub fn start(self) { + thread::spawn({ + let config = Arc::clone(&self.config); + + move || { + Self::run_http(config) + } + }); + + thread::spawn({ + let config = Arc::clone(&self.config); + + move || { + Self::run_https(config) + } + }); + } + + pub fn run_http( + config: Arc + ) -> Option<()> { + let listener = TcpListener::bind(&config.http_host).ok()?; + + let pool = ThreadPool::new(10); + + info!("HTTP server runned on {}", &config.http_host); + + for stream in listener.incoming() { + pool.execute({ + let config = config.clone(); + + move || { + let Ok(mut stream) = stream else { return }; + + let Ok(_) = stream.set_write_timeout(Some(Duration::from_secs(10))) else { return }; + let Ok(_) = stream.set_read_timeout(Some(Duration::from_secs(10))) else { return }; + + let Ok(addr) = stream.peer_addr() else { return }; + + Self::accept_stream( + config, + &mut stream, + addr, + true + ); + } + }); + } + + Some(()) + } + + pub fn run_https( + config: Arc + ) -> Option<()> { + let listener = TcpListener::bind(&config.https_host).ok()?; + + let mut cert = SslAcceptor::mozilla_intermediate(SslMethod::tls()).ok()?; + + cert.set_servername_callback(Box::new({ + let config = config.clone(); + + move |ssl: &mut SslRef, _: &mut SslAlert| -> Result<(), SniError> { + let servname = ssl.servername(NameType::HOST_NAME).ok_or(SniError::NOACK)?; + let cert = config.get_site(servname).ok_or(SniError::NOACK)?; + ssl.set_ssl_context(&cert.ssl.as_ref().ok_or(SniError::NOACK)?.get_context()).ok().ok_or(SniError::NOACK) + } + } + )); + + let cert = cert.build(); + + let pool = ThreadPool::new(10); + + info!("HTTPS server runned on {}", &config.https_host); + + for stream in listener.incoming() { + pool.execute({ + let config = config.clone(); + let cert = cert.clone(); + + move || { + let Ok(stream) = stream else { return }; + + let Ok(_) = stream.set_write_timeout(Some(Duration::from_secs(10))) else { return }; + let Ok(_) = stream.set_read_timeout(Some(Duration::from_secs(10))) else { return }; + + let Ok(addr) = stream.peer_addr() else { return }; + + let Ok(mut stream) = cert.accept(stream) else { return }; + + Self::accept_stream( + config, + &mut stream, + addr, + true + ); + } + }); + } + + Some(()) + } + + pub fn accept_stream( + config: Arc, + stream: &mut (impl Read + Write), + addr: SocketAddr, + https: bool + ) -> Option<()> { + let mut reqst_data: Vec = vec![0; 4096]; + + stream.read(&mut reqst_data).ok()?; + + let reqst = String::from_utf8(reqst_data).ok()?; + let reqst = reqst.trim_matches(char::from(0)); + + let (head, body) = reqst.split_once("\r\n\r\n")?; + + let mut head_lines = head.split("\r\n"); + + let status = head_lines.next()?; + let status: Vec<&str> = status.split(" ").collect(); + + let mut host: &str = "honk"; + let mut content_length: usize = 0; + + for l in head_lines { + let (key, value) = l.split_once(": ")?; + let key = key.to_lowercase().replace("-", "_"); + + if key == "host" { + host = &value; + } + if key == "content_length" { + content_length = value.parse().ok()?; + } + } + + let site = config.get_site(host); + + if site.is_none() { + return None; + } + + let site = site?.clone(); + let mut site_stream = site.connect()?; + + site_stream.write((addr.to_string() + "\n" + reqst).as_bytes()).ok()?; + + let body_len = body.len(); + if body_len < content_length { + let mut body_data: Vec = vec![0; content_length - body_len]; + stream.read_exact(&mut body_data).ok()?; + site_stream.write_all(&body_data).ok()?; + } + + loop { + let mut buf: Vec = Vec::new(); + site_stream.read_to_end(&mut buf).ok()?; + if buf.is_empty() { + break; + } + stream.write_all(&buf).ok()?; + } + + let method = status[0]; + let page = status[1]; + + site_stream.shutdown(Shutdown::Both).ok()?; + + if https { + info!("{} > {} https://{}{}", addr.to_string(), method, host, page); + } else { + info!("{} > {} http://{}{}", addr.to_string(), method, host, page); + } + + Some(()) + } +} \ No newline at end of file diff --git a/src/flowgate/ssl_cert.rs b/src/flowgate/ssl_cert.rs new file mode 100644 index 0000000..118e000 --- /dev/null +++ b/src/flowgate/ssl_cert.rs @@ -0,0 +1,43 @@ +use openssl::ssl::{SslContext, SslFiletype, SslMethod}; + +#[derive(Clone)] +pub struct SslCert { + context: Option, +} + +fn generate_ctx(cert_file: &str, key_file: &str) -> Option { + let mut ctx = match SslContext::builder(SslMethod::tls()) { + Ok(i) => i, + Err(_) => return None, + }; + match ctx.set_private_key_file(&key_file, SslFiletype::PEM) { + Ok(i) => i, + Err(_) => return None, + }; + match ctx.set_certificate_file(&cert_file, SslFiletype::PEM) { + Ok(i) => i, + Err(_) => return None, + }; + match ctx.check_private_key() { + Ok(i) => i, + Err(_) => return None, + }; + Some(ctx.build()) +} + +impl SslCert { + pub fn new(cert_file: &str, key_file: &str) -> Option { + Some(SslCert { + context: match generate_ctx(cert_file, key_file) { + Some(i) => Some(i), + None => { + return None; + } + } + }) + } + + pub fn get_context(&self) -> SslContext { + self.context.as_ref().unwrap().clone() + } +} \ No newline at end of file diff --git a/src/http_server.rs b/src/http_server.rs deleted file mode 100644 index 3ea23c6..0000000 --- a/src/http_server.rs +++ /dev/null @@ -1,590 +0,0 @@ -use http_rrs::ThreadPool; -use log::{debug, info, warn}; -use openssl::ssl::{ - NameType, SniError, SslAcceptor, SslAlert, SslContext, SslFiletype, SslMethod, SslRef, - SslStream, -}; -use std::time::Duration; -use std::{ - io::{Read, Write}, - net::{IpAddr, Shutdown, TcpListener, TcpStream}, - sync::Arc, - thread, -}; - -#[derive(Clone)] -pub struct SslCert { - pub cert_file: String, - pub key_file: String, - pub ctx_index: u8, - pub ctx: Option, -} - -impl SslCert { - pub fn generate_ctx(cert_file: &str, key_file: &str) -> Option { - let mut ctx = match SslContext::builder(SslMethod::tls()) { - Ok(i) => i, - Err(_) => return None, - }; - match ctx.set_private_key_file(&key_file, SslFiletype::PEM) { - Ok(i) => i, - Err(_) => return None, - }; - match ctx.set_certificate_file(&cert_file, SslFiletype::PEM) { - Ok(i) => i, - Err(_) => return None, - }; - match ctx.check_private_key() { - Ok(i) => i, - Err(_) => return None, - }; - Some(ctx.build()) - } - - pub fn new(cert_file: &str, key_file: &str) -> Option { - let ctx = match Self::generate_ctx(cert_file, key_file) { - Some(i) => Some(i), - None => { - return None; - } - }; - - Some(SslCert { - ctx: ctx, - cert_file: cert_file.to_string(), - key_file: key_file.to_string(), - ctx_index: 0, - }) - } - - pub fn get_ctx(&mut self) -> Option<&SslContext> { - // self.ctx_index += 1; - // if self.ctx_index > 5 { - // self.ctx_index = 0; - // self.ctx = Self::generate_ctx(&self.cert_file, &self.key_file); - // } - self.ctx.as_ref() - } -} - -#[derive(Clone)] -pub struct Site { - pub domain: String, - pub host: String, - pub ssl: Option, -} - -impl Site { - fn connect(self) -> Result { - match TcpStream::connect(self.host) { - Ok(i) => Ok(i), - Err(_) => Err("server not canacting".to_string()), - } - } -} - -#[derive(Clone)] -pub struct SiteServer { - host: String, - sites: Arc>, -} - -fn split_once<'a>(in_string: &'a str, separator: &str) -> Result<(&'a str, &'a str), String> { - let mut splitter = in_string.splitn(2, &separator); - let first = match splitter.next() { - Some(i) => i, - None => return Err("first split nat foined".to_string()), - }; - let second = match splitter.next() { - Some(i) => i, - None => return Err("escond split nat foined".to_string()), - }; - Ok((first, second)) -} - -fn get_site(sites: &Vec, domain: &str) -> Option { - for i in sites.iter() { - if i.domain == domain { - return Some(i.clone()); - } - } - return None; -} - -impl SiteServer { - pub fn new<'a>(host: String, sites: Arc>) -> Self { - SiteServer { host, sites } - } - - fn get_site(self, domain: &str) -> Option { - return get_site(&self.sites, domain); - } -} - -impl SiteServer { - pub fn start_http(self) { - thread::spawn(move || { - self.run_http(); - }); - } - - pub fn run_http(self) { - let listener: TcpListener = match TcpListener::bind(&self.host) { - Ok(i) => i, - Err(_) => { - info!("Http server listener bind error"); - return; - } - }; - let pool = ThreadPool::new(10); - - info!("HTTP server runned on {}", &self.host); - - for stream in listener.incoming() { - let local_self = self.clone(); - - pool.execute(move || { - let mut stream = match stream { - Ok(i) => i, - Err(e) => { - warn!("{}", e); - return; - } - }; - - match stream.set_write_timeout(Some(Duration::from_secs(10))) { - Ok(i) => i, - Err(_) => { - return; - } - }; - match stream.set_read_timeout(Some(Duration::from_secs(10))) { - Ok(i) => i, - Err(_) => { - return; - } - }; - - let addr = stream.peer_addr(); - - match local_self.accept_http( - &mut stream, - match addr { - Ok(v) => v, - Err(_) => { - return; - } - }, - ) { - Ok(v) => { - let (addr, method, host, page) = v; - info!("{} > {} http://{}{}", addr, method, host, page); - } - Err(_) => {} - } - }); - } - } - - pub fn run_ssl(self) { - let listener: TcpListener = match TcpListener::bind(&self.host) { - Ok(i) => i, - Err(_) => { - info!("Ssl server listener bind error"); - return; - } - }; - - let mut cert = match SslAcceptor::mozilla_intermediate(SslMethod::tls()) { - Ok(v) => v, - Err(_) => { - info!("Ssl acceptor create error"); - return; - } - }; - - let sites = self.clone().sites.clone(); - - cert.set_servername_callback(Box::new( - move |_ssl: &mut SslRef, _alert: &mut SslAlert| -> Result<(), SniError> { - debug!("hangs"); - let servname = match _ssl.servername(NameType::HOST_NAME) { - Some(i) => i, - None => return Err(SniError::NOACK), - }; - let cert = match get_site(&sites, servname) { - Some(i) => i, - None => return Err(SniError::NOACK), - }; - match _ssl.set_ssl_context( - match match cert.ssl { - Some(i) => i, - None => return Err(SniError::NOACK), - } - .get_ctx() - { - Some(k) => k, - None => return Err(SniError::NOACK), - }, - ) { - Ok(i) => i, - Err(_) => return Err(SniError::NOACK), - }; - return Ok(()); - }, - )); - - let cert = cert.build(); - - let pool = ThreadPool::new(10); - - info!("HTTPS server runned on {}", &self.host); - - for stream in listener.incoming() { - let local_self = self.clone(); - let local_cert = cert.clone(); - - pool.execute(move || { - let stream = match stream { - Ok(i) => { - debug!("norm esy"); - i - } - Err(_) => { - return; - } - }; - - match stream.set_write_timeout(Some(Duration::from_secs(10))) { - Ok(i) => i, - Err(_) => { - return; - } - }; - match stream.set_read_timeout(Some(Duration::from_secs(10))) { - Ok(i) => i, - Err(_) => { - return; - } - }; - - let addr = stream.peer_addr(); - - let mut stream = match local_cert.accept(stream) { - Ok(st) => { - debug!("ssl esy"); - st - } - Err(_) => { - return; - } - }; - - match local_self.accept_ssl( - &mut stream, - match addr { - Ok(v) => v, - Err(_) => { - return; - } - }, - ) { - Ok(v) => { - let (addr, method, host, page) = v; - info!("{} > {} https://{}{}", addr, method, host, page); - } - Err(_) => {} - } - }); - } - } - - pub fn accept_http( - self, - stream: &mut TcpStream, - peer_addr: std::net::SocketAddr, - ) -> Result<(String, String, String, String), ()> { - let octets = match peer_addr.ip() { - IpAddr::V4(ip) => ip.octets(), - _ => [127, 0, 0, 1], - }; - - let dot: String = String::from("."); - let ip_str = String::from( - octets[0].to_string() - + &dot - + &octets[1].to_string() - + &dot - + &octets[2].to_string() - + &dot - + &octets[3].to_string(), - ); - - println!("{}", &ip_str); - - let addition: String = ip_str.clone() + ":" + peer_addr.port().to_string().as_str() + "\n"; - - let mut reqst_data: Vec = vec![0; 4096]; - - match stream.read(&mut reqst_data) { - Ok(i) => i, - Err(_) => return Err(()), - }; - - let reqst = match String::from_utf8(reqst_data) { - Ok(v) => v, - Err(_) => { - return Err(()); - } - }; - let reqst = reqst.trim_matches(char::from(0)); - - let (head, body) = match split_once(&reqst, "\r\n\r\n") { - Ok(i) => i, - Err(_) => return Err(()), - }; - - let mut head_lines = head.split("\r\n"); - - let status = match head_lines.next() { - Some(i) => i, - None => return Err(()), - }; - let status: Vec<&str> = status.split(" ").collect(); - - let mut host: &str = "honk"; - let mut content_length: usize = 0; - - for l in head_lines { - let (key, value) = match split_once(&l, ": ") { - Ok(i) => i, - Err(_) => return Err(()), - }; - let key = key.to_lowercase().replace("-", "_"); - - if key == "host" { - host = &value; - } - if key == "content_length" { - content_length = match value.parse() { - Ok(i) => i, - Err(_) => { - return Err(()); - } - }; - } - } - - let site = self.get_site(host); - - if site.is_none() { - return Err(()); - } - - let site = match site { - Some(i) => i, - None => return Err(()), - }; - let mut site_stream = match site.connect() { - Ok(i) => i, - Err(_) => return Err(()), - }; - - match site_stream.write((addition + reqst).as_bytes()) { - Ok(i) => i, - Err(_) => { - return Err(()); - } - }; - - let body_len = body.len(); - if body_len < content_length { - let mut body_data: Vec = vec![0; content_length - body_len]; - match stream.read_exact(&mut body_data) { - Ok(i) => i, - Err(_) => return Err(()), - }; - match site_stream.write_all(&body_data) { - Ok(i) => i, - Err(_) => return Err(()), - }; - } - - loop { - let mut buf: Vec = Vec::new(); - match site_stream.read_to_end(&mut buf) { - Ok(i) => i, - Err(_) => return Err(()), - }; - if buf.is_empty() { - break; - } - match stream.write_all(&buf) { - Ok(i) => i, - Err(_) => return Err(()), - }; - } - - let method = status[0]; - let page = status[1]; - - match site_stream.shutdown(Shutdown::Both) { - Ok(i) => i, - Err(_) => { - return Err(()); - } - }; - - Ok(( - ip_str.clone(), - method.to_string().clone(), - host.to_string().clone(), - page.to_string().clone(), - )) - } - - pub fn accept_ssl( - self, - stream: &mut SslStream, - peer_addr: std::net::SocketAddr, - ) -> Result<(String, String, String, String), ()> { - let octets = match peer_addr.ip() { - IpAddr::V4(ip) => ip.octets(), - _ => [127, 0, 0, 1], - }; - - let dot: String = String::from("."); - let ip_str = String::from( - octets[0].to_string() - + &dot - + &octets[1].to_string() - + &dot - + &octets[2].to_string() - + &dot - + &octets[3].to_string(), - ); - - let addition: String = ip_str.clone() + ":" + peer_addr.port().to_string().as_str() + "\n"; - - let mut reqst_data: Vec = vec![0; 4096]; - - match stream.read(&mut reqst_data) { - Ok(i) => i, - Err(_) => return Err(()), - }; - - let reqst = match String::from_utf8(reqst_data) { - Ok(v) => v, - Err(_) => { - return Err(()); - } - }; - let reqst = reqst.trim_matches(char::from(0)); - - let (head, body) = match split_once(&reqst, "\r\n\r\n") { - Ok(i) => i, - Err(_) => return Err(()), - }; - - let mut head_lines = head.split("\r\n"); - - let status = match head_lines.next() { - Some(i) => i, - None => return Err(()), - }; - let status: Vec<&str> = status.split(" ").collect(); - - let mut host: &str = "honk"; - let mut content_length: usize = 0; - - for l in head_lines { - let (key, value) = match split_once(&l, ": ") { - Ok(i) => i, - Err(_) => return Err(()), - }; - let key = key.to_lowercase().replace("-", "_"); - - if key == "host" { - host = &value; - } - if key == "content_length" { - content_length = match value.parse() { - Ok(i) => i, - Err(_) => { - return Err(()); - } - }; - } - } - - let site = self.get_site(host); - - if site.is_none() { - return Err(()); - } - - let site = match site { - Some(i) => i, - None => return Err(()), - }; - let mut site_stream = match site.connect() { - Ok(i) => i, - Err(_) => return Err(()), - }; - - match site_stream.write((addition + reqst).as_bytes()) { - Ok(i) => i, - Err(_) => { - return Err(()); - } - }; - - let body_len = body.len(); - if body_len < content_length { - let mut body_data: Vec = vec![0; content_length - body_len]; - match stream.read_exact(&mut body_data) { - Ok(i) => i, - Err(_) => return Err(()), - }; - match site_stream.write_all(&body_data) { - Ok(i) => i, - Err(_) => return Err(()), - }; - } - - loop { - let mut buf: Vec = Vec::new(); - match site_stream.read_to_end(&mut buf) { - Ok(i) => i, - Err(_) => return Err(()), - }; - if buf.is_empty() { - break; - } - match stream.write_all(&buf) { - Ok(i) => i, - Err(e) => { - info!("{}", e); - return Err(()); - } - }; - } - - let method = status[0]; - let page = status[1]; - - match site_stream.shutdown(Shutdown::Both) { - Ok(i) => i, - Err(_) => { - return Err(()); - } - }; - - Ok(( - ip_str.clone(), - method.to_string().clone(), - host.to_string().clone(), - page.to_string().clone(), - )) - } -} diff --git a/src/lib.rs b/src/lib.rs deleted file mode 100644 index 0c06c61..0000000 --- a/src/lib.rs +++ /dev/null @@ -1,71 +0,0 @@ -use std::{ - sync::{mpsc, Arc, Mutex}, - thread, -}; - -type Job = Box; - -pub struct ThreadPool { - workers: Vec, - sender: mpsc::Sender, -} - -pub enum PoolCreationError { - InvalidSize, -} - -impl ThreadPool { - pub fn new(size: usize) -> ThreadPool { - assert!(size > 0); - - let (sender, receiver) = mpsc::channel(); - let receiver = Arc::new(Mutex::new(receiver)); - - let mut workers = Vec::with_capacity(size); - - for _ in 0..size { - workers.push(Worker::new(Arc::clone(&receiver))); - } - - ThreadPool { workers, sender } - } - - pub fn build(size: usize) -> Result { - if size <= 0 { - Err(PoolCreationError::InvalidSize) - } else { - Ok(Self::new(size)) - } - } - - pub fn join(self) { - for ele in self.workers.into_iter() { - ele.thread.join().unwrap(); - } - } - - pub fn execute(&self, f: F) - where - F: FnOnce() + Send + 'static, - { - let job = Box::new(f); - - self.sender.send(job).unwrap(); - } -} - -struct Worker { - thread: thread::JoinHandle<()>, -} - -impl Worker { - fn new(receiver: Arc>>) -> Worker { - let thread = thread::spawn(move || { - while let Ok(job) = receiver.lock().unwrap().recv() { - job(); - } - }); - - Worker { thread } - } -} diff --git a/src/main.rs b/src/main.rs index 810b793..2671c85 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,116 +1,12 @@ -pub mod http_server; -extern crate yaml_rust; +mod flowgate; -use http_server::*; -use std::sync::Arc; -use std::{fs, thread}; -use yaml_rust::YamlLoader; - -use log::LevelFilter; -use log4rs::append::console::ConsoleAppender; -use log4rs::append::file::FileAppender; -use log4rs::config::{Appender, Config, Root}; -use log4rs::encode::pattern::PatternEncoder; - -struct AppConfig { - sites: Vec, - http_host: String, - https_host: String, -} - -impl AppConfig { - fn parse(filename: &str) -> Option { - let Ok(file_content) = fs::read_to_string(filename) else { - return None; - }; - let Ok(docs) = YamlLoader::load_from_str(file_content.as_str()) else { - return None; - }; - let doc = docs.get(0)?; - - let http_host = doc["http_host"].as_str()?.to_string(); - let https_host = doc["https_host"].as_str()?.to_string(); - - let mut sites: Vec = Vec::new(); - - let sites_yaml = doc["sites"].as_vec()?; - - for s in sites_yaml { - let mut cert: Option = None; - - if !s["ssl_cert"].is_badvalue() && !s["ssl_cert"].is_null() { - cert = Some( - SslCert::new( - s["ssl_cert"].as_str().unwrap(), - s["ssl_key"].as_str().unwrap(), - ) - .unwrap(), - ); - } - - let site = Site { - domain: s["domain"].as_str().unwrap().to_string(), - host: s["host"].as_str().unwrap().to_string(), - ssl: cert, - }; - - sites.push(site); - } - - Some(AppConfig { - sites, - http_host, - https_host, - }) - } -} +use flowgate::{Config, FlowgateServer}; fn main() { - log4rs::init_config( - Config::builder() - .appender( - Appender::builder().build( - "logfile", - Box::new( - FileAppender::builder() - .encoder(Box::new(PatternEncoder::new( - "{d(%Y-%m-%d %H:%M:%S)} | {l} - {m}\n", - ))) - .build("latest.log") - .unwrap(), - ), - ), - ) - .appender( - Appender::builder().build( - "stdout", - Box::new( - ConsoleAppender::builder() - .encoder(Box::new(PatternEncoder::new( - "{d(%Y-%m-%d %H:%M:%S)} | {l} - {m}\n", - ))) - .build(), - ), - ), - ) - .build( - Root::builder() - .appender("logfile") - .appender("stdout") - .build(LevelFilter::Debug), - ) - .unwrap(), - ) - .unwrap(); + pretty_env_logger::init(); - let config = AppConfig::parse("conf.yml").unwrap(); - let sites_arc = Arc::new(config.sites); + let config = Config::parse("conf.yml").unwrap(); + let server = FlowgateServer::new(config); - let sites = sites_arc.clone(); - thread::spawn(move || { - SiteServer::new(config.http_host.to_string(), sites).run_http(); - }); - - let sites = sites_arc.clone(); - SiteServer::new(config.https_host.to_string(), sites).run_ssl(); + server.start(); }