From 67e547a2759202928579a0ddde1900d53cd0f5b7 Mon Sep 17 00:00:00 2001 From: MeexReay Date: Fri, 2 Aug 2024 16:50:20 +0300 Subject: [PATCH] maybe fix --- Cargo.lock | 17 ++++ Cargo.toml | 1 + config.yml | 2 +- default_config.yml | 15 ++++ src/main.rs | 17 +++- src/meexprox/config.rs | 10 ++- src/meexprox/meexprox.rs | 180 ++++++++++++++++++--------------------- 7 files changed, 139 insertions(+), 103 deletions(-) create mode 100644 default_config.yml diff --git a/Cargo.lock b/Cargo.lock index 444ac3c..d5332b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -210,6 +210,7 @@ dependencies = [ "rust_mc_proto", "serde_yml", "simplelog", + "tokio", "uuid", ] @@ -262,6 +263,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "pin-project-lite" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" + [[package]] name = "powerfmt" version = "0.2.0" @@ -507,6 +514,16 @@ dependencies = [ "time-core", ] +[[package]] +name = "tokio" +version = "1.39.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daa4fb1bc778bd6f04cbfc4bb2d06a7396a8f299dc33ea1900cedaa316f467b1" +dependencies = [ + "backtrace", + "pin-project-lite", +] + [[package]] name = "unicode-ident" version = "1.0.12" diff --git a/Cargo.toml b/Cargo.toml index 4cbf6db..30087e6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,3 +11,4 @@ log = "0.4.22" simplelog = "0.12.2" derivative = "2.2.0" no_deadlocks = "1.3.2" +tokio = {version = "1.39.2", features = ["rt"] } diff --git a/config.yml b/config.yml index 84d8a19..bcf2133 100644 --- a/config.yml +++ b/config.yml @@ -4,7 +4,7 @@ talk_host: 127.0.0.1:12346 # secret host to talk with meexprox (optional) talk_secret: qwerty123456 # secret token for talk with meexprox (optional) servers: # verified servers (name -> ip) - play: 127.0.0.1:12345 + play: 6.tcp.eu.ngrok.io:17753 forced_hosts: # connect to server from connected hostname (name -> hostname) (optional) play: play.localhost diff --git a/default_config.yml b/default_config.yml new file mode 100644 index 0000000..84d8a19 --- /dev/null +++ b/default_config.yml @@ -0,0 +1,15 @@ +host: 127.0.0.1:25565 # host to bind meexprox + +talk_host: 127.0.0.1:12346 # secret host to talk with meexprox (optional) +talk_secret: qwerty123456 # secret token for talk with meexprox (optional) + +servers: # verified servers (name -> ip) + play: 127.0.0.1:12345 + +forced_hosts: # connect to server from connected hostname (name -> hostname) (optional) + play: play.localhost + +default_server: play # default server to connect (optional) + +player_forwarding: disabled # how to transfer player ip to connected server (handshake / disabled) +no_pf_for_ip_connect: true # disable player forwarding for connecting with server ip diff --git a/src/main.rs b/src/main.rs index 7ccb43a..65f5249 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,7 +8,12 @@ use rust_mc_proto::DataBufferReader; use simplelog::{ ColorChoice, CombinedLogger, Config, LevelFilter, TermLogger, TerminalMode, WriteLogger, }; -use std::{error::Error, fs::File, sync::atomic::Ordering}; +use std::{ + error::Error, + fs::{self, File}, + path::Path, + sync::atomic::Ordering, +}; pub struct MyEventListener {} @@ -73,7 +78,6 @@ impl EventListener for MyEventListener { cancel, } => { debug!("status request"); - *status = String::from("123123"); } } @@ -97,7 +101,14 @@ fn main() { ]) .unwrap(); - let config = ProxyConfig::load("config.yml").expect("config parse error"); + let config_path = Path::new("config.yml"); + + if !config_path.exists() { + fs::write(config_path, include_bytes!("../default_config.yml")) + .expect("config write error"); + } + + let config = ProxyConfig::load(config_path).expect("config parse error"); let mut meexprox = MeexProx::new(config); diff --git a/src/meexprox/config.rs b/src/meexprox/config.rs index 1532c7c..774dce0 100644 --- a/src/meexprox/config.rs +++ b/src/meexprox/config.rs @@ -1,6 +1,8 @@ use super::ProxyError; + use serde_yml::Value; use std::fs; +use std::path::Path; #[derive(Clone, Debug)] pub struct ProxyServer { @@ -105,8 +107,8 @@ impl ProxyConfig { self.no_pf_for_ip_connect } - pub fn load(path: &str) -> Result> { - let data = serde_yml::from_str::(&fs::read_to_string(path)?)?; + pub fn load_data(data: String) -> Result> { + let data = serde_yml::from_str::(&data)?; let data = data.as_mapping().ok_or(ProxyError::ConfigParse)?; let host = extract_string!(data, "host").ok_or(ProxyError::ConfigParse)?; @@ -165,6 +167,10 @@ impl ProxyConfig { )) } + pub fn load(path: impl AsRef) -> Result> { + Self::load_data(fs::read_to_string(path)?) + } + pub fn get_server_by_name(&self, name: &str) -> Option { for server in &self.servers { if &server.name == name { diff --git a/src/meexprox/meexprox.rs b/src/meexprox/meexprox.rs index 169601d..8f4c943 100644 --- a/src/meexprox/meexprox.rs +++ b/src/meexprox/meexprox.rs @@ -14,6 +14,7 @@ use std::{ }, thread, }; +use tokio::task::AbortHandle; use uuid::Uuid; #[derive(Derivative)] @@ -23,26 +24,26 @@ pub struct ProxyPlayer { client_conn: MinecraftConnection, #[derivative(Debug = "ignore")] server_conn: MinecraftConnection, + connection_threads: Vec, name: Option, uuid: Option, protocol_version: u16, server: Option, shared_secret: Option>, verify_token: Option>, - connection_id: Arc, } impl ProxyPlayer { pub fn new( client_conn: MinecraftConnection, server_conn: MinecraftConnection, + connection_threads: Vec, name: Option, uuid: Option, protocol_version: u16, server: Option, shared_secret: Option>, verify_token: Option>, - connection_id: Arc, ) -> ProxyPlayer { ProxyPlayer { client_conn, @@ -53,7 +54,7 @@ impl ProxyPlayer { server, shared_secret, verify_token, - connection_id, + connection_threads, } } @@ -97,8 +98,8 @@ impl ProxyPlayer { self.verify_token.as_ref() } - pub fn connection_id(&self) -> Arc { - self.connection_id.clone() + pub fn connection_threads(&mut self) -> &mut Vec { + &mut self.connection_threads } pub fn connect_to_ip( @@ -116,10 +117,9 @@ impl ProxyPlayer { return Ok(()); } - this.lock() - .unwrap() - .connection_id - .fetch_add(1, Ordering::Relaxed); + for thread in &mut this.lock().unwrap().connection_threads { + thread.abort(); + } this.lock().unwrap().server_conn.close(); this.lock().unwrap().server_conn = MinecraftConnection::connect(ip)?; @@ -157,14 +157,12 @@ impl ProxyPlayer { return Ok(()); } - this.lock() - .unwrap() - .connection_id - .fetch_add(1, Ordering::Relaxed); + for thread in &mut this.lock().unwrap().connection_threads { + thread.abort(); + } + this.lock().unwrap().server_conn.close(); this.lock().unwrap().server = Some(server.clone()); - - this.lock().unwrap().server_conn.close(); this.lock().unwrap().server_conn = MinecraftConnection::connect(server.host())?; thread::spawn({ @@ -192,29 +190,19 @@ impl ProxyPlayer { server_address: &str, server_port: u16, ) -> Result<(), Box> { - { - let mut player = this.lock().unwrap(); - player.connection_id.fetch_add(1, Ordering::Relaxed); - player.server_conn.close(); - - let server_host = player.server().unwrap().host().to_string(); - // println!("connect"); - player.server_conn = MinecraftConnection::connect(&server_host)?; - // println!("connected"); + for thread in &mut this.lock().unwrap().connection_threads { + thread.abort(); } + this.lock().unwrap().server_conn.close(); + + let server_host = this.lock().unwrap().server().unwrap().host().to_string(); + this.lock().unwrap().server_conn = MinecraftConnection::connect(&server_host)?; thread::spawn({ - // println!("connecting1"); - let player_forwarding = { - let meexprox_guard = meexprox.lock().unwrap(); - meexprox_guard.config.player_forwarding().clone() - }; - // println!("connecting2"); + let player_forwarding = meexprox.lock().unwrap().config.player_forwarding().clone(); let server_address = server_address.to_string(); - // println!("connecting3"); move || { - // println!("connecting4"); let _ = ProxyPlayer::connect( this, meexprox, @@ -306,9 +294,6 @@ impl ProxyPlayer { return Ok(()); }; - let atomic_connection_id = this.lock().unwrap().connection_id.clone(); - let connection_id = this.lock().unwrap().connection_id.load(Ordering::Relaxed); - if !logged { ProxyPlayer::send_handshake( this.clone(), @@ -373,23 +358,20 @@ impl ProxyPlayer { } } - thread::spawn({ - let mut client_conn = client_conn.try_clone().unwrap(); - let mut server_conn = server_conn.try_clone().unwrap(); + let mut handles = Vec::new(); - let this = this.clone(); - let meexprox = meexprox.clone(); - let name = name.clone(); - let atomic_connection_id = atomic_connection_id.clone(); + handles.push( + tokio::spawn({ + let mut client_conn = client_conn.try_clone().unwrap(); + let mut server_conn = server_conn.try_clone().unwrap(); - move || { - let _ = || -> Result<(), ProtocolError> { - while atomic_connection_id.load(Ordering::Relaxed) == connection_id { - let packet = match client_conn.read_packet() { - Ok(packet) => packet, - Err(_) => break, - }; + let this = this.clone(); + let meexprox = meexprox.clone(); + let name = name.clone(); + let addr = addr.clone(); + async move { + while let Ok(packet) = client_conn.read_packet() { let packet = ProxyEvent::recv_client_packet(meexprox.clone(), packet, this.clone()); @@ -397,48 +379,56 @@ impl ProxyPlayer { ProxyEvent::send_server_packet(meexprox.clone(), packet, this.clone()); if !cancel { - server_conn.write_packet(&packet)?; + match server_conn.write_packet(&packet) { + Ok(_) => {} + Err(_) => { + break; + } + }; } } - Ok(()) - }(); - - if atomic_connection_id.load(Ordering::Relaxed) == connection_id { if meexprox.lock().unwrap().remove_player(this.clone()) { info!("{} disconnected player {}", addr.to_string(), name); ProxyEvent::player_disconnected(meexprox.clone(), this.clone()); } } - } - }); + }) + .abort_handle(), + ); - let _ = || -> Result<(), ProtocolError> { - while atomic_connection_id.load(Ordering::Relaxed) == connection_id { - let packet = match server_conn.read_packet() { - Ok(packet) => packet, - Err(_) => break, - }; + handles.push( + tokio::spawn({ + let this = this.clone(); - let packet = ProxyEvent::recv_server_packet(meexprox.clone(), packet, this.clone()); + async move { + while let Ok(packet) = server_conn.read_packet() { + let packet = + ProxyEvent::recv_server_packet(meexprox.clone(), packet, this.clone()); - let (packet, cancel) = - ProxyEvent::send_client_packet(meexprox.clone(), packet, this.clone()); + let (packet, cancel) = + ProxyEvent::send_client_packet(meexprox.clone(), packet, this.clone()); - if !cancel { - client_conn.write_packet(&packet)?; + if !cancel { + match client_conn.write_packet(&packet) { + Ok(_) => {} + Err(_) => { + break; + } + }; + } + } + + if meexprox.lock().unwrap().remove_player(this.clone()) { + info!("{} disconnected player {}", addr.to_string(), name); + ProxyEvent::player_disconnected(meexprox.clone(), this.clone()); + } } - } + }) + .abort_handle(), + ); - Ok(()) - }(); - - if atomic_connection_id.load(Ordering::Relaxed) == connection_id { - if meexprox.lock().unwrap().remove_player(this.clone()) { - info!("{} disconnected player {}", addr.to_string(), name); - ProxyEvent::player_disconnected(meexprox.clone(), this.clone()); - } - } + this.lock().unwrap().connection_threads = handles; Ok(()) } @@ -573,13 +563,13 @@ impl MeexProx { let player = Arc::new(Mutex::new(ProxyPlayer::new( client_conn.try_clone().unwrap(), server_conn.try_clone().unwrap(), + Vec::new(), None, None, protocol_version, Some(server.clone()), None, None, - Arc::new(AtomicUsize::new(0)), ))); let (server, cancel) = @@ -642,27 +632,23 @@ impl MeexProx { // return Ok(()); // } - thread::spawn({ - let this = this.clone(); + let this = this.clone(); - move || { - info!( - "{} connected player {}", - addr.to_string(), - player.lock().unwrap().name.clone().unwrap() - ); - ProxyEvent::player_connected(this.clone(), player.clone()); + info!( + "{} connected player {}", + addr.to_string(), + player.lock().unwrap().name.clone().unwrap() + ); + ProxyEvent::player_connected(this.clone(), player.clone()); - let _ = ProxyPlayer::connect( - player, - this, - server_config.player_forwarding().clone(), - &server_address, - server_port, - true, - ); - } - }); + let _ = ProxyPlayer::connect( + player, + this, + server_config.player_forwarding().clone(), + &server_address, + server_port, + true, + ); } Ok(())