add RunningHttpServer

This commit is contained in:
MeexReay 2024-06-22 16:08:13 +03:00
parent 0ef43dcb47
commit 7c5e6dc9c7
3 changed files with 86 additions and 8 deletions

View File

@ -70,6 +70,6 @@ fn main() {
HttpServerStarter::new(site, host)
.timeout(Some(Duration::from_secs(5)))
.threads(5)
.start()
.start_forever()
.expect("http server error");
}

View File

@ -1,6 +1,7 @@
use futures::executor::block_on;
use serde_json::Value;
use std::io::{Read, Write};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use std::{
boxed::Box,
@ -645,6 +646,22 @@ pub struct HttpServerStarter<T: HttpServer + Send + 'static> {
threads: usize,
}
pub struct RunningHttpServer {
thread: thread::JoinHandle<()>,
running: Arc<AtomicBool>,
}
impl RunningHttpServer {
fn new(thread: thread::JoinHandle<()>, running: Arc<AtomicBool>) -> Self {
RunningHttpServer { thread, running }
}
pub fn close(self) {
self.running.store(false, Ordering::Release);
self.thread.join().unwrap();
}
}
impl<T: HttpServer + Send + 'static> HttpServerStarter<T> {
pub fn new(http_server: T, host: &str) -> Self {
HttpServerStarter {
@ -681,7 +698,7 @@ impl<T: HttpServer + Send + 'static> HttpServerStarter<T> {
return self;
}
pub fn start(self) -> Result<(), Box<dyn Error>> {
pub fn start_forever(self) -> Result<(), Box<dyn Error>> {
let handler = if self.support_http_rrs {
move |server, sock| {
handle_connection_rrs(server, sock);
@ -692,10 +709,12 @@ impl<T: HttpServer + Send + 'static> HttpServerStarter<T> {
}
};
let running = Arc::new(AtomicBool::new(true));
if self.threads == 0 {
start_server(self.http_server, &self.host, self.timeout, handler)
start_server(self.http_server, &self.host, self.timeout, handler, running)
} else if self.threads == 1 {
start_server_sync(self.http_server, &self.host, self.timeout, handler)
start_server_sync(self.http_server, &self.host, self.timeout, handler, running)
} else {
start_server_with_threadpool(
self.http_server,
@ -703,9 +722,63 @@ impl<T: HttpServer + Send + 'static> HttpServerStarter<T> {
self.timeout,
self.threads,
handler,
running,
)
}
}
pub fn start(self) -> RunningHttpServer {
let handler = if self.support_http_rrs {
move |server, sock| {
handle_connection_rrs(server, sock);
}
} else {
move |server, sock| {
handle_connection(server, sock);
}
};
let running = Arc::new(AtomicBool::new(true));
let running_clone = running.clone();
let thread = if self.threads == 0 {
thread::spawn(move || {
start_server(
self.http_server,
&self.host,
self.timeout,
handler,
running_clone,
)
.expect("http server error");
})
} else if self.threads == 1 {
thread::spawn(move || {
start_server_sync(
self.http_server,
&self.host,
self.timeout,
handler,
running_clone,
)
.expect("http server error");
})
} else {
thread::spawn(move || {
start_server_with_threadpool(
self.http_server,
&self.host,
self.timeout,
self.threads,
handler,
running_clone,
)
.expect("http server error")
})
};
RunningHttpServer::new(thread, running.clone())
}
}
fn start_server_with_threadpool<F, S>(
@ -714,6 +787,7 @@ fn start_server_with_threadpool<F, S>(
timeout: Option<Duration>,
threads: usize,
handler: F,
running: Arc<AtomicBool>,
) -> Result<(), Box<dyn Error>>
where
F: (Fn(Arc<Mutex<S>>, TcpStream) -> ()) + Send + 'static + Copy,
@ -727,7 +801,7 @@ where
let server_clone = server.clone();
block_on(server_clone.lock().unwrap().on_start(&host_clone));
loop {
while running.load(Ordering::Acquire) {
let (sock, _) = match listener.accept() {
Ok(i) => i,
Err(_) => {
@ -744,6 +818,8 @@ where
});
}
threadpool.join();
block_on(server.lock().unwrap().on_close());
Ok(())
@ -754,6 +830,7 @@ fn start_server<F, S>(
host: &str,
timeout: Option<Duration>,
handler: F,
running: Arc<AtomicBool>,
) -> Result<(), Box<dyn Error>>
where
F: (Fn(Arc<Mutex<S>>, TcpStream) -> ()) + Send + 'static + Copy,
@ -766,7 +843,7 @@ where
let server_clone = server.clone();
block_on(server_clone.lock().unwrap().on_start(&host_clone));
loop {
while running.load(Ordering::Acquire) {
let (sock, _) = match listener.accept() {
Ok(i) => i,
Err(_) => {
@ -793,6 +870,7 @@ fn start_server_sync<F, S>(
host: &str,
timeout: Option<Duration>,
handler: F,
running: Arc<AtomicBool>,
) -> Result<(), Box<dyn Error>>
where
F: (Fn(Arc<Mutex<S>>, TcpStream) -> ()) + Send + 'static + Copy,
@ -805,7 +883,7 @@ where
let server_clone = server.clone();
block_on(server_clone.lock().unwrap().on_start(&host_clone));
loop {
while running.load(Ordering::Acquire) {
let (sock, _) = match listener.accept() {
Ok(i) => i,
Err(_) => {

View File

@ -44,6 +44,6 @@ fn main() {
HttpServerStarter::new(site, host)
.timeout(Some(Duration::from_secs(5))) // read & write timeout
.threads(5) // threadpool size
.start()
.start_forever()
.expect("http server error");
}