read_loop

This commit is contained in:
MeexReay 2025-05-04 17:10:24 +03:00
parent c19d3f1947
commit bb20ea6e1d
5 changed files with 157 additions and 87 deletions

View File

@ -22,9 +22,10 @@ pub enum ServerError {
ConnectionClosed, // Соединение закрыто, единственная ошибка которая не логируется у handle_connection ConnectionClosed, // Соединение закрыто, единственная ошибка которая не логируется у handle_connection
SerTextComponent, // Ошибка при сериализации текст-компонента SerTextComponent, // Ошибка при сериализации текст-компонента
DeTextComponent, // Ошибка при десериализации текст-компонента DeTextComponent, // Ошибка при десериализации текст-компонента
SerNbt, // Ошибка при сериализации nbt SerNbt, // Ошибка при сериализации nbt
DeNbt, // Ошибка при десериализации nbt DeNbt, // Ошибка при десериализации nbt
UnexpectedState, // Указывает на то что этот пакет не может быть отправлен в данном режиме (в основном через ProtocolHelper) UnexpectedState, // Указывает на то что этот пакет не может быть отправлен в данном режиме (в основном через ProtocolHelper)
ReadLoopMode, // Ошибка когда вызывается read_any_packet во время работы read_loop
Other(String), // Другая ошибка, либо очень специфичная, либо хз, лучше не использовать и создавать новое поле ошибки Other(String), // Другая ошибка, либо очень специфичная, либо хз, лучше не использовать и создавать новое поле ошибки
} }

View File

