is alive state

This commit is contained in:
MeexReay 2024-11-10 03:31:21 +03:00
parent e346dcd89f
commit 4ccb6487c8
3 changed files with 77 additions and 25 deletions

View File

@ -16,5 +16,5 @@ bytebuffer = "2.3.0"
uuid = "1.11.0" uuid = "1.11.0"
[features] [features]
default = [] default = ["atomic_clone"]
atomic_compression = [] atomic_clone = []

View File

@ -11,7 +11,7 @@ rust_mc_proto = { git = "https://github.com/MeexReay/rust_mc_proto" } # unstable
``` ```
Features: Features:
- `atomic_compression` - `atomic_clone`
## How to use ## How to use

View File

@ -13,14 +13,10 @@ pub use crate::{
use bytebuffer::ByteBuffer; use bytebuffer::ByteBuffer;
use flate2::{read::ZlibDecoder, write::ZlibEncoder, Compression}; use flate2::{read::ZlibDecoder, write::ZlibEncoder, Compression};
use std::{ use std::{
error::Error, error::Error, fmt, io::{Read, Write}, net::{TcpStream, ToSocketAddrs}, sync::atomic::AtomicBool, usize
fmt,
io::{Read, Write},
net::{TcpStream, ToSocketAddrs},
usize,
}; };
#[cfg(feature = "atomic_compression")] #[cfg(feature = "atomic_clone")]
use std::sync::{ use std::sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
Arc, Arc,
@ -39,6 +35,7 @@ pub enum ProtocolError {
ZlibError, ZlibError,
UnsignedShortError, UnsignedShortError,
CloneError, CloneError,
ConnectionClosedError
} }
impl fmt::Display for ProtocolError { impl fmt::Display for ProtocolError {
@ -52,11 +49,15 @@ impl Error for ProtocolError {}
/// Minecraft connection, wrapper for stream with compression /// Minecraft connection, wrapper for stream with compression
pub struct MinecraftConnection<T: Read + Write> { pub struct MinecraftConnection<T: Read + Write> {
stream: T, stream: T,
#[cfg(feature = "atomic_compression")] #[cfg(feature = "atomic_clone")]
compression: Arc<AtomicUsize>, compression: Arc<AtomicUsize>,
#[cfg(not(feature = "atomic_compression"))] #[cfg(not(feature = "atomic_clone"))]
compression: Option<usize>, compression: Option<usize>,
compression_type: u32, compression_type: u32,
#[cfg(feature = "atomic_clone")]
is_alive: Arc<AtomicBool>,
#[cfg(not(feature = "atomic_clone"))]
is_alive: bool,
} }
impl MinecraftConnection<TcpStream> { impl MinecraftConnection<TcpStream> {
@ -77,10 +78,14 @@ impl MinecraftConnection<TcpStream> {
Ok(MinecraftConnection { Ok(MinecraftConnection {
stream, stream,
#[cfg(feature = "atomic_compression")] #[cfg(feature = "atomic_clone")]
compression: Arc::new(AtomicUsize::new(usize::MAX)), compression: Arc::new(AtomicUsize::new(usize::MAX)),
#[cfg(not(feature = "atomic_compression"))] #[cfg(not(feature = "atomic_clone"))]
compression: None, compression: None,
#[cfg(feature = "atomic_clone")]
is_alive: Arc::new(AtomicBool::new(true)),
#[cfg(not(feature = "atomic_clone"))]
is_alive: true,
compression_type: 1, compression_type: 1,
}) })
} }
@ -90,8 +95,17 @@ impl MinecraftConnection<TcpStream> {
} }
/// Close TcpStream /// Close TcpStream
#[cfg(not(feature = "atomic_clone"))]
pub fn close(&mut self) {
let _ = self.stream.shutdown(std::net::Shutdown::Both);
self.is_alive = false;
}
/// Close TcpStream
#[cfg(feature = "atomic_clone")]
pub fn close(&self) { pub fn close(&self) {
let _ = self.stream.shutdown(std::net::Shutdown::Both); let _ = self.stream.shutdown(std::net::Shutdown::Both);
self.is_alive.store(false, Ordering::Relaxed);
} }
/// Try clone MinecraftConnection with compression and stream /// Try clone MinecraftConnection with compression and stream
@ -99,6 +113,7 @@ impl MinecraftConnection<TcpStream> {
match self.stream.try_clone() { match self.stream.try_clone() {
Ok(stream) => Ok(MinecraftConnection { Ok(stream) => Ok(MinecraftConnection {
stream, stream,
is_alive: self.is_alive.clone(),
compression: self.compression.clone(), compression: self.compression.clone(),
compression_type: self.compression_type, compression_type: self.compression_type,
}), }),
@ -131,17 +146,45 @@ impl<T: Read + Write> MinecraftConnection<T> {
pub fn new(stream: T) -> MinecraftConnection<T> { pub fn new(stream: T) -> MinecraftConnection<T> {
MinecraftConnection { MinecraftConnection {
stream, stream,
#[cfg(feature = "atomic_compression")] #[cfg(feature = "atomic_clone")]
compression: Arc::new(AtomicUsize::new(usize::MAX)), compression: Arc::new(AtomicUsize::new(usize::MAX)),
#[cfg(not(feature = "atomic_compression"))] #[cfg(not(feature = "atomic_clone"))]
compression: None, compression: None,
#[cfg(feature = "atomic_clone")]
is_alive: Arc::new(AtomicBool::new(true)),
#[cfg(not(feature = "atomic_clone"))]
is_alive: true,
compression_type: 1, compression_type: 1,
} }
} }
/// Set alive state
#[cfg(not(feature = "atomic_clone"))]
pub fn set_alive(&mut self, state: bool) {
self.is_alive = state;
}
/// Set alive state
#[cfg(feature = "atomic_clone")]
pub fn set_alive(&self, state: bool) {
self.is_alive.store(state, Ordering::Relaxed);
}
/// Is connection alive
#[cfg(not(feature = "atomic_clone"))]
pub fn set_alive(&self) -> bool {
self.is_alive
}
/// Is connection alive
#[cfg(feature = "atomic_clone")]
pub fn is_alive(&self) -> bool {
self.is_alive.load(Ordering::Relaxed)
}
/// Set compression threshold /// Set compression threshold
pub fn set_compression(&mut self, threshold: Option<usize>) { pub fn set_compression(&mut self, threshold: Option<usize>) {
#[cfg(feature = "atomic_compression")] #[cfg(feature = "atomic_clone")]
self.compression.store( self.compression.store(
match threshold { match threshold {
Some(t) => t, Some(t) => t,
@ -149,7 +192,7 @@ impl<T: Read + Write> MinecraftConnection<T> {
}, },
Ordering::Relaxed, Ordering::Relaxed,
); );
#[cfg(not(feature = "atomic_compression"))] #[cfg(not(feature = "atomic_clone"))]
{ {
self.compression = threshold; self.compression = threshold;
} }
@ -157,7 +200,7 @@ impl<T: Read + Write> MinecraftConnection<T> {
/// Get compression threshold /// Get compression threshold
pub fn compression(&self) -> Option<usize> { pub fn compression(&self) -> Option<usize> {
#[cfg(feature = "atomic_compression")] #[cfg(feature = "atomic_clone")]
{ {
let threshold = self.compression.load(Ordering::Relaxed); let threshold = self.compression.load(Ordering::Relaxed);
if threshold == usize::MAX { if threshold == usize::MAX {
@ -166,7 +209,7 @@ impl<T: Read + Write> MinecraftConnection<T> {
return Some(threshold) return Some(threshold)
} }
} }
#[cfg(not(feature = "atomic_compression"))] #[cfg(not(feature = "atomic_clone"))]
{ {
self.compression self.compression
} }
@ -202,7 +245,11 @@ impl<T: Read + Write> MinecraftConnection<T> {
/// Read [`Packet`](Packet) from connection /// Read [`Packet`](Packet) from connection
pub fn read_packet(&mut self) -> Result<Packet, ProtocolError> { pub fn read_packet(&mut self) -> Result<Packet, ProtocolError> {
#[cfg(feature = "atomic_compression")] if !self.is_alive() {
return Err(ProtocolError::ConnectionClosedError);
}
#[cfg(feature = "atomic_clone")]
{ {
return read_packet_atomic( return read_packet_atomic(
&mut self.stream, &mut self.stream,
@ -211,13 +258,17 @@ impl<T: Read + Write> MinecraftConnection<T> {
) )
} }
#[cfg(not(feature = "atomic_compression"))] #[cfg(not(feature = "atomic_clone"))]
read_packet(&mut self.stream, self.compression) read_packet(&mut self.stream, self.compression)
} }
/// Write [`Packet`](Packet) to connection /// Write [`Packet`](Packet) to connection
pub fn write_packet(&mut self, packet: &Packet) -> Result<(), ProtocolError> { pub fn write_packet(&mut self, packet: &Packet) -> Result<(), ProtocolError> {
#[cfg(feature = "atomic_compression")] if !self.is_alive() {
return Err(ProtocolError::ConnectionClosedError);
}
#[cfg(feature = "atomic_clone")]
{ {
return write_packet_atomic( return write_packet_atomic(
&mut self.stream, &mut self.stream,
@ -228,7 +279,7 @@ impl<T: Read + Write> MinecraftConnection<T> {
) )
} }
#[cfg(not(feature = "atomic_compression"))] #[cfg(not(feature = "atomic_clone"))]
{ {
write_packet(&mut self.stream, self.compression, self.compression_type, packet) write_packet(&mut self.stream, self.compression, self.compression_type, packet)
} }
@ -241,6 +292,7 @@ impl<T: Read + Write + Clone> MinecraftConnection<T> {
MinecraftConnection { MinecraftConnection {
stream: self.stream.clone(), stream: self.stream.clone(),
compression: self.compression.clone(), compression: self.compression.clone(),
is_alive: self.is_alive.clone(),
compression_type: self.compression_type, compression_type: self.compression_type,
} }
} }
@ -351,7 +403,7 @@ pub fn write_packet<T: Write>(
/// usize::MAX means that compression is disabled /// usize::MAX means that compression is disabled
/// ///
/// `ordering` is order how to load atomic /// `ordering` is order how to load atomic
#[cfg(feature = "atomic_compression")] #[cfg(feature = "atomic_clone")]
pub fn read_packet_atomic<T: Read>( pub fn read_packet_atomic<T: Read>(
stream: &mut T, stream: &mut T,
compression: Arc<AtomicUsize>, compression: Arc<AtomicUsize>,
@ -373,7 +425,7 @@ pub fn read_packet_atomic<T: Read>(
/// `compression_type` is integer from 0 (none) to 9 (longest) /// `compression_type` is integer from 0 (none) to 9 (longest)
/// 1 is fast compression /// 1 is fast compression
/// 6 is normal compression /// 6 is normal compression
#[cfg(feature = "atomic_compression")] #[cfg(feature = "atomic_clone")]
pub fn write_packet_atomic<T: Write>( pub fn write_packet_atomic<T: Write>(
stream: &mut T, stream: &mut T,
compression: Arc<AtomicUsize>, compression: Arc<AtomicUsize>,