From 0ef43dcb47b57aa5b7a5a399b757fd77a4c6650f Mon Sep 17 00:00:00 2001 From: MeexReay Date: Sat, 22 Jun 2024 15:36:38 +0300 Subject: [PATCH] add threadpool and HttpServerStarter --- README.md | 27 +++-- examples/simple_site.rs | 9 +- src/lib.rs | 256 ++++++++++++++++++++++++++++++++-------- src/main.rs | 27 +++-- 4 files changed, 249 insertions(+), 70 deletions(-) diff --git a/README.md b/README.md index 5db52e7..7df43eb 100644 --- a/README.md +++ b/README.md @@ -5,12 +5,21 @@ This library is under developement, so if you found any bugs, please write them Example: ```rust -use ezhttp::{Headers, HttpRequest, HttpResponse, HttpServer}; +use ezhttp::{Headers, HttpRequest, HttpResponse, HttpServer, HttpServerStarter}; +use std::time::Duration; struct EzSite { index_page: String, } +impl EzSite { + fn new(index_page: &str) -> Self { + EzSite { + index_page: index_page.to_string(), + } + } +} + impl HttpServer for EzSite { async fn on_request(&mut self, req: &HttpRequest) -> Option { println!("{} > {} {}", req.addr, req.method, req.page); @@ -22,7 +31,7 @@ impl HttpServer for EzSite { &self.index_page, // response body )) } else { - None // just shutdown socket + None // close connection } } @@ -35,19 +44,15 @@ impl HttpServer for EzSite { } } -impl EzSite { - fn new(index_page: &str) -> Self { - EzSite { - index_page: index_page.to_string(), - } - } -} - fn main() { let site = EzSite::new("Hello World!"); let host = "localhost:8080"; - ezhttp::start_server(site, host).unwrap(); + HttpServerStarter::new(site, host) + .timeout(Some(Duration::from_secs(5))) // read & write timeout + .threads(5) // threadpool size + .start() + .expect("http server error"); } ``` diff --git a/examples/simple_site.rs b/examples/simple_site.rs index 40bc644..bf05583 100644 --- a/examples/simple_site.rs +++ b/examples/simple_site.rs @@ -1,4 +1,5 @@ -use ezhttp::{Headers, HttpRequest, HttpResponse, HttpServer}; +use ezhttp::{Headers, HttpRequest, HttpResponse, HttpServer, HttpServerStarter}; +use std::time::Duration; struct EzSite { main_page: String, @@ -66,5 +67,9 @@ fn main() { let site = EzSite::new("

Hello World!

"); let host = "localhost:8080"; - ezhttp::start_server(site, host).unwrap(); + HttpServerStarter::new(site, host) + .timeout(Some(Duration::from_secs(5))) + .threads(5) + .start() + .expect("http server error"); } diff --git a/src/lib.rs b/src/lib.rs index bb816a7..bf35967 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,7 +12,7 @@ use std::{ }; use std::{ net::{TcpListener, TcpStream}, - sync::Mutex, + sync::{mpsc, Mutex}, }; #[derive(Clone, Debug)] @@ -628,7 +628,7 @@ impl HttpResponse { } } -pub trait HttpServer: Sync { +pub trait HttpServer { fn on_start(&mut self, host: &str) -> impl Future + Send; fn on_close(&mut self) -> impl Future + Send; fn on_request( @@ -637,16 +637,166 @@ pub trait HttpServer: Sync { ) -> impl Future> + Send; } -fn start_server_with_handler( +pub struct HttpServerStarter { + http_server: T, + support_http_rrs: bool, + timeout: Option, + host: String, + threads: usize, +} + +impl HttpServerStarter { + pub fn new(http_server: T, host: &str) -> Self { + HttpServerStarter { + http_server, + support_http_rrs: false, + timeout: None, + host: host.to_string(), + threads: 0, + } + } + + pub fn http_server(mut self, http_server: T) -> Self { + self.http_server = http_server; + return self; + } + + pub fn support_http_rrs(mut self, support_http_rrs: bool) -> Self { + self.support_http_rrs = support_http_rrs; + return self; + } + + pub fn timeout(mut self, timeout: Option) -> Self { + self.timeout = timeout; + return self; + } + + pub fn host(mut self, host: String) -> Self { + self.host = host; + return self; + } + + pub fn threads(mut self, threads: usize) -> Self { + self.threads = threads; + return self; + } + + pub fn start(self) -> Result<(), Box> { + let handler = if self.support_http_rrs { + move |server, sock| { + handle_connection_rrs(server, sock); + } + } else { + move |server, sock| { + handle_connection(server, sock); + } + }; + + if self.threads == 0 { + start_server(self.http_server, &self.host, self.timeout, handler) + } else if self.threads == 1 { + start_server_sync(self.http_server, &self.host, self.timeout, handler) + } else { + start_server_with_threadpool( + self.http_server, + &self.host, + self.timeout, + self.threads, + handler, + ) + } + } +} + +fn start_server_with_threadpool( + server: S, + host: &str, + timeout: Option, + threads: usize, + handler: F, +) -> Result<(), Box> +where + F: (Fn(Arc>, TcpStream) -> ()) + Send + 'static + Copy, + S: HttpServer + Send + 'static, +{ + let threadpool = ThreadPool::new(threads); + let server = Arc::new(Mutex::new(server)); + let listener = TcpListener::bind(host)?; + + let host_clone = String::from(host).clone(); + let server_clone = server.clone(); + block_on(server_clone.lock().unwrap().on_start(&host_clone)); + + loop { + let (sock, _) = match listener.accept() { + Ok(i) => i, + Err(_) => { + break; + } + }; + + sock.set_read_timeout(timeout).unwrap(); + sock.set_write_timeout(timeout).unwrap(); + + let now_server = Arc::clone(&server); + threadpool.execute(move || { + handler(now_server, sock); + }); + } + + block_on(server.lock().unwrap().on_close()); + + Ok(()) +} + +fn start_server( server: S, host: &str, timeout: Option, handler: F, ) -> Result<(), Box> where - F: Fn(Arc>, TcpStream) -> (), + F: (Fn(Arc>, TcpStream) -> ()) + Send + 'static + Copy, + S: HttpServer + Send + 'static, +{ + let server = Arc::new(Mutex::new(server)); + let listener = TcpListener::bind(host)?; + + let host_clone = String::from(host).clone(); + let server_clone = server.clone(); + block_on(server_clone.lock().unwrap().on_start(&host_clone)); + + loop { + let (sock, _) = match listener.accept() { + Ok(i) => i, + Err(_) => { + break; + } + }; + + sock.set_read_timeout(timeout).unwrap(); + sock.set_write_timeout(timeout).unwrap(); + + let now_server = Arc::clone(&server); + thread::spawn(move || { + handler(now_server, sock); + }); + } + + block_on(server.lock().unwrap().on_close()); + + Ok(()) +} + +fn start_server_sync( + server: S, + host: &str, + timeout: Option, + handler: F, +) -> Result<(), Box> +where + F: (Fn(Arc>, TcpStream) -> ()) + Send + 'static + Copy, S: HttpServer + Send + 'static, - F: Send + 'static, { let server = Arc::new(Mutex::new(server)); let listener = TcpListener::bind(host)?; @@ -675,46 +825,6 @@ where Ok(()) } -pub fn start_server( - server: impl HttpServer + Send + 'static, - host: &str, -) -> Result<(), Box> { - start_server_with_handler(server, host, None, move |server, sock| { - thread::spawn(move || handle_connection(server, sock)); - }) -} - -// http rrs support -pub fn start_server_rrs( - server: impl HttpServer + Send + 'static, - host: &str, -) -> Result<(), Box> { - start_server_with_handler(server, host, None, move |server, sock| { - thread::spawn(move || handle_connection_rrs(server, sock)); - }) -} - -pub fn start_server_timeout( - server: impl HttpServer + Send + 'static, - host: &str, - timeout: Duration, -) -> Result<(), Box> { - start_server_with_handler(server, host, Some(timeout), move |server, sock| { - thread::spawn(move || handle_connection(server, sock)); - }) -} - -// http rrs support -pub fn start_server_rrs_timeout( - server: impl HttpServer + Send + 'static, - host: &str, - timeout: Duration, -) -> Result<(), Box> { - start_server_with_handler(server, host, Some(timeout), move |server, sock| { - thread::spawn(move || handle_connection_rrs(server, sock)); - }) -} - fn handle_connection(server: Arc>, mut sock: TcpStream) { let addr = sock.peer_addr().unwrap(); @@ -733,7 +843,6 @@ fn handle_connection(server: Arc>, mut resp.write(&mut sock).unwrap(); } -// http rrs support fn handle_connection_rrs( server: Arc>, mut sock: TcpStream, @@ -752,3 +861,58 @@ fn handle_connection_rrs( }; resp.write(&mut sock).unwrap(); } + +type Job = Box; + +struct ThreadPool { + workers: Vec, + sender: mpsc::Sender, +} + +impl ThreadPool { + fn new(size: usize) -> ThreadPool { + assert!(size > 0); + + let (sender, receiver) = mpsc::channel(); + let receiver = Arc::new(Mutex::new(receiver)); + + let mut workers = Vec::with_capacity(size); + + for _ in 0..size { + workers.push(Worker::new(Arc::clone(&receiver))); + } + + ThreadPool { workers, sender } + } + + fn join(self) { + for ele in self.workers.into_iter() { + ele.thread.join().unwrap(); + } + } + + fn execute(&self, f: F) + where + F: FnOnce() + Send + 'static, + { + let job = Box::new(f); + + self.sender.send(job).unwrap(); + } +} + +struct Worker { + thread: thread::JoinHandle<()>, +} + +impl Worker { + fn new(receiver: Arc>>) -> Worker { + let thread = thread::spawn(move || { + while let Ok(job) = receiver.lock().unwrap().recv() { + job(); + } + }); + + Worker { thread } + } +} diff --git a/src/main.rs b/src/main.rs index b84cbdb..586a4eb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,18 @@ -use ezhttp::{Headers, HttpRequest, HttpResponse, HttpServer}; +use ezhttp::{Headers, HttpRequest, HttpResponse, HttpServer, HttpServerStarter}; +use std::time::Duration; struct EzSite { index_page: String, } +impl EzSite { + fn new(index_page: &str) -> Self { + EzSite { + index_page: index_page.to_string(), + } + } +} + impl HttpServer for EzSite { async fn on_request(&mut self, req: &HttpRequest) -> Option { println!("{} > {} {}", req.addr, req.method, req.page); @@ -15,7 +24,7 @@ impl HttpServer for EzSite { &self.index_page, // response body )) } else { - None // just shutdown socket + None // close connection } } @@ -28,17 +37,13 @@ impl HttpServer for EzSite { } } -impl EzSite { - fn new(index_page: &str) -> Self { - EzSite { - index_page: index_page.to_string(), - } - } -} - fn main() { let site = EzSite::new("Hello World!"); let host = "localhost:8080"; - ezhttp::start_server(site, host).unwrap(); + HttpServerStarter::new(site, host) + .timeout(Some(Duration::from_secs(5))) // read & write timeout + .threads(5) // threadpool size + .start() + .expect("http server error"); }