websocket

This commit is contained in:
MeexReay 2024-11-22 16:08:06 +03:00
parent eab6d83a5f
commit db288728bd
7 changed files with 100 additions and 43 deletions

View File

@ -5,13 +5,15 @@ edition = "2021"
[dependencies]
openssl = { version = "0.10.68", optional = true }
rustls = { version = "0.23.16", optional = true }
rustls = { version = "0.23.17", optional = true }
rustls-pemfile = { version = "2.2.0", optional = true }
serde_yml = "0.0.12"
log = "0.4.22"
colog = "1.3.0"
threadpool = "1.8.1"
wildcard_ex = "0.1.2"
websocket = "0.27.1"
serde_json = "1.0.133"
[features]
default = ["use-openssl"]

View File

@ -5,6 +5,8 @@ threadpool_size: 10 # Threadpool size (count of threads that accept r
connection_timeout: 10 # Read and write timeout of connections in seconds (optional, default - 10)
incoming_ip_forwarding: none # Read IP forwarding on incoming connections (optional, default - none)
websocket_host: localhost:999 # Websocket messaging host to edit sites (optional, default - null)
sites:
- domain: localhost # Site domain (use wildcard matching)
host: localhost:8080 # Http server host

View File

@ -2,8 +2,4 @@ pub mod config;
pub mod server;
pub mod ssl_cert;
pub mod closeable;
pub use config::*;
pub use server::*;
pub use ssl_cert::*;
pub use closeable::*;
pub mod websocket;

View File

