fix cpu leak

This commit is contained in:
MeexReay 2024-07-11 11:33:31 +03:00
parent 3e51e91630
commit a6127b6f60
4 changed files with 45 additions and 77 deletions

34
Cargo.lock generated
View File

@ -10,10 +10,11 @@ checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0"
[[package]] [[package]]
name = "ezhttp" name = "ezhttp"
version = "0.1.2" version = "0.1.3"
dependencies = [ dependencies = [
"futures", "futures",
"serde_json", "serde_json",
"threadpool",
"urlencoding", "urlencoding",
] ]
@ -106,18 +107,40 @@ dependencies = [
"slab", "slab",
] ]
[[package]]
name = "hermit-abi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
[[package]] [[package]]
name = "itoa" name = "itoa"
version = "1.0.11" version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b"
[[package]]
name = "libc"
version = "0.2.155"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c"
[[package]] [[package]]
name = "memchr" name = "memchr"
version = "2.7.4" version = "2.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
[[package]]
name = "num_cpus"
version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [
"hermit-abi",
"libc",
]
[[package]] [[package]]
name = "pin-project-lite" name = "pin-project-lite"
version = "0.2.14" version = "0.2.14"
@ -205,6 +228,15 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "threadpool"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa"
dependencies = [
"num_cpus",
]
[[package]] [[package]]
name = "unicode-ident" name = "unicode-ident"
version = "1.0.12" version = "1.0.12"

View File

@ -1,6 +1,6 @@
[package] [package]
name = "ezhttp" name = "ezhttp"
version = "0.1.2" version = "0.1.3"
edition = "2021" edition = "2021"
repository = "https://github.com/MeexReay/ezhttp" repository = "https://github.com/MeexReay/ezhttp"
@ -13,3 +13,4 @@ keywords = ["http", "server", "site"]
urlencoding = "2.1.3" urlencoding = "2.1.3"
serde_json = "1.0" serde_json = "1.0"
futures = "0.3.30" futures = "0.3.30"
threadpool = "1.8.1"

View File

@ -1,22 +1,18 @@
use futures::executor::block_on; use futures::executor::block_on;
use serde_json::Value; use serde_json::Value;
use std::collections::HashMap;
use std::io::{Read, Write};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use std::{ use std::{
boxed::Box, boxed::Box,
error::Error, error::Error,
fmt::{Debug, Display}, fmt::{Debug, Display},
future::Future, future::Future,
net::{IpAddr, SocketAddr, ToSocketAddrs}, net::{IpAddr, SocketAddr, ToSocketAddrs, TcpListener, TcpStream},
sync::Arc, sync::{Arc, atomic::{AtomicBool, Ordering}, Mutex},
thread, thread,
io::{Read, Write},
time::Duration,
collections::HashMap
}; };
use std::{ use threadpool::ThreadPool;
net::{TcpListener, TcpStream},
sync::{mpsc, Mutex},
};
/// Http headers /// Http headers
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -711,7 +707,7 @@ impl<T: HttpServer + Send + 'static> HttpServerStarter<T> {
/// Set threads in threadpool and return builder /// Set threads in threadpool and return builder
/// ///
/// 0 threads means that a new thread is created for each connection /// 0 threads means that a new thread is created for each connection \
/// 1 thread means that all connections are processed in the main thread /// 1 thread means that all connections are processed in the main thread
pub fn threads(mut self, threads: usize) -> Self { pub fn threads(mut self, threads: usize) -> Self {
self.threads = threads; self.threads = threads;
@ -740,7 +736,7 @@ impl<T: HttpServer + Send + 'static> HttpServerStarter<T> {
/// Get threads in threadpool /// Get threads in threadpool
/// ///
/// 0 threads means that a new thread is created for each connection /// 0 threads means that a new thread is created for each connection \
/// 1 thread means that all connections are processed in the main thread /// 1 thread means that all connections are processed in the main thread
pub fn get_threads(&self) -> usize { pub fn get_threads(&self) -> usize {
self.threads self.threads
@ -851,8 +847,6 @@ where
let server_clone = server.clone(); let server_clone = server.clone();
block_on(server_clone.lock().unwrap().on_start(&host_clone)); block_on(server_clone.lock().unwrap().on_start(&host_clone));
listener.set_nonblocking(true)?;
while running.load(Ordering::Acquire) { while running.load(Ordering::Acquire) {
let (sock, _) = match listener.accept() { let (sock, _) = match listener.accept() {
Ok(i) => i, Ok(i) => i,
@ -992,65 +986,6 @@ fn handle_connection_rrs<S: HttpServer + Send + 'static>(
resp.write(&mut sock).unwrap(); resp.write(&mut sock).unwrap();
} }
type Job = Box<dyn FnOnce() + Send + 'static>;
struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Option<Job>>,
}
impl ThreadPool {
fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for _ in 0..size {
workers.push(Worker::new(Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
fn join(self) {
for _ in 0..self.workers.len() {
self.sender.send(None).unwrap();
}
for ele in self.workers.into_iter() {
ele.thread.join().unwrap();
}
}
fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(Some(job)).unwrap();
}
}
struct Worker {
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(receiver: Arc<Mutex<mpsc::Receiver<Option<Job>>>>) -> Worker {
let thread = thread::spawn(move || {
while let Ok(Some(job)) = receiver.lock().unwrap().recv() {
job();
}
});
Worker { thread }
}
}
/// Start [`HttpServer`](HttpServer) on some host /// Start [`HttpServer`](HttpServer) on some host
/// ///
/// Use [`HttpServerStarter`](HttpServerStarter) to set more options /// Use [`HttpServerStarter`](HttpServerStarter) to set more options

View File

@ -20,8 +20,8 @@ impl HttpServer for EzSite {
if req.page == "/" { if req.page == "/" {
Some(HttpResponse::from_string( Some(HttpResponse::from_string(
Headers::from(vec![("Content-Type", "text/html")]), // response headers Headers::from(vec![("Content-Type", "text/html")]), // response headers
"200 OK", // response status code "200 OK", // response status code
self.index_page.clone(), // response body self.index_page.clone(), // response body
)) ))
} else { } else {
None // close connection None // close connection