@ -1,9 +1,8 @@
use std::{ use std::{
hash::Hash, hash::Hash, net::{SocketAddr, TcpStream}, sync::{atomic::{AtomicBool, Ordering}, mpsc::{self, Sender}, Arc, RwLock}
net::{SocketAddr, TcpStream},
sync::{Arc, RwLock},
}; };
use dashmap::DashMap;
use rust_mc_proto::{MinecraftConnection, Packet}; use rust_mc_proto::{MinecraftConnection, Packet};
use uuid::Uuid; use uuid::Uuid;
@ -21,6 +20,8 @@ pub struct ClientContext {
client_info: RwLock<Option<ClientInfo>>, client_info: RwLock<Option<ClientInfo>>,
player_info: RwLock<Option<PlayerInfo>>, player_info: RwLock<Option<PlayerInfo>>,
state: RwLock<ConnectionState>, state: RwLock<ConnectionState>,
packet_waiters: DashMap<usize, (u8, Sender<Packet>)>,
read_loop: AtomicBool
} }
// Реализуем сравнение через адрес // Реализуем сравнение через адрес
@ -39,6 +40,8 @@ impl Hash for ClientContext {
impl Eq for ClientContext {} impl Eq for ClientContext {}
impl ClientContext { impl ClientContext {
pub fn new(server: Arc<ServerContext>, conn: MinecraftConnection<TcpStream>) -> ClientContext { pub fn new(server: Arc<ServerContext>, conn: MinecraftConnection<TcpStream>) -> ClientContext {
ClientContext { ClientContext {
@ -49,6 +52,8 @@ impl ClientContext {
client_info: RwLock::new(None), client_info: RwLock::new(None),
player_info: RwLock::new(None), player_info: RwLock::new(None),
state: RwLock::new(ConnectionState::Handshake), state: RwLock::new(ConnectionState::Handshake),
packet_waiters: DashMap::new(),
read_loop: AtomicBool::new(false)
} }
} }
@ -117,7 +122,7 @@ impl ClientContext {
Ok(()) Ok(())
} }
pub fn read_any_packet(self: &Arc<Self>) -> Result<Packet, ServerError> { pub fn run_read_loop(self: &Arc<Self>, callback: impl Fn(Packet) -> Result<(), ServerError>) -> Result<(), ServerError> {
let state = self.state(); let state = self.state();
let mut conn = self.conn.read().unwrap().try_clone()?; // так можно делать т.к сокет это просто поинтер let mut conn = self.conn.read().unwrap().try_clone()?; // так можно делать т.к сокет это просто поинтер
@ -139,24 +144,79 @@ impl ClientContext {
packet.get_mut().set_position(0); packet.get_mut().set_position(0);
} }
if !cancelled { if !cancelled {
break Ok(packet); let mut skip = false;
for (_, (id, sender)) in self.packet_waiters.clone() {
if id == packet.id() {
sender.send(packet.clone()).unwrap();
skip = true;
break;
}
}
if !skip {
callback(packet.clone())?;
}
}
}
}
pub fn read_any_packet(self: &Arc<Self>) -> Result<Packet, ServerError> {
if self.read_loop.load(Ordering::SeqCst) {
Err(ServerError::ReadLoopMode)
} else {
let state = self.state();
let mut conn = self.conn.read().unwrap().try_clone()?; // так можно делать т.к сокет это просто поинтер
loop {
let mut packet = conn.read_packet()?;
let mut cancelled = false;
for handler in self
.server
.packet_handlers(|o| o.on_incoming_packet_priority())
.iter()
{
handler.on_incoming_packet(
self.clone(),
&mut packet,
&mut cancelled,
state.clone(),
)?;
packet.get_mut().set_position(0);
}
if !cancelled {
break Ok(packet);
}
} }
} }
} }
pub fn read_packet(self: &Arc<Self>, id: u8) -> Result<Packet, ServerError> { pub fn read_packet(self: &Arc<Self>, id: u8) -> Result<Packet, ServerError> {
let packet = self.read_any_packet()?; if self.read_loop.load(Ordering::SeqCst) {
if packet.id() != id { let (tx, rx) = mpsc::channel::<Packet>();
Err(ServerError::UnexpectedPacket)
let key: usize = (&tx as *const Sender<Packet>).addr();
self.packet_waiters.insert(key, (id, tx));
loop {
if let Ok(packet) = rx.recv() {
self.packet_waiters.remove(&key);
break Ok(packet)
}
}
} else { } else {
Ok(packet) let packet = self.read_any_packet()?;
if packet.id() != id {
Err(ServerError::UnexpectedPacket)
} else {
Ok(packet)
}
} }
} }
pub fn close(self: &Arc<Self>) { pub fn close(self: &Arc<Self>) {
self.conn.write().unwrap().close(); self.conn.write().unwrap().close();
} }
pub fn set_compression(self: &Arc<Self>, threshold: Option<usize>) { pub fn set_compression(self: &Arc<Self>, threshold: Option<usize>) {
self.conn.write().unwrap().set_compression(threshold); self.conn.write().unwrap().set_compression(threshold);
} }

View File

