no mutable refs in http server

This commit is contained in:
MeexReay 2024-08-29 06:00:53 +03:00
parent 1ae9a50c8b
commit 8be5a692a7
5 changed files with 31 additions and 48 deletions

View File

@ -31,11 +31,11 @@ impl HttpServer for EzSite {
} }
} }
async fn on_start(&mut self, _: &str) { async fn on_start(&self, _: &str) {
// println!("Http server started on {}", host); // println!("Http server started on {}", host);
} }
async fn on_close(&mut self) { async fn on_close(&self) {
// println!("Http server closed"); // println!("Http server closed");
} }
} }

View File

@ -54,11 +54,11 @@ impl HttpServer for EzSite {
} }
} }
async fn on_start(&mut self, host: &str) { async fn on_start(&self, host: &str) {
println!("Http server started on {}", host); println!("Http server started on {}", host);
} }
async fn on_close(&mut self) { async fn on_close(&self) {
println!("Http server closed"); println!("Http server closed");
} }
} }

View File

@ -1,18 +1,18 @@
use super::{HttpRequest, HttpServer, Stream}; use super::{HttpRequest, HttpServer, Stream};
use std::{future::Future, pin::Pin, sync::Arc}; use std::{future::Future, pin::Pin, sync::Arc};
use tokio::{net::TcpStream, sync::RwLock}; use tokio::net::TcpStream;
use tokio_io_timeout::TimeoutStream; use tokio_io_timeout::TimeoutStream;
#[cfg(feature = "http_rrs")] #[cfg(feature = "http_rrs")]
use {super::read_line_lf, std::net::{ToSocketAddrs, SocketAddr}}; use {super::read_line_lf, std::net::{ToSocketAddrs, SocketAddr}};
pub type Handler<T> = Box<dyn Fn(Arc<RwLock<T>>, TimeoutStream<TcpStream>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>; pub type Handler<T> = Box<dyn Fn(Arc<T>, TimeoutStream<TcpStream>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
/// Default connection handler /// Default connection handler
/// Turns input to request and response to output /// Turns input to request and response to output
pub async fn handler_connection<S: HttpServer + Send + 'static + Sync>( pub async fn handler_connection<S: HttpServer + Send + 'static + Sync>(
server: Arc<RwLock<S>>, server: Arc<S>,
mut sock: Stream mut sock: Stream
) { ) {
let Ok(addr) = sock.get_ref().peer_addr() else { return; }; let Ok(addr) = sock.get_ref().peer_addr() else { return; };
@ -20,27 +20,22 @@ pub async fn handler_connection<S: HttpServer + Send + 'static + Sync>(
let req = match HttpRequest::read(sock.get_mut(), &addr).await { let req = match HttpRequest::read(sock.get_mut(), &addr).await {
Ok(i) => i, Ok(i) => i,
Err(e) => { Err(e) => {
server.write().await.on_error(e).await; server.on_error(e).await;
return; return;
} }
}; };
let resp = match server.read().await.on_request(&req).await { let resp = match server.on_request(&req).await {
Some(i) => i, Some(i) => i,
None => { None => {
match server.write().await.on_request_mut(&req).await { return;
Some(i) => i,
None => {
return;
}
}
} }
}; };
match resp.write(sock.get_mut()).await { match resp.write(sock.get_mut()).await {
Ok(_) => {}, Ok(_) => {},
Err(e) => { Err(e) => {
server.write().await.on_error(e).await; server.on_error(e).await;
return; return;
}, },
} }
@ -56,13 +51,13 @@ macro_rules! pin_handler {
#[cfg(feature = "http_rrs")] #[cfg(feature = "http_rrs")]
/// HTTP_RRS handler /// HTTP_RRS handler
pub async fn handler_http_rrs<S: HttpServer + Send + 'static + Sync>( pub async fn handler_http_rrs<S: HttpServer + Send + 'static + Sync>(
server: Arc<RwLock<S>>, server: Arc<S>,
mut sock: Stream, mut sock: Stream,
) { ) {
let addr = match read_line_lf(sock.get_mut()).await { let addr = match read_line_lf(sock.get_mut()).await {
Ok(i) => i, Ok(i) => i,
Err(e) => { Err(e) => {
server.write().await.on_error(e).await; server.on_error(e).await;
return; return;
} }
} }
@ -73,27 +68,22 @@ pub async fn handler_http_rrs<S: HttpServer + Send + 'static + Sync>(
let req = match HttpRequest::read(sock.get_mut(), &addr).await { let req = match HttpRequest::read(sock.get_mut(), &addr).await {
Ok(i) => i, Ok(i) => i,
Err(e) => { Err(e) => {
server.write().await.on_error(e).await; server.on_error(e).await;
return; return;
} }
}; };
let resp = match server.read().await.on_request(&req).await { let resp = match server.on_request(&req).await {
Some(i) => i, Some(i) => i,
None => { None => {
match server.write().await.on_request_mut(&req).await { return;
Some(i) => i,
None => {
return;
}
}
} }
}; };
match resp.write(sock.get_mut()).await { match resp.write(sock.get_mut()).await {
Ok(_) => {}, Ok(_) => {},
Err(e) => { Err(e) => {
server.write().await.on_error(e).await; server.on_error(e).await;
return; return;
}, },
} }

View File

@ -10,7 +10,6 @@ use std::{
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use rusty_pool::ThreadPool; use rusty_pool::ThreadPool;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::sync::RwLock;
use tokio_io_timeout::TimeoutStream; use tokio_io_timeout::TimeoutStream;
pub mod error; pub mod error;
@ -63,20 +62,14 @@ pub type Stream = TimeoutStream<TcpStream>;
/// Async http server trait /// Async http server trait
pub trait HttpServer { pub trait HttpServer {
fn on_start(&mut self, host: &str) -> impl Future<Output = ()> + Send; fn on_start(&self, host: &str) -> impl Future<Output = ()> + Send;
fn on_close(&mut self) -> impl Future<Output = ()> + Send; fn on_close(&self) -> impl Future<Output = ()> + Send;
fn on_request( fn on_request(
&self, &self,
req: &HttpRequest, req: &HttpRequest,
) -> impl Future<Output = Option<HttpResponse>> + Send; ) -> impl Future<Output = Option<HttpResponse>> + Send;
fn on_request_mut(
&mut self,
_: &HttpRequest,
) -> impl Future<Output = Option<HttpResponse>> + Send {
async { None }
}
fn on_error( fn on_error(
&mut self, &self,
_: HttpError _: HttpError
) -> impl Future<Output = ()> + Send { ) -> impl Future<Output = ()> + Send {
async {} async {}
@ -95,12 +88,12 @@ where
T: HttpServer + Send + 'static, T: HttpServer + Send + 'static,
{ {
let threadpool = ThreadPool::new(threads, threads * 10, Duration::from_secs(60)); let threadpool = ThreadPool::new(threads, threads * 10, Duration::from_secs(60));
let server = Arc::new(RwLock::new(server)); let server = Arc::new(server);
let listener = TcpListener::bind(host).await?; let listener = TcpListener::bind(host).await?;
let host_clone = String::from(host).clone(); let host_clone = String::from(host).clone();
let server_clone = server.clone(); let server_clone = server.clone();
server_clone.write().await.on_start(&host_clone).await; server_clone.on_start(&host_clone).await;
while running.load(Ordering::Acquire) { while running.load(Ordering::Acquire) {
let Ok((sock, _)) = listener.accept().await else { continue; }; let Ok((sock, _)) = listener.accept().await else { continue; };
@ -116,7 +109,7 @@ where
threadpool.join(); threadpool.join();
server.write().await.on_close().await; server.on_close().await;
Ok(()) Ok(())
} }
@ -131,12 +124,12 @@ async fn start_server_new_thread<T>(
where where
T: HttpServer + Send + 'static, T: HttpServer + Send + 'static,
{ {
let server = Arc::new(RwLock::new(server)); let server = Arc::new(server);
let listener = TcpListener::bind(host).await?; let listener = TcpListener::bind(host).await?;
let host_clone = String::from(host).clone(); let host_clone = String::from(host).clone();
let server_clone = server.clone(); let server_clone = server.clone();
server_clone.write().await.on_start(&host_clone).await; server_clone.on_start(&host_clone).await;
while running.load(Ordering::Acquire) { while running.load(Ordering::Acquire) {
let Ok((sock, _)) = listener.accept().await else { continue; }; let Ok((sock, _)) = listener.accept().await else { continue; };
@ -150,7 +143,7 @@ where
tokio::spawn((&handler)(now_server, sock)); tokio::spawn((&handler)(now_server, sock));
} }
server.write().await.on_close().await; server.on_close().await;
Ok(()) Ok(())
} }
@ -165,12 +158,12 @@ async fn start_server_sync<T>(
where where
T: HttpServer + Send + 'static, T: HttpServer + Send + 'static,
{ {
let server = Arc::new(RwLock::new(server)); let server = Arc::new(server);
let listener = TcpListener::bind(host).await?; let listener = TcpListener::bind(host).await?;
let host_clone = String::from(host).clone(); let host_clone = String::from(host).clone();
let server_clone = server.clone(); let server_clone = server.clone();
server_clone.write().await.on_start(&host_clone).await; server_clone.on_start(&host_clone).await;
while running.load(Ordering::Acquire) { while running.load(Ordering::Acquire) {
let Ok((sock, _)) = listener.accept().await else { continue; }; let Ok((sock, _)) = listener.accept().await else { continue; };
@ -184,7 +177,7 @@ where
handler(now_server, sock).await; handler(now_server, sock).await;
} }
server.write().await.on_close().await; server.on_close().await;
Ok(()) Ok(())
} }

View File

@ -28,11 +28,11 @@ impl HttpServer for EzSite {
} }
} }
async fn on_start(&mut self, host: &str) { async fn on_start(&self, host: &str) {
println!("Http server started on {}", host); println!("Http server started on {}", host);
} }
async fn on_close(&mut self) { async fn on_close(&self) {
println!("Http server closed"); println!("Http server closed");
} }
} }