add threadpool and HttpServerStarter

This commit is contained in:
MeexReay 2024-06-22 15:36:38 +03:00
parent efc36aa3f8
commit 0ef43dcb47
4 changed files with 249 additions and 70 deletions

View File

@ -5,12 +5,21 @@ This library is under developement, so if you found any bugs, please write them
Example:
```rust
use ezhttp::{Headers, HttpRequest, HttpResponse, HttpServer};
use ezhttp::{Headers, HttpRequest, HttpResponse, HttpServer, HttpServerStarter};
use std::time::Duration;
struct EzSite {
index_page: String,
}
impl EzSite {
fn new(index_page: &str) -> Self {
EzSite {
index_page: index_page.to_string(),
}
}
}
impl HttpServer for EzSite {
async fn on_request(&mut self, req: &HttpRequest) -> Option<HttpResponse> {
println!("{} > {} {}", req.addr, req.method, req.page);
@ -22,7 +31,7 @@ impl HttpServer for EzSite {
&self.index_page, // response body
))
} else {
None // just shutdown socket
None // close connection
}
}
@ -35,19 +44,15 @@ impl HttpServer for EzSite {
}
}
impl EzSite {
fn new(index_page: &str) -> Self {
EzSite {
index_page: index_page.to_string(),
}
}
}
fn main() {
let site = EzSite::new("Hello World!");
let host = "localhost:8080";
ezhttp::start_server(site, host).unwrap();
HttpServerStarter::new(site, host)
.timeout(Some(Duration::from_secs(5))) // read & write timeout
.threads(5) // threadpool size
.start()
.expect("http server error");
}
```

View File

