diff --git a/cmd/ethrex/initializers.rs b/cmd/ethrex/initializers.rs index 87765b9aeaa..e37c4562a18 100644 --- a/cmd/ethrex/initializers.rs +++ b/cmd/ethrex/initializers.rs @@ -9,6 +9,7 @@ use ethrex_blockchain::{Blockchain, BlockchainOptions, BlockchainType}; use ethrex_common::fd_limit::raise_fd_limit; use ethrex_common::types::Genesis; use ethrex_config::networks::Network; +use ethrex_rpc::WebSocketConfig; use ethrex_metrics::profiling::{FunctionProfilingLayer, initialize_block_processing_profile}; use ethrex_metrics::rpc::initialize_rpc_metrics; @@ -208,15 +209,18 @@ pub async fn init_rpc_api( ) .await; - let ws_socket_opts = if opts.ws_enabled { - Some(get_ws_socket_addr(opts)) + let ws_config = if opts.ws_enabled { + Some(WebSocketConfig { + addr: get_ws_socket_addr(opts), + subscription_manager: ethrex_rpc::SubscriptionManager::spawn(), + }) } else { None }; let rpc_api = ethrex_rpc::start_api( get_http_socket_addr(opts), - ws_socket_opts, + ws_config, get_authrpc_socket_addr(opts), store, blockchain, diff --git a/cmd/ethrex/l2/initializers.rs b/cmd/ethrex/l2/initializers.rs index 3672b4b46c7..16fcc9603f3 100644 --- a/cmd/ethrex/l2/initializers.rs +++ b/cmd/ethrex/l2/initializers.rs @@ -1,7 +1,7 @@ use crate::cli::Options as L1Options; use crate::initializers::{ self, get_authrpc_socket_addr, get_http_socket_addr, get_local_node_record, get_local_p2p_node, - get_network, get_signer, init_blockchain, init_network, init_store, + get_network, get_signer, get_ws_socket_addr, init_blockchain, init_network, init_store, }; use crate::l2::{L2Options, SequencerOptions}; use crate::utils::{ @@ -22,10 +22,12 @@ use ethrex_p2p::{ sync_manager::SyncManager, types::{Node, NodeRecord}, }; +use ethrex_rpc::{SubscriptionManager, WebSocketConfig}; use ethrex_storage::Store; use ethrex_storage_rollup::{EngineTypeRollup, StoreRollup}; use eyre::OptionExt; use secp256k1::SecretKey; + use spawned_concurrency::tasks::ActorRef; use std::{fs::read_to_string, path::Path, sync::Arc, time::Duration}; use tokio::task::JoinSet; @@ -49,11 +51,13 @@ fn init_rpc_api( rollup_store: StoreRollup, log_filter_handler: Option>, l2_gas_limit: u64, + ws: Option, ) { init_datadir(&opts.datadir); let rpc_api = ethrex_l2_rpc::start_api( get_http_socket_addr(opts), + ws, get_authrpc_socket_addr(opts), store, blockchain, @@ -318,6 +322,16 @@ pub async fn init_l2( ) .await?; + // Create WebSocket config when WS is enabled. + let ws_config = if opts.node_opts.ws_enabled { + Some(WebSocketConfig { + addr: get_ws_socket_addr(&opts.node_opts), + subscription_manager: SubscriptionManager::spawn(), + }) + } else { + None + }; + init_rpc_api( &opts.node_opts, &opts, @@ -331,6 +345,7 @@ pub async fn init_l2( rollup_store.clone(), log_filter_handler, l2_gas_limit, + ws_config.clone(), ); // Initialize metrics if enabled @@ -354,6 +369,7 @@ pub async fn init_l2( genesis, checkpoints_dir, l2_gas_limit, + ws_config.as_ref().map(|ws| ws.subscription_manager.clone()), ) .await?; join_set.spawn(l2_sequencer); diff --git a/crates/l2/networking/rpc/Cargo.toml b/crates/l2/networking/rpc/Cargo.toml index a21ef2ae967..e4c16bdc996 100644 --- a/crates/l2/networking/rpc/Cargo.toml +++ b/crates/l2/networking/rpc/Cargo.toml @@ -19,7 +19,7 @@ ethrex-l2-common.workspace = true ethrex-rpc.workspace = true ethrex-rlp.workspace = true -axum.workspace = true +axum = { features = ["ws"], workspace = true } tower-http.workspace = true serde.workspace = true serde_json = "1.0.117" diff --git a/crates/l2/networking/rpc/rpc.rs b/crates/l2/networking/rpc/rpc.rs index c7eb88ff6d3..26b336adc57 100644 --- a/crates/l2/networking/rpc/rpc.rs +++ b/crates/l2/networking/rpc/rpc.rs @@ -9,6 +9,7 @@ use crate::l2::fees::{ use crate::l2::messages::GetL1MessageProof; use crate::utils::{RpcErr, RpcNamespace, resolve_namespace}; use axum::extract::State; +use axum::extract::ws::WebSocketUpgrade; use axum::{Json, Router, http::StatusCode, routing::post}; use bytes::Bytes; use ethrex_blockchain::Blockchain; @@ -20,7 +21,7 @@ use ethrex_p2p::types::NodeRecord; use ethrex_rpc::RpcHandler as L1RpcHandler; use ethrex_rpc::debug::execution_witness::ExecutionWitnessRequest; use ethrex_rpc::{ - ClientVersion, GasTipEstimator, NodeData, RpcRequestWrapper, + ClientVersion, GasTipEstimator, NodeData, RpcRequestWrapper, WebSocketConfig, types::transaction::SendRawTransactionRequest, utils::{RpcRequest, RpcRequestId}, }; @@ -74,6 +75,7 @@ pub const FILTER_DURATION: Duration = { #[expect(clippy::too_many_arguments)] pub async fn start_api( http_addr: SocketAddr, + ws: Option, authrpc_addr: SocketAddr, storage: Store, blockchain: Arc, @@ -118,6 +120,7 @@ pub async fn start_api( log_filter_handler, gas_ceil: l2_gas_limit, block_worker_channel, + ws: ws.clone(), }, valid_delegation_addresses, sponsor_pk, @@ -145,7 +148,7 @@ pub async fn start_api( let http_router = Router::new() .route("/", post(handle_http_request)) - .layer(cors) + .layer(cors.clone()) .with_state(service_context.clone()); let http_listener = TcpListener::bind(http_addr) .await @@ -157,8 +160,34 @@ pub async fn start_api( info!("Not starting Auth-RPC server. The address passed as argument is {authrpc_addr}"); - let _ = - tokio::try_join!(http_server).inspect_err(|e| info!("Error shutting down servers: {e:?}")); + if let Some(ref ws_config) = ws { + let ws_handler = |ws: WebSocketUpgrade, State(ctx): State| async move { + ws.on_upgrade(|mut socket| async move { + ethrex_rpc::handle_websocket(&mut socket, &ctx.l1_ctx, |req| { + let c = ctx.clone(); + async move { map_http_requests(&req, c).await } + }) + .await; + }) + }; + let ws_router = Router::new() + .route("/", axum::routing::any(ws_handler)) + .layer(cors) + .with_state(service_context); + let ws_listener = TcpListener::bind(ws_config.addr) + .await + .map_err(|error| RpcErr::Internal(error.to_string()))?; + let ws_server = axum::serve(ws_listener, ws_router) + .with_graceful_shutdown(ethrex_rpc::shutdown_signal()) + .into_future(); + info!("Starting WS server at {}", ws_config.addr); + + let _ = tokio::try_join!(http_server, ws_server) + .inspect_err(|e| info!("Error shutting down servers: {e:?}")); + } else { + let _ = tokio::try_join!(http_server) + .inspect_err(|e| info!("Error shutting down servers: {e:?}")); + } Ok(()) } diff --git a/crates/l2/sequencer/block_producer.rs b/crates/l2/sequencer/block_producer.rs index 4c4f66c5780..686596b6375 100644 --- a/crates/l2/sequencer/block_producer.rs +++ b/crates/l2/sequencer/block_producer.rs @@ -16,7 +16,7 @@ use ethrex_common::H256; use ethrex_common::{Address, U256}; use ethrex_l2_sdk::calldata::encode_calldata; use ethrex_rpc::{ - EthClient, + EthClient, SubscriptionManager, SubscriptionManagerProtocol, clients::{EthClientError, Overrides}, }; use ethrex_storage::Store; @@ -35,6 +35,7 @@ use tracing::{debug, error, info, warn}; use crate::{BlockProducerConfig, SequencerConfig}; use ethrex_l2_common::sequencer_state::{SequencerState, SequencerStatus}; + use std::str::FromStr; use super::errors::BlockProducerError; @@ -63,6 +64,8 @@ pub struct BlockProducer { block_gas_limit: u64, eth_client: EthClient, router_address: Address, + /// Actor handle for sending new block headers to WS subscribers. + subscription_manager: Option>, } #[derive(Clone, Serialize)] @@ -84,6 +87,7 @@ impl BlockProducer { sequencer_state: SequencerState, router_address: Address, l2_gas_limit: u64, + subscription_manager: Option>, ) -> Result { let BlockProducerConfig { block_time_ms, @@ -122,6 +126,7 @@ impl BlockProducer { block_gas_limit: l2_gas_limit, eth_client, router_address, + subscription_manager, }) } @@ -201,6 +206,8 @@ impl BlockProducer { let transactions_count = block.body.transactions.len(); let block_number = block.header.number; let block_hash = block.hash(); + // Save the header for newHeads notifications before block is moved into store_block. + let block_header = block.header.clone(); self.store_fee_config_by_block(block.header.number).await?; self.blockchain .store_block(block, account_updates_list, execution_result)?; @@ -217,6 +224,11 @@ impl BlockProducer { // Make the new head be part of the canonical chain apply_fork_choice(&self.store, block_hash, block_hash, block_hash).await?; + // Notify all eth_subscribe("newHeads") subscribers. + if let Some(ref manager) = self.subscription_manager { + let _ = manager.new_head(block_header); + } + metrics!( METRICS_BLOCKS.set_block_number(block_number); #[allow(clippy::as_conversions)] @@ -291,6 +303,7 @@ impl BlockProducer { #[actor(protocol = BlockProducerProtocol)] impl BlockProducer { + #[expect(clippy::too_many_arguments)] pub async fn spawn( store: Store, rollup_store: StoreRollup, @@ -299,6 +312,7 @@ impl BlockProducer { sequencer_state: SequencerState, router_address: Address, l2_gas_limit: u64, + subscription_manager: Option>, ) -> Result, BlockProducerError> { let block_producer = Self::new( &cfg.block_producer, @@ -309,6 +323,7 @@ impl BlockProducer { sequencer_state, router_address, l2_gas_limit, + subscription_manager, )?; let actor_ref = block_producer.start_with_backend(Backend::Blocking); Ok(actor_ref) diff --git a/crates/l2/sequencer/mod.rs b/crates/l2/sequencer/mod.rs index bc735556e6d..e6d1fc238a2 100644 --- a/crates/l2/sequencer/mod.rs +++ b/crates/l2/sequencer/mod.rs @@ -10,6 +10,7 @@ use ethrex_blockchain::Blockchain; use ethrex_common::types::Genesis; use ethrex_l2_common::prover::ProverType; use ethrex_monitor::{EthrexMonitor, MonitorConfig as ExternalMonitorConfig}; +use ethrex_rpc::SubscriptionManager; use ethrex_storage::Store; use ethrex_storage_rollup::StoreRollup; use l1_committer::L1Committer; @@ -53,6 +54,7 @@ pub async fn start_l2( genesis: Genesis, checkpoints_dir: PathBuf, l2_gas_limit: u64, + subscription_manager: Option>, ) -> Result< ( Option>, @@ -160,6 +162,7 @@ pub async fn start_l2( shared_state.clone(), cfg.l1_watcher.router_address, l2_gas_limit, + subscription_manager, ) .await .inspect_err(|err| { diff --git a/crates/networking/rpc/engine/fork_choice.rs b/crates/networking/rpc/engine/fork_choice.rs index 65f3889ee9d..c59e9b72f75 100644 --- a/crates/networking/rpc/engine/fork_choice.rs +++ b/crates/networking/rpc/engine/fork_choice.rs @@ -10,6 +10,7 @@ use tracing::{debug, info, warn}; use crate::{ rpc::{RpcApiContext, RpcHandler}, + subscription_manager::SubscriptionManagerProtocol, types::{ fork_choice::{ ForkChoiceResponse, ForkChoiceState, PayloadAttributesV3, PayloadAttributesV4, @@ -303,6 +304,11 @@ async fn handle_forkchoice( } }; + // Notify all eth_subscribe("newHeads") subscribers. + if let Some(ws) = &context.ws { + let _ = ws.subscription_manager.new_head(head.clone()); + } + Ok(( Some(head), ForkChoiceResponse::from(PayloadStatus::valid_with_hash( diff --git a/crates/networking/rpc/lib.rs b/crates/networking/rpc/lib.rs index 290eed4b3fe..98cc10d0dd8 100644 --- a/crates/networking/rpc/lib.rs +++ b/crates/networking/rpc/lib.rs @@ -67,6 +67,7 @@ mod eth; mod mempool; mod net; pub mod rpc; +pub mod subscription_manager; mod tracing; pub mod clients; @@ -87,7 +88,9 @@ pub use eth::{ transaction::EstimateGasRequest, }; pub use rpc::{ - ClientVersion, NodeData, RpcApiContext, RpcHandler, RpcRequestWrapper, map_debug_requests, + ClientVersion, NodeData, RpcApiContext, RpcHandler, RpcRequestWrapper, WebSocketConfig, + handle_eth_subscribe, handle_eth_unsubscribe, handle_websocket, map_debug_requests, map_eth_requests, map_http_requests, rpc_response, shutdown_signal, }; +pub use subscription_manager::{SubscriptionManager, SubscriptionManagerProtocol}; pub use utils::{RpcErr, RpcErrorMetadata, RpcNamespace}; diff --git a/crates/networking/rpc/rpc.rs b/crates/networking/rpc/rpc.rs index 21e38073508..c66718f1bb0 100644 --- a/crates/networking/rpc/rpc.rs +++ b/crates/networking/rpc/rpc.rs @@ -43,6 +43,7 @@ use crate::eth::{ GetTransactionByHashRequest, GetTransactionReceiptRequest, }, }; +use crate::subscription_manager::{SubscriptionManager, SubscriptionManagerProtocol}; use crate::tracing::{TraceBlockByNumberRequest, TraceTransactionRequest}; use crate::types::transaction::SendRawTransactionRequest; use crate::utils::{ @@ -51,7 +52,7 @@ use crate::utils::{ }; use crate::{admin, net}; use crate::{eth, mempool}; -use axum::extract::ws::WebSocket; +use axum::extract::ws::{Message, WebSocket}; use axum::extract::{DefaultBodyLimit, State, WebSocketUpgrade}; use axum::{Json, Router, http::StatusCode, routing::post}; use axum_extra::{ @@ -71,6 +72,7 @@ use ethrex_p2p::types::NodeRecord; use ethrex_storage::Store; use serde::Deserialize; use serde_json::Value; +use spawned_concurrency::tasks::ActorRef; use std::{ collections::HashMap, future::IntoFuture, @@ -209,6 +211,17 @@ pub struct RpcApiContext { pub gas_ceil: u64, /// Channel for sending blocks to the block executor worker thread. pub block_worker_channel: UnboundedSender, + /// WebSocket configuration. `None` when the WS server is disabled. + pub ws: Option, +} + +/// Configuration for the WebSocket RPC server. +#[derive(Clone)] +pub struct WebSocketConfig { + /// Socket address the WS server listens on. + pub addr: SocketAddr, + /// Actor handle for managing `eth_subscribe` / `eth_unsubscribe` connections. + pub subscription_manager: ActorRef, } impl std::fmt::Debug for RpcApiContext { @@ -437,13 +450,15 @@ pub fn start_block_executor(blockchain: Arc) -> UnboundedSender) -> UnboundedSender) -> UnboundedSender, + ws: Option, authrpc_addr: SocketAddr, storage: Store, blockchain: Arc, @@ -510,6 +526,7 @@ pub async fn start_api( log_filter_handler, gas_ceil, block_worker_channel, + ws: ws.clone(), }; // Periodically clean up the active filters for the filters endpoints. @@ -578,21 +595,27 @@ pub async fn start_api( .into_future(); info!("Starting Auth-RPC server at {authrpc_addr}"); - if let Some(address) = ws_addr { - let ws_handler = |ws: WebSocketUpgrade, ctx| async { - ws.on_upgrade(|socket| handle_websocket(socket, ctx)) + if let Some(ref ws_config) = ws { + let ws_handler = |ws: WebSocketUpgrade, State(ctx): State| async move { + ws.on_upgrade(|mut socket| async move { + handle_websocket(&mut socket, &ctx, |req| { + let c = ctx.clone(); + async move { map_http_requests(&req, c).await } + }) + .await; + }) }; let ws_router = Router::new() .route("/", axum::routing::any(ws_handler)) .layer(cors) .with_state(service_context); - let ws_listener = TcpListener::bind(address) + let ws_listener = TcpListener::bind(ws_config.addr) .await .map_err(|error| RpcErr::Internal(error.to_string()))?; let ws_server = axum::serve(ws_listener, ws_router) .with_graceful_shutdown(shutdown_signal()) .into_future(); - info!("Starting WS server at {address}"); + info!("Starting WS server at {}", ws_config.addr); let _ = tokio::try_join!(authrpc_server, http_server, ws_server) .inspect_err(|e| error!("Error shutting down servers: {e:?}")); @@ -670,27 +693,247 @@ pub async fn handle_authrpc_request( } } -async fn handle_websocket(mut socket: WebSocket, state: State) { - while let Some(message) = socket.recv().await { - let Ok(body) = message - .and_then(|msg| msg.into_text()) - .map(|msg| msg.to_string()) - else { - return; - }; +/// Handle a WebSocket connection. +/// +/// Supports eth_subscribe / eth_unsubscribe for "newHeads" in addition to +/// regular JSON-RPC request-response calls that work the same as over HTTP. +/// +/// The `route_request` closure handles non-subscription JSON-RPC methods. +/// L1 passes its own `map_http_requests`; L2 passes its variant so that +/// L2-specific methods (e.g. `ethrexL2_*`) are reachable over WebSocket. +pub async fn handle_websocket( + socket: &mut WebSocket, + context: &RpcApiContext, + route_request: F, +) where + F: Fn(RpcRequest) -> Fut, + Fut: std::future::Future>, + E: Into, +{ + let (out_tx, mut out_rx) = tokio::sync::mpsc::channel::( + crate::subscription_manager::SUBSCRIBER_CHANNEL_CAPACITY, + ); + // Currently only "newHeads" subscriptions are supported. When additional + // subscription types (e.g., "logs", "newPendingTransactions") are added, + // the subscription tracking below will need per-type handling. + let mut subscription_ids: Vec = Vec::new(); + + loop { + tokio::select! { + msg = socket.recv() => { + let Some(msg) = msg else { break }; + let body = match msg { + Ok(Message::Text(text)) => text.to_string(), + Ok(Message::Close(_)) => break, + Ok(_) => continue, + Err(_) => break, + }; + + let response = handle_ws_request( + &body, context, &out_tx, &mut subscription_ids, &route_request, + ).await; + if let Some(resp) = response + && socket.send(Message::Text(resp.into())).await.is_err() + { + break; + } + } - // ok-clone: increase arc reference count - let Ok(response) = handle_http_request(state.clone(), body) - .await - .map(|res| res.to_string()) - else { - return; - }; + Some(msg) = out_rx.recv() => { + if socket.send(Message::Text(msg.into())).await.is_err() { + break; + } + } + } + } + + if let Some(ws) = &context.ws { + for id in subscription_ids { + let _ = ws.subscription_manager.unsubscribe(id).await; + } + } +} + +async fn handle_ws_request( + body: &str, + context: &RpcApiContext, + out_tx: &tokio::sync::mpsc::Sender, + subscription_ids: &mut Vec, + route_request: &F, +) -> Option +where + F: Fn(RpcRequest) -> Fut, + Fut: std::future::Future>, + E: Into, +{ + // Parse as raw JSON first so we can distinguish between: + // -32700 Parse error (malformed JSON) + // -32600 Invalid Request (valid JSON, but not a valid JSON-RPC request object) + let parsed: Value = match serde_json::from_str(body) { + Ok(v) => v, + Err(_) => return Some(ws_error_response(None, -32700, "Parse error")), + }; + + // Accept both a single request and a batch (array), matching HTTP behavior. + let wrapper: RpcRequestWrapper = match serde_json::from_value(parsed) { + Ok(w) => w, + Err(_) => return Some(ws_error_response(None, -32600, "Invalid Request")), + }; + + match wrapper { + RpcRequestWrapper::Single(req) => { + let resp = + process_ws_request(req, context, out_tx, subscription_ids, route_request).await?; + Some(resp.to_string()) + } + RpcRequestWrapper::Multiple(reqs) => { + // Per JSON-RPC 2.0 spec, an empty batch is an invalid request. + if reqs.is_empty() { + return Some(ws_error_response(None, -32600, "Invalid Request")); + } + let mut responses = Vec::with_capacity(reqs.len()); + for req in reqs { + if let Some(resp) = + process_ws_request(req, context, out_tx, subscription_ids, route_request).await + { + responses.push(resp); + } + } + if responses.is_empty() { + None + } else { + serde_json::to_string(&responses).ok() + } + } + } +} + +async fn process_ws_request( + req: RpcRequest, + context: &RpcApiContext, + out_tx: &tokio::sync::mpsc::Sender, + subscription_ids: &mut Vec, + route_request: &F, +) -> Option +where + F: Fn(RpcRequest) -> Fut, + Fut: std::future::Future>, + E: Into, +{ + match req.method.as_str() { + "eth_subscribe" => { + let result = handle_eth_subscribe(&req, context, out_tx, subscription_ids).await; + rpc_response(req.id, result).ok() + } + "eth_unsubscribe" => { + let result = handle_eth_unsubscribe(&req, context, subscription_ids).await; + rpc_response(req.id, result).ok() + } + _ => { + let id = req.id.clone(); + let res = route_request(req).await; + rpc_response(id, res).ok() + } + } +} + +/// Build a JSON-RPC 2.0 error response. Used for transport-level errors +/// (parse error, invalid request) where the request ID is unknown. +fn ws_error_response(id: Option, code: i32, message: &str) -> String { + let id = match id { + Some(id) => serde_json::to_value(id).unwrap_or(Value::Null), + None => Value::Null, + }; + serde_json::json!({ + "jsonrpc": "2.0", + "id": id, + "error": { "code": code, "message": message }, + }) + .to_string() +} + +/// Handle `eth_subscribe`. +/// +/// Only `"newHeads"` is supported. Registers this connection with the +/// `SubscriptionManager` actor and returns the subscription ID. +pub async fn handle_eth_subscribe( + req: &crate::utils::RpcRequest, + context: &RpcApiContext, + out_tx: &tokio::sync::mpsc::Sender, + subscription_ids: &mut Vec, +) -> Result { + use crate::subscription_manager::MAX_SUBSCRIPTIONS_PER_CONNECTION; + + let params = req.params.as_deref().unwrap_or(&[]); + let sub_type = params.first().and_then(|v| v.as_str()).ok_or_else(|| { + RpcErr::BadParams("eth_subscribe requires a subscription type parameter".to_string()) + })?; + + if subscription_ids.len() >= MAX_SUBSCRIPTIONS_PER_CONNECTION { + return Err(RpcErr::BadParams(format!( + "Too many subscriptions (max {MAX_SUBSCRIPTIONS_PER_CONNECTION})" + ))); + } - if socket.send(response.into()).await.is_err() { - return; + match sub_type { + "newHeads" => { + let ws = context + .ws + .as_ref() + .ok_or_else(|| RpcErr::Internal("WebSocket server not enabled".to_string()))?; + + let id = ws + .subscription_manager + .subscribe(out_tx.clone()) + .await + .map_err(|e| RpcErr::Internal(format!("Subscription failed: {e}")))?; + + subscription_ids.push(id.clone()); + Ok(Value::String(id)) } + other => Err(RpcErr::BadParams(format!( + "Unsupported subscription type: {other}" + ))), + } +} + +/// Handle `eth_unsubscribe`. +/// +/// Delegates to the [`SubscriptionManager`] actor and returns `true` if the +/// subscription was found and removed, `false` otherwise. +pub async fn handle_eth_unsubscribe( + req: &crate::utils::RpcRequest, + context: &RpcApiContext, + subscription_ids: &mut Vec, +) -> Result { + let params = req.params.as_deref().unwrap_or(&[]); + let sub_id = params + .first() + .and_then(|v| v.as_str()) + .ok_or_else(|| { + RpcErr::BadParams("eth_unsubscribe requires a subscription ID parameter".to_string()) + })? + .to_string(); + + // Only unsubscribe if the requested ID belongs to this connection. + let Some(pos) = subscription_ids.iter().position(|id| id == &sub_id) else { + return Ok(Value::Bool(false)); + }; + + let removed = if let Some(ref ws) = context.ws { + ws.subscription_manager + .unsubscribe(sub_id) + .await + .unwrap_or(false) + } else { + false + }; + + if removed { + subscription_ids.swap_remove(pos); } + + Ok(Value::Bool(removed)) } /// Handle requests that can come from either clients or other users diff --git a/crates/networking/rpc/subscription_manager.rs b/crates/networking/rpc/subscription_manager.rs new file mode 100644 index 00000000000..e27a4aed38b --- /dev/null +++ b/crates/networking/rpc/subscription_manager.rs @@ -0,0 +1,168 @@ +//! Actor-based subscription manager for WebSocket `eth_subscribe` connections. +//! +//! The `SubscriptionManager` is a GenServer actor that owns all subscription +//! state. It receives `NewHead` messages from block producers / fork choice +//! handlers and fans out notifications to all connected WebSocket clients +//! through per-connection `mpsc` channels. +//! +//! Using an actor removes the need for a `broadcast` channel and eliminates +//! the "lagged subscriber" problem: when a connection drops, its sender is +//! removed during the next `new_head` fan-out rather than silently accumulating +//! unread messages. + +use ethrex_common::types::BlockHeader; +use rand::RngCore; +use serde_json::Value; +use spawned_concurrency::{ + actor, + error::ActorError, + protocol, + tasks::{Actor, ActorRef, ActorStart as _, Context, Handler, Response}, +}; +use std::collections::HashMap; +use tokio::sync::mpsc::Sender; +use tracing::{debug, warn}; + +/// Maximum number of buffered notifications per subscriber. +/// If a subscriber's channel is full (slow WebSocket client), the notification +/// is dropped rather than blocking the actor. Matches Geth's approach of +/// dropping slow clients (Geth uses 20,000; we use a smaller buffer since +/// each notification is already serialized JSON). +pub const SUBSCRIBER_CHANNEL_CAPACITY: usize = 512; + +/// Maximum number of active subscriptions allowed per WebSocket connection. +pub const MAX_SUBSCRIPTIONS_PER_CONNECTION: usize = 128; + +/// Actor that manages all active WebSocket subscriptions. +/// +/// Each subscription is identified by a hex-encoded string ID and backed by a +/// bounded `Sender` that delivers serialised notification JSON to the +/// corresponding WebSocket write-loop. +#[derive(Default)] +pub struct SubscriptionManager { + subscribers: HashMap>, +} + +/// Messages understood by the [`SubscriptionManager`]. +#[protocol] +pub trait SubscriptionManagerProtocol: Send + Sync { + /// Broadcast a new block header to all `newHeads` subscribers. + /// + /// The actor handles serialization and hash injection. Callers just + /// pass the raw `BlockHeader`. Dead subscribers are removed automatically + /// when their channel is closed. + fn new_head(&self, header: BlockHeader) -> Result<(), ActorError>; + + /// Register a new subscriber. + /// + /// Returns the subscription ID that the client should use in subsequent + /// `eth_unsubscribe` calls. + fn subscribe(&self, sender: Sender) -> Response; + + /// Remove a subscriber by ID. + /// + /// Returns `true` if the subscription existed and was removed, `false` + /// otherwise. + fn unsubscribe(&self, id: String) -> Response; +} + +#[actor(protocol = SubscriptionManagerProtocol)] +impl SubscriptionManager { + /// Spawn the actor and return a handle. + pub fn spawn() -> ActorRef { + SubscriptionManager::default().start() + } + + #[send_handler] + async fn handle_new_head( + &mut self, + msg: subscription_manager_protocol::NewHead, + _ctx: &Context, + ) { + if self.subscribers.is_empty() { + return; + } + + // Serialize the header and inject the computed block hash. + let header = msg.header; + let block_hash = header.hash(); + let mut header_value = match serde_json::to_value(&header) { + Ok(v) => v, + Err(e) => { + warn!("Failed to serialize block header for newHeads: {e}"); + return; + } + }; + if let Value::Object(ref mut map) = header_value { + map.insert( + "hash".to_string(), + Value::String(format!("{block_hash:#x}")), + ); + } + + let mut dead_ids: Vec = Vec::new(); + + for (sub_id, sender) in &self.subscribers { + let notification = build_subscription_notification(sub_id, &header_value); + match sender.try_send(notification) { + Ok(()) => {} + Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { + dead_ids.push(sub_id.clone()); + } + Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { + warn!(sub_id = %sub_id, "Subscriber channel full, dropping notification"); + } + } + } + + for id in dead_ids { + debug!(sub_id = %id, "Removing closed newHeads subscriber"); + self.subscribers.remove(&id); + } + } + + #[request_handler] + async fn handle_subscribe( + &mut self, + msg: subscription_manager_protocol::Subscribe, + _ctx: &Context, + ) -> String { + let id = generate_subscription_id(); + self.subscribers.insert(id.clone(), msg.sender); + id + } + + #[request_handler] + async fn handle_unsubscribe( + &mut self, + msg: subscription_manager_protocol::Unsubscribe, + _ctx: &Context, + ) -> bool { + self.subscribers.remove(&msg.id).is_some() + } +} + +/// Build the standard Ethereum subscription notification envelope. +/// +/// `result` is cloned per subscriber — cheap relative to re-serializing the +/// header. Using `serde_json::json!` avoids hand-rolled string interpolation, +/// which would silently produce malformed JSON if `sub_id` or the result ever +/// contained unescaped characters. +fn build_subscription_notification(sub_id: &str, result: &Value) -> String { + serde_json::json!({ + "jsonrpc": "2.0", + "method": "eth_subscription", + "params": { + "subscription": sub_id, + "result": result, + }, + }) + .to_string() +} + +/// Generate a random hex subscription ID (16 bytes / 128 bits). +fn generate_subscription_id() -> String { + let mut bytes = [0u8; 16]; + rand::thread_rng().fill_bytes(&mut bytes); + format!("0x{}", hex::encode(bytes)) +} diff --git a/crates/networking/rpc/test_utils.rs b/crates/networking/rpc/test_utils.rs index a656bc406be..3428efa4ddb 100644 --- a/crates/networking/rpc/test_utils.rs +++ b/crates/networking/rpc/test_utils.rs @@ -224,7 +224,6 @@ pub fn example_local_node_record() -> NodeRecord { // ``` pub async fn start_test_api() -> tokio::task::JoinHandle<()> { let http_addr: SocketAddr = "127.0.0.1:8500".parse().unwrap(); - let ws_addr: SocketAddr = "127.0.0.1:8546".parse().unwrap(); let authrpc_addr: SocketAddr = "127.0.0.1:8501".parse().unwrap(); let mut storage = Store::new("", EngineType::InMemory).expect("Failed to create in-memory storage"); @@ -239,7 +238,7 @@ pub async fn start_test_api() -> tokio::task::JoinHandle<()> { tokio::spawn(async move { start_api( http_addr, - Some(ws_addr), + None, authrpc_addr, storage.clone(), blockchain.clone(), @@ -293,6 +292,7 @@ pub async fn default_context_with_storage(storage: Store) -> RpcApiContext { log_filter_handler: None, gas_ceil: DEFAULT_BUILDER_GAS_CEIL, block_worker_channel, + ws: None, } } diff --git a/crates/networking/rpc/utils.rs b/crates/networking/rpc/utils.rs index 7188fd7dba8..780ba93b9c7 100644 --- a/crates/networking/rpc/utils.rs +++ b/crates/networking/rpc/utils.rs @@ -239,7 +239,7 @@ pub enum RpcNamespace { /// /// Per the JSON-RPC 2.0 spec, request IDs can be either numbers or strings. /// The same ID must be returned in the response. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(untagged)] pub enum RpcRequestId { /// Numeric request ID. @@ -260,7 +260,7 @@ pub enum RpcRequestId { /// "params": ["0x...", "latest"] /// } /// ``` -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct RpcRequest { /// Request identifier, echoed back in the response. pub id: RpcRequestId, diff --git a/test/tests/rpc/mod.rs b/test/tests/rpc/mod.rs index e117f98f4db..a075dfceb4f 100644 --- a/test/tests/rpc/mod.rs +++ b/test/tests/rpc/mod.rs @@ -1 +1,2 @@ mod client_version_tests; +mod subscription_manager_tests; diff --git a/test/tests/rpc/subscription_manager_tests.rs b/test/tests/rpc/subscription_manager_tests.rs new file mode 100644 index 00000000000..caed1fa1940 --- /dev/null +++ b/test/tests/rpc/subscription_manager_tests.rs @@ -0,0 +1,145 @@ +use ethrex_common::types::BlockHeader; +use ethrex_rpc::subscription_manager::{ + SUBSCRIBER_CHANNEL_CAPACITY, SubscriptionManager, SubscriptionManagerProtocol, +}; +use tokio::sync::mpsc; + +#[tokio::test] +async fn subscribe_returns_unique_ids() { + let manager = SubscriptionManager::spawn(); + + let (tx1, _rx1) = mpsc::channel(SUBSCRIBER_CHANNEL_CAPACITY); + let (tx2, _rx2) = mpsc::channel(SUBSCRIBER_CHANNEL_CAPACITY); + + let id1 = manager.subscribe(tx1).await.unwrap(); + let id2 = manager.subscribe(tx2).await.unwrap(); + + assert_ne!(id1, id2); + assert!(id1.starts_with("0x")); + assert!(id2.starts_with("0x")); +} + +#[tokio::test] +async fn unsubscribe_existing_returns_true() { + let manager = SubscriptionManager::spawn(); + + let (tx, _rx) = mpsc::channel(SUBSCRIBER_CHANNEL_CAPACITY); + let id = manager.subscribe(tx).await.unwrap(); + + let removed = manager.unsubscribe(id).await.unwrap(); + assert!(removed); +} + +#[tokio::test] +async fn unsubscribe_nonexistent_returns_false() { + let manager = SubscriptionManager::spawn(); + + let removed = manager.unsubscribe("0xdeadbeef".to_string()).await.unwrap(); + assert!(!removed); +} + +#[tokio::test] +async fn unsubscribe_twice_returns_false_second_time() { + let manager = SubscriptionManager::spawn(); + + let (tx, _rx) = mpsc::channel(SUBSCRIBER_CHANNEL_CAPACITY); + let id = manager.subscribe(tx).await.unwrap(); + + assert!(manager.unsubscribe(id.clone()).await.unwrap()); + assert!(!manager.unsubscribe(id).await.unwrap()); +} + +#[tokio::test] +async fn new_head_fans_out_to_all_subscribers() { + let manager = SubscriptionManager::spawn(); + + let (tx1, mut rx1) = mpsc::channel(SUBSCRIBER_CHANNEL_CAPACITY); + let (tx2, mut rx2) = mpsc::channel(SUBSCRIBER_CHANNEL_CAPACITY); + + let id1 = manager.subscribe(tx1).await.unwrap(); + let id2 = manager.subscribe(tx2).await.unwrap(); + + let header = BlockHeader::default(); + manager.new_head(header).unwrap(); + + let msg1 = tokio::time::timeout(std::time::Duration::from_secs(2), rx1.recv()) + .await + .expect("timed out waiting for subscriber 1") + .expect("channel closed"); + let msg2 = tokio::time::timeout(std::time::Duration::from_secs(2), rx2.recv()) + .await + .expect("timed out waiting for subscriber 2") + .expect("channel closed"); + + // Verify notification envelope structure. + let v1: serde_json::Value = serde_json::from_str(&msg1).unwrap(); + assert_eq!(v1["jsonrpc"], "2.0"); + assert_eq!(v1["method"], "eth_subscription"); + assert_eq!(v1["params"]["subscription"], id1); + assert!(v1["params"]["result"]["hash"].is_string()); + + let v2: serde_json::Value = serde_json::from_str(&msg2).unwrap(); + assert_eq!(v2["params"]["subscription"], id2); +} + +#[tokio::test] +async fn new_head_removes_dead_subscribers() { + let manager = SubscriptionManager::spawn(); + + let (tx_alive, mut rx_alive) = mpsc::channel(SUBSCRIBER_CHANNEL_CAPACITY); + let (tx_dead, rx_dead) = mpsc::channel(SUBSCRIBER_CHANNEL_CAPACITY); + + let _id_alive = manager.subscribe(tx_alive).await.unwrap(); + let id_dead = manager.subscribe(tx_dead).await.unwrap(); + + // Drop the receiver so the dead subscriber's channel is closed. + drop(rx_dead); + + let header = BlockHeader::default(); + manager.new_head(header).unwrap(); + + // The alive subscriber should still receive the notification. + let msg = tokio::time::timeout(std::time::Duration::from_secs(2), rx_alive.recv()) + .await + .expect("timed out waiting for alive subscriber") + .expect("channel closed"); + assert!(!msg.is_empty()); + + // The dead subscriber should have been cleaned up — unsubscribe returns false. + // Give the actor a moment to process the new_head and clean up. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + let removed = manager.unsubscribe(id_dead).await.unwrap(); + assert!(!removed, "dead subscriber should have been cleaned up"); +} + +#[tokio::test] +async fn new_head_with_no_subscribers_does_not_panic() { + let manager = SubscriptionManager::spawn(); + let header = BlockHeader::default(); + // Should not panic or error. + manager.new_head(header).unwrap(); +} + +#[tokio::test] +async fn notification_contains_block_hash() { + let manager = SubscriptionManager::spawn(); + + let (tx, mut rx) = mpsc::channel(SUBSCRIBER_CHANNEL_CAPACITY); + let _id = manager.subscribe(tx).await.unwrap(); + + let header = BlockHeader::default(); + let expected_hash = format!("{:#x}", header.hash()); + + manager.new_head(header).unwrap(); + + let msg = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) + .await + .expect("timed out") + .expect("channel closed"); + + let v: serde_json::Value = serde_json::from_str(&msg).unwrap(); + assert_eq!( + v["params"]["result"]["hash"].as_str().unwrap(), + expected_hash + ); +}