use std::{ collections::VecDeque, hash::Hash, net::{SocketAddr, TcpStream}, sync::{ atomic::{AtomicBool, Ordering}, Arc, Mutex, RwLock }, thread, time::Duration }; use rust_mc_proto::{MinecraftConnection, Packet}; use uuid::Uuid; use crate::server::{ServerError, context::ServerContext, protocol::ConnectionState}; use super::helper::ProtocolHelper; // Клиент контекст // Должен быть обернут в Arc для передачи между потоками pub struct ClientContext { pub server: Arc, pub addr: SocketAddr, conn: RwLock>, handshake: RwLock>, client_info: RwLock>, player_info: RwLock>, state: RwLock, packet_buffer: Mutex>, read_loop: AtomicBool, is_alive: AtomicBool } // Реализуем сравнение через адрес // IPv4 не должен обманывать, иначе у нас случится коллапс impl PartialEq for ClientContext { fn eq(&self, other: &Self) -> bool { self.addr == other.addr } } impl Hash for ClientContext { fn hash(&self, state: &mut H) { self.addr.hash(state); } } impl Eq for ClientContext {} impl ClientContext { pub fn new(server: Arc, conn: MinecraftConnection) -> ClientContext { ClientContext { server, addr: conn.get_ref().peer_addr().unwrap(), conn: RwLock::new(conn), handshake: RwLock::new(None), client_info: RwLock::new(None), player_info: RwLock::new(None), state: RwLock::new(ConnectionState::Handshake), packet_buffer: Mutex::new(VecDeque::new()), read_loop: AtomicBool::new(false), is_alive: AtomicBool::new(true) } } pub fn set_handshake(self: &Arc, handshake: Handshake) { *self.handshake.write().unwrap() = Some(handshake); } pub fn set_client_info(self: &Arc, client_info: ClientInfo) { *self.client_info.write().unwrap() = Some(client_info); } pub fn set_player_info(self: &Arc, player_info: PlayerInfo) { *self.player_info.write().unwrap() = Some(player_info); } pub fn set_state(self: &Arc, state: ConnectionState) -> Result<(), ServerError> { *self.state.write().unwrap() = state.clone(); for handler in self .server .packet_handlers(|o| o.on_state_priority()) .iter() { handler.on_state(self.clone(), state.clone())?; } Ok(()) } pub fn handshake(self: &Arc) -> Option { self.handshake.read().unwrap().clone() } pub fn client_info(self: &Arc) -> Option { self.client_info.read().unwrap().clone() } pub fn player_info(self: &Arc) -> Option { self.player_info.read().unwrap().clone() } pub fn state(self: &Arc) -> ConnectionState { self.state.read().unwrap().clone() } pub fn write_packet(self: &Arc, packet: &Packet) -> Result<(), ServerError> { let state = self.state(); let mut packet = packet.clone(); let mut cancelled = false; for handler in self .server .packet_handlers(|o| o.on_outcoming_packet_priority()) .iter() { handler.on_outcoming_packet( self.clone(), &mut packet, &mut cancelled, state.clone(), )?; packet.get_mut().set_position(0); } if !cancelled { match self.conn.write().unwrap().write_packet(&packet) { Ok(_) => {}, Err(e) => { self.is_alive.store(false, Ordering::SeqCst); return Err(e.into()); } }; } Ok(()) } pub fn run_read_loop( self: &Arc ) -> Result<(), ServerError> { self.read_loop.store(true, Ordering::SeqCst); let mut conn = self.conn.read().unwrap().try_clone()?; // так можно делать т.к сокет это просто поинтер while self.is_alive() { let mut packet = match conn.read_packet() { Ok(v) => v, Err(e) => { self.is_alive.store(false, Ordering::SeqCst); return Err(e.into()); } }; let mut cancelled = false; let state = self.state(); for handler in self .server .packet_handlers(|o| o.on_incoming_packet_priority()) .iter() { handler.on_incoming_packet( self.clone(), &mut packet, &mut cancelled, state.clone(), )?; packet.get_mut().set_position(0); } if !cancelled { self.packet_buffer.lock().unwrap().push_back(packet); } } Ok(()) } pub fn read_any_packet(self: &Arc) -> Result { if self.read_loop.load(Ordering::SeqCst) { loop { if let Some(packet) = self.packet_buffer.lock().unwrap().pop_front() { return Ok(packet); } thread::sleep(Duration::from_millis(10)); } } else { let state = self.state(); loop { let mut packet = match self.conn.write().unwrap().read_packet() { Ok(v) => v, Err(e) => { self.is_alive.store(false, Ordering::SeqCst); return Err(e.into()); } }; let mut cancelled = false; for handler in self .server .packet_handlers(|o| o.on_incoming_packet_priority()) .iter() { handler.on_incoming_packet( self.clone(), &mut packet, &mut cancelled, state.clone(), )?; packet.get_mut().set_position(0); } if !cancelled { break Ok(packet); } } } } pub fn read_packet(self: &Arc, id: u8) -> Result { if self.read_loop.load(Ordering::SeqCst) { loop { { let mut locked = self.packet_buffer.lock().unwrap(); for (i, packet) in locked.clone().iter().enumerate() { if packet.id() == id { locked.remove(i); return Ok(packet.clone()); } } } thread::sleep(Duration::from_millis(10)); } } else { let packet = match self.read_any_packet() { Ok(v) => v, Err(e) => { self.is_alive.store(false, Ordering::SeqCst); return Err(e); } }; if packet.id() != id { Err(ServerError::UnexpectedPacket(packet.id())) } else { Ok(packet) } } } pub fn close(self: &Arc) { self.conn.write().unwrap().close(); } pub fn set_compression(self: &Arc, threshold: Option) { self.conn.write().unwrap().set_compression(threshold); } pub fn is_alive(self: &Arc) -> bool { self.is_alive.load(Ordering::SeqCst) } pub fn protocol_helper(self: &Arc) -> ProtocolHelper { ProtocolHelper::new(self.clone()) } } #[derive(Clone)] pub struct Handshake { pub protocol_version: i32, pub server_address: String, pub server_port: u16, } #[derive(Clone)] pub struct ClientInfo { pub brand: String, pub locale: String, pub view_distance: i8, pub chat_mode: i32, pub chat_colors: bool, pub displayed_skin_parts: u8, pub main_hand: i32, pub enable_text_filtering: bool, pub allow_server_listings: bool, pub particle_status: i32, } #[derive(Clone)] pub struct PlayerInfo { pub name: String, pub uuid: Uuid, }