@ -1,4 +1,5 @@
use ezhttp::{Headers, HttpRequest, HttpResponse, HttpServer};
use ezhttp::{Headers, HttpRequest, HttpResponse, HttpServer, HttpServerStarter};
use std::time::Duration;
struct EzSite {
main_page: String,
@ -66,5 +67,9 @@ fn main() {
let site = EzSite::new("<h1>Hello World!</h1>");
let host = "localhost:8080";
ezhttp::start_server(site, host).unwrap();
HttpServerStarter::new(site, host)
.timeout(Some(Duration::from_secs(5)))
.threads(5)
.start()
.expect("http server error");
}

View File

@ -12,7 +12,7 @@ use std::{
};
use std::{
net::{TcpListener, TcpStream},
sync::Mutex,
sync::{mpsc, Mutex},
};
#[derive(Clone, Debug)]
@ -628,7 +628,7 @@ impl HttpResponse {
}
}
pub trait HttpServer: Sync {
pub trait HttpServer {
fn on_start(&mut self, host: &str) -> impl Future<Output = ()> + Send;
fn on_close(&mut self) -> impl Future<Output = ()> + Send;
fn on_request(
@ -637,16 +637,166 @@ pub trait HttpServer: Sync {
) -> impl Future<Output = Option<HttpResponse>> + Send;
}
fn start_server_with_handler<F, S>(
pub struct HttpServerStarter<T: HttpServer + Send + 'static> {
http_server: T,
support_http_rrs: bool,
timeout: Option<Duration>,
host: String,
threads: usize,
}
impl<T: HttpServer + Send + 'static> HttpServerStarter<T> {
pub fn new(http_server: T, host: &str) -> Self {
HttpServerStarter {
http_server,
support_http_rrs: false,
timeout: None,
host: host.to_string(),
threads: 0,
}
}
pub fn http_server(mut self, http_server: T) -> Self {
self.http_server = http_server;
return self;
}
pub fn support_http_rrs(mut self, support_http_rrs: bool) -> Self {
self.support_http_rrs = support_http_rrs;
return self;
}
pub fn timeout(mut self, timeout: Option<Duration>) -> Self {
self.timeout = timeout;
return self;
}
pub fn host(mut self, host: String) -> Self {
self.host = host;
return self;
}
pub fn threads(mut self, threads: usize) -> Self {
self.threads = threads;
return self;
}
pub fn start(self) -> Result<(), Box<dyn Error>> {
let handler = if self.support_http_rrs {
move |server, sock| {
handle_connection_rrs(server, sock);
}
} else {
move |server, sock| {
handle_connection(server, sock);
}
};
if self.threads == 0 {
start_server(self.http_server, &self.host, self.timeout, handler)
} else if self.threads == 1 {
start_server_sync(self.http_server, &self.host, self.timeout, handler)
} else {
start_server_with_threadpool(
self.http_server,
&self.host,
self.timeout,
self.threads,
handler,
)
}
}
}
fn start_server_with_threadpool<F, S>(
server: S,
host: &str,
timeout: Option<Duration>,
threads: usize,
handler: F,
) -> Result<(), Box<dyn Error>>
where
F: (Fn(Arc<Mutex<S>>, TcpStream) -> ()) + Send + 'static + Copy,
S: HttpServer + Send + 'static,
{
let threadpool = ThreadPool::new(threads);
let server = Arc::new(Mutex::new(server));
let listener = TcpListener::bind(host)?;
let host_clone = String::from(host).clone();
let server_clone = server.clone();
block_on(server_clone.lock().unwrap().on_start(&host_clone));
loop {
let (sock, _) = match listener.accept() {
Ok(i) => i,
Err(_) => {
break;
}
};
sock.set_read_timeout(timeout).unwrap();
sock.set_write_timeout(timeout).unwrap();
let now_server = Arc::clone(&server);
threadpool.execute(move || {
handler(now_server, sock);
});
}
block_on(server.lock().unwrap().on_close());
Ok(())
}
fn start_server<F, S>(
server: S,
host: &str,
timeout: Option<Duration>,
handler: F,
) -> Result<(), Box<dyn Error>>
where
F: Fn(Arc<Mutex<S>>, TcpStream) -> (),
F: (Fn(Arc<Mutex<S>>, TcpStream) -> ()) + Send + 'static + Copy,
S: HttpServer + Send + 'static,
{
let server = Arc::new(Mutex::new(server));
let listener = TcpListener::bind(host)?;
let host_clone = String::from(host).clone();
let server_clone = server.clone();
block_on(server_clone.lock().unwrap().on_start(&host_clone));
loop {
let (sock, _) = match listener.accept() {
Ok(i) => i,
Err(_) => {
break;
}
};
sock.set_read_timeout(timeout).unwrap();
sock.set_write_timeout(timeout).unwrap();
let now_server = Arc::clone(&server);
thread::spawn(move || {
handler(now_server, sock);
});
}
block_on(server.lock().unwrap().on_close());
Ok(())
}
fn start_server_sync<F, S>(
server: S,
host: &str,
timeout: Option<Duration>,
handler: F,
) -> Result<(), Box<dyn Error>>
where
F: (Fn(Arc<Mutex<S>>, TcpStream) -> ()) + Send + 'static + Copy,
S: HttpServer + Send + 'static,
F: Send + 'static,
{
let server = Arc::new(Mutex::new(server));
let listener = TcpListener::bind(host)?;
@ -675,46 +825,6 @@ where
Ok(())
}
pub fn start_server(
server: impl HttpServer + Send + 'static,
host: &str,
) -> Result<(), Box<dyn Error>> {
start_server_with_handler(server, host, None, move |server, sock| {
thread::spawn(move || handle_connection(server, sock));
})
}
// http rrs support
pub fn start_server_rrs(
server: impl HttpServer + Send + 'static,
host: &str,
) -> Result<(), Box<dyn Error>> {
start_server_with_handler(server, host, None, move |server, sock| {
thread::spawn(move || handle_connection_rrs(server, sock));
})
}
pub fn start_server_timeout(
server: impl HttpServer + Send + 'static,
host: &str,
timeout: Duration,
) -> Result<(), Box<dyn Error>> {
start_server_with_handler(server, host, Some(timeout), move |server, sock| {
thread::spawn(move || handle_connection(server, sock));
})
}
// http rrs support
pub fn start_server_rrs_timeout(
server: impl HttpServer + Send + 'static,
host: &str,
timeout: Duration,
) -> Result<(), Box<dyn Error>> {
start_server_with_handler(server, host, Some(timeout), move |server, sock| {
thread::spawn(move || handle_connection_rrs(server, sock));
})
}
fn handle_connection<S: HttpServer + Send + 'static>(server: Arc<Mutex<S>>, mut sock: TcpStream) {
let addr = sock.peer_addr().unwrap();
@ -733,7 +843,6 @@ fn handle_connection<S: HttpServer + Send + 'static>(server: Arc<Mutex<S>>, mut
resp.write(&mut sock).unwrap();
}
// http rrs support
fn handle_connection_rrs<S: HttpServer + Send + 'static>(
server: Arc<Mutex<S>>,
mut sock: TcpStream,
@ -752,3 +861,58 @@ fn handle_connection_rrs<S: HttpServer + Send + 'static>(
};
resp.write(&mut sock).unwrap();
}
type Job = Box<dyn FnOnce() + Send + 'static>;
struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
impl ThreadPool {
fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for _ in 0..size {
workers.push(Worker::new(Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
fn join(self) {
for ele in self.workers.into_iter() {
ele.thread.join().unwrap();
}
}
fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
while let Ok(job) = receiver.lock().unwrap().recv() {
job();
}
});
Worker { thread }
}
}

View File

@ -1,9 +1,18 @@
use ezhttp::{Headers, HttpRequest, HttpResponse, HttpServer};
use ezhttp::{Headers, HttpRequest, HttpResponse, HttpServer, HttpServerStarter};
use std::time::Duration;
struct EzSite {
index_page: String,
}
impl EzSite {
fn new(index_page: &str) -> Self {
EzSite {
index_page: index_page.to_string(),
}
}
}
impl HttpServer for EzSite {
async fn on_request(&mut self, req: &HttpRequest) -> Option<HttpResponse> {
println!("{} > {} {}", req.addr, req.method, req.page);
@ -15,7 +24,7 @@ impl HttpServer for EzSite {
&self.index_page, // response body
))
} else {
None // just shutdown socket
None // close connection
}
}
@ -28,17 +37,13 @@ impl HttpServer for EzSite {
}
}
impl EzSite {
fn new(index_page: &str) -> Self {
EzSite {
index_page: index_page.to_string(),
}
}
}
fn main() {
let site = EzSite::new("Hello World!");
let host = "localhost:8080";
ezhttp::start_server(site, host).unwrap();
HttpServerStarter::new(site, host)
.timeout(Some(Duration::from_secs(5))) // read & write timeout
.threads(5) // threadpool size
.start()
.expect("http server error");
}