From bb20ea6e1db3cb67cb331a1c8b4d0e8a42be6500 Mon Sep 17 00:00:00 2001 From: MeexReay Date: Sun, 4 May 2025 17:10:24 +0300 Subject: [PATCH] read_loop --- src/server/mod.rs | 5 +- src/server/player/context.rs | 80 ++++++++++++++++--- src/server/player/helper.rs | 4 +- src/server/protocol/handler.rs | 17 ++-- src/server/protocol/play.rs | 138 +++++++++++++++++---------------- 5 files changed, 157 insertions(+), 87 deletions(-) diff --git a/src/server/mod.rs b/src/server/mod.rs index 4fd5170..fed3abd 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -22,9 +22,10 @@ pub enum ServerError { ConnectionClosed, // Соединение закрыто, единственная ошибка которая не логируется у handle_connection SerTextComponent, // Ошибка при сериализации текст-компонента DeTextComponent, // Ошибка при десериализации текст-компонента - SerNbt, // Ошибка при сериализации nbt - DeNbt, // Ошибка при десериализации nbt + SerNbt, // Ошибка при сериализации nbt + DeNbt, // Ошибка при десериализации nbt UnexpectedState, // Указывает на то что этот пакет не может быть отправлен в данном режиме (в основном через ProtocolHelper) + ReadLoopMode, // Ошибка когда вызывается read_any_packet во время работы read_loop Other(String), // Другая ошибка, либо очень специфичная, либо хз, лучше не использовать и создавать новое поле ошибки } diff --git a/src/server/player/context.rs b/src/server/player/context.rs index 05bdbb1..c53175d 100644 --- a/src/server/player/context.rs +++ b/src/server/player/context.rs @@ -1,9 +1,8 @@ use std::{ - hash::Hash, - net::{SocketAddr, TcpStream}, - sync::{Arc, RwLock}, + hash::Hash, net::{SocketAddr, TcpStream}, sync::{atomic::{AtomicBool, Ordering}, mpsc::{self, Sender}, Arc, RwLock} }; +use dashmap::DashMap; use rust_mc_proto::{MinecraftConnection, Packet}; use uuid::Uuid; @@ -21,6 +20,8 @@ pub struct ClientContext { client_info: RwLock>, player_info: RwLock>, state: RwLock, + packet_waiters: DashMap)>, + read_loop: AtomicBool } // Реализуем сравнение через адрес @@ -39,6 +40,8 @@ impl Hash for ClientContext { impl Eq for ClientContext {} + + impl ClientContext { pub fn new(server: Arc, conn: MinecraftConnection) -> ClientContext { ClientContext { @@ -49,6 +52,8 @@ impl ClientContext { client_info: RwLock::new(None), player_info: RwLock::new(None), state: RwLock::new(ConnectionState::Handshake), + packet_waiters: DashMap::new(), + read_loop: AtomicBool::new(false) } } @@ -117,7 +122,7 @@ impl ClientContext { Ok(()) } - pub fn read_any_packet(self: &Arc) -> Result { + pub fn run_read_loop(self: &Arc, callback: impl Fn(Packet) -> Result<(), ServerError>) -> Result<(), ServerError> { let state = self.state(); let mut conn = self.conn.read().unwrap().try_clone()?; // так можно делать т.к сокет это просто поинтер @@ -139,24 +144,79 @@ impl ClientContext { packet.get_mut().set_position(0); } if !cancelled { - break Ok(packet); + let mut skip = false; + for (_, (id, sender)) in self.packet_waiters.clone() { + if id == packet.id() { + sender.send(packet.clone()).unwrap(); + skip = true; + break; + } + } + if !skip { + callback(packet.clone())?; + } + } + } + } + + pub fn read_any_packet(self: &Arc) -> Result { + if self.read_loop.load(Ordering::SeqCst) { + Err(ServerError::ReadLoopMode) + } else { + let state = self.state(); + + let mut conn = self.conn.read().unwrap().try_clone()?; // так можно делать т.к сокет это просто поинтер + + loop { + let mut packet = conn.read_packet()?; + let mut cancelled = false; + for handler in self + .server + .packet_handlers(|o| o.on_incoming_packet_priority()) + .iter() + { + handler.on_incoming_packet( + self.clone(), + &mut packet, + &mut cancelled, + state.clone(), + )?; + packet.get_mut().set_position(0); + } + if !cancelled { + break Ok(packet); + } } } } pub fn read_packet(self: &Arc, id: u8) -> Result { - let packet = self.read_any_packet()?; - if packet.id() != id { - Err(ServerError::UnexpectedPacket) + if self.read_loop.load(Ordering::SeqCst) { + let (tx, rx) = mpsc::channel::(); + + let key: usize = (&tx as *const Sender).addr(); + self.packet_waiters.insert(key, (id, tx)); + + loop { + if let Ok(packet) = rx.recv() { + self.packet_waiters.remove(&key); + break Ok(packet) + } + } } else { - Ok(packet) + let packet = self.read_any_packet()?; + + if packet.id() != id { + Err(ServerError::UnexpectedPacket) + } else { + Ok(packet) + } } } pub fn close(self: &Arc) { self.conn.write().unwrap().close(); } - pub fn set_compression(self: &Arc, threshold: Option) { self.conn.write().unwrap().set_compression(threshold); } diff --git a/src/server/player/helper.rs b/src/server/player/helper.rs index 5b86c88..540290a 100644 --- a/src/server/player/helper.rs +++ b/src/server/player/helper.rs @@ -53,7 +53,7 @@ impl ProtocolHelper { match self.state { ConnectionState::Configuration => clientbound::configuration::STORE_COOKIE, ConnectionState::Play => clientbound::play::STORE_COOKIE, - _ => { return Err(ServerError::UnexpectedState) }, + _ => return Err(ServerError::UnexpectedState), }, |p| { p.write_string(id)?; @@ -159,7 +159,7 @@ impl ProtocolHelper { }; Ok(data) - }, + } ConnectionState::Play => { let mut packet = Packet::empty(clientbound::play::COOKIE_REQUEST); packet.write_string(id)?; diff --git a/src/server/protocol/handler.rs b/src/server/protocol/handler.rs index a724bb3..0fdbab2 100644 --- a/src/server/protocol/handler.rs +++ b/src/server/protocol/handler.rs @@ -8,7 +8,11 @@ use rust_mc_proto::{DataReader, DataWriter, Packet}; use crate::trigger_event; -use super::{id::*, play::{handle_configuration_state, handle_play_state}, ConnectionState}; +use super::{ + ConnectionState, + id::*, + play::{handle_configuration_state, handle_play_state}, +}; pub fn handle_connection( client: Arc, // Контекст клиента @@ -157,10 +161,13 @@ pub fn handle_connection( particle_status, }); - client.write_packet(&Packet::build(clientbound::configuration::PLUGIN_MESSAGE, |p| { - p.write_string("minecraft:brand")?; - p.write_string("rust_minecraft_server") - })?)?; + client.write_packet(&Packet::build( + clientbound::configuration::PLUGIN_MESSAGE, + |p| { + p.write_string("minecraft:brand")?; + p.write_string("rust_minecraft_server") + }, + )?)?; handle_configuration_state(client.clone())?; diff --git a/src/server/protocol/play.rs b/src/server/protocol/play.rs index 7a8a88d..ea230f7 100644 --- a/src/server/protocol/play.rs +++ b/src/server/protocol/play.rs @@ -1,32 +1,28 @@ -use std::{io::Cursor, sync::Arc}; +use std::{io::Cursor, sync::Arc, thread}; -use rust_mc_proto::{read_packet, DataWriter, Packet}; +use log::debug; +use rust_mc_proto::{DataWriter, Packet, read_packet}; -use crate::server::{ - player::context::ClientContext, ServerError -}; +use crate::server::{ServerError, player::context::ClientContext}; use super::id::*; -pub fn send_update_tags( - client: Arc, -) -> Result<(), ServerError> { - +pub fn send_update_tags(client: Arc) -> Result<(), ServerError> { // rewrite this hardcode bullshit - client.write_packet(&Packet::from_bytes(clientbound::configuration::UPDATE_TAGS, include_bytes!("update-tags.bin")))?; + client.write_packet(&Packet::from_bytes( + clientbound::configuration::UPDATE_TAGS, + include_bytes!("update-tags.bin"), + ))?; Ok(()) } -pub fn send_registry_data( - client: Arc, -) -> Result<(), ServerError> { - +pub fn send_registry_data(client: Arc) -> Result<(), ServerError> { // rewrite this hardcode bullshit let mut registry_data = Cursor::new(include_bytes!("registry-data.bin")); - + while let Ok(mut packet) = read_packet(&mut registry_data, None) { packet.set_id(clientbound::configuration::REGISTRY_DATA); client.write_packet(&packet)?; @@ -35,31 +31,23 @@ pub fn send_registry_data( Ok(()) } -pub fn process_known_packs( - client: Arc -) -> Result<(), ServerError> { - let mut packet = Packet::empty(clientbound::configuration::KNOWN_PACKS); - packet.write_varint(1)?; - packet.write_string("minecraft")?; - packet.write_string("core")?; - packet.write_string("1.21.5")?; - client.write_packet(&packet)?; - - client.read_packet(serverbound::configuration::KNOWN_PACKS)?; - - Ok(()) -} - pub fn handle_configuration_state( client: Arc, // Контекст клиента ) -> Result<(), ServerError> { - let mut packet = Packet::empty(clientbound::configuration::FEATURE_FLAGS); - packet.write_varint(1)?; - packet.write_string("minecraft:vanilla")?; - client.write_packet(&packet)?; + packet.write_varint(1)?; + packet.write_string("minecraft:vanilla")?; + client.write_packet(&packet)?; + + let mut packet = Packet::empty(clientbound::configuration::KNOWN_PACKS); + packet.write_varint(1)?; + packet.write_string("minecraft")?; + packet.write_string("core")?; + packet.write_string("1.21.5")?; + client.write_packet(&packet)?; + + client.read_packet(serverbound::configuration::KNOWN_PACKS)?; - process_known_packs(client.clone())?; send_registry_data(client.clone())?; send_update_tags(client.clone())?; @@ -75,43 +63,57 @@ pub fn handle_play_state( // "server is in developement suka".to_string(), // ))?; - let mut packet = Packet::empty(clientbound::play::LOGIN); - packet.write_int(0)?; // Entity ID - packet.write_boolean(false)?; // Is hardcore - packet.write_varint(4)?; // Dimension Names - packet.write_string("minecraft:overworld")?; - packet.write_string("minecraft:nether")?; - packet.write_string("minecraft:the_end")?; - packet.write_string("minecraft:overworld_caves")?; - packet.write_varint(0)?; // Max Players - packet.write_varint(8)?; // View Distance - packet.write_varint(5)?; // Simulation Distance - packet.write_boolean(false)?; // Reduced Debug Info - packet.write_boolean(true)?; // Enable respawn screen - packet.write_boolean(false)?; // Do limited crafting + let mut packet = Packet::empty(clientbound::play::LOGIN); - packet.write_varint(0)?; // Dimension Type - packet.write_string("minecraft:overworld")?; // Dimension Name - packet.write_long(0x0f38f26ad09c3e20)?; // Hashed seed - packet.write_byte(0)?; // Game mode - packet.write_signed_byte(-1)?; // Previous Game mode - packet.write_boolean(false)?; // Is Debug - packet.write_boolean(true)?; // Is Flat - packet.write_boolean(false)?; // Has death location - packet.write_varint(20)?; // Portal cooldown - packet.write_varint(60)?; // Sea level + packet.write_int(0)?; // Entity ID + packet.write_boolean(false)?; // Is hardcore + packet.write_varint(4)?; // Dimension Names + packet.write_string("minecraft:overworld")?; + packet.write_string("minecraft:nether")?; + packet.write_string("minecraft:the_end")?; + packet.write_string("minecraft:overworld_caves")?; + packet.write_varint(0)?; // Max Players + packet.write_varint(8)?; // View Distance + packet.write_varint(5)?; // Simulation Distance + packet.write_boolean(false)?; // Reduced Debug Info + packet.write_boolean(true)?; // Enable respawn screen + packet.write_boolean(false)?; // Do limited crafting - packet.write_boolean(false)?; // Enforces Secure Chat - client.write_packet(&packet)?; + packet.write_varint(0)?; // Dimension Type + packet.write_string("minecraft:overworld")?; // Dimension Name + packet.write_long(0x0f38f26ad09c3e20)?; // Hashed seed + packet.write_byte(0)?; // Game mode + packet.write_signed_byte(-1)?; // Previous Game mode + packet.write_boolean(false)?; // Is Debug + packet.write_boolean(true)?; // Is Flat + packet.write_boolean(false)?; // Has death location + packet.write_varint(20)?; // Portal cooldown + packet.write_varint(60)?; // Sea level - loop {} - - // TODO: отдельный поток для чтения пакетов + packet.write_boolean(false)?; // Enforces Secure Chat - // TODO: переработка функции read_packet так чтобы когда - // делаешь read_any_packet, пакет отправлялся сначала всем другим - // функциям read_packet которые настроены на этот айди пакета, - // а потом если таковых не осталось пакет возвращался + client.write_packet(&packet)?; + + thread::spawn({ + let client = client.clone(); + + move || { + let _ = client.clone().run_read_loop({ + let client = client.clone(); + + move |packet| { + // Сделать базовые приколы типа keep-alive и другое + + Ok(()) + } + }); + client.close(); + } + }); + + loop {} + + // Сделать отправку чанков Ok(()) }