server rewrite

This commit is contained in:
MeexReay 2024-11-27 20:30:05 +03:00
parent 1ae196d322
commit 24db811c57
18 changed files with 947 additions and 786 deletions

115
Cargo.lock generated
View File

@ -45,10 +45,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de"
[[package]]
name = "bytes"
version = "1.7.1"
name = "byteorder"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da"
[[package]]
name = "cc"
@ -85,6 +91,8 @@ name = "ezhttp"
version = "0.1.6"
dependencies = [
"lazy_static",
"mime_guess",
"rand",
"rusty_pool",
"serde_json",
"threadpool",
@ -182,6 +190,17 @@ dependencies = [
"slab",
]
[[package]]
name = "getrandom"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7"
dependencies = [
"cfg-if",
"libc",
"wasi",
]
[[package]]
name = "gimli"
version = "0.29.0"
@ -228,6 +247,22 @@ version = "2.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
[[package]]
name = "mime"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "mime_guess"
version = "2.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e"
dependencies = [
"mime",
"unicase",
]
[[package]]
name = "miniz_oxide"
version = "0.7.4"
@ -303,6 +338,15 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "ppv-lite86"
version = "0.2.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04"
dependencies = [
"zerocopy",
]
[[package]]
name = "proc-macro2"
version = "1.0.85"
@ -321,6 +365,36 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom",
]
[[package]]
name = "redox_syscall"
version = "0.5.3"
@ -383,9 +457,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.128"
version = "1.0.133"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ff5456707a1de34e7e37f2a6fd3d3f808c318259cbd01ab6377795054b483d8"
checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377"
dependencies = [
"itoa",
"memchr",
@ -455,9 +529,9 @@ dependencies = [
[[package]]
name = "tokio"
version = "1.40.0"
version = "1.41.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998"
checksum = "22cfb5bee7a6a52939ca9224d6ac897bb669134078daa8735560897f69de4d33"
dependencies = [
"backtrace",
"bytes",
@ -492,6 +566,12 @@ dependencies = [
"syn",
]
[[package]]
name = "unicase"
version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e51b68083f157f853b6379db119d1c1be0e6e4dec98101079dec41f6f5cf6df"
[[package]]
name = "unicode-ident"
version = "1.0.12"
@ -582,3 +662,24 @@ name = "windows_x86_64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[[package]]
name = "zerocopy"
version = "0.7.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0"
dependencies = [
"byteorder",
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.7.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
dependencies = [
"proc-macro2",
"quote",
"syn",
]

View File

@ -11,12 +11,11 @@ keywords = ["http", "server", "site", "async"]
[dependencies]
urlencoding = "2.1.3"
serde_json = "1.0.128"
tokio = { version = "1.40.0", features = ["full"] }
serde_json = "1.0.133"
tokio = { version = "1.41.1", features = ["full"] }
rusty_pool = "0.7.0"
tokio-io-timeout = "1.2.0"
threadpool = "1.8.1"
lazy_static = "1.5.0"
[features]
flowgate = []
rand = "0.8.5"
mime_guess = "2.0.5"

View File

@ -10,65 +10,43 @@ ezhttp = "0.1.6" # stable
ezhttp = { git = "https://github.com/MeexReay/ezhttp" } # unstable
```
Features:
- http_rrs (adds handler_http_rrs)
## Examples
Hello world example:
```rust
use ezhttp::{Headers, HttpRequest, HttpResponse, HttpServer, HttpServerStarter};
use std::time::Duration;
use ezhttp::prelude::*;
struct EzSite {
index_page: String,
}
impl EzSite {
fn new(index_page: &str) -> Self {
EzSite {
index_page: index_page.to_string(),
}
}
}
struct EzSite(String);
impl HttpServer for EzSite {
async fn on_request(&mut self, req: &HttpRequest) -> Option<HttpResponse> {
println!("{} > {} {}", req.addr, req.method, req.page);
async fn on_request(&self, req: &HttpRequest) -> Option<HttpResponse> {
println!("{} > {} {}", req.addr, req.method, req.url.to_path_string());
if req.page == "/" {
Some(HttpResponse::from_string(
Headers::from(vec![("Content-Type", "text/html")]), // response headers
"200 OK", // response status code
self.index_page.clone(), // response body
if req.url.path == "/" {
Some(HttpResponse::new(
OK, // response status code
Headers::from(vec![ // response headers
("Content-Type", "text/html"), // - content type
("Content-Length", self.0.len().to_string().as_str()) // - content length
]), Body::from_text(&self.0.clone()), // response body
))
} else {
None // close connection
}
}
async fn on_start(&mut self, host: &str) {
async fn on_start(&self, host: &str) {
println!("Http server started on {}", host);
}
async fn on_close(&mut self) {
async fn on_close(&self) {
println!("Http server closed");
}
}
#[tokio::main]
async fn main() {
let site = EzSite::new("Hello World!");
let host = "localhost:8080";
HttpServerStarter::new(site, host)
.timeout(Some(Duration::from_secs(5))) // read & write timeout
.threads(5) // threadpool size
.start_forever()
.await
.expect("http server error");
// ezhttp::start_server(site, host);
start_server(EzSite("Hello World!".to_string()), "localhost:8080").await.expect("http server error");
}
```

View File

@ -1,69 +0,0 @@
use ezhttp::{Headers, HttpRequest, HttpResponse, HttpServer, HttpServerStarter};
use std::{
io::{stdin, stdout, Error, Write},
time::Duration,
};
struct EzSite {
index_page: String,
}
impl EzSite {
fn new(index_page: &str) -> Self {
EzSite {
index_page: index_page.to_string(),
}
}
}
impl HttpServer for EzSite {
async fn on_request(&self, req: &HttpRequest) -> Option<HttpResponse> {
// println!("{} > {} {}", req.addr, req.method, req.page);
if req.page == "/" {
Some(HttpResponse::from_string(
Headers::from(vec![("Content-Type", "text/html")]), // response headers
"200 OK", // response status code
&self.index_page, // response body
))
} else {
None // close connection
}
}
async fn on_start(&self, _: &str) {
// println!("Http server started on {}", host);
}
async fn on_close(&self) {
// println!("Http server closed");
}
}
fn input(prompt: &str) -> Result<String, Error> {
stdout().write_all(prompt.as_bytes())?;
stdout().flush()?;
let mut buf = String::new();
stdin().read_line(&mut buf)?;
Ok(buf)
}
fn main() {
let site_1 = HttpServerStarter::new(EzSite::new("Hello World! site_1"), "localhost:8080")
.timeout(Some(Duration::from_secs(5))) // read & write timeout
.threads(5) // threadpool size
.start();
let site_2 = HttpServerStarter::new(EzSite::new("Hello World! site_2"), "localhost:8081")
.timeout(Some(Duration::from_secs(5))) // read & write timeout
.threads(5) // threadpool size
.start();
input("enter to close site_1").unwrap();
site_1.close();
input("enter to close site_2").unwrap();
site_2.close();
}

View File

@ -1,6 +1,19 @@
use ezhttp::{Headers, HttpRequest, HttpResponse, HttpServer, HttpServerStarter};
use std::time::Duration;
use ezhttp::{
body::Body,
headers::Headers,
request::HttpRequest,
response::{
status_code::{NOT_FOUND, OK},
HttpResponse
},
server::{
starter::HttpServerStarter,
HttpServer
}
};
struct EzSite {
main_page: String,
}
@ -13,23 +26,31 @@ impl EzSite {
}
fn ok_response(&self, content: String) -> HttpResponse {
HttpResponse::from_string(
Headers::from(vec![("Content-Type", "text/html")]),
"200 OK".to_string(),
content,
HttpResponse::new(
OK,
Headers::from(vec![
("Content-Length", content.len().to_string().as_str()),
("Content-Type", "text/html"),
("Connection", "keep-alive"),
]),
Body::from_text(&content),
)
}
fn not_found_response(&self, content: String) -> HttpResponse {
HttpResponse::from_string(
Headers::from(vec![("Content-Type", "text/html")]),
"404 Not Found".to_string(),
content,
HttpResponse::new(
NOT_FOUND,
Headers::from(vec![
("Content-Length", content.len().to_string().as_str()),
("Content-Type", "text/html"),
("Connection", "keep-alive"),
]),
Body::from_text(&content),
)
}
async fn get_main_page(&self, req: &HttpRequest) -> Option<HttpResponse> {
if req.page == "/" {
if req.url.path == "/" {
Some(self.ok_response(self.main_page.clone()))
} else {
None
@ -37,20 +58,20 @@ impl EzSite {
}
async fn get_unknown_page(&self, req: &HttpRequest) -> Option<HttpResponse> {
Some(self.not_found_response(format!("<h1>404 Error</h1>Not Found {}", &req.page)))
Some(self.not_found_response(format!("<h1>404 Error</h1>Not Found {}", &req.url.path)))
}
}
impl HttpServer for EzSite {
async fn on_request(&self, req: &HttpRequest) -> Option<HttpResponse> {
println!("{} > {} {}", req.addr, req.method, req.page);
println!("{} > {} {}", req.addr, req.method, req.url.to_path_string());
if let Some(resp) = self.get_main_page(req).await {
Some(resp)
} else if let Some(resp) = self.get_unknown_page(req).await {
Some(resp)
} else {
None // shutdown socket
None // shutdown connection
}
}
@ -66,7 +87,7 @@ impl HttpServer for EzSite {
#[tokio::main]
async fn main() {
let site = EzSite::new("<h1>Hello World!</h1>");
let host = "localhost:8080";
let host = "localhost:8000";
HttpServerStarter::new(site, host)
.timeout(Some(Duration::from_secs(5)))

34
examples/small_site.rs Normal file
View File

@ -0,0 +1,34 @@
use ezhttp::prelude::*;
struct EzSite(String);
impl HttpServer for EzSite {
async fn on_request(&self, req: &HttpRequest) -> Option<HttpResponse> {
println!("{} > {} {}", req.addr, req.method, req.url.to_path_string());
if req.url.path == "/" {
Some(HttpResponse::new(
OK, // response status code
Headers::from(vec![ // response headers
("Content-Type", "text/html"), // - content type
("Content-Length", self.0.len().to_string().as_str()) // - content length
]), Body::from_text(&self.0.clone()), // response body
))
} else {
None // close connection
}
}
async fn on_start(&self, host: &str) {
println!("Http server started on {}", host);
}
async fn on_close(&self) {
println!("Http server closed");
}
}
#[tokio::main]
async fn main() {
start_server(EzSite("Hello World!".to_string()), "localhost:8080").await.expect("http server error");
}

203
src/ezhttp/body.rs Normal file
View File

@ -0,0 +1,203 @@
use std::{collections::HashMap, path::PathBuf};
use serde_json::Value;
use tokio::{fs, io::{AsyncReadExt, AsyncWriteExt}};
use crate::ezhttp::{split_bytes, split_bytes_once};
use super::{read_line_crlf, headers::Headers, error::HttpError};
#[derive(Debug, Clone)]
pub struct Body {
pub data: Vec<u8>
}
impl Body {
pub fn new(data: Vec<u8>) -> Body {
Body {
data
}
}
pub fn as_bytes(&self) -> Vec<u8> {
self.data.clone()
}
pub fn as_text(&self) -> Option<String> {
String::from_utf8(self.data.clone()).ok()
}
pub fn as_query(&self) -> Option<HashMap<String, String>> {
let mut text = self.as_text()?;
if text.starts_with("?") {
text = text[1..].to_string();
}
Some(HashMap::from_iter(text.split("&").filter_map(|entry| {
let (key, value) = entry.split_once("=").unwrap_or((entry, ""));
Some((urlencoding::decode(key).ok()?.to_string(), urlencoding::decode(value).ok()?.to_string()))
})))
}
pub fn as_json(&self) -> Option<Value> {
serde_json::to_value(self.as_text()?).ok()
}
pub fn from_bytes(bytes: &[u8]) -> Body {
Self::new(bytes.to_vec())
}
pub fn from_text(text: &str) -> Body {
Self::from_bytes(text.as_bytes())
}
pub fn from_query(params: HashMap<String, String>) -> Body {
Self::from_text(&params.iter()
.map(|o|
format!("{}={}",
urlencoding::encode(o.0),
urlencoding::encode(o.1))
)
.collect::<Vec<String>>()
.join("&")
)
}
pub fn from_json(value: Value) -> Body {
Self::from_text(&value.to_string())
}
pub fn from_multipart(parts: Vec<Part>, boundary: String) -> Body {
let mut data: Vec<u8> = Vec::new();
for part in parts {
data.append(&mut b"--".to_vec());
data.append(&mut boundary.as_bytes().to_vec());
data.append(&mut b"\r\nContent-Disposition: form-data; name=\"".to_vec());
data.append(&mut part.name.as_bytes().to_vec());
data.append(&mut b"\"".to_vec());
if let Some(filename) = &part.filename {
data.append(&mut b"; filename=\"".to_vec());
data.append(&mut filename.as_bytes().to_vec());
data.append(&mut b"\"".to_vec());
}
data.append(&mut b"\r\n".to_vec());
if let Some(content_type) = &part.content_type {
data.append(&mut b"Content-Type: ".to_vec());
data.append(&mut content_type.as_bytes().to_vec());
data.append(&mut b"\r\n".to_vec());
}
data.append(&mut b"\r\n".to_vec());
data.append(&mut part.body.as_bytes());
}
data.append(&mut b"--".to_vec());
data.append(&mut boundary.as_bytes().to_vec());
data.append(&mut b"--\r\n".to_vec());
Self::from_bytes(&data)
}
pub fn as_multipart(&self, boundary: String) -> Vec<Part> {
let data = self.as_bytes();
split_bytes(&data, format!("--{boundary}").as_bytes()).iter()
.filter(|o| o != &b"--\r\n\r\n")
.filter_map(|o| {
let (head,body) = split_bytes_once(o, b"\r\n\r\n");
let head = String::from_utf8(head).ok()?;
let head = head.split("\r\n").filter_map(|h| {
let (name, value) = h.split_once(": ")?;
Some((name.to_lowercase(), value.to_string()))
}).collect::<Vec<(String, String)>>();
let content_type = head.iter()
.find(|o| o.0 == "content-type")
.map(|o| o.1.clone());
let (name, filename) = head.iter()
.find(|o| o.0 == "content-disposition")
.map(|o| o.1.split(";").filter(|o| o == &"form-data").map(|s| s.trim().to_string()).collect::<Vec<String>>())
.map(|o| (
o.iter().find(|k| k.starts_with("name=\"")).map(|k| k[6..k.len()-1].to_string()),
o.iter().find(|k| k.starts_with("filename=\"")).map(|k| k[10..k.len()-1].to_string())
))?;
let name = name?;
Some(Part::new(name, Body::from_bytes(&body), filename, content_type))
}).collect::<Vec<Part>>()
}
pub async fn recv(stream: &mut (impl AsyncReadExt + Unpin), headers: &Headers) -> Result<Body, HttpError> {
let mut reqdata: Vec<u8> = Vec::new();
if let Some(content_size) = headers.clone().get("content-length".to_string()) {
let content_size: usize = content_size.parse().map_err(|_| HttpError::InvalidContentSize)?;
reqdata.resize(content_size, 0);
stream.read_exact(&mut reqdata).await.map_err(|_| HttpError::InvalidContent)?;
} else if let Some(transfer_encoding) = headers.clone().get("transfer_encoding".to_string()) {
if transfer_encoding.split(",").map(|o| o.trim()).find(|o| o == &"chunked").is_some() {
loop {
let length = usize::from_str_radix(&read_line_crlf(stream).await?, 16).map_err(|_| HttpError::InvalidContent)?;
if length == 0 { break }
let mut data = vec![0u8; length+2];
stream.read_exact(&mut data).await.map_err(|_| HttpError::InvalidContent)?;
data.truncate(length);
reqdata.append(&mut data);
}
}
}
Ok(Body::from_bytes(&reqdata))
}
pub async fn send(&self, stream: &mut (impl AsyncWriteExt + Unpin)) -> Result<(), HttpError> {
stream.write_all(&self.as_bytes()).await.map_err(|_| HttpError::WriteHeadError)
}
}
impl Default for Body {
fn default() -> Self {
Body {
data: Vec::new()
}
}
}
pub struct Part {
pub name: String,
pub body: Body,
pub filename: Option<String>,
pub content_type: Option<String>
}
impl Part {
pub fn new(
name: String,
body: Body,
filename: Option<String>,
content_type: Option<String>
) -> Part {
Part {
name,
body,
filename,
content_type
}
}
pub fn body(name: String, body: Body) -> Part {
Part {
name,
body,
filename: None,
content_type: None
}
}
pub async fn file(name: String, file: PathBuf) -> Option<Part> {
Some(Part {
name,
body: Body::from_text(&fs::read_to_string(&file).await.ok()?),
filename: Some(file.file_name()?.to_str()?.to_string()),
content_type: mime_guess::from_path(file).first().map(|o| o.to_string())
})
}
}

0
src/ezhttp/client/mod.rs Normal file
View File

View File

@ -13,7 +13,8 @@ pub enum HttpError {
WriteHeadError,
WriteBodyError,
InvalidStatus,
RequstError
RequestError,
UrlError
}
impl std::fmt::Display for HttpError {

View File

@ -1,94 +0,0 @@
use super::{HttpRequest, HttpServer, Stream};
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::net::TcpStream;
use tokio_io_timeout::TimeoutStream;
#[cfg(feature = "flowgate")]
use {super::read_line_lf, std::net::{ToSocketAddrs, SocketAddr}};
pub type Handler<T> = Box<dyn Fn(Arc<T>, TimeoutStream<TcpStream>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
/// Default connection handler
/// Turns input to request and response to output
pub async fn handler_connection<S: HttpServer + Send + 'static + Sync>(
server: Arc<S>,
mut sock: Stream
) {
let Ok(addr) = sock.get_ref().peer_addr() else { return; };
loop {
let req = match HttpRequest::read(sock.get_mut(), &addr).await {
Ok(i) => i,
Err(e) => {
server.on_error(e).await;
return;
}
};
let resp = match server.on_request(&req).await {
Some(i) => i,
None => {
return;
}
};
match resp.write(sock.get_mut()).await {
Ok(_) => {},
Err(e) => {
server.on_error(e).await;
return;
},
}
}
}
#[macro_export]
macro_rules! pin_handler {
($handler: expr) => {
Box::new(move |a, b| Box::pin($handler(a, b)))
};
}
#[cfg(feature = "flowgate")]
/// Flowgate handler
pub async fn handler_flowgate<S: HttpServer + Send + 'static + Sync>(
server: Arc<S>,
mut sock: Stream,
) {
loop {
let addr = match read_line_lf(sock.get_mut()).await {
Ok(i) => i,
Err(e) => {
server.on_error(e).await;
return;
}
}
.to_socket_addrs()
.unwrap()
.collect::<Vec<SocketAddr>>()[0];
let req = match HttpRequest::read(sock.get_mut(), &addr).await {
Ok(i) => i,
Err(e) => {
server.on_error(e).await;
return;
}
};
let resp = match server.on_request(&req).await {
Some(i) => i,
None => {
return;
}
};
match resp.write(sock.get_mut()).await {
Ok(_) => {},
Err(e) => {
server.on_error(e).await;
return;
},
}
}
}

View File

@ -3,6 +3,10 @@ use std::{
fmt::{Debug, Display},
};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use super::{read_line_crlf, error::HttpError};
/// Http headers
#[derive(Clone, Debug)]
pub struct Headers {
@ -93,6 +97,31 @@ impl Headers {
pub fn clear(&mut self) {
self.entries.clear();
}
pub async fn recv(stream: &mut (impl AsyncReadExt + Unpin)) -> Result<Headers, HttpError> {
let mut headers = Headers::new();
loop {
let text = read_line_crlf(stream).await.map_err(|_| HttpError::InvalidHeaders)?;
if text.len() == 0 { break }
let (key, value) = text.split_once(": ").ok_or(HttpError::InvalidHeaders)?;
headers.put(key.to_lowercase(), value.to_string());
}
Ok(headers)
}
pub async fn send(&self, stream: &mut (impl AsyncWriteExt + Unpin)) -> Result<(), HttpError> {
let mut head = String::new();
for (k, v) in self.entries() {
head.push_str(&k);
head.push_str(": ");
head.push_str(&v);
head.push_str("\r\n");
}
stream.write_all(head.as_bytes()).await.map_err(|_| HttpError::WriteHeadError)
}
}
impl Display for Headers {

View File

@ -1,34 +1,90 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::{
boxed::Box,
error::Error,
future::Future,
sync::Arc,
time::Duration,
};
use tokio::io::AsyncReadExt;
use threadpool::ThreadPool;
use tokio::net::{TcpListener, TcpStream};
use tokio::runtime::Runtime;
use tokio_io_timeout::TimeoutStream;
pub mod error;
pub mod headers;
pub mod request;
pub mod response;
pub mod starter;
pub mod handler;
pub mod body;
pub mod server;
pub mod client;
pub use error::*;
pub use headers::*;
pub use request::*;
pub use response::*;
pub use starter::*;
pub use handler::*;
pub mod prelude {
pub use super::*;
pub use super::error::*;
pub use super::headers::*;
pub use super::request::*;
pub use super::response::*;
pub use super::response::status_code::*;
pub use super::body::*;
pub use super::server::*;
pub use super::server::handler::*;
pub use super::server::starter::*;
pub use super::client::*;
}
use crate::pin_handler;
use error::HttpError;
use rand::Rng;
use tokio::{io::AsyncReadExt, net::TcpStream};
use tokio_io_timeout::TimeoutStream;
const CHARS: &str = "qwertyuiopasdfghjklzxcvbnm0123456789QWERTYUIOPASDFGHJKLZXCVBNM'()+_,-./:=?";
pub fn gen_multipart_boundary() -> String {
let range = 20..40;
let length: usize = rand::thread_rng().gen_range(range);
[0..length].iter().map(|_|
String::from(CHARS.chars()
.collect::<Vec<char>>()
.get(rand::thread_rng()
.gen_range(0..CHARS.len())
).unwrap().clone()
)
).collect::<Vec<String>>().join("")
}
fn split_bytes_once(bytes: &[u8], sep: &[u8]) -> (Vec<u8>, Vec<u8>) {
if let Some(index) = bytes.windows(sep.len())
.enumerate()
.filter(|o| o.1 == sep)
.map(|o| o.0)
.next() {
let t = bytes.split_at(index);
(t.0.to_vec(), t.1.split_at(sep.len()).1.to_vec())
} else {
(Vec::from(bytes), Vec::new())
}
}
fn split_bytes(bytes: &[u8], sep: &[u8]) -> Vec<Vec<u8>> {
if bytes.len() >= sep.len() {
let indexes: Vec<usize> = bytes.windows(sep.len())
.enumerate()
.filter(|o| o.1 == sep)
.map(|o| o.0)
.collect();
let mut parts: Vec<Vec<u8>> = Vec::new();
let mut now_part: Vec<u8> = Vec::new();
let mut i = 0usize;
loop {
if i >= bytes.len() {
break;
}
if indexes.contains(&i) {
parts.push(now_part.clone());
now_part.clear();
i += sep.len();
continue;
}
now_part.push(bytes[i]);
i += 1;
}
parts.push(now_part);
parts
} else {
vec![Vec::from(bytes)]
}
}
async fn read_line(data: &mut (impl AsyncReadExt + Unpin)) -> Result<String, HttpError> {
let mut line = Vec::new();
@ -51,157 +107,4 @@ async fn read_line_crlf(data: &mut (impl AsyncReadExt + Unpin)) -> Result<String
}
}
async fn read_line_lf(data: &mut (impl AsyncReadExt + Unpin)) -> Result<String, HttpError> {
match read_line(data).await {
Ok(i) => Ok(i[..i.len() - 1].to_string()),
Err(e) => Err(e),
}
}
pub type Stream = TimeoutStream<TcpStream>;
/// Async http server trait
pub trait HttpServer {
fn on_start(&self, host: &str) -> impl Future<Output = ()> + Send;
fn on_close(&self) -> impl Future<Output = ()> + Send;
fn on_request(
&self,
req: &HttpRequest,
) -> impl Future<Output = Option<HttpResponse>> + Send;
fn on_error(
&self,
_: HttpError
) -> impl Future<Output = ()> + Send {
async {}
}
}
async fn start_server_with_threadpool<T>(
server: T,
host: &str,
timeout: Option<Duration>,
threads: usize,
handler: Handler<T>,
running: Arc<AtomicBool>,
) -> Result<(), Box<dyn Error>>
where
T: HttpServer + Send + 'static + Sync,
{
let threadpool = ThreadPool::new(threads);
let server = Arc::new(server);
let listener = TcpListener::bind(host).await?;
let old_handler = handler;
let handler = Arc::new(move |now_server, sock| {
Runtime::new().unwrap().block_on(old_handler(now_server, sock));
});
let host_clone = String::from(host).clone();
let server_clone = server.clone();
server_clone.on_start(&host_clone).await;
while running.load(Ordering::Acquire) {
let Ok((sock, _)) = listener.accept().await else { continue; };
let mut sock = TimeoutStream::new(sock);
sock.set_read_timeout(timeout);
sock.set_write_timeout(timeout);
let now_server = Arc::clone(&server);
let now_handler = Arc::clone(&handler);
threadpool.execute(move || {
(now_handler)(now_server, sock);
});
}
threadpool.join();
server.on_close().await;
Ok(())
}
async fn start_server_new_thread<T>(
server: T,
host: &str,
timeout: Option<Duration>,
handler: Handler<T>,
running: Arc<AtomicBool>,
) -> Result<(), Box<dyn Error>>
where
T: HttpServer + Send + 'static,
{
let server = Arc::new(server);
let listener = TcpListener::bind(host).await?;
let host_clone = String::from(host).clone();
let server_clone = server.clone();
server_clone.on_start(&host_clone).await;
while running.load(Ordering::Acquire) {
let Ok((sock, _)) = listener.accept().await else { continue; };
let mut sock = TimeoutStream::new(sock);
sock.set_read_timeout(timeout);
sock.set_write_timeout(timeout);
let now_server = Arc::clone(&server);
tokio::spawn((&handler)(now_server, sock));
}
server.on_close().await;
Ok(())
}
async fn start_server_sync<T>(
server: T,
host: &str,
timeout: Option<Duration>,
handler: Handler<T>,
running: Arc<AtomicBool>,
) -> Result<(), Box<dyn Error>>
where
T: HttpServer + Send + 'static,
{
let server = Arc::new(server);
let listener = TcpListener::bind(host).await?;
let host_clone = String::from(host).clone();
let server_clone = server.clone();
server_clone.on_start(&host_clone).await;
while running.load(Ordering::Acquire) {
let Ok((sock, _)) = listener.accept().await else { continue; };
let mut sock = TimeoutStream::new(sock);
sock.set_read_timeout(timeout);
sock.set_write_timeout(timeout);
let now_server = Arc::clone(&server);
handler(now_server, sock).await;
}
server.on_close().await;
Ok(())
}
/// Start [`HttpServer`](HttpServer) on some host
///
/// Use [`HttpServerStarter`](HttpServerStarter) to set more options
pub async fn start_server<T: HttpServer + Send + 'static + Sync>(
server: T,
host: &str
) -> Result<(), Box<dyn Error>> {
start_server_new_thread(
server,
host,
None,
pin_handler!(handler_connection),
Arc::new(AtomicBool::new(true)),
).await
}

View File

@ -1,21 +1,120 @@
use super::{read_line_crlf, Headers, HttpError};
use super::{body::{Body, Part}, gen_multipart_boundary, read_line_crlf, headers::Headers, HttpError};
use serde_json::Value;
use std::{
fmt::{Debug, Display},
net::SocketAddr,
collections::HashMap, fmt::{Debug, Display}, net::SocketAddr, str::FromStr
};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[derive(Clone, Debug)]
pub struct URL {
pub path: String,
pub domain: String,
pub anchor: Option<String>,
pub query: HashMap<String, String>,
pub scheme: String,
pub port: u16
}
impl URL {
pub fn new(
domain: String,
port: u16,
path: String,
anchor: Option<String>,
query: HashMap<String, String>,
scheme: String
) -> URL {
URL {
path,
domain,
anchor,
query,
scheme,
port
}
}
pub fn to_path_string(&self) -> String {
format!("{}{}{}", self.path, if self.query.is_empty() {
String::new()
} else {
"?".to_string()+&self.query.iter().map(|o| {
format!("{}={}", urlencoding::encode(o.0), urlencoding::encode(o.1))
}).collect::<Vec<String>>().join("&")
}, if let Some(anchor) = &self.anchor {
"#".to_string()+anchor
} else {
String::new()
})
}
pub fn from_path_string(s: &str, scheme: String, domain: String, port: u16) -> Option<Self> {
let (s, anchor) = s.split_once("#").unwrap_or((s, ""));
let (path, query) = s.split_once("?").unwrap_or((s, ""));
let anchor = if anchor.is_empty() { None } else { Some(anchor.to_string()) };
let query = if query.is_empty() { HashMap::new() } else { {
HashMap::from_iter(query.split("&").filter_map(|entry| {
let (key, value) = entry.split_once("=").unwrap_or((entry, ""));
Some((urlencoding::decode(key).ok()?.to_string(), urlencoding::decode(value).ok()?.to_string()))
}))
} };
let path = path.to_string();
let scheme = scheme.to_string();
Some(URL { path, domain, anchor, query, scheme, port })
}
}
impl FromStr for URL {
type Err = HttpError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let (scheme, s) = s.split_once("://").ok_or(HttpError::UrlError)?;
let (host, s) = s.split_once("/").unwrap_or((s, ""));
let (domain, port) = host.split_once(":").unwrap_or((host,
if scheme == "http" { "80" }
else if scheme == "https" { "443" }
else { return Err(HttpError::UrlError) }
));
let port = port.parse::<u16>().map_err(|_| HttpError::UrlError)?;
let (s, anchor) = s.split_once("#").unwrap_or((s, ""));
let (path, query) = s.split_once("?").unwrap_or((s, ""));
let anchor = if anchor.is_empty() { None } else { Some(anchor.to_string()) };
let query = if query.is_empty() { HashMap::new() } else { {
HashMap::from_iter(query.split("&").filter_map(|entry| {
let (key, value) = entry.split_once("=").unwrap_or((entry, ""));
Some((urlencoding::decode(key).ok()?.to_string(), urlencoding::decode(value).ok()?.to_string()))
}))
} };
let domain = domain.to_string();
let path = format!("/{path}");
let scheme = scheme.to_string();
Ok(URL { path, domain, anchor, query, scheme, port })
}
}
impl ToString for URL {
fn to_string(&self) -> String {
format!("{}://{}{}", self.scheme, {
if (self.scheme == "http" && self.port != 80) || (self.scheme == "https" && self.port != 443) {
format!("{}:{}", self.domain, self.port)
} else {
self.domain.clone()
}
}, self.to_path_string())
}
}
/// Http request
#[derive(Debug, Clone)]
pub struct HttpRequest {
pub page: String,
pub url: URL,
pub method: String,
pub addr: String,
pub addr: SocketAddr,
pub headers: Headers,
pub params: Value,
pub data: Vec<u8>,
pub body: Body
}
impl Display for HttpRequest {
@ -26,22 +125,25 @@ impl Display for HttpRequest {
impl HttpRequest {
/// Create new http request
pub fn new(page: &str, method: &str, params: Value, headers: Headers, data: Vec<u8>) -> Self {
pub fn new(
url: URL,
method: String,
addr: SocketAddr,
headers: Headers,
body: Body
) -> Self {
HttpRequest {
page: page.to_string(),
method: method.to_string(),
addr: String::new(),
params,
url,
method,
addr,
headers,
data,
body
}
}
/// Read http request from stream
pub async fn read(data: &mut (impl AsyncReadExt + Unpin), addr: &SocketAddr) -> Result<HttpRequest, HttpError> {
let ip_str = addr.to_string();
let status: Vec<String> = match read_line_crlf(data).await {
pub async fn recv(stream: &mut (impl AsyncReadExt + Unpin), addr: &SocketAddr) -> Result<HttpRequest, HttpError> {
let status: Vec<String> = match read_line_crlf(stream).await {
Ok(i) => {
i.splitn(3, " ")
.map(|s| s.to_string())
@ -51,205 +153,59 @@ impl HttpRequest {
};
let method = status[0].clone();
let (page, query) = match status[1].split_once("?") {
Some(i) => (i.0.to_string(), Some(i.1)),
None => (status[1].to_string(), None),
};
let page = status[1].clone();
let mut headers = Headers::new();
let headers = Headers::recv(stream).await?;
let body = Body::recv(stream, &headers).await?;
loop {
let text = match read_line_crlf(data).await {
Ok(i) => i,
Err(_) => return Err(HttpError::InvalidHeaders),
};
if text.len() == 0 {
break;
}
let (key, value) = match text.split_once(": ") {
Some(i) => i,
None => return Err(HttpError::InvalidHeaders),
};
headers.put(key.to_lowercase(), value.to_string());
}
let mut params = serde_json::Map::new();
if let Some(i) = query {
for ele in i.split("&") {
let (k, v) = match ele.split_once("=") {
Some(i) => i,
None => return Err(HttpError::InvalidQuery),
};
params.insert(
match urlencoding::decode(k) {
Ok(i) => i.to_string(),
Err(_) => return Err(HttpError::InvalidQuery),
},
match urlencoding::decode(v) {
Ok(i) => Value::String(i.to_string()),
Err(_) => return Err(HttpError::InvalidQuery),
},
);
}
}
let mut reqdata: Vec<u8> = Vec::new();
if let Some(content_size) = headers.clone().get("content-length".to_string()) {
let content_size: usize = match content_size.parse() {
Ok(i) => i,
Err(_) => return Err(HttpError::InvalidContentSize),
};
if content_size > reqdata.len() {
let mut buf: Vec<u8> = Vec::new();
buf.resize(content_size - reqdata.len(), 0);
match data.read_exact(&mut buf).await {
Ok(i) => i,
Err(_) => return Err(HttpError::InvalidContent),
};
reqdata.append(&mut buf);
}
}
if let Some(content_type) = headers.clone().get("content-type".to_string()) {
let mut body = match String::from_utf8(reqdata.clone()) {
Ok(i) => i,
Err(_) => return Err(HttpError::InvalidContent),
};
match content_type.as_str() {
"application/json" => {
let val: Value = match serde_json::from_str(&body) {
Ok(i) => i,
Err(_) => return Err(HttpError::JsonParseError),
};
if let Value::Object(mut dict) = val {
params.append(&mut dict);
}
}
"multipart/form-data" => {
let boundary = "--".to_string()
+ &content_type[(content_type.find("boundary=").unwrap() + 9)..]
+ "\r\n";
for part in body.split(boundary.as_str()) {
let lines: Vec<&str> = part.split("\r\n").collect();
if lines.len() >= 3 {
if lines[0].starts_with("Content-Disposition: form-data; name=\"") {
let name: &str =
&lines[0]["Content-Disposition: form-data; name=\"".len()..];
let name: &str = &name[..name.len() - 1];
params
.insert(name.to_string(), Value::String(lines[2].to_string()));
}
}
}
}
"application/x-www-form-urlencoded" => {
if body.starts_with("?") {
body = body.as_str()[1..].to_string()
}
for ele in body.split("&") {
let (k, v) = match ele.split_once("=") {
Some(i) => i,
None => return Err(HttpError::InvalidQuery),
};
params.insert(
match urlencoding::decode(k) {
Ok(i) => i.to_string(),
Err(_) => return Err(HttpError::InvalidQuery),
},
match urlencoding::decode(v) {
Ok(i) => Value::String(i.to_string()),
Err(_) => return Err(HttpError::InvalidQuery),
},
);
}
}
_ => {}
}
}
Ok(HttpRequest {
page,
Ok(HttpRequest::new(
URL::from_path_string(
&page,
"http".to_string(),
"localhost".to_string(),
80
).ok_or(HttpError::UrlError)?,
method,
addr: ip_str.to_string(),
params: Value::Object(params),
addr.clone(),
headers,
data: reqdata.clone(),
})
}
/// Set params to query in url
pub fn params_to_page(&mut self) {
let mut query = String::new();
let mut i: bool = !self.page.contains("?");
if let Value::Object(obj) = self.params.clone() {
for (k, v) in obj {
query.push_str(if i { "?" } else { "&" });
query.push_str(urlencoding::encode(k.as_str()).to_string().as_str());
query.push_str("=");
query.push_str(
urlencoding::encode(v.as_str().unwrap())
.to_string()
.as_str(),
);
i = false;
}
}
self.page += query.as_str();
}
/// Set params to json data
pub fn params_to_json(&mut self) {
self.data = Vec::from(self.params.to_string().as_bytes());
body
))
}
/// Write http request to stream
///
/// [`params`](Self::params) is not written to the stream, you need to use [`params_to_json`](Self::params_to_json) or [`params_to_page`](Self::params_to_page)
pub async fn write(self, data: &mut (impl AsyncWriteExt + Unpin)) -> Result<(), HttpError> {
pub async fn send(&self, stream: &mut (impl AsyncWriteExt + Unpin)) -> Result<(), HttpError> {
let mut head: String = String::new();
head.push_str(&self.method);
head.push_str(" ");
head.push_str(&self.page);
head.push_str(&self.url.to_path_string());
head.push_str(" HTTP/1.1");
head.push_str("\r\n");
stream.write_all(head.as_bytes()).await.map_err(|_| HttpError::WriteHeadError)?;
for (k, v) in self.headers.entries() {
head.push_str(&k);
head.push_str(": ");
head.push_str(&v);
head.push_str("\r\n");
}
self.headers.send(stream).await?;
head.push_str("\r\n");
stream.write_all(b"\r\n").await.map_err(|_| HttpError::WriteHeadError)?;
match data.write_all(head.as_bytes()).await {
Ok(i) => i,
Err(_) => return Err(HttpError::WriteHeadError),
};
if !self.data.is_empty() {
match data.write_all(&self.data).await {
Ok(i) => i,
Err(_) => return Err(HttpError::WriteBodyError),
};
}
self.body.send(stream).await?;
Ok(())
}
pub fn get_multipart(&self) -> Option<Vec<Part>> {
let boundary = self.headers.get("content-type")?
.split(";")
.map(|o| o.trim())
.find(|o| o.starts_with("boundary="))
.map(|o| o[9..].to_string())?;
Some(self.body.as_multipart(boundary))
}
pub fn set_multipart(&mut self, parts: Vec<Part>) -> Option<()> {
let boundary = gen_multipart_boundary();
self.headers.put("Content-Type", format!("multipart/form-data; boundary={}", boundary.clone()));
self.body = Body::from_multipart(parts, boundary);
Some(())
}
}

View File

@ -1,15 +1,19 @@
use super::{read_line_crlf, Headers, HttpError};
use super::{body::{Body, Part}, gen_multipart_boundary, read_line_crlf, headers::Headers, HttpError};
use serde_json::Value;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::fmt::{Debug, Display};
pub mod status_code {
pub const OK: &str = "200 OK";
pub const NOT_FOUND: &str = "404 Not Found";
}
/// Http response
#[derive(Debug, Clone)]
pub struct HttpResponse {
pub headers: Headers,
pub status_code: String,
pub data: Vec<u8>,
pub headers: Headers,
pub body: Body,
}
impl Display for HttpResponse {
@ -19,148 +23,68 @@ impl Display for HttpResponse {
}
impl HttpResponse {
/// Create new http response with empty headers and data and a 200 OK status code
pub fn new() -> Self {
Self::from_bytes(Headers::new(), "200 OK", Vec::new())
}
/// Create new http response from headers, bytes data, and status code
pub fn from_bytes(headers: Headers, status_code: impl ToString, data: Vec<u8>) -> Self {
pub fn new(
status_code: &str,
headers: Headers,
body: Body
) -> Self {
HttpResponse {
headers,
data,
status_code: status_code.to_string(),
}
}
/// Create new http response from headers, string data, and status code
pub fn from_string(headers: Headers, status_code: impl ToString, data: impl ToString) -> Self {
HttpResponse {
headers,
data: data.to_string().into_bytes(),
status_code: status_code.to_string(),
}
}
/// Get data in UTF-8
pub fn get_text(self) -> String {
match String::from_utf8(self.data) {
Ok(i) => i,
Err(_) => String::new(),
}
}
/// Get json [`Value`](Value) from data
pub fn get_json(self) -> Value {
match serde_json::from_str(self.get_text().as_str()) {
Ok(i) => i,
Err(_) => Value::Null,
body
}
}
/// Read http response from stream
pub async fn read(data: &mut (impl AsyncReadExt + Unpin)) -> Result<HttpResponse, HttpError> {
let status = match read_line_crlf(data).await {
Ok(i) => i,
Err(e) => {
return Err(e);
}
};
pub async fn recv(stream: &mut (impl AsyncReadExt + Unpin)) -> Result<HttpResponse, HttpError> {
let status = read_line_crlf(stream).await?;
let (_, status_code) = match status.split_once(" ") {
Some(i) => i,
None => return Err(HttpError::InvalidStatus),
};
let (_, status_code) = status.split_once(" ").ok_or(HttpError::InvalidStatus)?;
let mut headers = Headers::new();
let headers = Headers::recv(stream).await?;
let body = Body::recv(stream, &headers).await?;
loop {
let text = match read_line_crlf(data).await {
Ok(i) => i,
Err(_) => return Err(HttpError::InvalidHeaders),
};
if text.len() == 0 {
break;
}
let (key, value) = match text.split_once(": ") {
Some(i) => i,
None => return Err(HttpError::InvalidHeaders),
};
headers.put(key.to_lowercase(), value.to_string());
}
let mut reqdata: Vec<u8> = Vec::new();
if let Some(content_size) = headers.clone().get("content-length".to_string()) {
let content_size: usize = match content_size.parse() {
Ok(i) => i,
Err(_) => return Err(HttpError::InvalidContentSize),
};
if content_size > reqdata.len() {
let mut buf: Vec<u8> = Vec::new();
buf.resize(content_size - reqdata.len(), 0);
match data.read_exact(&mut buf).await {
Ok(i) => i,
Err(_) => return Err(HttpError::InvalidContent),
};
reqdata.append(&mut buf);
}
} else {
loop {
let mut buf: Vec<u8> = vec![0; 1024 * 32];
let buf_len = match data.read(&mut buf).await {
Ok(i) => i,
Err(_) => {
break;
}
};
if buf_len == 0 {
break;
}
buf.truncate(buf_len);
reqdata.append(&mut buf);
}
}
Ok(HttpResponse::from_bytes(headers, status_code, reqdata))
Ok(HttpResponse::new(status_code, headers, body))
}
/// Write http response to stream
pub async fn write(self, data: &mut (impl AsyncWriteExt + Unpin)) -> Result<(), HttpError> {
pub async fn send(self, stream: &mut (impl AsyncWriteExt + Unpin)) -> Result<(), HttpError> {
let mut head: String = String::new();
head.push_str("HTTP/1.1 ");
head.push_str(&self.status_code);
head.push_str("\r\n");
stream.write_all(head.as_bytes()).await.map_err(|_| HttpError::WriteHeadError)?;
for (k, v) in self.headers.entries() {
head.push_str(&k);
head.push_str(": ");
head.push_str(&v);
head.push_str("\r\n");
}
self.headers.send(stream).await?;
head.push_str("\r\n");
stream.write_all(b"\r\n").await.map_err(|_| HttpError::WriteHeadError)?;
match data.write_all(head.as_bytes()).await {
Ok(i) => i,
Err(_) => return Err(HttpError::WriteHeadError),
};
match data.write_all(&self.data).await {
Ok(i) => i,
Err(_) => return Err(HttpError::WriteHeadError),
};
self.body.send(stream).await?;
Ok(())
}
pub fn get_multipart(&self) -> Option<Vec<Part>> {
let boundary = self.headers.get("content-type")?
.split(";")
.map(|o| o.trim())
.find(|o| o.starts_with("boundary="))
.map(|o| o[9..].to_string())?;
Some(self.body.as_multipart(boundary))
}
pub fn set_multipart(&mut self, parts: Vec<Part>) -> Option<()> {
let boundary = gen_multipart_boundary();
self.headers.put("Content-Type", format!("multipart/form-data; boundary={}", boundary.clone()));
self.body = Body::from_multipart(parts, boundary);
Some(())
}
}
impl Default for HttpResponse {
/// Create new http response with empty headers and data and a 200 OK status code
fn default() -> Self {
Self::new("200 OK", Headers::new(), Body::default())
}
}

View File

@ -0,0 +1,54 @@
use super::{
HttpServer,
super::{
Stream,
request::HttpRequest
}
};
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::net::TcpStream;
use tokio_io_timeout::TimeoutStream;
pub type Handler<T> = Box<dyn Fn(Arc<T>, TimeoutStream<TcpStream>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
/// Default connection handler
/// Turns input to request and response to output
pub async fn handler_connection<S: HttpServer + Send + 'static + Sync>(
server: Arc<S>,
mut sock: Stream
) {
let Ok(addr) = sock.get_ref().peer_addr() else { return; };
loop {
let req = match HttpRequest::recv(sock.get_mut(), &addr).await {
Ok(i) => i,
Err(e) => {
server.on_error(e).await;
return;
}
};
let resp = match server.on_request(&req).await {
Some(i) => i,
None => {
return;
}
};
match resp.send(sock.get_mut()).await {
Ok(_) => {},
Err(e) => {
server.on_error(e).await;
return;
},
}
}
}
#[macro_export]
macro_rules! pin_handler {
($handler: expr) => {
Box::new(move |a, b| Box::pin($handler(a, b)))
};
}

170
src/ezhttp/server/mod.rs Normal file
View File

@ -0,0 +1,170 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::{
boxed::Box,
error::Error,
future::Future,
sync::Arc,
time::Duration,
};
use threadpool::ThreadPool;
use tokio::net::TcpListener;
use tokio::runtime::Runtime;
use tokio_io_timeout::TimeoutStream;
use crate::pin_handler;
use super::error::HttpError;
use super::request::HttpRequest;
use super::response::HttpResponse;
pub mod handler;
pub mod starter;
use handler::{handler_connection, Handler};
/// Async http server trait
pub trait HttpServer {
fn on_start(&self, host: &str) -> impl Future<Output = ()> + Send;
fn on_close(&self) -> impl Future<Output = ()> + Send;
fn on_request(
&self,
req: &HttpRequest,
) -> impl Future<Output = Option<HttpResponse>> + Send;
fn on_error(
&self,
_: HttpError
) -> impl Future<Output = ()> + Send {
async {}
}
}
async fn start_server_with_threadpool<T>(
server: T,
host: &str,
timeout: Option<Duration>,
threads: usize,
handler: Handler<T>,
running: Arc<AtomicBool>,
) -> Result<(), Box<dyn Error>>
where
T: HttpServer + Send + 'static + Sync,
{
let threadpool = ThreadPool::new(threads);
let server = Arc::new(server);
let listener = TcpListener::bind(host).await?;
let old_handler = handler;
let handler = Arc::new(move |now_server, sock| {
Runtime::new().unwrap().block_on(old_handler(now_server, sock));
});
let host_clone = String::from(host).clone();
let server_clone = server.clone();
server_clone.on_start(&host_clone).await;
while running.load(Ordering::Acquire) {
let Ok((sock, _)) = listener.accept().await else { continue; };
let mut sock = TimeoutStream::new(sock);
sock.set_read_timeout(timeout);
sock.set_write_timeout(timeout);
let now_server = Arc::clone(&server);
let now_handler = Arc::clone(&handler);
threadpool.execute(move || {
(now_handler)(now_server, sock);
});
}
threadpool.join();
server.on_close().await;
Ok(())
}
async fn start_server_new_thread<T>(
server: T,
host: &str,
timeout: Option<Duration>,
handler: Handler<T>,
running: Arc<AtomicBool>,
) -> Result<(), Box<dyn Error>>
where
T: HttpServer + Send + 'static,
{
let server = Arc::new(server);
let listener = TcpListener::bind(host).await?;
let host_clone = String::from(host).clone();
let server_clone = server.clone();
server_clone.on_start(&host_clone).await;
while running.load(Ordering::Acquire) {
let Ok((sock, _)) = listener.accept().await else { continue; };
let mut sock = TimeoutStream::new(sock);
sock.set_read_timeout(timeout);
sock.set_write_timeout(timeout);
let now_server = Arc::clone(&server);
tokio::spawn((&handler)(now_server, sock));
}
server.on_close().await;
Ok(())
}
async fn start_server_sync<T>(
server: T,
host: &str,
timeout: Option<Duration>,
handler: Handler<T>,
running: Arc<AtomicBool>,
) -> Result<(), Box<dyn Error>>
where
T: HttpServer + Send + 'static,
{
let server = Arc::new(server);
let listener = TcpListener::bind(host).await?;
let host_clone = String::from(host).clone();
let server_clone = server.clone();
server_clone.on_start(&host_clone).await;
while running.load(Ordering::Acquire) {
let Ok((sock, _)) = listener.accept().await else { continue; };
let mut sock = TimeoutStream::new(sock);
sock.set_read_timeout(timeout);
sock.set_write_timeout(timeout);
let now_server = Arc::clone(&server);
handler(now_server, sock).await;
}
server.on_close().await;
Ok(())
}
/// Start [`HttpServer`](HttpServer) on some host
///
/// Use [`HttpServerStarter`](HttpServerStarter) to set more options
pub async fn start_server<T: HttpServer + Send + 'static + Sync>(
server: T,
host: &str
) -> Result<(), Box<dyn Error>> {
start_server_new_thread(
server,
host,
None,
pin_handler!(handler_connection),
Arc::new(AtomicBool::new(true)),
).await
}

View File

@ -1,7 +1,11 @@
use tokio::{runtime::Runtime, task::JoinHandle};
use super::{
handler_connection, start_server_new_thread, start_server_sync, start_server_with_threadpool, Handler, HttpServer
start_server_new_thread,
start_server_sync,
start_server_with_threadpool,
handler::{handler_connection, Handler},
HttpServer
};
use crate::pin_handler;

View File

@ -1,53 +0,0 @@
use ezhttp::{Headers, HttpRequest, HttpResponse, HttpServer, HttpServerStarter};
use std::time::Duration;
struct EzSite {
index_page: String,
}
impl EzSite {
fn new(index_page: &str) -> Self {
EzSite {
index_page: index_page.to_string(),
}
}
}
impl HttpServer for EzSite {
async fn on_request(&self, req: &HttpRequest) -> Option<HttpResponse> {
println!("{} > {} {}", req.addr, req.method, req.page);
if req.page == "/" {
Some(HttpResponse::from_string(
Headers::from(vec![("Content-Type", "text/html")]), // response headers
"200 OK", // response status code
self.index_page.clone(), // response body
))
} else {
None // close connection
}
}
async fn on_start(&self, host: &str) {
println!("Http server started on {}", host);
}
async fn on_close(&self) {
println!("Http server closed");
}
}
#[tokio::main]
async fn main() {
let site = EzSite::new("Hello World!");
let host = "localhost:8080";
HttpServerStarter::new(site, host)
.timeout(Some(Duration::from_secs(5))) // read & write timeout
.threads(5) // threadpool size
.start_forever()
.await
.expect("http server error");
// ezhttp::start_server(site, host);
}