@ -53,7 +53,7 @@ impl ProtocolHelper {
match self.state { match self.state {
ConnectionState::Configuration => clientbound::configuration::STORE_COOKIE, ConnectionState::Configuration => clientbound::configuration::STORE_COOKIE,
ConnectionState::Play => clientbound::play::STORE_COOKIE, ConnectionState::Play => clientbound::play::STORE_COOKIE,
_ => { return Err(ServerError::UnexpectedState) }, _ => return Err(ServerError::UnexpectedState),
}, },
|p| { |p| {
p.write_string(id)?; p.write_string(id)?;
@ -159,7 +159,7 @@ impl ProtocolHelper {
}; };
Ok(data) Ok(data)
}, }
ConnectionState::Play => { ConnectionState::Play => {
let mut packet = Packet::empty(clientbound::play::COOKIE_REQUEST); let mut packet = Packet::empty(clientbound::play::COOKIE_REQUEST);
packet.write_string(id)?; packet.write_string(id)?;

View File

@ -8,7 +8,11 @@ use rust_mc_proto::{DataReader, DataWriter, Packet};
use crate::trigger_event; use crate::trigger_event;
use super::{id::*, play::{handle_configuration_state, handle_play_state}, ConnectionState}; use super::{
ConnectionState,
id::*,
play::{handle_configuration_state, handle_play_state},
};
pub fn handle_connection( pub fn handle_connection(
client: Arc<ClientContext>, // Контекст клиента client: Arc<ClientContext>, // Контекст клиента
@ -157,10 +161,13 @@ pub fn handle_connection(
particle_status, particle_status,
}); });
client.write_packet(&Packet::build(clientbound::configuration::PLUGIN_MESSAGE, |p| { client.write_packet(&Packet::build(
p.write_string("minecraft:brand")?; clientbound::configuration::PLUGIN_MESSAGE,
p.write_string("rust_minecraft_server") |p| {
})?)?; p.write_string("minecraft:brand")?;
p.write_string("rust_minecraft_server")
},
)?)?;
handle_configuration_state(client.clone())?; handle_configuration_state(client.clone())?;

View File

@ -1,28 +1,24 @@
use std::{io::Cursor, sync::Arc}; use std::{io::Cursor, sync::Arc, thread};
use rust_mc_proto::{read_packet, DataWriter, Packet}; use log::debug;
use rust_mc_proto::{DataWriter, Packet, read_packet};
use crate::server::{ use crate::server::{ServerError, player::context::ClientContext};
player::context::ClientContext, ServerError
};
use super::id::*; use super::id::*;
pub fn send_update_tags( pub fn send_update_tags(client: Arc<ClientContext>) -> Result<(), ServerError> {
client: Arc<ClientContext>,
) -> Result<(), ServerError> {
// rewrite this hardcode bullshit // rewrite this hardcode bullshit
client.write_packet(&Packet::from_bytes(clientbound::configuration::UPDATE_TAGS, include_bytes!("update-tags.bin")))?; client.write_packet(&Packet::from_bytes(
clientbound::configuration::UPDATE_TAGS,
include_bytes!("update-tags.bin"),
))?;
Ok(()) Ok(())
} }
pub fn send_registry_data( pub fn send_registry_data(client: Arc<ClientContext>) -> Result<(), ServerError> {
client: Arc<ClientContext>,
) -> Result<(), ServerError> {
// rewrite this hardcode bullshit // rewrite this hardcode bullshit
let mut registry_data = Cursor::new(include_bytes!("registry-data.bin")); let mut registry_data = Cursor::new(include_bytes!("registry-data.bin"));
@ -35,31 +31,23 @@ pub fn send_registry_data(
Ok(()) Ok(())
} }
pub fn process_known_packs(
client: Arc<ClientContext>
) -> Result<(), ServerError> {
let mut packet = Packet::empty(clientbound::configuration::KNOWN_PACKS);
packet.write_varint(1)?;
packet.write_string("minecraft")?;
packet.write_string("core")?;
packet.write_string("1.21.5")?;
client.write_packet(&packet)?;
client.read_packet(serverbound::configuration::KNOWN_PACKS)?;
Ok(())
}
pub fn handle_configuration_state( pub fn handle_configuration_state(
client: Arc<ClientContext>, // Контекст клиента client: Arc<ClientContext>, // Контекст клиента
) -> Result<(), ServerError> { ) -> Result<(), ServerError> {
let mut packet = Packet::empty(clientbound::configuration::FEATURE_FLAGS); let mut packet = Packet::empty(clientbound::configuration::FEATURE_FLAGS);
packet.write_varint(1)?; packet.write_varint(1)?;
packet.write_string("minecraft:vanilla")?; packet.write_string("minecraft:vanilla")?;
client.write_packet(&packet)?; client.write_packet(&packet)?;
let mut packet = Packet::empty(clientbound::configuration::KNOWN_PACKS);
packet.write_varint(1)?;
packet.write_string("minecraft")?;
packet.write_string("core")?;
packet.write_string("1.21.5")?;
client.write_packet(&packet)?;
client.read_packet(serverbound::configuration::KNOWN_PACKS)?;
process_known_packs(client.clone())?;
send_registry_data(client.clone())?; send_registry_data(client.clone())?;
send_update_tags(client.clone())?; send_update_tags(client.clone())?;
@ -75,43 +63,57 @@ pub fn handle_play_state(
// "server is in developement suka".to_string(), // "server is in developement suka".to_string(),
// ))?; // ))?;
let mut packet = Packet::empty(clientbound::play::LOGIN); let mut packet = Packet::empty(clientbound::play::LOGIN);
packet.write_int(0)?; // Entity ID
packet.write_boolean(false)?; // Is hardcore
packet.write_varint(4)?; // Dimension Names
packet.write_string("minecraft:overworld")?;
packet.write_string("minecraft:nether")?;
packet.write_string("minecraft:the_end")?;
packet.write_string("minecraft:overworld_caves")?;
packet.write_varint(0)?; // Max Players
packet.write_varint(8)?; // View Distance
packet.write_varint(5)?; // Simulation Distance
packet.write_boolean(false)?; // Reduced Debug Info
packet.write_boolean(true)?; // Enable respawn screen
packet.write_boolean(false)?; // Do limited crafting
packet.write_varint(0)?; // Dimension Type packet.write_int(0)?; // Entity ID
packet.write_string("minecraft:overworld")?; // Dimension Name packet.write_boolean(false)?; // Is hardcore
packet.write_long(0x0f38f26ad09c3e20)?; // Hashed seed packet.write_varint(4)?; // Dimension Names
packet.write_byte(0)?; // Game mode packet.write_string("minecraft:overworld")?;
packet.write_signed_byte(-1)?; // Previous Game mode packet.write_string("minecraft:nether")?;
packet.write_boolean(false)?; // Is Debug packet.write_string("minecraft:the_end")?;
packet.write_boolean(true)?; // Is Flat packet.write_string("minecraft:overworld_caves")?;
packet.write_boolean(false)?; // Has death location packet.write_varint(0)?; // Max Players
packet.write_varint(20)?; // Portal cooldown packet.write_varint(8)?; // View Distance
packet.write_varint(60)?; // Sea level packet.write_varint(5)?; // Simulation Distance
packet.write_boolean(false)?; // Reduced Debug Info
packet.write_boolean(true)?; // Enable respawn screen
packet.write_boolean(false)?; // Do limited crafting
packet.write_boolean(false)?; // Enforces Secure Chat packet.write_varint(0)?; // Dimension Type
client.write_packet(&packet)?; packet.write_string("minecraft:overworld")?; // Dimension Name
packet.write_long(0x0f38f26ad09c3e20)?; // Hashed seed
packet.write_byte(0)?; // Game mode
packet.write_signed_byte(-1)?; // Previous Game mode
packet.write_boolean(false)?; // Is Debug
packet.write_boolean(true)?; // Is Flat
packet.write_boolean(false)?; // Has death location
packet.write_varint(20)?; // Portal cooldown
packet.write_varint(60)?; // Sea level
packet.write_boolean(false)?; // Enforces Secure Chat
client.write_packet(&packet)?;
thread::spawn({
let client = client.clone();
move || {
let _ = client.clone().run_read_loop({
let client = client.clone();
move |packet| {
// Сделать базовые приколы типа keep-alive и другое
Ok(())
}
});
client.close();
}
});
loop {} loop {}
// TODO: отдельный поток для чтения пакетов // Сделать отправку чанков
// TODO: переработка функции read_packet так чтобы когда
// делаешь read_any_packet, пакет отправлялся сначала всем другим
// функциям read_packet которые настроены на этот айди пакета,
// а потом если таковых не осталось пакет возвращался
Ok(()) Ok(())
} }