async rewrite x2

This commit is contained in:
MeexReay 2024-08-29 02:15:36 +03:00
parent 0d5dd9281b
commit 5060e811a1
8 changed files with 241 additions and 250 deletions

15
Cargo.lock generated
View File

@ -87,6 +87,7 @@ dependencies = [
"rusty_pool", "rusty_pool",
"serde_json", "serde_json",
"tokio", "tokio",
"tokio-io-timeout",
"urlencoding", "urlencoding",
] ]
@ -374,9 +375,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_json" name = "serde_json"
version = "1.0.125" version = "1.0.127"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83c8e735a073ccf5be70aa8066aa984eaf2fa000db6c8d0100ae605b366d31ed" checksum = "8043c06d9f82bd7271361ed64f415fe5e12a77fdb52e573e7f06a516dea329ad"
dependencies = [ dependencies = [
"itoa", "itoa",
"memchr", "memchr",
@ -453,6 +454,16 @@ dependencies = [
"windows-sys", "windows-sys",
] ]
[[package]]
name = "tokio-io-timeout"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf"
dependencies = [
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "tokio-macros" name = "tokio-macros"
version = "2.4.0" version = "2.4.0"

View File

@ -11,6 +11,10 @@ keywords = ["http", "server", "site", "async"]
[dependencies] [dependencies]
urlencoding = "2.1.3" urlencoding = "2.1.3"
serde_json = "1.0.125" serde_json = "1.0.127"
tokio = { version = "1.39.3", features = ["full"] } tokio = { version = "1.39.3", features = ["full"] }
rusty_pool = "0.7.0" rusty_pool = "0.7.0"
tokio-io-timeout = "1.2.0"
[features]
http_rrs = []

View File

@ -13,6 +13,7 @@ pub enum HttpError {
WriteHeadError, WriteHeadError,
WriteBodyError, WriteBodyError,
InvalidStatus, InvalidStatus,
RequstError
} }
impl std::fmt::Display for HttpError { impl std::fmt::Display for HttpError {

86
src/ezhttp/handler.rs Normal file
View File

@ -0,0 +1,86 @@
use super::{HttpError, HttpRequest, HttpServer, Stream};
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::{net::TcpStream, sync::Mutex};
use tokio_io_timeout::TimeoutStream;
#[cfg(feature = "http_rrs")]
use {super::read_line_lf, std::net::{ToSocketAddrs, SocketAddr}};
pub type Handler<T> = Box<dyn Fn(Arc<Mutex<T>>, TimeoutStream<TcpStream>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
/// Default connection handler
/// Turns input to request and response to output
pub async fn handler_connection<S: HttpServer + Send + 'static>(
server: Arc<Mutex<S>>,
mut sock: Stream
) {
let Ok(addr) = sock.get_ref().peer_addr() else { return; };
let req = match HttpRequest::read(sock.get_mut(), &addr).await {
Ok(i) => i,
Err(e) => {
server.lock().await.on_error(e).await;
return;
}
};
let resp = match server.lock().await.on_request(&req).await {
Some(i) => i,
None => {
server.lock().await.on_error(HttpError::RequstError).await;
return;
}
};
match resp.write(sock.get_mut()).await {
Ok(_) => {},
Err(e) => {
server.lock().await.on_error(e).await;
return;
},
}
}
#[cfg(feature = "http_rrs")]
/// HTTP_RRS handler
pub async fn handler_http_rrs<S: HttpServer + Send + 'static>(
server: Arc<Mutex<S>>,
mut sock: Stream,
) {
let addr = match read_line_lf(sock.get_mut()).await {
Ok(i) => i,
Err(e) => {
server.lock().await.on_error(e).await;
return;
}
}
.to_socket_addrs()
.unwrap()
.collect::<Vec<SocketAddr>>()[0];
let req = match HttpRequest::read(sock.get_mut(), &addr).await {
Ok(i) => i,
Err(e) => {
server.lock().await.on_error(e).await;
return;
}
};
let resp = match server.lock().await.on_request(&req).await {
Some(i) => i,
None => {
server.lock().await.on_error(HttpError::RequstError).await;
return;
}
};
match resp.write(sock.get_mut()).await {
Ok(_) => {},
Err(e) => {
server.lock().await.on_error(e).await;
return;
},
}
}

View File

@ -1,94 +1,64 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::{ use std::{
boxed::Box, boxed::Box,
error::Error, error::Error,
future::Future, future::Future,
io::Read, sync::Arc,
net::{TcpListener, TcpStream},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration, time::Duration,
}; };
use tokio::io::AsyncReadExt;
use rusty_pool::ThreadPool;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::Mutex;
use tokio_io_timeout::TimeoutStream;
pub mod error; pub mod error;
pub mod headers; pub mod headers;
pub mod request; pub mod request;
pub mod response; pub mod response;
pub mod starter; pub mod starter;
pub mod handler;
pub use error::*; pub use error::*;
pub use headers::*; pub use headers::*;
pub use request::*; pub use request::*;
pub use response::*; pub use response::*;
use rusty_pool::ThreadPool;
pub use starter::*; pub use starter::*;
use tokio::sync::Mutex; pub use handler::*;
fn read_line(data: &mut impl Read) -> Result<String, HttpError> {
let mut bytes = Vec::new();
for byte in data.bytes() {
let byte = match byte {
Ok(i) => i,
Err(_) => return Err(HttpError::ReadLineEof),
};
bytes.push(byte); async fn read_line(data: &mut (impl AsyncReadExt + Unpin)) -> Result<String, HttpError> {
let mut line = Vec::new();
if byte == 0x0A { loop {
let mut buffer = vec![0;1];
data.read_exact(&mut buffer).await.or(Err(HttpError::ReadLineEof))?;
let char = buffer[0];
line.push(char);
if char == 0x0a {
break; break;
} }
} }
String::from_utf8(line).or(Err(HttpError::ReadLineUnknown))
match String::from_utf8(bytes) {
Ok(i) => Ok(i),
Err(_) => Err(HttpError::ReadLineUnknown),
}
} }
fn read_line_crlf(data: &mut impl Read) -> Result<String, HttpError> { async fn read_line_crlf(data: &mut (impl AsyncReadExt + Unpin)) -> Result<String, HttpError> {
match read_line(data) { match read_line(data).await {
Ok(i) => Ok(i[..i.len() - 2].to_string()), Ok(i) => Ok(i[..i.len() - 2].to_string()),
Err(e) => Err(e), Err(e) => Err(e),
} }
} }
fn read_line_lf(data: &mut impl Read) -> Result<String, HttpError> { #[cfg(feature = "http_rrs")]
match read_line(data) { async fn read_line_lf(data: &mut (impl AsyncReadExt + Unpin)) -> Result<String, HttpError> {
match read_line(data).await {
Ok(i) => Ok(i[..i.len() - 1].to_string()), Ok(i) => Ok(i[..i.len() - 1].to_string()),
Err(e) => Err(e), Err(e) => Err(e),
} }
} }
fn rem_first(value: &str) -> &str { pub type Stream = TimeoutStream<TcpStream>;
let mut chars = value.chars();
chars.next();
chars.as_str()
}
fn split(text: String, delimiter: &str, times: usize) -> Vec<String> {
match times {
0 => text.split(delimiter).map(|v| v.to_string()).collect(),
1 => {
let mut v: Vec<String> = Vec::new();
match text.split_once(delimiter) {
Some(i) => {
v.push(i.0.to_string());
v.push(i.1.to_string());
}
None => {
v.push(text);
}
}
v
}
_ => text
.splitn(times, delimiter)
.map(|v| v.to_string())
.collect(),
}
}
/// Async http server trait /// Async http server trait
pub trait HttpServer { pub trait HttpServer {
@ -98,45 +68,43 @@ pub trait HttpServer {
&mut self, &mut self,
req: &HttpRequest, req: &HttpRequest,
) -> impl Future<Output = Option<HttpResponse>> + Send; ) -> impl Future<Output = Option<HttpResponse>> + Send;
fn on_error(
&mut self,
_: HttpError
) -> impl Future<Output = ()> + Send {
async {}
}
} }
async fn start_server_with_threadpool<S>( async fn start_server_with_threadpool<T>(
server: S, server: T,
host: &str, host: &str,
timeout: Option<Duration>, timeout: Option<Duration>,
threads: usize, threads: usize,
rrs: bool, handler: Handler<T>,
running: Arc<AtomicBool>, running: Arc<AtomicBool>,
) -> Result<(), Box<dyn Error>> ) -> Result<(), Box<dyn Error>>
where where
S: HttpServer + Send + 'static, T: HttpServer + Send + 'static,
{ {
let threadpool = ThreadPool::new(threads, threads * 10, Duration::from_secs(60)); let threadpool = ThreadPool::new(threads, threads * 10, Duration::from_secs(60));
let server = Arc::new(Mutex::new(server)); let server = Arc::new(Mutex::new(server));
let listener = TcpListener::bind(host)?; let listener = TcpListener::bind(host).await?;
let host_clone = String::from(host).clone(); let host_clone = String::from(host).clone();
let server_clone = server.clone(); let server_clone = server.clone();
server_clone.lock().await.on_start(&host_clone).await; server_clone.lock().await.on_start(&host_clone).await;
while running.load(Ordering::Acquire) { while running.load(Ordering::Acquire) {
let (sock, _) = match listener.accept() { let Ok((sock, _)) = listener.accept().await else { continue; };
Ok(i) => i, let mut sock = TimeoutStream::new(sock);
Err(_) => {
continue;
}
};
sock.set_read_timeout(timeout).unwrap(); sock.set_read_timeout(timeout);
sock.set_write_timeout(timeout).unwrap(); sock.set_write_timeout(timeout);
let now_server = Arc::clone(&server); let now_server = Arc::clone(&server);
if !rrs { threadpool.spawn((&handler)(now_server, sock));
threadpool.spawn(handle_connection(now_server, sock));
} else {
threadpool.spawn(handle_connection_rrs(now_server, sock));
}
} }
threadpool.join(); threadpool.join();
@ -146,41 +114,33 @@ where
Ok(()) Ok(())
} }
async fn start_server_new_thread<S>( async fn start_server_new_thread<T>(
server: S, server: T,
host: &str, host: &str,
timeout: Option<Duration>, timeout: Option<Duration>,
rrs: bool, handler: Handler<T>,
running: Arc<AtomicBool>, running: Arc<AtomicBool>,
) -> Result<(), Box<dyn Error>> ) -> Result<(), Box<dyn Error>>
where where
S: HttpServer + Send + 'static, T: HttpServer + Send + 'static,
{ {
let server = Arc::new(Mutex::new(server)); let server = Arc::new(Mutex::new(server));
let listener = TcpListener::bind(host)?; let listener = TcpListener::bind(host).await?;
let host_clone = String::from(host).clone(); let host_clone = String::from(host).clone();
let server_clone = server.clone(); let server_clone = server.clone();
server_clone.lock().await.on_start(&host_clone).await; server_clone.lock().await.on_start(&host_clone).await;
while running.load(Ordering::Acquire) { while running.load(Ordering::Acquire) {
let (sock, _) = match listener.accept() { let Ok((sock, _)) = listener.accept().await else { continue; };
Ok(i) => i, let mut sock = TimeoutStream::new(sock);
Err(_) => {
continue;
}
};
sock.set_read_timeout(timeout).unwrap(); sock.set_read_timeout(timeout);
sock.set_write_timeout(timeout).unwrap(); sock.set_write_timeout(timeout);
let now_server = Arc::clone(&server); let now_server = Arc::clone(&server);
if !rrs { tokio::spawn((&handler)(now_server, sock));
tokio::spawn(handle_connection(now_server, sock));
} else {
tokio::spawn(handle_connection_rrs(now_server, sock));
}
} }
server.lock().await.on_close().await; server.lock().await.on_close().await;
@ -188,41 +148,33 @@ where
Ok(()) Ok(())
} }
async fn start_server_sync<S>( async fn start_server_sync<T>(
server: S, server: T,
host: &str, host: &str,
timeout: Option<Duration>, timeout: Option<Duration>,
rrs: bool, handler: Handler<T>,
running: Arc<AtomicBool>, running: Arc<AtomicBool>,
) -> Result<(), Box<dyn Error>> ) -> Result<(), Box<dyn Error>>
where where
S: HttpServer + Send + 'static, T: HttpServer + Send + 'static,
{ {
let server = Arc::new(Mutex::new(server)); let server = Arc::new(Mutex::new(server));
let listener = TcpListener::bind(host)?; let listener = TcpListener::bind(host).await?;
let host_clone = String::from(host).clone(); let host_clone = String::from(host).clone();
let server_clone = server.clone(); let server_clone = server.clone();
server_clone.lock().await.on_start(&host_clone).await; server_clone.lock().await.on_start(&host_clone).await;
while running.load(Ordering::Acquire) { while running.load(Ordering::Acquire) {
let (sock, _) = match listener.accept() { let Ok((sock, _)) = listener.accept().await else { continue; };
Ok(i) => i, let mut sock = TimeoutStream::new(sock);
Err(_) => {
continue;
}
};
sock.set_read_timeout(timeout).unwrap(); sock.set_read_timeout(timeout);
sock.set_write_timeout(timeout).unwrap(); sock.set_write_timeout(timeout);
let now_server = Arc::clone(&server); let now_server = Arc::clone(&server);
if !rrs { handler(now_server, sock).await;
handle_connection(now_server, sock).await;
} else {
handle_connection_rrs(now_server, sock).await;
}
} }
server.lock().await.on_close().await; server.lock().await.on_close().await;
@ -230,60 +182,18 @@ where
Ok(()) Ok(())
} }
async fn handle_connection<S: HttpServer + Send + 'static>(
server: Arc<Mutex<S>>,
mut sock: TcpStream
) {
let Ok(addr) = sock.peer_addr() else { return; };
let req = match HttpRequest::read(&mut sock, &addr) {
Ok(i) => i,
Err(_) => {
return;
}
};
let resp = match server.lock().await.on_request(&req).await {
Some(i) => i,
None => {
return;
}
};
let _ = resp.write(&mut sock);
}
async fn handle_connection_rrs<S: HttpServer + Send + 'static>(
server: Arc<Mutex<S>>,
mut sock: TcpStream,
) {
let req = match HttpRequest::read_with_rrs(&mut sock) {
Ok(i) => i,
Err(_) => {
return;
}
};
let resp = match server.lock().await.on_request(&req).await {
Some(i) => i,
None => {
return;
}
};
let _ = resp.write(&mut sock);
}
/// Start [`HttpServer`](HttpServer) on some host /// Start [`HttpServer`](HttpServer) on some host
/// ///
/// Use [`HttpServerStarter`](HttpServerStarter) to set more options /// Use [`HttpServerStarter`](HttpServerStarter) to set more options
pub async fn start_server<S: HttpServer + Send + 'static>( pub async fn start_server<T: HttpServer + Send + 'static>(
server: S, server: T,
host: &str host: &str
) -> Result<(), Box<dyn Error>> { ) -> Result<(), Box<dyn Error>> {
start_server_new_thread( start_server_new_thread(
server, server,
host, host,
None, None,
false, Box::new(move |a, b| Box::pin(handler_connection(a, b))),
Arc::new(AtomicBool::new(true)), Arc::new(AtomicBool::new(true)),
).await ).await
} }

