From 24db811c5711f5dcc4f77f0931b2cb38015f62f4 Mon Sep 17 00:00:00 2001 From: MeexReay Date: Wed, 27 Nov 2024 20:30:05 +0300 Subject: [PATCH] server rewrite --- Cargo.lock | 115 ++++++++- Cargo.toml | 9 +- README.md | 50 ++-- examples/parallel_sites.rs | 69 ------ examples/simple_site.rs | 49 ++-- examples/small_site.rs | 34 +++ src/ezhttp/body.rs | 203 ++++++++++++++++ src/ezhttp/client/mod.rs | 0 src/ezhttp/error.rs | 3 +- src/ezhttp/handler.rs | 94 -------- src/ezhttp/headers.rs | 29 +++ src/ezhttp/mod.rs | 259 +++++++-------------- src/ezhttp/request.rs | 362 +++++++++++++---------------- src/ezhttp/response.rs | 174 ++++---------- src/ezhttp/server/handler.rs | 54 +++++ src/ezhttp/server/mod.rs | 170 ++++++++++++++ src/ezhttp/{ => server}/starter.rs | 6 +- src/main.rs | 53 ----- 18 files changed, 947 insertions(+), 786 deletions(-) delete mode 100644 examples/parallel_sites.rs create mode 100644 examples/small_site.rs create mode 100644 src/ezhttp/body.rs create mode 100644 src/ezhttp/client/mod.rs delete mode 100644 src/ezhttp/handler.rs create mode 100644 src/ezhttp/server/handler.rs create mode 100644 src/ezhttp/server/mod.rs rename src/ezhttp/{ => server}/starter.rs (97%) delete mode 100644 src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 775ae59..275b98a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -45,10 +45,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" [[package]] -name = "bytes" -version = "1.7.1" +name = "byteorder" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + +[[package]] +name = "bytes" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" [[package]] name = "cc" @@ -85,6 +91,8 @@ name = "ezhttp" version = "0.1.6" dependencies = [ "lazy_static", + "mime_guess", + "rand", "rusty_pool", "serde_json", "threadpool", @@ -182,6 +190,17 @@ dependencies = [ "slab", ] +[[package]] +name = "getrandom" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "gimli" version = "0.29.0" @@ -228,6 +247,22 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + +[[package]] +name = "mime_guess" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "miniz_oxide" version = "0.7.4" @@ -303,6 +338,15 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "ppv-lite86" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" +dependencies = [ + "zerocopy", +] + [[package]] name = "proc-macro2" version = "1.0.85" @@ -321,6 +365,36 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + [[package]] name = "redox_syscall" version = "0.5.3" @@ -383,9 +457,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.128" +version = "1.0.133" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ff5456707a1de34e7e37f2a6fd3d3f808c318259cbd01ab6377795054b483d8" +checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377" dependencies = [ "itoa", "memchr", @@ -455,9 +529,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.40.0" +version = "1.41.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998" +checksum = "22cfb5bee7a6a52939ca9224d6ac897bb669134078daa8735560897f69de4d33" dependencies = [ "backtrace", "bytes", @@ -492,6 +566,12 @@ dependencies = [ "syn", ] +[[package]] +name = "unicase" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e51b68083f157f853b6379db119d1c1be0e6e4dec98101079dec41f6f5cf6df" + [[package]] name = "unicode-ident" version = "1.0.12" @@ -582,3 +662,24 @@ name = "windows_x86_64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "byteorder", + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/Cargo.toml b/Cargo.toml index 3449a4b..8bf3815 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,12 +11,11 @@ keywords = ["http", "server", "site", "async"] [dependencies] urlencoding = "2.1.3" -serde_json = "1.0.128" -tokio = { version = "1.40.0", features = ["full"] } +serde_json = "1.0.133" +tokio = { version = "1.41.1", features = ["full"] } rusty_pool = "0.7.0" tokio-io-timeout = "1.2.0" threadpool = "1.8.1" lazy_static = "1.5.0" - -[features] -flowgate = [] +rand = "0.8.5" +mime_guess = "2.0.5" diff --git a/README.md b/README.md index 75e24b3..db39b8d 100644 --- a/README.md +++ b/README.md @@ -10,65 +10,43 @@ ezhttp = "0.1.6" # stable ezhttp = { git = "https://github.com/MeexReay/ezhttp" } # unstable ``` -Features: -- http_rrs (adds handler_http_rrs) - ## Examples Hello world example: ```rust -use ezhttp::{Headers, HttpRequest, HttpResponse, HttpServer, HttpServerStarter}; -use std::time::Duration; +use ezhttp::prelude::*; -struct EzSite { - index_page: String, -} - -impl EzSite { - fn new(index_page: &str) -> Self { - EzSite { - index_page: index_page.to_string(), - } - } -} +struct EzSite(String); impl HttpServer for EzSite { - async fn on_request(&mut self, req: &HttpRequest) -> Option { - println!("{} > {} {}", req.addr, req.method, req.page); + async fn on_request(&self, req: &HttpRequest) -> Option { + println!("{} > {} {}", req.addr, req.method, req.url.to_path_string()); - if req.page == "/" { - Some(HttpResponse::from_string( - Headers::from(vec![("Content-Type", "text/html")]), // response headers - "200 OK", // response status code - self.index_page.clone(), // response body + if req.url.path == "/" { + Some(HttpResponse::new( + OK, // response status code + Headers::from(vec![ // response headers + ("Content-Type", "text/html"), // - content type + ("Content-Length", self.0.len().to_string().as_str()) // - content length + ]), Body::from_text(&self.0.clone()), // response body )) } else { None // close connection } } - async fn on_start(&mut self, host: &str) { + async fn on_start(&self, host: &str) { println!("Http server started on {}", host); } - async fn on_close(&mut self) { + async fn on_close(&self) { println!("Http server closed"); } } #[tokio::main] async fn main() { - let site = EzSite::new("Hello World!"); - let host = "localhost:8080"; - - HttpServerStarter::new(site, host) - .timeout(Some(Duration::from_secs(5))) // read & write timeout - .threads(5) // threadpool size - .start_forever() - .await - .expect("http server error"); - - // ezhttp::start_server(site, host); + start_server(EzSite("Hello World!".to_string()), "localhost:8080").await.expect("http server error"); } ``` diff --git a/examples/parallel_sites.rs b/examples/parallel_sites.rs deleted file mode 100644 index ea39568..0000000 --- a/examples/parallel_sites.rs +++ /dev/null @@ -1,69 +0,0 @@ -use ezhttp::{Headers, HttpRequest, HttpResponse, HttpServer, HttpServerStarter}; -use std::{ - io::{stdin, stdout, Error, Write}, - time::Duration, -}; - -struct EzSite { - index_page: String, -} - -impl EzSite { - fn new(index_page: &str) -> Self { - EzSite { - index_page: index_page.to_string(), - } - } -} - -impl HttpServer for EzSite { - async fn on_request(&self, req: &HttpRequest) -> Option { - // println!("{} > {} {}", req.addr, req.method, req.page); - - if req.page == "/" { - Some(HttpResponse::from_string( - Headers::from(vec![("Content-Type", "text/html")]), // response headers - "200 OK", // response status code - &self.index_page, // response body - )) - } else { - None // close connection - } - } - - async fn on_start(&self, _: &str) { - // println!("Http server started on {}", host); - } - - async fn on_close(&self) { - // println!("Http server closed"); - } -} - -fn input(prompt: &str) -> Result { - stdout().write_all(prompt.as_bytes())?; - stdout().flush()?; - let mut buf = String::new(); - stdin().read_line(&mut buf)?; - Ok(buf) -} - -fn main() { - let site_1 = HttpServerStarter::new(EzSite::new("Hello World! site_1"), "localhost:8080") - .timeout(Some(Duration::from_secs(5))) // read & write timeout - .threads(5) // threadpool size - .start(); - - let site_2 = HttpServerStarter::new(EzSite::new("Hello World! site_2"), "localhost:8081") - .timeout(Some(Duration::from_secs(5))) // read & write timeout - .threads(5) // threadpool size - .start(); - - input("enter to close site_1").unwrap(); - - site_1.close(); - - input("enter to close site_2").unwrap(); - - site_2.close(); -} diff --git a/examples/simple_site.rs b/examples/simple_site.rs index a3531b8..f0f9f00 100644 --- a/examples/simple_site.rs +++ b/examples/simple_site.rs @@ -1,6 +1,19 @@ -use ezhttp::{Headers, HttpRequest, HttpResponse, HttpServer, HttpServerStarter}; use std::time::Duration; +use ezhttp::{ + body::Body, + headers::Headers, + request::HttpRequest, + response::{ + status_code::{NOT_FOUND, OK}, + HttpResponse + }, + server::{ + starter::HttpServerStarter, + HttpServer + } +}; + struct EzSite { main_page: String, } @@ -13,23 +26,31 @@ impl EzSite { } fn ok_response(&self, content: String) -> HttpResponse { - HttpResponse::from_string( - Headers::from(vec![("Content-Type", "text/html")]), - "200 OK".to_string(), - content, + HttpResponse::new( + OK, + Headers::from(vec![ + ("Content-Length", content.len().to_string().as_str()), + ("Content-Type", "text/html"), + ("Connection", "keep-alive"), + ]), + Body::from_text(&content), ) } fn not_found_response(&self, content: String) -> HttpResponse { - HttpResponse::from_string( - Headers::from(vec![("Content-Type", "text/html")]), - "404 Not Found".to_string(), - content, + HttpResponse::new( + NOT_FOUND, + Headers::from(vec![ + ("Content-Length", content.len().to_string().as_str()), + ("Content-Type", "text/html"), + ("Connection", "keep-alive"), + ]), + Body::from_text(&content), ) } async fn get_main_page(&self, req: &HttpRequest) -> Option { - if req.page == "/" { + if req.url.path == "/" { Some(self.ok_response(self.main_page.clone())) } else { None @@ -37,20 +58,20 @@ impl EzSite { } async fn get_unknown_page(&self, req: &HttpRequest) -> Option { - Some(self.not_found_response(format!("

404 Error

Not Found {}", &req.page))) + Some(self.not_found_response(format!("

404 Error

Not Found {}", &req.url.path))) } } impl HttpServer for EzSite { async fn on_request(&self, req: &HttpRequest) -> Option { - println!("{} > {} {}", req.addr, req.method, req.page); + println!("{} > {} {}", req.addr, req.method, req.url.to_path_string()); if let Some(resp) = self.get_main_page(req).await { Some(resp) } else if let Some(resp) = self.get_unknown_page(req).await { Some(resp) } else { - None // shutdown socket + None // shutdown connection } } @@ -66,7 +87,7 @@ impl HttpServer for EzSite { #[tokio::main] async fn main() { let site = EzSite::new("

Hello World!

"); - let host = "localhost:8080"; + let host = "localhost:8000"; HttpServerStarter::new(site, host) .timeout(Some(Duration::from_secs(5))) diff --git a/examples/small_site.rs b/examples/small_site.rs new file mode 100644 index 0000000..3512152 --- /dev/null +++ b/examples/small_site.rs @@ -0,0 +1,34 @@ +use ezhttp::prelude::*; + +struct EzSite(String); + +impl HttpServer for EzSite { + async fn on_request(&self, req: &HttpRequest) -> Option { + println!("{} > {} {}", req.addr, req.method, req.url.to_path_string()); + + if req.url.path == "/" { + Some(HttpResponse::new( + OK, // response status code + Headers::from(vec![ // response headers + ("Content-Type", "text/html"), // - content type + ("Content-Length", self.0.len().to_string().as_str()) // - content length + ]), Body::from_text(&self.0.clone()), // response body + )) + } else { + None // close connection + } + } + + async fn on_start(&self, host: &str) { + println!("Http server started on {}", host); + } + + async fn on_close(&self) { + println!("Http server closed"); + } +} + +#[tokio::main] +async fn main() { + start_server(EzSite("Hello World!".to_string()), "localhost:8080").await.expect("http server error"); +} diff --git a/src/ezhttp/body.rs b/src/ezhttp/body.rs new file mode 100644 index 0000000..7892758 --- /dev/null +++ b/src/ezhttp/body.rs @@ -0,0 +1,203 @@ +use std::{collections::HashMap, path::PathBuf}; + +use serde_json::Value; +use tokio::{fs, io::{AsyncReadExt, AsyncWriteExt}}; + +use crate::ezhttp::{split_bytes, split_bytes_once}; + +use super::{read_line_crlf, headers::Headers, error::HttpError}; + +#[derive(Debug, Clone)] +pub struct Body { + pub data: Vec +} + +impl Body { + pub fn new(data: Vec) -> Body { + Body { + data + } + } + + pub fn as_bytes(&self) -> Vec { + self.data.clone() + } + + pub fn as_text(&self) -> Option { + String::from_utf8(self.data.clone()).ok() + } + + pub fn as_query(&self) -> Option> { + let mut text = self.as_text()?; + if text.starts_with("?") { + text = text[1..].to_string(); + } + Some(HashMap::from_iter(text.split("&").filter_map(|entry| { + let (key, value) = entry.split_once("=").unwrap_or((entry, "")); + Some((urlencoding::decode(key).ok()?.to_string(), urlencoding::decode(value).ok()?.to_string())) + }))) + } + + pub fn as_json(&self) -> Option { + serde_json::to_value(self.as_text()?).ok() + } + + pub fn from_bytes(bytes: &[u8]) -> Body { + Self::new(bytes.to_vec()) + } + + pub fn from_text(text: &str) -> Body { + Self::from_bytes(text.as_bytes()) + } + + pub fn from_query(params: HashMap) -> Body { + Self::from_text(¶ms.iter() + .map(|o| + format!("{}={}", + urlencoding::encode(o.0), + urlencoding::encode(o.1)) + ) + .collect::>() + .join("&") + ) + } + + pub fn from_json(value: Value) -> Body { + Self::from_text(&value.to_string()) + } + + pub fn from_multipart(parts: Vec, boundary: String) -> Body { + let mut data: Vec = Vec::new(); + + for part in parts { + data.append(&mut b"--".to_vec()); + data.append(&mut boundary.as_bytes().to_vec()); + data.append(&mut b"\r\nContent-Disposition: form-data; name=\"".to_vec()); + data.append(&mut part.name.as_bytes().to_vec()); + data.append(&mut b"\"".to_vec()); + if let Some(filename) = &part.filename { + data.append(&mut b"; filename=\"".to_vec()); + data.append(&mut filename.as_bytes().to_vec()); + data.append(&mut b"\"".to_vec()); + } + data.append(&mut b"\r\n".to_vec()); + if let Some(content_type) = &part.content_type { + data.append(&mut b"Content-Type: ".to_vec()); + data.append(&mut content_type.as_bytes().to_vec()); + data.append(&mut b"\r\n".to_vec()); + } + data.append(&mut b"\r\n".to_vec()); + data.append(&mut part.body.as_bytes()); + } + + data.append(&mut b"--".to_vec()); + data.append(&mut boundary.as_bytes().to_vec()); + data.append(&mut b"--\r\n".to_vec()); + + Self::from_bytes(&data) + } + + pub fn as_multipart(&self, boundary: String) -> Vec { + let data = self.as_bytes(); + split_bytes(&data, format!("--{boundary}").as_bytes()).iter() + .filter(|o| o != &b"--\r\n\r\n") + .filter_map(|o| { + let (head,body) = split_bytes_once(o, b"\r\n\r\n"); + let head = String::from_utf8(head).ok()?; + let head = head.split("\r\n").filter_map(|h| { + let (name, value) = h.split_once(": ")?; + Some((name.to_lowercase(), value.to_string())) + }).collect::>(); + let content_type = head.iter() + .find(|o| o.0 == "content-type") + .map(|o| o.1.clone()); + let (name, filename) = head.iter() + .find(|o| o.0 == "content-disposition") + .map(|o| o.1.split(";").filter(|o| o == &"form-data").map(|s| s.trim().to_string()).collect::>()) + .map(|o| ( + o.iter().find(|k| k.starts_with("name=\"")).map(|k| k[6..k.len()-1].to_string()), + o.iter().find(|k| k.starts_with("filename=\"")).map(|k| k[10..k.len()-1].to_string()) + ))?; + let name = name?; + + Some(Part::new(name, Body::from_bytes(&body), filename, content_type)) + }).collect::>() + } + + pub async fn recv(stream: &mut (impl AsyncReadExt + Unpin), headers: &Headers) -> Result { + let mut reqdata: Vec = Vec::new(); + + if let Some(content_size) = headers.clone().get("content-length".to_string()) { + let content_size: usize = content_size.parse().map_err(|_| HttpError::InvalidContentSize)?; + reqdata.resize(content_size, 0); + stream.read_exact(&mut reqdata).await.map_err(|_| HttpError::InvalidContent)?; + } else if let Some(transfer_encoding) = headers.clone().get("transfer_encoding".to_string()) { + if transfer_encoding.split(",").map(|o| o.trim()).find(|o| o == &"chunked").is_some() { + loop { + let length = usize::from_str_radix(&read_line_crlf(stream).await?, 16).map_err(|_| HttpError::InvalidContent)?; + if length == 0 { break } + let mut data = vec![0u8; length+2]; + stream.read_exact(&mut data).await.map_err(|_| HttpError::InvalidContent)?; + data.truncate(length); + reqdata.append(&mut data); + } + } + } + + Ok(Body::from_bytes(&reqdata)) + } + + pub async fn send(&self, stream: &mut (impl AsyncWriteExt + Unpin)) -> Result<(), HttpError> { + stream.write_all(&self.as_bytes()).await.map_err(|_| HttpError::WriteHeadError) + } +} + +impl Default for Body { + fn default() -> Self { + Body { + data: Vec::new() + } + } +} + +pub struct Part { + pub name: String, + pub body: Body, + pub filename: Option, + pub content_type: Option +} + +impl Part { + pub fn new( + name: String, + body: Body, + filename: Option, + content_type: Option + ) -> Part { + Part { + name, + body, + filename, + content_type + } + } + + pub fn body(name: String, body: Body) -> Part { + Part { + name, + body, + filename: None, + content_type: None + } + } + + pub async fn file(name: String, file: PathBuf) -> Option { + Some(Part { + name, + body: Body::from_text(&fs::read_to_string(&file).await.ok()?), + filename: Some(file.file_name()?.to_str()?.to_string()), + content_type: mime_guess::from_path(file).first().map(|o| o.to_string()) + }) + } +} + diff --git a/src/ezhttp/client/mod.rs b/src/ezhttp/client/mod.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/ezhttp/error.rs b/src/ezhttp/error.rs index 9392f45..98cd2d0 100644 --- a/src/ezhttp/error.rs +++ b/src/ezhttp/error.rs @@ -13,7 +13,8 @@ pub enum HttpError { WriteHeadError, WriteBodyError, InvalidStatus, - RequstError + RequestError, + UrlError } impl std::fmt::Display for HttpError { diff --git a/src/ezhttp/handler.rs b/src/ezhttp/handler.rs deleted file mode 100644 index b479098..0000000 --- a/src/ezhttp/handler.rs +++ /dev/null @@ -1,94 +0,0 @@ -use super::{HttpRequest, HttpServer, Stream}; - -use std::{future::Future, pin::Pin, sync::Arc}; -use tokio::net::TcpStream; -use tokio_io_timeout::TimeoutStream; - -#[cfg(feature = "flowgate")] -use {super::read_line_lf, std::net::{ToSocketAddrs, SocketAddr}}; - -pub type Handler = Box, TimeoutStream) -> Pin + Send>> + Send + Sync>; - -/// Default connection handler -/// Turns input to request and response to output -pub async fn handler_connection( - server: Arc, - mut sock: Stream -) { - let Ok(addr) = sock.get_ref().peer_addr() else { return; }; - - loop { - let req = match HttpRequest::read(sock.get_mut(), &addr).await { - Ok(i) => i, - Err(e) => { - server.on_error(e).await; - return; - } - }; - - let resp = match server.on_request(&req).await { - Some(i) => i, - None => { - return; - } - }; - - match resp.write(sock.get_mut()).await { - Ok(_) => {}, - Err(e) => { - server.on_error(e).await; - return; - }, - } - } -} - -#[macro_export] -macro_rules! pin_handler { - ($handler: expr) => { - Box::new(move |a, b| Box::pin($handler(a, b))) - }; -} - -#[cfg(feature = "flowgate")] -/// Flowgate handler -pub async fn handler_flowgate( - server: Arc, - mut sock: Stream, -) { - loop { - let addr = match read_line_lf(sock.get_mut()).await { - Ok(i) => i, - Err(e) => { - server.on_error(e).await; - return; - } - } - .to_socket_addrs() - .unwrap() - .collect::>()[0]; - - let req = match HttpRequest::read(sock.get_mut(), &addr).await { - Ok(i) => i, - Err(e) => { - server.on_error(e).await; - return; - } - }; - - let resp = match server.on_request(&req).await { - Some(i) => i, - None => { - return; - } - }; - - match resp.write(sock.get_mut()).await { - Ok(_) => {}, - Err(e) => { - server.on_error(e).await; - return; - }, - } - } -} \ No newline at end of file diff --git a/src/ezhttp/headers.rs b/src/ezhttp/headers.rs index 405c4fc..80d2935 100644 --- a/src/ezhttp/headers.rs +++ b/src/ezhttp/headers.rs @@ -3,6 +3,10 @@ use std::{ fmt::{Debug, Display}, }; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +use super::{read_line_crlf, error::HttpError}; + /// Http headers #[derive(Clone, Debug)] pub struct Headers { @@ -93,6 +97,31 @@ impl Headers { pub fn clear(&mut self) { self.entries.clear(); } + + pub async fn recv(stream: &mut (impl AsyncReadExt + Unpin)) -> Result { + let mut headers = Headers::new(); + + loop { + let text = read_line_crlf(stream).await.map_err(|_| HttpError::InvalidHeaders)?; + if text.len() == 0 { break } + + let (key, value) = text.split_once(": ").ok_or(HttpError::InvalidHeaders)?; + headers.put(key.to_lowercase(), value.to_string()); + } + + Ok(headers) + } + + pub async fn send(&self, stream: &mut (impl AsyncWriteExt + Unpin)) -> Result<(), HttpError> { + let mut head = String::new(); + for (k, v) in self.entries() { + head.push_str(&k); + head.push_str(": "); + head.push_str(&v); + head.push_str("\r\n"); + } + stream.write_all(head.as_bytes()).await.map_err(|_| HttpError::WriteHeadError) + } } impl Display for Headers { diff --git a/src/ezhttp/mod.rs b/src/ezhttp/mod.rs index 3d55a17..70e9398 100644 --- a/src/ezhttp/mod.rs +++ b/src/ezhttp/mod.rs @@ -1,34 +1,90 @@ -use std::sync::atomic::{AtomicBool, Ordering}; -use std::{ - boxed::Box, - error::Error, - future::Future, - sync::Arc, - time::Duration, -}; - -use tokio::io::AsyncReadExt; -use threadpool::ThreadPool; -use tokio::net::{TcpListener, TcpStream}; -use tokio::runtime::Runtime; -use tokio_io_timeout::TimeoutStream; - pub mod error; pub mod headers; pub mod request; pub mod response; -pub mod starter; -pub mod handler; +pub mod body; +pub mod server; +pub mod client; -pub use error::*; -pub use headers::*; -pub use request::*; -pub use response::*; -pub use starter::*; -pub use handler::*; +pub mod prelude { + pub use super::*; + pub use super::error::*; + pub use super::headers::*; + pub use super::request::*; + pub use super::response::*; + pub use super::response::status_code::*; + pub use super::body::*; + pub use super::server::*; + pub use super::server::handler::*; + pub use super::server::starter::*; + pub use super::client::*; +} -use crate::pin_handler; +use error::HttpError; +use rand::Rng; +use tokio::{io::AsyncReadExt, net::TcpStream}; +use tokio_io_timeout::TimeoutStream; +const CHARS: &str = "qwertyuiopasdfghjklzxcvbnm0123456789QWERTYUIOPASDFGHJKLZXCVBNM'()+_,-./:=?"; + +pub fn gen_multipart_boundary() -> String { + let range = 20..40; + let length: usize = rand::thread_rng().gen_range(range); + [0..length].iter().map(|_| + String::from(CHARS.chars() + .collect::>() + .get(rand::thread_rng() + .gen_range(0..CHARS.len()) + ).unwrap().clone() + ) + ).collect::>().join("") +} + +fn split_bytes_once(bytes: &[u8], sep: &[u8]) -> (Vec, Vec) { + if let Some(index) = bytes.windows(sep.len()) + .enumerate() + .filter(|o| o.1 == sep) + .map(|o| o.0) + .next() { + let t = bytes.split_at(index); + (t.0.to_vec(), t.1.split_at(sep.len()).1.to_vec()) + } else { + (Vec::from(bytes), Vec::new()) + } +} + +fn split_bytes(bytes: &[u8], sep: &[u8]) -> Vec> { + if bytes.len() >= sep.len() { + let indexes: Vec = bytes.windows(sep.len()) + .enumerate() + .filter(|o| o.1 == sep) + .map(|o| o.0) + .collect(); + let mut parts: Vec> = Vec::new(); + let mut now_part: Vec = Vec::new(); + let mut i = 0usize; + loop { + if i >= bytes.len() { + break; + } + + if indexes.contains(&i) { + parts.push(now_part.clone()); + now_part.clear(); + i += sep.len(); + continue; + } + + now_part.push(bytes[i]); + + i += 1; + } + parts.push(now_part); + parts + } else { + vec![Vec::from(bytes)] + } +} async fn read_line(data: &mut (impl AsyncReadExt + Unpin)) -> Result { let mut line = Vec::new(); @@ -51,157 +107,4 @@ async fn read_line_crlf(data: &mut (impl AsyncReadExt + Unpin)) -> Result Result { - match read_line(data).await { - Ok(i) => Ok(i[..i.len() - 1].to_string()), - Err(e) => Err(e), - } -} - -pub type Stream = TimeoutStream; - -/// Async http server trait -pub trait HttpServer { - fn on_start(&self, host: &str) -> impl Future + Send; - fn on_close(&self) -> impl Future + Send; - fn on_request( - &self, - req: &HttpRequest, - ) -> impl Future> + Send; - fn on_error( - &self, - _: HttpError - ) -> impl Future + Send { - async {} - } -} - -async fn start_server_with_threadpool( - server: T, - host: &str, - timeout: Option, - threads: usize, - handler: Handler, - running: Arc, -) -> Result<(), Box> -where - T: HttpServer + Send + 'static + Sync, -{ - let threadpool = ThreadPool::new(threads); - - let server = Arc::new(server); - let listener = TcpListener::bind(host).await?; - let old_handler = handler; - let handler = Arc::new(move |now_server, sock| { - Runtime::new().unwrap().block_on(old_handler(now_server, sock)); - }); - - let host_clone = String::from(host).clone(); - let server_clone = server.clone(); - server_clone.on_start(&host_clone).await; - - while running.load(Ordering::Acquire) { - let Ok((sock, _)) = listener.accept().await else { continue; }; - let mut sock = TimeoutStream::new(sock); - - sock.set_read_timeout(timeout); - sock.set_write_timeout(timeout); - - let now_server = Arc::clone(&server); - let now_handler = Arc::clone(&handler); - - threadpool.execute(move || { - (now_handler)(now_server, sock); - }); - } - - threadpool.join(); - - server.on_close().await; - - Ok(()) -} - -async fn start_server_new_thread( - server: T, - host: &str, - timeout: Option, - handler: Handler, - running: Arc, -) -> Result<(), Box> -where - T: HttpServer + Send + 'static, -{ - let server = Arc::new(server); - let listener = TcpListener::bind(host).await?; - - let host_clone = String::from(host).clone(); - let server_clone = server.clone(); - server_clone.on_start(&host_clone).await; - - while running.load(Ordering::Acquire) { - let Ok((sock, _)) = listener.accept().await else { continue; }; - let mut sock = TimeoutStream::new(sock); - - sock.set_read_timeout(timeout); - sock.set_write_timeout(timeout); - - let now_server = Arc::clone(&server); - - tokio::spawn((&handler)(now_server, sock)); - } - - server.on_close().await; - - Ok(()) -} - -async fn start_server_sync( - server: T, - host: &str, - timeout: Option, - handler: Handler, - running: Arc, -) -> Result<(), Box> -where - T: HttpServer + Send + 'static, -{ - let server = Arc::new(server); - let listener = TcpListener::bind(host).await?; - - let host_clone = String::from(host).clone(); - let server_clone = server.clone(); - server_clone.on_start(&host_clone).await; - - while running.load(Ordering::Acquire) { - let Ok((sock, _)) = listener.accept().await else { continue; }; - let mut sock = TimeoutStream::new(sock); - - sock.set_read_timeout(timeout); - sock.set_write_timeout(timeout); - - let now_server = Arc::clone(&server); - - handler(now_server, sock).await; - } - - server.on_close().await; - - Ok(()) -} - -/// Start [`HttpServer`](HttpServer) on some host -/// -/// Use [`HttpServerStarter`](HttpServerStarter) to set more options -pub async fn start_server( - server: T, - host: &str -) -> Result<(), Box> { - start_server_new_thread( - server, - host, - None, - pin_handler!(handler_connection), - Arc::new(AtomicBool::new(true)), - ).await -} +pub type Stream = TimeoutStream; \ No newline at end of file diff --git a/src/ezhttp/request.rs b/src/ezhttp/request.rs index 97ae2f4..ce85ba5 100644 --- a/src/ezhttp/request.rs +++ b/src/ezhttp/request.rs @@ -1,21 +1,120 @@ -use super::{read_line_crlf, Headers, HttpError}; +use super::{body::{Body, Part}, gen_multipart_boundary, read_line_crlf, headers::Headers, HttpError}; -use serde_json::Value; use std::{ - fmt::{Debug, Display}, - net::SocketAddr, + collections::HashMap, fmt::{Debug, Display}, net::SocketAddr, str::FromStr }; use tokio::io::{AsyncReadExt, AsyncWriteExt}; +#[derive(Clone, Debug)] +pub struct URL { + pub path: String, + pub domain: String, + pub anchor: Option, + pub query: HashMap, + pub scheme: String, + pub port: u16 +} + +impl URL { + pub fn new( + domain: String, + port: u16, + path: String, + anchor: Option, + query: HashMap, + scheme: String + ) -> URL { + URL { + path, + domain, + anchor, + query, + scheme, + port + } + } + + pub fn to_path_string(&self) -> String { + format!("{}{}{}", self.path, if self.query.is_empty() { + String::new() + } else { + "?".to_string()+&self.query.iter().map(|o| { + format!("{}={}", urlencoding::encode(o.0), urlencoding::encode(o.1)) + }).collect::>().join("&") + }, if let Some(anchor) = &self.anchor { + "#".to_string()+anchor + } else { + String::new() + }) + } + + pub fn from_path_string(s: &str, scheme: String, domain: String, port: u16) -> Option { + let (s, anchor) = s.split_once("#").unwrap_or((s, "")); + let (path, query) = s.split_once("?").unwrap_or((s, "")); + + let anchor = if anchor.is_empty() { None } else { Some(anchor.to_string()) }; + let query = if query.is_empty() { HashMap::new() } else { { + HashMap::from_iter(query.split("&").filter_map(|entry| { + let (key, value) = entry.split_once("=").unwrap_or((entry, "")); + Some((urlencoding::decode(key).ok()?.to_string(), urlencoding::decode(value).ok()?.to_string())) + })) + } }; + let path = path.to_string(); + let scheme = scheme.to_string(); + Some(URL { path, domain, anchor, query, scheme, port }) + } +} + +impl FromStr for URL { + type Err = HttpError; + + fn from_str(s: &str) -> Result { + let (scheme, s) = s.split_once("://").ok_or(HttpError::UrlError)?; + let (host, s) = s.split_once("/").unwrap_or((s, "")); + let (domain, port) = host.split_once(":").unwrap_or((host, + if scheme == "http" { "80" } + else if scheme == "https" { "443" } + else { return Err(HttpError::UrlError) } + )); + let port = port.parse::().map_err(|_| HttpError::UrlError)?; + let (s, anchor) = s.split_once("#").unwrap_or((s, "")); + let (path, query) = s.split_once("?").unwrap_or((s, "")); + + let anchor = if anchor.is_empty() { None } else { Some(anchor.to_string()) }; + let query = if query.is_empty() { HashMap::new() } else { { + HashMap::from_iter(query.split("&").filter_map(|entry| { + let (key, value) = entry.split_once("=").unwrap_or((entry, "")); + Some((urlencoding::decode(key).ok()?.to_string(), urlencoding::decode(value).ok()?.to_string())) + })) + } }; + let domain = domain.to_string(); + let path = format!("/{path}"); + let scheme = scheme.to_string(); + Ok(URL { path, domain, anchor, query, scheme, port }) + } +} + +impl ToString for URL { + fn to_string(&self) -> String { + format!("{}://{}{}", self.scheme, { + if (self.scheme == "http" && self.port != 80) || (self.scheme == "https" && self.port != 443) { + format!("{}:{}", self.domain, self.port) + } else { + self.domain.clone() + } + }, self.to_path_string()) + } +} + + /// Http request #[derive(Debug, Clone)] pub struct HttpRequest { - pub page: String, + pub url: URL, pub method: String, - pub addr: String, + pub addr: SocketAddr, pub headers: Headers, - pub params: Value, - pub data: Vec, + pub body: Body } impl Display for HttpRequest { @@ -26,22 +125,25 @@ impl Display for HttpRequest { impl HttpRequest { /// Create new http request - pub fn new(page: &str, method: &str, params: Value, headers: Headers, data: Vec) -> Self { + pub fn new( + url: URL, + method: String, + addr: SocketAddr, + headers: Headers, + body: Body + ) -> Self { HttpRequest { - page: page.to_string(), - method: method.to_string(), - addr: String::new(), - params, + url, + method, + addr, headers, - data, + body } } /// Read http request from stream - pub async fn read(data: &mut (impl AsyncReadExt + Unpin), addr: &SocketAddr) -> Result { - let ip_str = addr.to_string(); - - let status: Vec = match read_line_crlf(data).await { + pub async fn recv(stream: &mut (impl AsyncReadExt + Unpin), addr: &SocketAddr) -> Result { + let status: Vec = match read_line_crlf(stream).await { Ok(i) => { i.splitn(3, " ") .map(|s| s.to_string()) @@ -51,205 +153,59 @@ impl HttpRequest { }; let method = status[0].clone(); - let (page, query) = match status[1].split_once("?") { - Some(i) => (i.0.to_string(), Some(i.1)), - None => (status[1].to_string(), None), - }; + let page = status[1].clone(); - let mut headers = Headers::new(); + let headers = Headers::recv(stream).await?; + let body = Body::recv(stream, &headers).await?; - loop { - let text = match read_line_crlf(data).await { - Ok(i) => i, - Err(_) => return Err(HttpError::InvalidHeaders), - }; - - if text.len() == 0 { - break; - } - - let (key, value) = match text.split_once(": ") { - Some(i) => i, - None => return Err(HttpError::InvalidHeaders), - }; - - headers.put(key.to_lowercase(), value.to_string()); - } - - let mut params = serde_json::Map::new(); - - if let Some(i) = query { - for ele in i.split("&") { - let (k, v) = match ele.split_once("=") { - Some(i) => i, - None => return Err(HttpError::InvalidQuery), - }; - - params.insert( - match urlencoding::decode(k) { - Ok(i) => i.to_string(), - Err(_) => return Err(HttpError::InvalidQuery), - }, - match urlencoding::decode(v) { - Ok(i) => Value::String(i.to_string()), - Err(_) => return Err(HttpError::InvalidQuery), - }, - ); - } - } - - let mut reqdata: Vec = Vec::new(); - - if let Some(content_size) = headers.clone().get("content-length".to_string()) { - let content_size: usize = match content_size.parse() { - Ok(i) => i, - Err(_) => return Err(HttpError::InvalidContentSize), - }; - - if content_size > reqdata.len() { - let mut buf: Vec = Vec::new(); - buf.resize(content_size - reqdata.len(), 0); - - match data.read_exact(&mut buf).await { - Ok(i) => i, - Err(_) => return Err(HttpError::InvalidContent), - }; - - reqdata.append(&mut buf); - } - } - - if let Some(content_type) = headers.clone().get("content-type".to_string()) { - let mut body = match String::from_utf8(reqdata.clone()) { - Ok(i) => i, - Err(_) => return Err(HttpError::InvalidContent), - }; - - match content_type.as_str() { - "application/json" => { - let val: Value = match serde_json::from_str(&body) { - Ok(i) => i, - Err(_) => return Err(HttpError::JsonParseError), - }; - - if let Value::Object(mut dict) = val { - params.append(&mut dict); - } - } - "multipart/form-data" => { - let boundary = "--".to_string() - + &content_type[(content_type.find("boundary=").unwrap() + 9)..] - + "\r\n"; - for part in body.split(boundary.as_str()) { - let lines: Vec<&str> = part.split("\r\n").collect(); - if lines.len() >= 3 { - if lines[0].starts_with("Content-Disposition: form-data; name=\"") { - let name: &str = - &lines[0]["Content-Disposition: form-data; name=\"".len()..]; - let name: &str = &name[..name.len() - 1]; - params - .insert(name.to_string(), Value::String(lines[2].to_string())); - } - } - } - } - "application/x-www-form-urlencoded" => { - if body.starts_with("?") { - body = body.as_str()[1..].to_string() - } - - for ele in body.split("&") { - let (k, v) = match ele.split_once("=") { - Some(i) => i, - None => return Err(HttpError::InvalidQuery), - }; - - params.insert( - match urlencoding::decode(k) { - Ok(i) => i.to_string(), - Err(_) => return Err(HttpError::InvalidQuery), - }, - match urlencoding::decode(v) { - Ok(i) => Value::String(i.to_string()), - Err(_) => return Err(HttpError::InvalidQuery), - }, - ); - } - } - _ => {} - } - } - - Ok(HttpRequest { - page, - method, - addr: ip_str.to_string(), - params: Value::Object(params), - headers, - data: reqdata.clone(), - }) - } - - /// Set params to query in url - pub fn params_to_page(&mut self) { - let mut query = String::new(); - - let mut i: bool = !self.page.contains("?"); - - if let Value::Object(obj) = self.params.clone() { - for (k, v) in obj { - query.push_str(if i { "?" } else { "&" }); - query.push_str(urlencoding::encode(k.as_str()).to_string().as_str()); - query.push_str("="); - query.push_str( - urlencoding::encode(v.as_str().unwrap()) - .to_string() - .as_str(), - ); - i = false; - } - } - - self.page += query.as_str(); - } - - /// Set params to json data - pub fn params_to_json(&mut self) { - self.data = Vec::from(self.params.to_string().as_bytes()); + Ok(HttpRequest::new( + URL::from_path_string( + &page, + "http".to_string(), + "localhost".to_string(), + 80 + ).ok_or(HttpError::UrlError)?, + method, + addr.clone(), + headers, + body + )) } /// Write http request to stream - /// - /// [`params`](Self::params) is not written to the stream, you need to use [`params_to_json`](Self::params_to_json) or [`params_to_page`](Self::params_to_page) - pub async fn write(self, data: &mut (impl AsyncWriteExt + Unpin)) -> Result<(), HttpError> { + pub async fn send(&self, stream: &mut (impl AsyncWriteExt + Unpin)) -> Result<(), HttpError> { let mut head: String = String::new(); head.push_str(&self.method); head.push_str(" "); - head.push_str(&self.page); + head.push_str(&self.url.to_path_string()); head.push_str(" HTTP/1.1"); head.push_str("\r\n"); + stream.write_all(head.as_bytes()).await.map_err(|_| HttpError::WriteHeadError)?; - for (k, v) in self.headers.entries() { - head.push_str(&k); - head.push_str(": "); - head.push_str(&v); - head.push_str("\r\n"); - } + self.headers.send(stream).await?; - head.push_str("\r\n"); + stream.write_all(b"\r\n").await.map_err(|_| HttpError::WriteHeadError)?; - match data.write_all(head.as_bytes()).await { - Ok(i) => i, - Err(_) => return Err(HttpError::WriteHeadError), - }; - - if !self.data.is_empty() { - match data.write_all(&self.data).await { - Ok(i) => i, - Err(_) => return Err(HttpError::WriteBodyError), - }; - } + self.body.send(stream).await?; Ok(()) } + + pub fn get_multipart(&self) -> Option> { + let boundary = self.headers.get("content-type")? + .split(";") + .map(|o| o.trim()) + .find(|o| o.starts_with("boundary=")) + .map(|o| o[9..].to_string())?; + Some(self.body.as_multipart(boundary)) + } + + pub fn set_multipart(&mut self, parts: Vec) -> Option<()> { + let boundary = gen_multipart_boundary(); + self.headers.put("Content-Type", format!("multipart/form-data; boundary={}", boundary.clone())); + self.body = Body::from_multipart(parts, boundary); + Some(()) + } } + + diff --git a/src/ezhttp/response.rs b/src/ezhttp/response.rs index 6beb5ef..98ac138 100644 --- a/src/ezhttp/response.rs +++ b/src/ezhttp/response.rs @@ -1,15 +1,19 @@ -use super::{read_line_crlf, Headers, HttpError}; +use super::{body::{Body, Part}, gen_multipart_boundary, read_line_crlf, headers::Headers, HttpError}; -use serde_json::Value; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use std::fmt::{Debug, Display}; +pub mod status_code { + pub const OK: &str = "200 OK"; + pub const NOT_FOUND: &str = "404 Not Found"; +} + /// Http response #[derive(Debug, Clone)] pub struct HttpResponse { - pub headers: Headers, pub status_code: String, - pub data: Vec, + pub headers: Headers, + pub body: Body, } impl Display for HttpResponse { @@ -19,148 +23,68 @@ impl Display for HttpResponse { } impl HttpResponse { - /// Create new http response with empty headers and data and a 200 OK status code - pub fn new() -> Self { - Self::from_bytes(Headers::new(), "200 OK", Vec::new()) - } - - /// Create new http response from headers, bytes data, and status code - pub fn from_bytes(headers: Headers, status_code: impl ToString, data: Vec) -> Self { + pub fn new( + status_code: &str, + headers: Headers, + body: Body + ) -> Self { HttpResponse { - headers, - data, status_code: status_code.to_string(), - } - } - - /// Create new http response from headers, string data, and status code - pub fn from_string(headers: Headers, status_code: impl ToString, data: impl ToString) -> Self { - HttpResponse { headers, - data: data.to_string().into_bytes(), - status_code: status_code.to_string(), - } - } - - /// Get data in UTF-8 - pub fn get_text(self) -> String { - match String::from_utf8(self.data) { - Ok(i) => i, - Err(_) => String::new(), - } - } - - /// Get json [`Value`](Value) from data - pub fn get_json(self) -> Value { - match serde_json::from_str(self.get_text().as_str()) { - Ok(i) => i, - Err(_) => Value::Null, + body } } /// Read http response from stream - pub async fn read(data: &mut (impl AsyncReadExt + Unpin)) -> Result { - let status = match read_line_crlf(data).await { - Ok(i) => i, - Err(e) => { - return Err(e); - } - }; + pub async fn recv(stream: &mut (impl AsyncReadExt + Unpin)) -> Result { + let status = read_line_crlf(stream).await?; - let (_, status_code) = match status.split_once(" ") { - Some(i) => i, - None => return Err(HttpError::InvalidStatus), - }; + let (_, status_code) = status.split_once(" ").ok_or(HttpError::InvalidStatus)?; - let mut headers = Headers::new(); + let headers = Headers::recv(stream).await?; + let body = Body::recv(stream, &headers).await?; - loop { - let text = match read_line_crlf(data).await { - Ok(i) => i, - Err(_) => return Err(HttpError::InvalidHeaders), - }; - - if text.len() == 0 { - break; - } - - let (key, value) = match text.split_once(": ") { - Some(i) => i, - None => return Err(HttpError::InvalidHeaders), - }; - - headers.put(key.to_lowercase(), value.to_string()); - } - - let mut reqdata: Vec = Vec::new(); - - if let Some(content_size) = headers.clone().get("content-length".to_string()) { - let content_size: usize = match content_size.parse() { - Ok(i) => i, - Err(_) => return Err(HttpError::InvalidContentSize), - }; - - if content_size > reqdata.len() { - let mut buf: Vec = Vec::new(); - buf.resize(content_size - reqdata.len(), 0); - - match data.read_exact(&mut buf).await { - Ok(i) => i, - Err(_) => return Err(HttpError::InvalidContent), - }; - - reqdata.append(&mut buf); - } - } else { - loop { - let mut buf: Vec = vec![0; 1024 * 32]; - - let buf_len = match data.read(&mut buf).await { - Ok(i) => i, - Err(_) => { - break; - } - }; - - if buf_len == 0 { - break; - } - - buf.truncate(buf_len); - - reqdata.append(&mut buf); - } - } - - Ok(HttpResponse::from_bytes(headers, status_code, reqdata)) + Ok(HttpResponse::new(status_code, headers, body)) } /// Write http response to stream - pub async fn write(self, data: &mut (impl AsyncWriteExt + Unpin)) -> Result<(), HttpError> { + pub async fn send(self, stream: &mut (impl AsyncWriteExt + Unpin)) -> Result<(), HttpError> { let mut head: String = String::new(); head.push_str("HTTP/1.1 "); head.push_str(&self.status_code); head.push_str("\r\n"); + stream.write_all(head.as_bytes()).await.map_err(|_| HttpError::WriteHeadError)?; - for (k, v) in self.headers.entries() { - head.push_str(&k); - head.push_str(": "); - head.push_str(&v); - head.push_str("\r\n"); - } + self.headers.send(stream).await?; - head.push_str("\r\n"); + stream.write_all(b"\r\n").await.map_err(|_| HttpError::WriteHeadError)?; - match data.write_all(head.as_bytes()).await { - Ok(i) => i, - Err(_) => return Err(HttpError::WriteHeadError), - }; - - match data.write_all(&self.data).await { - Ok(i) => i, - Err(_) => return Err(HttpError::WriteHeadError), - }; + self.body.send(stream).await?; Ok(()) } + + pub fn get_multipart(&self) -> Option> { + let boundary = self.headers.get("content-type")? + .split(";") + .map(|o| o.trim()) + .find(|o| o.starts_with("boundary=")) + .map(|o| o[9..].to_string())?; + Some(self.body.as_multipart(boundary)) + } + + pub fn set_multipart(&mut self, parts: Vec) -> Option<()> { + let boundary = gen_multipart_boundary(); + self.headers.put("Content-Type", format!("multipart/form-data; boundary={}", boundary.clone())); + self.body = Body::from_multipart(parts, boundary); + Some(()) + } +} + +impl Default for HttpResponse { + + /// Create new http response with empty headers and data and a 200 OK status code + fn default() -> Self { + Self::new("200 OK", Headers::new(), Body::default()) + } } diff --git a/src/ezhttp/server/handler.rs b/src/ezhttp/server/handler.rs new file mode 100644 index 0000000..71672a0 --- /dev/null +++ b/src/ezhttp/server/handler.rs @@ -0,0 +1,54 @@ +use super::{ + HttpServer, + super::{ + Stream, + request::HttpRequest + } +}; + +use std::{future::Future, pin::Pin, sync::Arc}; +use tokio::net::TcpStream; +use tokio_io_timeout::TimeoutStream; + +pub type Handler = Box, TimeoutStream) -> Pin + Send>> + Send + Sync>; + +/// Default connection handler +/// Turns input to request and response to output +pub async fn handler_connection( + server: Arc, + mut sock: Stream +) { + let Ok(addr) = sock.get_ref().peer_addr() else { return; }; + + loop { + let req = match HttpRequest::recv(sock.get_mut(), &addr).await { + Ok(i) => i, + Err(e) => { + server.on_error(e).await; + return; + } + }; + + let resp = match server.on_request(&req).await { + Some(i) => i, + None => { + return; + } + }; + + match resp.send(sock.get_mut()).await { + Ok(_) => {}, + Err(e) => { + server.on_error(e).await; + return; + }, + } + } +} + +#[macro_export] +macro_rules! pin_handler { + ($handler: expr) => { + Box::new(move |a, b| Box::pin($handler(a, b))) + }; +} \ No newline at end of file diff --git a/src/ezhttp/server/mod.rs b/src/ezhttp/server/mod.rs new file mode 100644 index 0000000..59e840d --- /dev/null +++ b/src/ezhttp/server/mod.rs @@ -0,0 +1,170 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::{ + boxed::Box, + error::Error, + future::Future, + sync::Arc, + time::Duration, +}; + +use threadpool::ThreadPool; +use tokio::net::TcpListener; +use tokio::runtime::Runtime; +use tokio_io_timeout::TimeoutStream; + +use crate::pin_handler; + +use super::error::HttpError; +use super::request::HttpRequest; +use super::response::HttpResponse; + +pub mod handler; +pub mod starter; + +use handler::{handler_connection, Handler}; + +/// Async http server trait +pub trait HttpServer { + fn on_start(&self, host: &str) -> impl Future + Send; + fn on_close(&self) -> impl Future + Send; + fn on_request( + &self, + req: &HttpRequest, + ) -> impl Future> + Send; + fn on_error( + &self, + _: HttpError + ) -> impl Future + Send { + async {} + } +} + +async fn start_server_with_threadpool( + server: T, + host: &str, + timeout: Option, + threads: usize, + handler: Handler, + running: Arc, +) -> Result<(), Box> +where + T: HttpServer + Send + 'static + Sync, +{ + let threadpool = ThreadPool::new(threads); + + let server = Arc::new(server); + let listener = TcpListener::bind(host).await?; + let old_handler = handler; + let handler = Arc::new(move |now_server, sock| { + Runtime::new().unwrap().block_on(old_handler(now_server, sock)); + }); + + let host_clone = String::from(host).clone(); + let server_clone = server.clone(); + server_clone.on_start(&host_clone).await; + + while running.load(Ordering::Acquire) { + let Ok((sock, _)) = listener.accept().await else { continue; }; + let mut sock = TimeoutStream::new(sock); + + sock.set_read_timeout(timeout); + sock.set_write_timeout(timeout); + + let now_server = Arc::clone(&server); + let now_handler = Arc::clone(&handler); + + threadpool.execute(move || { + (now_handler)(now_server, sock); + }); + } + + threadpool.join(); + + server.on_close().await; + + Ok(()) +} + +async fn start_server_new_thread( + server: T, + host: &str, + timeout: Option, + handler: Handler, + running: Arc, +) -> Result<(), Box> +where + T: HttpServer + Send + 'static, +{ + let server = Arc::new(server); + let listener = TcpListener::bind(host).await?; + + let host_clone = String::from(host).clone(); + let server_clone = server.clone(); + server_clone.on_start(&host_clone).await; + + while running.load(Ordering::Acquire) { + let Ok((sock, _)) = listener.accept().await else { continue; }; + let mut sock = TimeoutStream::new(sock); + + sock.set_read_timeout(timeout); + sock.set_write_timeout(timeout); + + let now_server = Arc::clone(&server); + + tokio::spawn((&handler)(now_server, sock)); + } + + server.on_close().await; + + Ok(()) +} + +async fn start_server_sync( + server: T, + host: &str, + timeout: Option, + handler: Handler, + running: Arc, +) -> Result<(), Box> +where + T: HttpServer + Send + 'static, +{ + let server = Arc::new(server); + let listener = TcpListener::bind(host).await?; + + let host_clone = String::from(host).clone(); + let server_clone = server.clone(); + server_clone.on_start(&host_clone).await; + + while running.load(Ordering::Acquire) { + let Ok((sock, _)) = listener.accept().await else { continue; }; + let mut sock = TimeoutStream::new(sock); + + sock.set_read_timeout(timeout); + sock.set_write_timeout(timeout); + + let now_server = Arc::clone(&server); + + handler(now_server, sock).await; + } + + server.on_close().await; + + Ok(()) +} + +/// Start [`HttpServer`](HttpServer) on some host +/// +/// Use [`HttpServerStarter`](HttpServerStarter) to set more options +pub async fn start_server( + server: T, + host: &str +) -> Result<(), Box> { + start_server_new_thread( + server, + host, + None, + pin_handler!(handler_connection), + Arc::new(AtomicBool::new(true)), + ).await +} \ No newline at end of file diff --git a/src/ezhttp/starter.rs b/src/ezhttp/server/starter.rs similarity index 97% rename from src/ezhttp/starter.rs rename to src/ezhttp/server/starter.rs index 04703aa..16e4925 100644 --- a/src/ezhttp/starter.rs +++ b/src/ezhttp/server/starter.rs @@ -1,7 +1,11 @@ use tokio::{runtime::Runtime, task::JoinHandle}; use super::{ - handler_connection, start_server_new_thread, start_server_sync, start_server_with_threadpool, Handler, HttpServer + start_server_new_thread, + start_server_sync, + start_server_with_threadpool, + handler::{handler_connection, Handler}, + HttpServer }; use crate::pin_handler; diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index 00c416c..0000000 --- a/src/main.rs +++ /dev/null @@ -1,53 +0,0 @@ -use ezhttp::{Headers, HttpRequest, HttpResponse, HttpServer, HttpServerStarter}; -use std::time::Duration; - -struct EzSite { - index_page: String, -} - -impl EzSite { - fn new(index_page: &str) -> Self { - EzSite { - index_page: index_page.to_string(), - } - } -} - -impl HttpServer for EzSite { - async fn on_request(&self, req: &HttpRequest) -> Option { - println!("{} > {} {}", req.addr, req.method, req.page); - - if req.page == "/" { - Some(HttpResponse::from_string( - Headers::from(vec![("Content-Type", "text/html")]), // response headers - "200 OK", // response status code - self.index_page.clone(), // response body - )) - } else { - None // close connection - } - } - - async fn on_start(&self, host: &str) { - println!("Http server started on {}", host); - } - - async fn on_close(&self) { - println!("Http server closed"); - } -} - -#[tokio::main] -async fn main() { - let site = EzSite::new("Hello World!"); - let host = "localhost:8080"; - - HttpServerStarter::new(site, host) - .timeout(Some(Duration::from_secs(5))) // read & write timeout - .threads(5) // threadpool size - .start_forever() - .await - .expect("http server error"); - - // ezhttp::start_server(site, host); -}