Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 61 additions & 33 deletions crates/game_server/src/connection/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use horizon_event_system::{PlayerId, AuthenticationStatus};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
use tokio::sync::{mpsc, RwLock};
use tracing::info;
use futures_util::sink::SinkExt;
use futures_util::stream::SplitSink;
Expand All @@ -25,7 +25,7 @@ use tokio_tungstenite::{WebSocketStream, tungstenite::Message};
///
/// * Uses `RwLock<HashMap>` for thread-safe connection storage
/// * Implements atomic connection ID generation
/// * Provides broadcast channel for outgoing messages
/// * Per-connection mpsc channels for O(1) message delivery
/// * Maintains bidirectional player-connection mapping
#[derive(Debug)]
pub struct ConnectionManager {
Expand All @@ -36,26 +36,25 @@ pub struct ConnectionManager {
/// Atomic counter for generating unique connection IDs
next_id: Arc<std::sync::atomic::AtomicUsize>,

/// Broadcast sender for outgoing messages to specific connections
sender: broadcast::Sender<(ConnectionId, Vec<u8>)>,
/// Per-connection message senders for O(1) message delivery
message_senders: Arc<RwLock<HashMap<ConnectionId, mpsc::UnboundedSender<Vec<u8>>>>>,
}

impl ConnectionManager {
/// Creates a new connection manager.
///
/// Initializes the internal data structures and broadcast channel
/// with a reasonable buffer size for message queuing.
/// Initializes the internal data structures with per-connection
/// message channels for efficient O(1) message delivery.
///
/// # Returns
///
/// A new `ConnectionManager` instance ready to handle connections.
pub fn new() -> Self {
let (sender, _) = broadcast::channel(1000);
Self {
connections: Arc::new(RwLock::new(HashMap::new())),
ws_senders: Arc::new(RwLock::new(HashMap::new())),
next_id: Arc::new(std::sync::atomic::AtomicUsize::new(1)),
sender,
message_senders: Arc::new(RwLock::new(HashMap::new())),
}
}

Expand Down Expand Up @@ -170,57 +169,86 @@ impl ConnectionManager {
connections.get(&connection_id).and_then(|c| c.player_id)
}

/// Registers a message channel for a connection.
///
/// Creates a new per-connection mpsc channel and returns the receiver.
/// This allows O(1) message delivery without filtering overhead.
///
/// # Arguments
///
/// * `connection_id` - The connection to register a channel for
///
/// # Returns
///
/// An unbounded receiver for messages targeted to this connection.
pub async fn register_message_channel(&self, connection_id: ConnectionId) -> mpsc::UnboundedReceiver<Vec<u8>> {
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using an unbounded channel can lead to memory exhaustion if messages are produced faster than they can be consumed. Consider using a bounded channel with an appropriate capacity and handling the backpressure scenario (e.g., dropping old messages or disconnecting slow clients).

Copilot uses AI. Check for mistakes.
let (sender, receiver) = mpsc::unbounded_channel();
let mut senders = self.message_senders.write().await;
senders.insert(connection_id, sender);
tracing::debug!("📬 Registered message channel for connection {}", connection_id);
receiver
}

/// Removes the message channel for a connection.
///
/// This should be called during connection cleanup to prevent
/// memory leaks from unused channels.
///
/// # Arguments
///
/// * `connection_id` - The connection whose channel should be removed
pub async fn remove_message_channel(&self, connection_id: ConnectionId) {
let mut senders = self.message_senders.write().await;
senders.remove(&connection_id);
tracing::debug!("📭 Removed message channel for connection {}", connection_id);
}

/// Sends a message to a specific connection.
///
/// Queues a message for delivery to the specified connection through
/// the internal broadcast channel.
/// Uses direct HashMap lookup for O(1) message delivery to the
/// connection's dedicated mpsc channel.
///
/// # Arguments
///
/// * `connection_id` - The target connection
/// * `message` - The message data to send
pub async fn send_to_connection(&self, connection_id: ConnectionId, message: Vec<u8>) {
if let Err(e) = self.sender.send((connection_id, message)) {
tracing::error!("Failed to send message to connection {}: {:?}", connection_id, e);
let senders = self.message_senders.read().await;
if let Some(sender) = senders.get(&connection_id) {
if let Err(e) = sender.send(message) {
tracing::error!("Failed to send message to connection {}: {:?}", connection_id, e);
}
} else {
tracing::warn!("Attempted to send message to non-existent connection {}", connection_id);
}
}

/// Broadcasts a message to all currently connected clients.
///
/// Sends the same message to every active connection. The message is
/// cloned for each connection to ensure proper delivery.
/// Sends the same message to every active connection using their
/// dedicated mpsc channels for O(N) delivery.
///
/// # Arguments
///
/// * `message` - The message data to broadcast to all clients
///
/// # Returns
///
/// The number of connections that the message was queued for.
/// The number of connections that the message was sent to.
pub async fn broadcast_to_all(&self, message: Vec<u8>) -> usize {
let connections = self.connections.read().await;
let connection_count = connections.len();
let senders = self.message_senders.read().await;
let mut sent_count = 0;

for &connection_id in connections.keys() {
if let Err(e) = self.sender.send((connection_id, message.clone())) {
for (connection_id, sender) in senders.iter() {
if let Err(e) = sender.send(message.clone()) {
tracing::error!("Failed to broadcast message to connection {}: {:?}", connection_id, e);
} else {
sent_count += 1;
}
}
Comment on lines +242 to 248
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The broadcast_to_all method holds a read lock on message_senders while sending to all channels. If any channel's buffer is full or slow to process, this will block other operations from acquiring the lock. Consider collecting the senders first, then releasing the lock before sending, or clone the senders to avoid holding the lock during I/O operations.

Copilot uses AI. Check for mistakes.

tracing::debug!("📡 Broadcasted message to {} connections", connection_count);
connection_count
}

/// Creates a new receiver for outgoing messages.
///
/// Each connection handler should call this to get a receiver
/// for messages targeted to their specific connection.
///
/// # Returns
///
/// A broadcast receiver for connection-targeted messages.
pub fn subscribe(&self) -> broadcast::Receiver<(ConnectionId, Vec<u8>)> {
self.sender.subscribe()
tracing::debug!("📡 Broadcasted message to {} connections", sent_count);
sent_count
}

/// Finds the connection ID associated with a player.
Expand Down
45 changes: 45 additions & 0 deletions crates/game_server/src/server/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,51 @@ impl GameServer {
.await
.map_err(|e| ServerError::Internal(e.to_string()))?;

// Register handler to update player_id in connection manager
// This allows plugins to replace the temporary connection-level player_id
// with a permanent database player_id after authentication
let connection_manager_for_update = self.connection_manager.clone();
self.horizon_event_system
.on_core_async("update_player_id", move |event: serde_json::Value| {
let conn_mgr = connection_manager_for_update.clone();

info!("🔄 Received update_player_id event: {:?}", event);

// Deserialize player IDs from the event
let old_player_id = serde_json::from_value::<horizon_event_system::PlayerId>(event["old_player_id"].clone());
let new_player_id = serde_json::from_value::<horizon_event_system::PlayerId>(event["new_player_id"].clone());

if let (Ok(old_player_id), Ok(new_player_id)) = (old_player_id, new_player_id) {
// Spawn a dedicated thread with its own runtime to handle the async work
// This is necessary because on_core_async handlers don't have a guaranteed tokio runtime context
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to build runtime for update_player_id");

rt.block_on(async move {
// Get the connection_id for this player
if let Some(connection_id) = conn_mgr.get_connection_id_by_player(old_player_id).await {
// Update the player_id stored in the connection
conn_mgr.set_player_id(connection_id, new_player_id).await;
info!(
"🔄 Updated player_id in connection {} from {} to {}",
connection_id, old_player_id, new_player_id
);
} else {
warn!("⚠️ Failed to find connection for player {} when updating player_id", old_player_id);
}
});
Comment on lines +567 to +585
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a new runtime for each update_player_id event is inefficient and can lead to resource exhaustion under load. Each runtime spawns OS threads, and creating them repeatedly is expensive. Consider using the existing tokio runtime handle from the ServerContext instead, or spawn the task on the existing runtime using tokio::spawn.

Suggested change
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to build runtime for update_player_id");
rt.block_on(async move {
// Get the connection_id for this player
if let Some(connection_id) = conn_mgr.get_connection_id_by_player(old_player_id).await {
// Update the player_id stored in the connection
conn_mgr.set_player_id(connection_id, new_player_id).await;
info!(
"🔄 Updated player_id in connection {} from {} to {}",
connection_id, old_player_id, new_player_id
);
} else {
warn!("⚠️ Failed to find connection for player {} when updating player_id", old_player_id);
}
});
tokio::spawn(async move {
// Get the connection_id for this player
if let Some(connection_id) = conn_mgr.get_connection_id_by_player(old_player_id).await {
// Update the player_id stored in the connection
conn_mgr.set_player_id(connection_id, new_player_id).await;
info!(
"🔄 Updated player_id in connection {} from {} to {}",
connection_id, old_player_id, new_player_id
);
} else {
warn!("⚠️ Failed to find connection for player {} when updating player_id", old_player_id);
}

Copilot uses AI. Check for mistakes.
Comment on lines +564 to +585
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copilot is right. please dont do this in prod

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

diff --git a/crates/game_server/src/server/core.rs b/crates/game_server/src/server/core.rs
index 0bf4ac1..9cc791a 100644
--- a/crates/game_server/src/server/core.rs
+++ b/crates/game_server/src/server/core.rs
@@ -564,25 +564,18 @@ impl GameServer {
                 if let (Ok(old_player_id), Ok(new_player_id)) = (old_player_id, new_player_id) {
                     // Spawn a dedicated thread with its own runtime to handle the async work
                     // This is necessary because on_core_async handlers don't have a guaranteed tokio runtime context
-                    std::thread::spawn(move || {
-                        let rt = tokio::runtime::Builder::new_current_thread()
-                            .enable_all()
-                            .build()
-                            .expect("Failed to build runtime for update_player_id");
-
-                        rt.block_on(async move {
-                            // Get the connection_id for this player
-                            if let Some(connection_id) = conn_mgr.get_connection_id_by_player(old_player_id).await {
-                                // Update the player_id stored in the connection
-                                conn_mgr.set_player_id(connection_id, new_player_id).await;
-                                info!(
-                                    "🔄 Updated player_id in connection {} from {} to {}",
-                                    connection_id, old_player_id, new_player_id
-                                );
-                            } else {
-                                warn!("⚠ Failed to find connection for player {} when updating player_id", old_player_id);
-                            }
-                        });
+                    tokio::runtime::Handle::current().spawn(async move {^M
+                        // Get the connection_id for this player^M
+                        if let Some(connection_id) = conn_mgr.get_connection_id_by_player(old_player_id).await {^M
+                            // Update the player_id stored in the connection^M
+                            conn_mgr.set_player_id(connection_id, new_player_id).await;^M
+                            info!(^M
+                                "🔄 Updated player_id in connection {} from {} to {}",^M
+                                connection_id, old_player_id, new_player_id^M
+                            );^M
+                        } else {^M
+                            warn!("⚠ Failed to find connection for player {} when updating player_id", old_player_id);^M
+                        }^M
                     });
                 } else {
                     warn!("⚠ Failed to deserialize player IDs from update_player_id event: {:?}", event);
@@ -772,4 +765,4 @@ impl GameServer {
         self.plugin_manager.clone()
     }

-}
\ No newline at end of file
+}^M

perhaps this diff might work? im not sure if this will panic because i have no idea if this closure runs in an OS thread or green thread, and there are no tests to ensure behaviour.

});
} else {
warn!("⚠️ Failed to deserialize player IDs from update_player_id event: {:?}", event);
}

Ok(())
})
.await
.map_err(|e| ServerError::Internal(e.to_string()))?;

// Register a simple ping handler for testing validity of the client connection
self.horizon_event_system
Expand Down
27 changes: 14 additions & 13 deletions crates/game_server/src/server/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ pub async fn handle_connection(
.await
.map_err(|e| ServerError::Internal(e.to_string()))?;

let mut message_receiver = connection_manager.subscribe();
// Register per-connection message channel for O(1) message delivery
let mut message_receiver = connection_manager.register_message_channel(connection_id).await;
let ws_sender_incoming = ws_sender.clone();
let ws_sender_outgoing = ws_sender.clone();

Expand Down Expand Up @@ -135,21 +136,19 @@ pub async fn handle_connection(
}
};

// Outgoing message task
// Outgoing message task - receives directly from dedicated channel (O(1) complexity)
let outgoing_task = {
let ws_sender = ws_sender_outgoing;
async move {
while let Ok((target_connection_id, message)) = message_receiver.recv().await {
if target_connection_id == connection_id {
let message_text = String::from_utf8_lossy(&message);
let mut ws_sender = ws_sender.lock().await;
if let Err(e) = ws_sender
.send(Message::Text(message_text.to_string().into()))
.await
{
error!("Failed to send message: {}", e);
break;
}
while let Some(message) = message_receiver.recv().await {
let message_text = String::from_utf8_lossy(&message);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from_utf8_lossy is not O(1) due to the fact that it needs to loop over message.

let mut ws_sender = ws_sender.lock().await;
if let Err(e) = ws_sender
.send(Message::Text(message_text.to_string().into()))
.await
{
error!("Failed to send message: {}", e);
break;
}
}
}
Expand Down Expand Up @@ -177,7 +176,9 @@ pub async fn handle_connection(
.map_err(|e| ServerError::Internal(e.to_string()))?;
}

// Cleanup: remove connection and message channel
connection_manager.remove_connection(connection_id).await;
connection_manager.remove_ws_sender(connection_id).await;
connection_manager.remove_message_channel(connection_id).await;
Ok(())
}
8 changes: 8 additions & 0 deletions crates/horizon_event_system/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,17 @@ flate2 = { workspace = true }
base64 = { workspace = true }
const_format = { workspace = true }
rstar = "0.12"
async-lock = "3.4"

# Proc macro dependencies for new derive macros
syn = { version = "2.0", features = ["full"] }
quote = "1.0"
proc-macro2 = "1.0"
backtrace = "0.3.75"

[dev-dependencies]
criterion = { version = "0.5", features = ["html_reports"] }

[[bench]]
name = "distance_benchmark"
harness = false
109 changes: 109 additions & 0 deletions crates/horizon_event_system/benches/distance_benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Benchmark comparing distance() vs distance_squared() performance
//
// Run with: cargo bench --bench distance_benchmark
//
// Expected results: distance_squared() should be ~50-70% faster

use criterion::{black_box, criterion_group, criterion_main, Criterion};

#[derive(Clone, Copy, Debug)]
pub struct Vec3 {
pub x: f64,
pub y: f64,
pub z: f64,
}

impl Vec3 {
pub fn new(x: f64, y: f64, z: f64) -> Self {
Self { x, y, z }
}

pub fn distance(&self, other: Vec3) -> f64 {
let dx = self.x - other.x;
let dy = self.y - other.y;
let dz = self.z - other.z;
(dx * dx + dy * dy + dz * dz).sqrt()
}

pub fn distance_squared(&self, other: Vec3) -> f64 {
let dx = self.x - other.x;
let dy = self.y - other.y;
let dz = self.z - other.z;
dx * dx + dy * dy + dz * dz
}
}

fn sphere_test_with_distance(center: Vec3, points: &[Vec3], radius: f64) -> usize {
points
.iter()
.filter(|&&point| point.distance(center) <= radius)
.count()
}

fn sphere_test_with_distance_squared(center: Vec3, points: &[Vec3], radius: f64) -> usize {
let radius_sq = radius * radius;
points
.iter()
.filter(|&&point| point.distance_squared(center) <= radius_sq)
.count()
}

fn benchmark_distance_methods(c: &mut Criterion) {
// Generate 10,000 random points (typical for a busy game server zone)
let center = Vec3::new(0.0, 0.0, 0.0);
let radius = 100.0;

let points: Vec<Vec3> = (0..10000)
.map(|i| {
let angle = (i as f64) * 0.01;
let dist = (i as f64) * 0.02;
Vec3::new(
dist * angle.cos(),
dist * angle.sin(),
(i as f64) * 0.001,
)
})
.collect();

let mut group = c.benchmark_group("sphere_containment");

group.bench_function("with_distance (sqrt)", |b| {
b.iter(|| {
sphere_test_with_distance(
black_box(center),
black_box(&points),
black_box(radius),
)
})
});

group.bench_function("with_distance_squared (optimized)", |b| {
b.iter(|| {
sphere_test_with_distance_squared(
black_box(center),
black_box(&points),
black_box(radius),
)
})
});

group.finish();

// Individual distance calculations
let mut group = c.benchmark_group("single_distance");
let point1 = Vec3::new(50.0, 30.0, 20.0);
let point2 = Vec3::new(10.0, 15.0, 5.0);

group.bench_function("distance", |b| {
b.iter(|| black_box(point1).distance(black_box(point2)))
});

group.bench_function("distance_squared", |b| {
b.iter(|| black_box(point1).distance_squared(black_box(point2)))
});

group.finish();
}

criterion_group!(benches, benchmark_distance_methods);
criterion_main!(benches);
Loading
Loading