View File

@ -1,11 +1,11 @@
use super::{read_line_crlf, read_line_lf, rem_first, split, Headers, HttpError}; use super::{read_line_crlf, Headers, HttpError};
use serde_json::Value; use serde_json::Value;
use std::{ use std::{
fmt::{Debug, Display}, fmt::{Debug, Display},
io::{Read, Write}, net::SocketAddr,
net::{IpAddr, SocketAddr, ToSocketAddrs},
}; };
use tokio::io::{AsyncReadExt, AsyncWriteExt};
/// Http request /// Http request
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -38,39 +38,28 @@ impl HttpRequest {
} }
/// Read http request from stream /// Read http request from stream
pub fn read(data: &mut impl Read, addr: &SocketAddr) -> Result<HttpRequest, HttpError> { pub async fn read(data: &mut (impl AsyncReadExt + Unpin), addr: &SocketAddr) -> Result<HttpRequest, HttpError> {
let octets = match addr.ip() { let ip_str = addr.to_string();
IpAddr::V4(ip) => ip.octets(),
_ => [127, 0, 0, 1],
};
let ip_str = octets[0].to_string() let status: Vec<String> = match read_line_crlf(data).await {
+ "." Ok(i) => {
+ &octets[1].to_string() i.splitn(3, " ")
+ "." .map(|s| s.to_string())
+ &octets[2].to_string() .collect()
+ "."
+ &octets[3].to_string();
let status = split(
match read_line_crlf(data) {
Ok(i) => i,
Err(e) => return Err(e),
}, },
" ", Err(e) => return Err(e),
3, };
);
let method = status[0].clone(); let method = status[0].clone();
let (page, query) = match status[1].split_once("?") { let (page, query) = match status[1].split_once("?") {
Some(i) => (i.0.to_string(), Some(i.1)), Some(i) => (i.0.to_string(), Some(i.1)),
None => (status[1].clone(), None), None => (status[1].to_string(), None),
}; };
let mut headers = Headers::new(); let mut headers = Headers::new();
loop { loop {
let text = match read_line_crlf(data) { let text = match read_line_crlf(data).await {
Ok(i) => i, Ok(i) => i,
Err(_) => return Err(HttpError::InvalidHeaders), Err(_) => return Err(HttpError::InvalidHeaders),
}; };
@ -121,7 +110,7 @@ impl HttpRequest {
let mut buf: Vec<u8> = Vec::new(); let mut buf: Vec<u8> = Vec::new();
buf.resize(content_size - reqdata.len(), 0); buf.resize(content_size - reqdata.len(), 0);
match data.read_exact(&mut buf) { match data.read_exact(&mut buf).await {
Ok(i) => i, Ok(i) => i,
Err(_) => return Err(HttpError::InvalidContent), Err(_) => return Err(HttpError::InvalidContent),
}; };
@ -166,7 +155,7 @@ impl HttpRequest {
} }
"application/x-www-form-urlencoded" => { "application/x-www-form-urlencoded" => {
if body.starts_with("?") { if body.starts_with("?") {
body = rem_first(body.as_str()).to_string() body = body.as_str()[1..].to_string()
} }
for ele in body.split("&") { for ele in body.split("&") {
@ -201,20 +190,6 @@ impl HttpRequest {
}) })
} }
/// Read http request with http_rrs support
pub fn read_with_rrs(data: &mut impl Read) -> Result<HttpRequest, HttpError> {
let addr = match read_line_lf(data) {
Ok(i) => i,
Err(e) => {
return Err(e);
}
}
.to_socket_addrs()
.unwrap()
.collect::<Vec<SocketAddr>>()[0];
HttpRequest::read(data, &addr)
}
/// Set params to query in url /// Set params to query in url
pub fn params_to_page(&mut self) { pub fn params_to_page(&mut self) {
let mut query = String::new(); let mut query = String::new();
@ -246,7 +221,7 @@ impl HttpRequest {
/// Write http request to stream /// Write http request to stream
/// ///
/// [`params`](Self::params) is not written to the stream, you need to use [`params_to_json`](Self::params_to_json) or [`params_to_page`](Self::params_to_page) /// [`params`](Self::params) is not written to the stream, you need to use [`params_to_json`](Self::params_to_json) or [`params_to_page`](Self::params_to_page)
pub fn write(self, data: &mut impl Write) -> Result<(), HttpError> { pub async fn write(self, data: &mut (impl AsyncWriteExt + Unpin)) -> Result<(), HttpError> {
let mut head: String = String::new(); let mut head: String = String::new();
head.push_str(&self.method); head.push_str(&self.method);
head.push_str(" "); head.push_str(" ");
@ -263,13 +238,13 @@ impl HttpRequest {
head.push_str("\r\n"); head.push_str("\r\n");
match data.write_all(head.as_bytes()) { match data.write_all(head.as_bytes()).await {
Ok(i) => i, Ok(i) => i,
Err(_) => return Err(HttpError::WriteHeadError), Err(_) => return Err(HttpError::WriteHeadError),
}; };
if !self.data.is_empty() { if !self.data.is_empty() {
match data.write_all(&self.data) { match data.write_all(&self.data).await {
Ok(i) => i, Ok(i) => i,
Err(_) => return Err(HttpError::WriteBodyError), Err(_) => return Err(HttpError::WriteBodyError),
}; };

View File

@ -1,10 +1,8 @@
use super::{read_line_crlf, Headers, HttpError}; use super::{read_line_crlf, Headers, HttpError};
use serde_json::Value; use serde_json::Value;
use std::{ use tokio::io::{AsyncReadExt, AsyncWriteExt};
fmt::{Debug, Display}, use std::fmt::{Debug, Display};
io::{Read, Write},
};
/// Http response /// Http response
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -61,8 +59,8 @@ impl HttpResponse {
} }
/// Read http response from stream /// Read http response from stream
pub fn read(data: &mut impl Read) -> Result<HttpResponse, HttpError> { pub async fn read(data: &mut (impl AsyncReadExt + Unpin)) -> Result<HttpResponse, HttpError> {
let status = match read_line_crlf(data) { let status = match read_line_crlf(data).await {
Ok(i) => i, Ok(i) => i,
Err(e) => { Err(e) => {
return Err(e); return Err(e);
@ -77,7 +75,7 @@ impl HttpResponse {
let mut headers = Headers::new(); let mut headers = Headers::new();
loop { loop {
let text = match read_line_crlf(data) { let text = match read_line_crlf(data).await {
Ok(i) => i, Ok(i) => i,
Err(_) => return Err(HttpError::InvalidHeaders), Err(_) => return Err(HttpError::InvalidHeaders),
}; };
@ -106,7 +104,7 @@ impl HttpResponse {
let mut buf: Vec<u8> = Vec::new(); let mut buf: Vec<u8> = Vec::new();
buf.resize(content_size - reqdata.len(), 0); buf.resize(content_size - reqdata.len(), 0);
match data.read_exact(&mut buf) { match data.read_exact(&mut buf).await {
Ok(i) => i, Ok(i) => i,
Err(_) => return Err(HttpError::InvalidContent), Err(_) => return Err(HttpError::InvalidContent),
}; };
@ -117,7 +115,7 @@ impl HttpResponse {
loop { loop {
let mut buf: Vec<u8> = vec![0; 1024 * 32]; let mut buf: Vec<u8> = vec![0; 1024 * 32];
let buf_len = match data.read(&mut buf) { let buf_len = match data.read(&mut buf).await {
Ok(i) => i, Ok(i) => i,
Err(_) => { Err(_) => {
break; break;
@ -138,7 +136,7 @@ impl HttpResponse {
} }
/// Write http response to stream /// Write http response to stream
pub fn write(self, data: &mut impl Write) -> Result<(), &str> { pub async fn write(self, data: &mut (impl AsyncWriteExt + Unpin)) -> Result<(), HttpError> {
let mut head: String = String::new(); let mut head: String = String::new();
head.push_str("HTTP/1.1 "); head.push_str("HTTP/1.1 ");
head.push_str(&self.status_code); head.push_str(&self.status_code);
@ -153,14 +151,14 @@ impl HttpResponse {
head.push_str("\r\n"); head.push_str("\r\n");
match data.write_all(head.as_bytes()) { match data.write_all(head.as_bytes()).await {
Ok(i) => i, Ok(i) => i,
Err(_) => return Err("write head error"), Err(_) => return Err(HttpError::WriteHeadError),
}; };
match data.write_all(&self.data) { match data.write_all(&self.data).await {
Ok(i) => i, Ok(i) => i,
Err(_) => return Err("write body error"), Err(_) => return Err(HttpError::WriteHeadError),
}; };
Ok(()) Ok(())

View File

@ -1,28 +1,25 @@
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::sync::Mutex;
use tokio::net::TcpStream;
use tokio_io_timeout::TimeoutStream;
use super::{ use super::{
start_server_new_thread, start_server_sync, start_server_new_thread,
start_server_with_threadpool, HttpServer, start_server_sync,
start_server_with_threadpool,
handler_connection,
Handler,
HttpServer,
}; };
use std::pin::Pin;
use std::{ use std::{
error::Error, error::Error, future::Future, sync::{
sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
Arc, Arc,
}, }, time::Duration
time::Duration,
}; };
/// Http server start builder
pub struct HttpServerStarter<T: HttpServer + Send + 'static> {
http_server: T,
support_http_rrs: bool,
timeout: Option<Duration>,
host: String,
threads: usize,
}
/// Running http server /// Running http server
pub struct RunningHttpServer { pub struct RunningHttpServer {
thread: JoinHandle<()>, thread: JoinHandle<()>,
@ -41,12 +38,21 @@ impl RunningHttpServer {
} }
} }
/// Http server start builder
pub struct HttpServerStarter<T: HttpServer + Send + 'static> {
http_server: T,
handler: Handler<T>,
timeout: Option<Duration>,
host: String,
threads: usize,
}
impl<T: HttpServer + Send + 'static> HttpServerStarter<T> { impl<T: HttpServer + Send + 'static> HttpServerStarter<T> {
/// Create new HttpServerStarter /// Create new HttpServerStarter
pub fn new(http_server: T, host: &str) -> Self { pub fn new(http_server: T, host: &str) -> Self {
HttpServerStarter { HttpServerStarter {
http_server, http_server,
support_http_rrs: false, handler: Box::new(move |a, b| Box::pin(handler_connection(a, b))),
timeout: None, timeout: None,
host: host.to_string(), host: host.to_string(),
threads: 0, threads: 0,
@ -56,25 +62,25 @@ impl<T: HttpServer + Send + 'static> HttpServerStarter<T> {
/// Set http server /// Set http server
pub fn http_server(mut self, http_server: T) -> Self { pub fn http_server(mut self, http_server: T) -> Self {
self.http_server = http_server; self.http_server = http_server;
return self; self
} }
/// Set if http_rrs is supported /// Set if http_rrs is supported
pub fn support_http_rrs(mut self, support_http_rrs: bool) -> Self { pub fn handler(mut self, handler: impl Fn(Arc<Mutex<T>>, TimeoutStream<TcpStream>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static) -> Self {
self.support_http_rrs = support_http_rrs; self.handler = Box::new(move |a, b| Box::pin(handler(a, b)));
return self; self
} }
/// Set timeout for read & write /// Set timeout for read & write
pub fn timeout(mut self, timeout: Option<Duration>) -> Self { pub fn timeout(mut self, timeout: Option<Duration>) -> Self {
self.timeout = timeout; self.timeout = timeout;
return self; self
} }
/// Set host /// Set host
pub fn host(mut self, host: String) -> Self { pub fn host(mut self, host: String) -> Self {
self.host = host; self.host = host;
return self; self
} }
/// Set threads in threadpool and return builder /// Set threads in threadpool and return builder
@ -83,17 +89,17 @@ impl<T: HttpServer + Send + 'static> HttpServerStarter<T> {
/// 1 thread means that all connections are processed in the main thread /// 1 thread means that all connections are processed in the main thread
pub fn threads(mut self, threads: usize) -> Self { pub fn threads(mut self, threads: usize) -> Self {
self.threads = threads; self.threads = threads;
return self; self
} }
/// Get http server /// Get http server
pub fn get_http_server(self) -> T { pub fn get_http_server(&self) -> &T {
self.http_server &self.http_server
} }
/// Get if http_rrs is supported /// Get if http_rrs is supported
pub fn get_support_http_rrs(&self) -> bool { pub fn get_handler(&self) -> &Handler<T> {
self.support_http_rrs &self.handler
} }
/// Get timeout for read & write /// Get timeout for read & write
@ -109,7 +115,7 @@ impl<T: HttpServer + Send + 'static> HttpServerStarter<T> {
/// Get threads in threadpool /// Get threads in threadpool
/// ///
/// 0 threads means that a new thread is created for each connection \ /// 0 threads means that a new thread is created for each connection \
/// 1 thread means that all connections are processed in the main thread /// 1 thread means that all connections are processed in the one thread
pub fn get_threads(&self) -> usize { pub fn get_threads(&self) -> usize {
self.threads self.threads
} }
@ -119,16 +125,16 @@ impl<T: HttpServer + Send + 'static> HttpServerStarter<T> {
let running = Arc::new(AtomicBool::new(true)); let running = Arc::new(AtomicBool::new(true));
if self.threads == 0 { if self.threads == 0 {
start_server_new_thread(self.http_server, &self.host, self.timeout, self.support_http_rrs, running).await start_server_new_thread(self.http_server, &self.host, self.timeout, self.handler, running).await
} else if self.threads == 1 { } else if self.threads == 1 {
start_server_sync(self.http_server, &self.host, self.timeout, self.support_http_rrs, running).await start_server_sync(self.http_server, &self.host, self.timeout, self.handler, running).await
} else { } else {
start_server_with_threadpool( start_server_with_threadpool(
self.http_server, self.http_server,
&self.host, &self.host,
self.timeout, self.timeout,
self.threads, self.threads,
self.support_http_rrs, self.handler,
running, running,
).await ).await
} }
@ -145,7 +151,7 @@ impl<T: HttpServer + Send + 'static> HttpServerStarter<T> {
self.http_server, self.http_server,
&self.host, &self.host,
self.timeout, self.timeout,
self.support_http_rrs, self.handler,
running_clone, running_clone,
).await ).await
.expect("http server error"); .expect("http server error");
@ -156,7 +162,7 @@ impl<T: HttpServer + Send + 'static> HttpServerStarter<T> {
self.http_server, self.http_server,
&self.host, &self.host,
self.timeout, self.timeout,
self.support_http_rrs, self.handler,
running_clone, running_clone,
).await ).await
.expect("http server error"); .expect("http server error");
@ -168,7 +174,7 @@ impl<T: HttpServer + Send + 'static> HttpServerStarter<T> {
&self.host, &self.host,
self.timeout, self.timeout,
self.threads, self.threads,
self.support_http_rrs, self.handler,
running_clone, running_clone,
).await ).await
.expect("http server error") .expect("http server error")