From caa08c9c25b0b73ffa10fbc51aaef90d249bffcd Mon Sep 17 00:00:00 2001 From: MeexReay Date: Mon, 5 May 2025 00:17:17 +0300 Subject: [PATCH] chunk data packet!!! --- sniff-packets/src/main.rs | 19 ++++ src/main.rs | 2 +- src/server/mod.rs | 3 +- src/server/player/context.rs | 114 +++++++++++++------- src/server/protocol/handler.rs | 6 +- src/server/protocol/play.rs | 192 ++++++++++++++++++++++++++++----- 6 files changed, 260 insertions(+), 76 deletions(-) diff --git a/sniff-packets/src/main.rs b/sniff-packets/src/main.rs index 67f3194..0a84c2d 100644 --- a/sniff-packets/src/main.rs +++ b/sniff-packets/src/main.rs @@ -175,5 +175,24 @@ fn main() -> Result<(), ProtocolError> { fs::write("registry-data.bin", &data).unwrap(); + let packet = conn.read_packet()?; + conn.write_packet(&packet)?; // finish conf + + loop { + let mut packet = conn.read_packet()?; + + if packet.id() == 0x41 { + let id = packet.read_varint()?; + + conn.write_packet(&Packet::build(0x00, |packet| packet.write_varint(id))?)?; + } + + if packet.id() == 0x27 { + // here you can read "Chunk Data and Update Light" packet + + break; + } + } + Ok(()) } diff --git a/src/main.rs b/src/main.rs index 1e650f2..e34a01b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -156,7 +156,7 @@ fn main() { let mut server = ServerContext::new(config); server.add_listener(Box::new(ExampleListener)); // Добавляем пример листенера - // server.add_packet_handler(Box::new(ExamplePacketHandler)); // Добавляем пример пакет хандлера + server.add_packet_handler(Box::new(ExamplePacketHandler)); // Добавляем пример пакет хандлера // Бетонируем сервер контекст от изменений let server = Arc::new(server); diff --git a/src/server/mod.rs b/src/server/mod.rs index fed3abd..58414ec 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -17,7 +17,7 @@ pub mod protocol; // Ошибки сервера #[derive(Debug)] pub enum ServerError { - UnexpectedPacket, // Неожиданный пакет + UnexpectedPacket(u8), // Неожиданный пакет Protocol(ProtocolError), // Ошибка в протоколе при работе с rust_mc_proto ConnectionClosed, // Соединение закрыто, единственная ошибка которая не логируется у handle_connection SerTextComponent, // Ошибка при сериализации текст-компонента @@ -25,7 +25,6 @@ pub enum ServerError { SerNbt, // Ошибка при сериализации nbt DeNbt, // Ошибка при десериализации nbt UnexpectedState, // Указывает на то что этот пакет не может быть отправлен в данном режиме (в основном через ProtocolHelper) - ReadLoopMode, // Ошибка когда вызывается read_any_packet во время работы read_loop Other(String), // Другая ошибка, либо очень специфичная, либо хз, лучше не использовать и создавать новое поле ошибки } diff --git a/src/server/player/context.rs b/src/server/player/context.rs index 4b52b0c..2a1c85d 100644 --- a/src/server/player/context.rs +++ b/src/server/player/context.rs @@ -1,15 +1,10 @@ use std::{ - hash::Hash, - net::{SocketAddr, TcpStream}, - sync::{ - Arc, RwLock, - atomic::{AtomicBool, Ordering}, - mpsc::{self, Sender}, - }, + collections::VecDeque, hash::Hash, net::{SocketAddr, TcpStream}, sync::{ + atomic::{AtomicBool, Ordering}, Arc, Mutex, RwLock + }, thread, time::Duration }; -use dashmap::DashMap; -use rust_mc_proto::{MinecraftConnection, Packet}; +use rust_mc_proto::{MinecraftConnection, Packet, ProtocolError}; use uuid::Uuid; use crate::server::{ServerError, context::ServerContext, protocol::ConnectionState}; @@ -26,8 +21,9 @@ pub struct ClientContext { client_info: RwLock>, player_info: RwLock>, state: RwLock, - packet_waiters: DashMap)>, + packet_buffer: Mutex>, read_loop: AtomicBool, + is_alive: AtomicBool } // Реализуем сравнение через адрес @@ -56,8 +52,9 @@ impl ClientContext { client_info: RwLock::new(None), player_info: RwLock::new(None), state: RwLock::new(ConnectionState::Handshake), - packet_waiters: DashMap::new(), + packet_buffer: Mutex::new(VecDeque::new()), read_loop: AtomicBool::new(false), + is_alive: AtomicBool::new(true) } } @@ -121,22 +118,40 @@ impl ClientContext { packet.get_mut().set_position(0); } if !cancelled { - self.conn.write().unwrap().write_packet(&packet)?; + match self.conn.write().unwrap().write_packet(&packet) { + Ok(_) => {}, + Err(ProtocolError::ConnectionClosedError) => { + self.is_alive.store(false, Ordering::SeqCst); + return Err(ServerError::ConnectionClosed); + }, + Err(e) => { + return Err(e.into()); + } + }; } Ok(()) } pub fn run_read_loop( - self: &Arc, - callback: impl Fn(Packet) -> Result<(), ServerError>, + self: &Arc ) -> Result<(), ServerError> { - let state = self.state(); + self.read_loop.store(true, Ordering::SeqCst); let mut conn = self.conn.read().unwrap().try_clone()?; // так можно делать т.к сокет это просто поинтер loop { - let mut packet = conn.read_packet()?; + let mut packet = match conn.read_packet() { + Ok(v) => v, + Err(ProtocolError::ConnectionClosedError) => { + self.is_alive.store(false, Ordering::SeqCst); + return Err(ServerError::ConnectionClosed); + }, + Err(e) => { + return Err(e.into()); + } + }; let mut cancelled = false; + let state = self.state(); for handler in self .server .packet_handlers(|o| o.on_incoming_packet_priority()) @@ -151,31 +166,33 @@ impl ClientContext { packet.get_mut().set_position(0); } if !cancelled { - let mut skip = false; - for (_, (id, sender)) in self.packet_waiters.clone() { - if id == packet.id() { - sender.send(packet.clone()).unwrap(); - skip = true; - break; - } - } - if !skip { - callback(packet.clone())?; - } + self.packet_buffer.lock().unwrap().push_back(packet); } } } pub fn read_any_packet(self: &Arc) -> Result { if self.read_loop.load(Ordering::SeqCst) { - Err(ServerError::ReadLoopMode) + loop { + if let Some(packet) = self.packet_buffer.lock().unwrap().pop_front() { + return Ok(packet); + } + thread::sleep(Duration::from_millis(10)); + } } else { let state = self.state(); - let mut conn = self.conn.read().unwrap().try_clone()?; // так можно делать т.к сокет это просто поинтер - loop { - let mut packet = conn.read_packet()?; + let mut packet = match self.conn.write().unwrap().read_packet() { + Ok(v) => v, + Err(ProtocolError::ConnectionClosedError) => { + self.is_alive.store(false, Ordering::SeqCst); + return Err(ServerError::ConnectionClosed); + }, + Err(e) => { + return Err(e.into()); + } + }; let mut cancelled = false; for handler in self .server @@ -199,22 +216,32 @@ impl ClientContext { pub fn read_packet(self: &Arc, id: u8) -> Result { if self.read_loop.load(Ordering::SeqCst) { - let (tx, rx) = mpsc::channel::(); - - let key: usize = (&tx as *const Sender).addr(); - self.packet_waiters.insert(key, (id, tx)); - loop { - if let Ok(packet) = rx.recv() { - self.packet_waiters.remove(&key); - break Ok(packet); + { + let mut locked = self.packet_buffer.lock().unwrap(); + for (i, packet) in locked.clone().iter().enumerate() { + if packet.id() == id { + locked.remove(i); + return Ok(packet.clone()); + } + } } + thread::sleep(Duration::from_millis(10)); } } else { - let packet = self.read_any_packet()?; + let packet = match self.read_any_packet() { + Ok(v) => v, + Err(ServerError::ConnectionClosed) => { + self.is_alive.store(false, Ordering::SeqCst); + return Err(ServerError::ConnectionClosed); + }, + Err(e) => { + return Err(e); + } + }; if packet.id() != id { - Err(ServerError::UnexpectedPacket) + Err(ServerError::UnexpectedPacket(packet.id())) } else { Ok(packet) } @@ -224,10 +251,15 @@ impl ClientContext { pub fn close(self: &Arc) { self.conn.write().unwrap().close(); } + pub fn set_compression(self: &Arc, threshold: Option) { self.conn.write().unwrap().set_compression(threshold); } + pub fn is_alive(self: &Arc) -> bool { + self.is_alive.load(Ordering::SeqCst) + } + pub fn protocol_helper(self: &Arc) -> ProtocolHelper { ProtocolHelper::new(self.clone()) } diff --git a/src/server/protocol/handler.rs b/src/server/protocol/handler.rs index 0fdbab2..d532e33 100644 --- a/src/server/protocol/handler.rs +++ b/src/server/protocol/handler.rs @@ -75,8 +75,8 @@ pub fn handle_connection( packet.write_long(timestamp)?; client.write_packet(&packet)?; } - _ => { - return Err(ServerError::UnexpectedPacket); + id => { + return Err(ServerError::UnexpectedPacket(id)); } } } @@ -181,7 +181,7 @@ pub fn handle_connection( } _ => { // Тип подключения не рукопожатный - return Err(ServerError::UnexpectedPacket); + return Err(ServerError::UnexpectedState); } } diff --git a/src/server/protocol/play.rs b/src/server/protocol/play.rs index 1677063..92a719e 100644 --- a/src/server/protocol/play.rs +++ b/src/server/protocol/play.rs @@ -1,8 +1,8 @@ -use std::{io::Cursor, sync::Arc, thread}; +use std::{io::Cursor, sync::Arc, thread, time::{Duration, SystemTime, UNIX_EPOCH}}; use rust_mc_proto::{DataWriter, Packet, read_packet}; -use crate::server::{data::text_component::TextComponent, player::context::ClientContext, ServerError}; +use crate::server::{player::context::ClientContext, ServerError}; use super::id::*; @@ -12,9 +12,7 @@ pub fn send_update_tags(client: Arc) -> Result<(), ServerError> { client.write_packet(&Packet::from_bytes( clientbound::configuration::UPDATE_TAGS, include_bytes!("update-tags.bin"), - ))?; - - Ok(()) + )) } pub fn send_registry_data(client: Arc) -> Result<(), ServerError> { @@ -49,16 +47,10 @@ pub fn handle_configuration_state( client.read_packet(serverbound::configuration::KNOWN_PACKS)?; send_registry_data(client.clone())?; - send_update_tags(client.clone())?; - - Ok(()) + send_update_tags(client.clone()) } -// Отдельная функция для работы с самой игрой -pub fn handle_play_state( - client: Arc, // Контекст клиента -) -> Result<(), ServerError> { - +pub fn send_login(client: Arc) -> Result<(), ServerError> { // Отправка пакета Login let mut packet = Packet::empty(clientbound::play::LOGIN); @@ -89,33 +81,175 @@ pub fn handle_play_state( packet.write_boolean(false)?; // Enforces Secure Chat + client.write_packet(&packet) +} + +pub fn send_game_event(client: Arc, event: u8, value: f32) -> Result<(), ServerError> { + let mut packet = Packet::empty(clientbound::play::GAME_EVENT); + + packet.write_byte(event)?; + packet.write_float(value)?; + + client.write_packet(&packet) +} + +pub fn sync_player_pos( + client: Arc, + x: f64, + y: f64, + z: f64, + vel_x: f64, + vel_y: f64, + vel_z: f64, + yaw: f32, + pitch: f32, + flags: i32 +) -> Result<(), ServerError> { + let timestamp = (SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() & 0xFFFFFFFF) as i32; + + let mut packet = Packet::empty(clientbound::play::SYNCHRONIZE_PLAYER_POSITION); + + packet.write_varint(timestamp)?; + packet.write_double(x)?; + packet.write_double(y)?; + packet.write_double(z)?; + packet.write_double(vel_x)?; + packet.write_double(vel_y)?; + packet.write_double(vel_z)?; + packet.write_float(yaw)?; + packet.write_float(pitch)?; + packet.write_int(flags)?; + client.write_packet(&packet)?; + client.read_packet(serverbound::play::CONFIRM_TELEPORTATION)?; + + Ok(()) +} + +pub fn set_center_chunk(client: Arc, x: i32, z: i32) -> Result<(), ServerError> { + let mut packet = Packet::empty(clientbound::play::SET_CENTER_CHUNK); + + packet.write_varint(x)?; + packet.write_varint(z)?; + + client.write_packet(&packet) +} + +pub fn send_example_chunk(client: Arc, x: i32, z: i32) -> Result<(), ServerError> { + let mut packet = Packet::empty(clientbound::play::CHUNK_DATA_AND_UPDATE_LIGHT); + + packet.write_int(x)?; + packet.write_int(z)?; + + // heightmap + + packet.write_varint(1)?; // heightmaps count + packet.write_varint(0)?; // MOTION_BLOCKING + packet.write_varint(256)?; // Length of the following long array (16 * 16 = 256) + for _ in 0..256 { + packet.write_long(0)?; // height - 0 + } + + // sending chunk data + + let mut chunk_data = Vec::new(); + + // we want to fill the area from -64 to 0, so it will be 4 chunk sections + + for _ in 0..4 { + chunk_data.write_short(4096)?; // non-air blocks count, 16 * 16 * 16 = 4096 stone blocks + + // blocks paletted container + chunk_data.write_byte(0)?; // Bits Per Entry, use Single valued palette format + chunk_data.write_varint(1)?; // block state id in the registry (1 for stone) + + // biomes palleted container + chunk_data.write_byte(0)?; // Bits Per Entry, use Single valued palette format + chunk_data.write_varint(27)?; // biome id in the registry + } + + // air chunk sections + + for _ in 0..20 { + chunk_data.write_short(0)?; // non-air blocks count, 0 + + // blocks paletted container + chunk_data.write_byte(0)?; // Bits Per Entry, use Single valued palette format + chunk_data.write_varint(0)?; // block state id in the registry (0 for air) + + // biomes palleted container + chunk_data.write_byte(0)?; // Bits Per Entry, use Single valued palette format + chunk_data.write_varint(27)?; // biome id in the registry + } + + packet.write_usize_varint(chunk_data.len())?; + packet.write_bytes(&chunk_data)?; + + packet.write_byte(0)?; + + + // light data + + packet.write_byte(0)?; + packet.write_byte(0)?; + packet.write_byte(0)?; + packet.write_byte(0)?; + packet.write_byte(0)?; + packet.write_byte(0)?; + + + client.write_packet(&packet)?; + + Ok(()) +} + +pub fn send_keep_alive(client: Arc) -> Result<(), ServerError> { + let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() as i64; + + let mut packet = Packet::empty(clientbound::play::KEEP_ALIVE); + packet.write_long(timestamp)?; + client.write_packet(&packet)?; + + client.read_packet(serverbound::play::KEEP_ALIVE)?; + + Ok(()) +} + +// Отдельная функция для работы с самой игрой +pub fn handle_play_state( + client: Arc, // Контекст клиента +) -> Result<(), ServerError> { + thread::spawn({ let client = client.clone(); - + move || { - let _ = client.clone().run_read_loop({ - let client = client.clone(); - - move |packet| { - // TODO: Сделать базовые приколы типа keep-alive и другое - - Ok(()) - } - }); + let _ = client.run_read_loop(); client.close(); } }); - // Отключение игрока с сообщением - client.protocol_helper().disconnect(TextComponent::rainbow( - "server is in developement suka".to_string(), - ))?; + send_login(client.clone())?; + sync_player_pos(client.clone(), 8.0, 0.0, 8.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0)?; + send_game_event(client.clone(), 13, 0.0)?; // 13 - Start waiting for level chunks + set_center_chunk(client.clone(), 0, 0)?; + send_example_chunk(client.clone(), 0, 0)?; - // loop {} + let mut ticks_alive = 0u64; - // TODO: Сделать отправку чанков + while client.is_alive() { + if ticks_alive % 200 == 0 { // 10 secs timer + send_keep_alive(client.clone())?; + } + + if ticks_alive % 20 == 0 { // 1 sec timer + // do something + } + + thread::sleep(Duration::from_millis(50)); // 1 tick + ticks_alive += 1; + } Ok(()) }