@ -1,9 +1,9 @@
use std::{fs, net::TcpStream, sync::Arc, time::Duration};
use std::{fs, net::TcpStream, time::Duration};
use serde_yml::{Number, Value};
use wildcard_ex::is_match_simple;
use super::SslCert;
use super::ssl_cert::SslCert;
#[derive(Clone)]
pub struct SiteConfig {
@ -47,12 +47,13 @@ impl IpForwarding {
#[derive(Clone)]
pub struct Config {
pub sites: Arc<Vec<SiteConfig>>,
pub sites: Vec<SiteConfig>,
pub http_host: String,
pub https_host: String,
pub threadpool_size: usize,
pub connection_timeout: Duration,
pub incoming_ip_forwarding: IpForwarding
pub incoming_ip_forwarding: IpForwarding,
pub websocket_host: Option<String>
}
impl Config {
@ -71,6 +72,7 @@ impl Config {
.map(|o| o.as_str()).flatten()
.map(|o| IpForwarding::from_name(o)).flatten()
.unwrap_or(IpForwarding::None);
let websocket_host = doc.get("websocket_host").map(|o| o.as_str()).flatten().map(|o| o.to_string());
let mut sites: Vec<SiteConfig> = Vec::new();
@ -108,20 +110,19 @@ impl Config {
sites.push(site);
}
let sites = Arc::new(sites);
Some(Config {
sites,
http_host,
https_host,
threadpool_size,
connection_timeout,
incoming_ip_forwarding
incoming_ip_forwarding,
websocket_host
}.clone())
}
pub fn get_site(&self, domain: &str) -> Option<&SiteConfig> {
for i in self.sites.as_ref() {
for i in &self.sites {
if is_match_simple(&i.domain, domain) {
return Some(i);
}

View File

@ -1,21 +1,19 @@
use std::{
io::{Read, Write}, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, TcpListener, TcpStream}, str::FromStr, sync::Arc, thread, time::Duration
io::{Read, Write}, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, TcpListener, TcpStream}, str::FromStr, sync::{Arc, RwLock}, thread, time::Duration
};
use log::info;
use threadpool::ThreadPool;
use crate::IpForwarding;
use super::{Closeable, Config, SiteConfig};
use super::{closeable::Closeable, config::{Config,SiteConfig,IpForwarding}};
pub struct FlowgateServer {
config: Arc<Config>,
config: Arc<RwLock<Config>>,
}
impl FlowgateServer {
pub fn new(config: Config) -> Self {
FlowgateServer { config: Arc::new(config) }
pub fn new(config: Arc<RwLock<Config>>) -> Self {
FlowgateServer { config }
}
pub fn start(&self) {
@ -37,13 +35,13 @@ impl FlowgateServer {
}
pub fn run_http(
config: Arc<Config>
config: Arc<RwLock<Config>>
) -> Option<()> {
let listener = TcpListener::bind(&config.http_host).ok()?;
let listener = TcpListener::bind(&config.read().ok()?.http_host).ok()?;
let pool = ThreadPool::new(10);
info!("HTTP server runned on {}", &config.http_host);
info!("HTTP server runned on {}", &config.read().ok()?.http_host);
for stream in listener.incoming() {
pool.execute({
@ -72,11 +70,11 @@ impl FlowgateServer {
#[cfg(feature = "use-openssl")]
pub fn run_https(
config: Arc<Config>
config: Arc<RwLock<Config>>
) -> Option<()> {
use openssl::ssl::{NameType, SniError, SslAcceptor, SslAlert, SslMethod, SslRef};
let listener = TcpListener::bind(&config.https_host).ok()?;
let listener = TcpListener::bind(&config.read().ok()?.https_host).ok()?;
let mut cert = SslAcceptor::mozilla_intermediate(SslMethod::tls()).ok()?;
@ -85,7 +83,8 @@ impl FlowgateServer {
move |ssl: &mut SslRef, _: &mut SslAlert| -> Result<(), SniError> {
let servname = ssl.servername(NameType::HOST_NAME).ok_or(SniError::NOACK)?;
let cert = config.get_site(servname).ok_or(SniError::NOACK)?;
let c = config.read().unwrap();
let cert = c.get_site(servname).ok_or(SniError::NOACK)?;
ssl.set_ssl_context(&cert.ssl.as_ref().ok_or(SniError::NOACK)?.get_context()).ok().ok_or(SniError::NOACK)
}
}
@ -93,9 +92,9 @@ impl FlowgateServer {
let cert = cert.build();
let pool = ThreadPool::new(config.threadpool_size);
let pool = ThreadPool::new(config.read().ok()?.threadpool_size);
info!("HTTPS server runned on {}", &config.https_host);
info!("HTTPS server runned on {}", &config.read().ok()?.https_host);
for stream in listener.incoming() {
pool.execute({
@ -105,8 +104,8 @@ impl FlowgateServer {
move || {
let Ok(stream) = stream else { return };
let Ok(_) = stream.set_write_timeout(Some(config.connection_timeout)) else { return };
let Ok(_) = stream.set_read_timeout(Some(config.connection_timeout)) else { return };
let Ok(_) = stream.set_write_timeout(Some(config.read().unwrap().connection_timeout)) else { return };
let Ok(_) = stream.set_read_timeout(Some(config.read().unwrap().connection_timeout)) else { return };
let Ok(addr) = stream.peer_addr() else { return };
@ -127,7 +126,7 @@ impl FlowgateServer {
#[cfg(feature = "use-rustls")]
pub fn run_https(
config: Arc<Config>
config: Arc<RwLock<Config>>
) -> Option<()> {
use std::sync::Arc;
use rustls::{server::ResolvesServerCertUsingSni, ServerConfig};
@ -182,7 +181,7 @@ impl FlowgateServer {
}
pub fn accept_stream(
config: Arc<Config>,
config: Arc<RwLock<Config>>,
stream: &mut (impl Read + Write + Closeable),
addr: SocketAddr,
https: bool
@ -206,7 +205,7 @@ impl FlowgateServer {
}
fn read_request<'a>(
config: Arc<Config>,
config: Arc<RwLock<Config>>,
stream: &'a mut (impl Read + Write + Closeable),
addr: SocketAddr,
https: bool,
@ -214,7 +213,7 @@ impl FlowgateServer {
) -> Option<(TcpStream, SiteConfig, bool, String)> {
let mut addr = addr;
match &config.incoming_ip_forwarding {
match &config.read().ok()?.incoming_ip_forwarding {
IpForwarding::Simple => {
let mut header = Vec::new();
@ -293,7 +292,7 @@ impl FlowgateServer {
.map(|l| l.split_once(": ").unwrap())
.collect();
if let IpForwarding::Header(header) = &config.incoming_ip_forwarding {
if let IpForwarding::Header(header) = &config.read().ok()?.incoming_ip_forwarding {
if let Some(ip) = headers.iter().find(|o| o.0 == header).map(|o| o.1) {
addr = SocketAddr::from_str(ip).ok()?;
}
@ -311,9 +310,9 @@ impl FlowgateServer {
}
}
let site = config.get_site(&host)?;
let site = config.read().ok()?.get_site(&host)?.clone();
(site.connect()?, site.clone(), keep_alive, host)
(site.connect()?, site, keep_alive, host)
} else {
connected?
};

53
src/flowgate/websocket.rs Normal file
View File

@ -0,0 +1,53 @@
use std::sync::{Arc, RwLock};
use serde_json::Value;
use websocket::{sync::Server, OwnedMessage};
use super::config::{Config, IpForwarding, SiteConfig};
fn on_message(config: Arc<RwLock<Config>>, data: Value) -> Option<()> {
let data = data.as_object()?;
if data.get("type")?.as_str()? == "set_site" {
let mut conf = config.write().ok()?;
let domain = data.get("domain")?.as_str()?;
if let Some(site) = conf.sites.iter_mut().filter(|o| o.domain == domain).next() {
site.host = data.get("host")?.as_str()?.to_string();
site.enable_keep_alive = data.get("enable_keep_alive")?.as_bool()?;
site.support_keep_alive = data.get("support_keep_alive")?.as_bool()?;
site.ip_forwarding = IpForwarding::from_name(data.get("ip_forwarding")?.as_str()?)?;
} else {
conf.sites.push(SiteConfig {
domain: domain.to_string(),
host: data.get("host")?.as_str()?.to_string(),
enable_keep_alive: data.get("enable_keep_alive")?.as_bool()?,
support_keep_alive: data.get("support_keep_alive")?.as_bool()?,
ip_forwarding: IpForwarding::from_name(data.get("ip_forwarding")?.as_str()?)?,
ssl: None
});
}
}
Some(())
}
pub fn start_server(config: Arc<RwLock<Config>>) -> Option<()> {
let mut server = Server::bind(config.read().ok()?.websocket_host.clone()?).ok()?;
while let Ok(res) = server.accept() {
let mut res = res.accept().ok()?;
for msg in res.incoming_messages() {
if let Ok(OwnedMessage::Text(msg)) = msg {
if let Ok(data) = serde_json::from_str(&msg) {
if let None = on_message(config.clone(), data) {
break
}
}
} else {
break
}
}
}
Some(())
}

View File

@ -1,6 +1,6 @@
use std::{fs, path::Path};
use std::{fs, path::Path, sync::{Arc, RwLock}};
use flowgate::{Config, FlowgateServer};
use flowgate::{config::Config, server::FlowgateServer, websocket};
fn main() {
colog::init();
@ -9,10 +9,14 @@ fn main() {
let _ = fs::write("conf.yml", include_bytes!("../conf.yml"));
}
let config = Config::parse("conf.yml").unwrap();
let server = FlowgateServer::new(config);
let config = Arc::new(RwLock::new(Config::parse("conf.yml").unwrap()));
let server = FlowgateServer::new(config.clone());
server.start();
if config.read().unwrap().websocket_host.is_some() {
websocket::start_server(config);
} else {
loop {}
}
}