diff --git a/src/server/mod.rs b/src/server/mod.rs
index 4fd5170..fed3abd 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -22,9 +22,10 @@ pub enum ServerError {
ConnectionClosed, // Соединение закрыто, единственная ошибка которая не логируется у handle_connection
SerTextComponent, // Ошибка при сериализации текст-компонента
DeTextComponent, // Ошибка при десериализации текст-компонента
- SerNbt, // Ошибка при сериализации nbt
- DeNbt, // Ошибка при десериализации nbt
+ SerNbt, // Ошибка при сериализации nbt
+ DeNbt, // Ошибка при десериализации nbt
UnexpectedState, // Указывает на то что этот пакет не может быть отправлен в данном режиме (в основном через ProtocolHelper)
+ ReadLoopMode, // Ошибка когда вызывается read_any_packet во время работы read_loop
Other(String), // Другая ошибка, либо очень специфичная, либо хз, лучше не использовать и создавать новое поле ошибки
}
diff --git a/src/server/player/context.rs b/src/server/player/context.rs
index 05bdbb1..c53175d 100644
--- a/src/server/player/context.rs
+++ b/src/server/player/context.rs
@@ -1,9 +1,8 @@
use std::{
- hash::Hash,
- net::{SocketAddr, TcpStream},
- sync::{Arc, RwLock},
+ hash::Hash, net::{SocketAddr, TcpStream}, sync::{atomic::{AtomicBool, Ordering}, mpsc::{self, Sender}, Arc, RwLock}
};
+use dashmap::DashMap;
use rust_mc_proto::{MinecraftConnection, Packet};
use uuid::Uuid;
@@ -21,6 +20,8 @@ pub struct ClientContext {
client_info: RwLock>,
player_info: RwLock >,
state: RwLock,
+ packet_waiters: DashMap)>,
+ read_loop: AtomicBool
}
// Реализуем сравнение через адрес
@@ -39,6 +40,8 @@ impl Hash for ClientContext {
impl Eq for ClientContext {}
+
+
impl ClientContext {
pub fn new(server: Arc, conn: MinecraftConnection) -> ClientContext {
ClientContext {
@@ -49,6 +52,8 @@ impl ClientContext {
client_info: RwLock::new(None),
player_info: RwLock::new(None),
state: RwLock::new(ConnectionState::Handshake),
+ packet_waiters: DashMap::new(),
+ read_loop: AtomicBool::new(false)
}
}
@@ -117,7 +122,7 @@ impl ClientContext {
Ok(())
}
- pub fn read_any_packet(self: &Arc) -> Result {
+ pub fn run_read_loop(self: &Arc, callback: impl Fn(Packet) -> Result<(), ServerError>) -> Result<(), ServerError> {
let state = self.state();
let mut conn = self.conn.read().unwrap().try_clone()?; // так можно делать т.к сокет это просто поинтер
@@ -139,24 +144,79 @@ impl ClientContext {
packet.get_mut().set_position(0);
}
if !cancelled {
- break Ok(packet);
+ let mut skip = false;
+ for (_, (id, sender)) in self.packet_waiters.clone() {
+ if id == packet.id() {
+ sender.send(packet.clone()).unwrap();
+ skip = true;
+ break;
+ }
+ }
+ if !skip {
+ callback(packet.clone())?;
+ }
+ }
+ }
+ }
+
+ pub fn read_any_packet(self: &Arc) -> Result {
+ if self.read_loop.load(Ordering::SeqCst) {
+ Err(ServerError::ReadLoopMode)
+ } else {
+ let state = self.state();
+
+ let mut conn = self.conn.read().unwrap().try_clone()?; // так можно делать т.к сокет это просто поинтер
+
+ loop {
+ let mut packet = conn.read_packet()?;
+ let mut cancelled = false;
+ for handler in self
+ .server
+ .packet_handlers(|o| o.on_incoming_packet_priority())
+ .iter()
+ {
+ handler.on_incoming_packet(
+ self.clone(),
+ &mut packet,
+ &mut cancelled,
+ state.clone(),
+ )?;
+ packet.get_mut().set_position(0);
+ }
+ if !cancelled {
+ break Ok(packet);
+ }
}
}
}
pub fn read_packet(self: &Arc, id: u8) -> Result {
- let packet = self.read_any_packet()?;
- if packet.id() != id {
- Err(ServerError::UnexpectedPacket)
+ if self.read_loop.load(Ordering::SeqCst) {
+ let (tx, rx) = mpsc::channel::();
+
+ let key: usize = (&tx as *const Sender).addr();
+ self.packet_waiters.insert(key, (id, tx));
+
+ loop {
+ if let Ok(packet) = rx.recv() {
+ self.packet_waiters.remove(&key);
+ break Ok(packet)
+ }
+ }
} else {
- Ok(packet)
+ let packet = self.read_any_packet()?;
+
+ if packet.id() != id {
+ Err(ServerError::UnexpectedPacket)
+ } else {
+ Ok(packet)
+ }
}
}
pub fn close(self: &Arc) {
self.conn.write().unwrap().close();
}
-
pub fn set_compression(self: &Arc, threshold: Option) {
self.conn.write().unwrap().set_compression(threshold);
}
diff --git a/src/server/player/helper.rs b/src/server/player/helper.rs
index 5b86c88..540290a 100644
--- a/src/server/player/helper.rs
+++ b/src/server/player/helper.rs
@@ -53,7 +53,7 @@ impl ProtocolHelper {
match self.state {
ConnectionState::Configuration => clientbound::configuration::STORE_COOKIE,
ConnectionState::Play => clientbound::play::STORE_COOKIE,
- _ => { return Err(ServerError::UnexpectedState) },
+ _ => return Err(ServerError::UnexpectedState),
},
|p| {
p.write_string(id)?;
@@ -159,7 +159,7 @@ impl ProtocolHelper {
};
Ok(data)
- },
+ }
ConnectionState::Play => {
let mut packet = Packet::empty(clientbound::play::COOKIE_REQUEST);
packet.write_string(id)?;
diff --git a/src/server/protocol/handler.rs b/src/server/protocol/handler.rs
index a724bb3..0fdbab2 100644
--- a/src/server/protocol/handler.rs
+++ b/src/server/protocol/handler.rs
@@ -8,7 +8,11 @@ use rust_mc_proto::{DataReader, DataWriter, Packet};
use crate::trigger_event;
-use super::{id::*, play::{handle_configuration_state, handle_play_state}, ConnectionState};
+use super::{
+ ConnectionState,
+ id::*,
+ play::{handle_configuration_state, handle_play_state},
+};
pub fn handle_connection(
client: Arc, // Контекст клиента
@@ -157,10 +161,13 @@ pub fn handle_connection(
particle_status,
});
- client.write_packet(&Packet::build(clientbound::configuration::PLUGIN_MESSAGE, |p| {
- p.write_string("minecraft:brand")?;
- p.write_string("rust_minecraft_server")
- })?)?;
+ client.write_packet(&Packet::build(
+ clientbound::configuration::PLUGIN_MESSAGE,
+ |p| {
+ p.write_string("minecraft:brand")?;
+ p.write_string("rust_minecraft_server")
+ },
+ )?)?;
handle_configuration_state(client.clone())?;
diff --git a/src/server/protocol/play.rs b/src/server/protocol/play.rs
index 7a8a88d..ea230f7 100644
--- a/src/server/protocol/play.rs
+++ b/src/server/protocol/play.rs
@@ -1,32 +1,28 @@
-use std::{io::Cursor, sync::Arc};
+use std::{io::Cursor, sync::Arc, thread};
-use rust_mc_proto::{read_packet, DataWriter, Packet};
+use log::debug;
+use rust_mc_proto::{DataWriter, Packet, read_packet};
-use crate::server::{
- player::context::ClientContext, ServerError
-};
+use crate::server::{ServerError, player::context::ClientContext};
use super::id::*;
-pub fn send_update_tags(
- client: Arc,
-) -> Result<(), ServerError> {
-
+pub fn send_update_tags(client: Arc) -> Result<(), ServerError> {
// rewrite this hardcode bullshit
- client.write_packet(&Packet::from_bytes(clientbound::configuration::UPDATE_TAGS, include_bytes!("update-tags.bin")))?;
+ client.write_packet(&Packet::from_bytes(
+ clientbound::configuration::UPDATE_TAGS,
+ include_bytes!("update-tags.bin"),
+ ))?;
Ok(())
}
-pub fn send_registry_data(
- client: Arc,
-) -> Result<(), ServerError> {
-
+pub fn send_registry_data(client: Arc) -> Result<(), ServerError> {
// rewrite this hardcode bullshit
let mut registry_data = Cursor::new(include_bytes!("registry-data.bin"));
-
+
while let Ok(mut packet) = read_packet(&mut registry_data, None) {
packet.set_id(clientbound::configuration::REGISTRY_DATA);
client.write_packet(&packet)?;
@@ -35,31 +31,23 @@ pub fn send_registry_data(
Ok(())
}
-pub fn process_known_packs(
- client: Arc
-) -> Result<(), ServerError> {
- let mut packet = Packet::empty(clientbound::configuration::KNOWN_PACKS);
- packet.write_varint(1)?;
- packet.write_string("minecraft")?;
- packet.write_string("core")?;
- packet.write_string("1.21.5")?;
- client.write_packet(&packet)?;
-
- client.read_packet(serverbound::configuration::KNOWN_PACKS)?;
-
- Ok(())
-}
-
pub fn handle_configuration_state(
client: Arc, // Контекст клиента
) -> Result<(), ServerError> {
-
let mut packet = Packet::empty(clientbound::configuration::FEATURE_FLAGS);
- packet.write_varint(1)?;
- packet.write_string("minecraft:vanilla")?;
- client.write_packet(&packet)?;
+ packet.write_varint(1)?;
+ packet.write_string("minecraft:vanilla")?;
+ client.write_packet(&packet)?;
+
+ let mut packet = Packet::empty(clientbound::configuration::KNOWN_PACKS);
+ packet.write_varint(1)?;
+ packet.write_string("minecraft")?;
+ packet.write_string("core")?;
+ packet.write_string("1.21.5")?;
+ client.write_packet(&packet)?;
+
+ client.read_packet(serverbound::configuration::KNOWN_PACKS)?;
- process_known_packs(client.clone())?;
send_registry_data(client.clone())?;
send_update_tags(client.clone())?;
@@ -75,43 +63,57 @@ pub fn handle_play_state(
// "server is in developement suka".to_string(),
// ))?;
- let mut packet = Packet::empty(clientbound::play::LOGIN);
- packet.write_int(0)?; // Entity ID
- packet.write_boolean(false)?; // Is hardcore
- packet.write_varint(4)?; // Dimension Names
- packet.write_string("minecraft:overworld")?;
- packet.write_string("minecraft:nether")?;
- packet.write_string("minecraft:the_end")?;
- packet.write_string("minecraft:overworld_caves")?;
- packet.write_varint(0)?; // Max Players
- packet.write_varint(8)?; // View Distance
- packet.write_varint(5)?; // Simulation Distance
- packet.write_boolean(false)?; // Reduced Debug Info
- packet.write_boolean(true)?; // Enable respawn screen
- packet.write_boolean(false)?; // Do limited crafting
+ let mut packet = Packet::empty(clientbound::play::LOGIN);
- packet.write_varint(0)?; // Dimension Type
- packet.write_string("minecraft:overworld")?; // Dimension Name
- packet.write_long(0x0f38f26ad09c3e20)?; // Hashed seed
- packet.write_byte(0)?; // Game mode
- packet.write_signed_byte(-1)?; // Previous Game mode
- packet.write_boolean(false)?; // Is Debug
- packet.write_boolean(true)?; // Is Flat
- packet.write_boolean(false)?; // Has death location
- packet.write_varint(20)?; // Portal cooldown
- packet.write_varint(60)?; // Sea level
+ packet.write_int(0)?; // Entity ID
+ packet.write_boolean(false)?; // Is hardcore
+ packet.write_varint(4)?; // Dimension Names
+ packet.write_string("minecraft:overworld")?;
+ packet.write_string("minecraft:nether")?;
+ packet.write_string("minecraft:the_end")?;
+ packet.write_string("minecraft:overworld_caves")?;
+ packet.write_varint(0)?; // Max Players
+ packet.write_varint(8)?; // View Distance
+ packet.write_varint(5)?; // Simulation Distance
+ packet.write_boolean(false)?; // Reduced Debug Info
+ packet.write_boolean(true)?; // Enable respawn screen
+ packet.write_boolean(false)?; // Do limited crafting
- packet.write_boolean(false)?; // Enforces Secure Chat
- client.write_packet(&packet)?;
+ packet.write_varint(0)?; // Dimension Type
+ packet.write_string("minecraft:overworld")?; // Dimension Name
+ packet.write_long(0x0f38f26ad09c3e20)?; // Hashed seed
+ packet.write_byte(0)?; // Game mode
+ packet.write_signed_byte(-1)?; // Previous Game mode
+ packet.write_boolean(false)?; // Is Debug
+ packet.write_boolean(true)?; // Is Flat
+ packet.write_boolean(false)?; // Has death location
+ packet.write_varint(20)?; // Portal cooldown
+ packet.write_varint(60)?; // Sea level
- loop {}
-
- // TODO: отдельный поток для чтения пакетов
+ packet.write_boolean(false)?; // Enforces Secure Chat
- // TODO: переработка функции read_packet так чтобы когда
- // делаешь read_any_packet, пакет отправлялся сначала всем другим
- // функциям read_packet которые настроены на этот айди пакета,
- // а потом если таковых не осталось пакет возвращался
+ client.write_packet(&packet)?;
+
+ thread::spawn({
+ let client = client.clone();
+
+ move || {
+ let _ = client.clone().run_read_loop({
+ let client = client.clone();
+
+ move |packet| {
+ // Сделать базовые приколы типа keep-alive и другое
+
+ Ok(())
+ }
+ });
+ client.close();
+ }
+ });
+
+ loop {}
+
+ // Сделать отправку чанков
Ok(())
}