Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
e4d9702
Add eth_subscribe/eth_unsubscribe WebSocket subscription support
avilagaston9 Apr 16, 2026
273af43
Resolve ws_addr from L1 options inside init_rpc_api instead of passing
avilagaston9 Apr 16, 2026
7dc57d9
Use a single new_heads_sender variable and clone when needed, instead of
avilagaston9 Apr 16, 2026
d66d10d
Refine Credible Layer integration: boolean gate flag, log levels, and…
avilagaston9 Apr 16, 2026
4905895
Combine ws_socket_opts and new_heads_sender into a single conditional…
avilagaston9 Apr 16, 2026
79e31e7
Introduce WebSocketConfig struct to bundle WS address and broadcast
avilagaston9 Apr 16, 2026
0881580
Clean up WebSocketConfig usage: add explicit imports instead of full
avilagaston9 Apr 16, 2026
6cb8d88
Replace broadcast channel with SubscriptionManager actor for WebSocke…
avilagaston9 Apr 16, 2026
687be21
Remove unnecessary delegation comments in L2 WS handler
avilagaston9 Apr 16, 2026
70a5e5b
Move block header serialization into SubscriptionManager actor.
avilagaston9 Apr 16, 2026
3def76e
Subscribe only on eth_subscribe, not on every WS connection.
avilagaston9 Apr 16, 2026
1114402
Deduplicate WS handling: L2 reuses L1's handle_websocket directly.
avilagaston9 Apr 16, 2026
4f89ad4
Remove handle_websocket wrapper in L2 — call L1 directly from the axu…
avilagaston9 Apr 16, 2026
b26ae62
Remove duplicate doc comment on handle_websocket
avilagaston9 Apr 16, 2026
944f02f
Use bounded channel (capacity 512) for subscriber notifications instead
avilagaston9 Apr 16, 2026
01f2e92
Fix import ordering (cargo fmt)
avilagaston9 Apr 16, 2026
15e5d85
Remove accidentally committed .claude directory
avilagaston9 Apr 16, 2026
6842695
Allow too_many_arguments on BlockProducer::spawn
avilagaston9 Apr 16, 2026
92f05f9
Allow multiple subscriptions per WebSocket connection and add Subscri…
avilagaston9 Apr 16, 2026
25e2718
Use correct JSON-RPC error codes in WebSocket handler: return
avilagaston9 Apr 16, 2026
fd67b9b
Fix formatting in subscription_manager_tests.rs (cargo fmt)
avilagaston9 Apr 17, 2026
ea3a57b
Use random subscription IDs, add per-connection subscription limit,
avilagaston9 Apr 17, 2026
abc812f
Make handle_websocket generic over a request-routing function so L2
avilagaston9 Apr 17, 2026
12bcf13
Revert .claude/ addition to .gitignore since it should not be part
avilagaston9 Apr 17, 2026
9d50083
Make build_subscription_notification private since it is only used wi…
avilagaston9 Apr 17, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions cmd/ethrex/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use ethrex_blockchain::{Blockchain, BlockchainOptions, BlockchainType};
use ethrex_common::fd_limit::raise_fd_limit;
use ethrex_common::types::Genesis;
use ethrex_config::networks::Network;
use ethrex_rpc::WebSocketConfig;

use ethrex_metrics::profiling::{FunctionProfilingLayer, initialize_block_processing_profile};
use ethrex_metrics::rpc::initialize_rpc_metrics;
Expand Down Expand Up @@ -208,15 +209,18 @@ pub async fn init_rpc_api(
)
.await;

let ws_socket_opts = if opts.ws_enabled {
Some(get_ws_socket_addr(opts))
let ws_config = if opts.ws_enabled {
Some(WebSocketConfig {
addr: get_ws_socket_addr(opts),
subscription_manager: ethrex_rpc::SubscriptionManager::spawn(),
})
} else {
None
};

let rpc_api = ethrex_rpc::start_api(
get_http_socket_addr(opts),
ws_socket_opts,
ws_config,
get_authrpc_socket_addr(opts),
store,
blockchain,
Expand Down
18 changes: 17 additions & 1 deletion cmd/ethrex/l2/initializers.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::cli::Options as L1Options;
use crate::initializers::{
self, get_authrpc_socket_addr, get_http_socket_addr, get_local_node_record, get_local_p2p_node,
get_network, get_signer, init_blockchain, init_network, init_store,
get_network, get_signer, get_ws_socket_addr, init_blockchain, init_network, init_store,
};
use crate::l2::{L2Options, SequencerOptions};
use crate::utils::{
Expand All @@ -22,10 +22,12 @@ use ethrex_p2p::{
sync_manager::SyncManager,
types::{Node, NodeRecord},
};
use ethrex_rpc::{SubscriptionManager, WebSocketConfig};
use ethrex_storage::Store;
use ethrex_storage_rollup::{EngineTypeRollup, StoreRollup};
use eyre::OptionExt;
use secp256k1::SecretKey;

use spawned_concurrency::tasks::ActorRef;
use std::{fs::read_to_string, path::Path, sync::Arc, time::Duration};
use tokio::task::JoinSet;
Expand All @@ -49,11 +51,13 @@ fn init_rpc_api(
rollup_store: StoreRollup,
log_filter_handler: Option<reload::Handle<EnvFilter, Registry>>,
l2_gas_limit: u64,
ws: Option<WebSocketConfig>,
) {
init_datadir(&opts.datadir);

let rpc_api = ethrex_l2_rpc::start_api(
get_http_socket_addr(opts),
ws,
get_authrpc_socket_addr(opts),
store,
blockchain,
Expand Down Expand Up @@ -318,6 +322,16 @@ pub async fn init_l2(
)
.await?;

// Create WebSocket config when WS is enabled.
let ws_config = if opts.node_opts.ws_enabled {
Some(WebSocketConfig {
addr: get_ws_socket_addr(&opts.node_opts),
subscription_manager: SubscriptionManager::spawn(),
})
} else {
None
};

init_rpc_api(
&opts.node_opts,
&opts,
Expand All @@ -331,6 +345,7 @@ pub async fn init_l2(
rollup_store.clone(),
log_filter_handler,
l2_gas_limit,
ws_config.clone(),
);

// Initialize metrics if enabled
Expand All @@ -354,6 +369,7 @@ pub async fn init_l2(
genesis,
checkpoints_dir,
l2_gas_limit,
ws_config.as_ref().map(|ws| ws.subscription_manager.clone()),
)
.await?;
join_set.spawn(l2_sequencer);
Expand Down
2 changes: 1 addition & 1 deletion crates/l2/networking/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ ethrex-l2-common.workspace = true
ethrex-rpc.workspace = true
ethrex-rlp.workspace = true

axum.workspace = true
axum = { features = ["ws"], workspace = true }
tower-http.workspace = true
serde.workspace = true
serde_json = "1.0.117"
Expand Down
37 changes: 33 additions & 4 deletions crates/l2/networking/rpc/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::l2::fees::{
use crate::l2::messages::GetL1MessageProof;
use crate::utils::{RpcErr, RpcNamespace, resolve_namespace};
use axum::extract::State;
use axum::extract::ws::WebSocketUpgrade;
use axum::{Json, Router, http::StatusCode, routing::post};
use bytes::Bytes;
use ethrex_blockchain::Blockchain;
Expand All @@ -20,7 +21,7 @@ use ethrex_p2p::types::NodeRecord;
use ethrex_rpc::RpcHandler as L1RpcHandler;
use ethrex_rpc::debug::execution_witness::ExecutionWitnessRequest;
use ethrex_rpc::{
ClientVersion, GasTipEstimator, NodeData, RpcRequestWrapper,
ClientVersion, GasTipEstimator, NodeData, RpcRequestWrapper, WebSocketConfig,
types::transaction::SendRawTransactionRequest,
utils::{RpcRequest, RpcRequestId},
};
Expand Down Expand Up @@ -74,6 +75,7 @@ pub const FILTER_DURATION: Duration = {
#[expect(clippy::too_many_arguments)]
pub async fn start_api(
http_addr: SocketAddr,
ws: Option<WebSocketConfig>,
authrpc_addr: SocketAddr,
storage: Store,
blockchain: Arc<Blockchain>,
Expand Down Expand Up @@ -118,6 +120,7 @@ pub async fn start_api(
log_filter_handler,
gas_ceil: l2_gas_limit,
block_worker_channel,
ws: ws.clone(),
},
valid_delegation_addresses,
sponsor_pk,
Expand Down Expand Up @@ -145,7 +148,7 @@ pub async fn start_api(

let http_router = Router::new()
.route("/", post(handle_http_request))
.layer(cors)
.layer(cors.clone())
.with_state(service_context.clone());
let http_listener = TcpListener::bind(http_addr)
.await
Expand All @@ -157,8 +160,34 @@ pub async fn start_api(

info!("Not starting Auth-RPC server. The address passed as argument is {authrpc_addr}");

let _ =
tokio::try_join!(http_server).inspect_err(|e| info!("Error shutting down servers: {e:?}"));
if let Some(ref ws_config) = ws {
let ws_handler = |ws: WebSocketUpgrade, State(ctx): State<RpcApiContext>| async move {
ws.on_upgrade(|mut socket| async move {
ethrex_rpc::handle_websocket(&mut socket, &ctx.l1_ctx, |req| {
let c = ctx.clone();
async move { map_http_requests(&req, c).await }
})
.await;
})
};
let ws_router = Router::new()
.route("/", axum::routing::any(ws_handler))
.layer(cors)
.with_state(service_context);
let ws_listener = TcpListener::bind(ws_config.addr)
.await
.map_err(|error| RpcErr::Internal(error.to_string()))?;
let ws_server = axum::serve(ws_listener, ws_router)
.with_graceful_shutdown(ethrex_rpc::shutdown_signal())
.into_future();
info!("Starting WS server at {}", ws_config.addr);

let _ = tokio::try_join!(http_server, ws_server)
.inspect_err(|e| info!("Error shutting down servers: {e:?}"));
} else {
let _ = tokio::try_join!(http_server)
.inspect_err(|e| info!("Error shutting down servers: {e:?}"));
}

Ok(())
}
Expand Down
17 changes: 16 additions & 1 deletion crates/l2/sequencer/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use ethrex_common::H256;
use ethrex_common::{Address, U256};
use ethrex_l2_sdk::calldata::encode_calldata;
use ethrex_rpc::{
EthClient,
EthClient, SubscriptionManager, SubscriptionManagerProtocol,
clients::{EthClientError, Overrides},
};
use ethrex_storage::Store;
Expand All @@ -35,6 +35,7 @@ use tracing::{debug, error, info, warn};

use crate::{BlockProducerConfig, SequencerConfig};
use ethrex_l2_common::sequencer_state::{SequencerState, SequencerStatus};

use std::str::FromStr;

use super::errors::BlockProducerError;
Expand Down Expand Up @@ -63,6 +64,8 @@ pub struct BlockProducer {
block_gas_limit: u64,
eth_client: EthClient,
router_address: Address,
/// Actor handle for sending new block headers to WS subscribers.
subscription_manager: Option<ActorRef<SubscriptionManager>>,
}

#[derive(Clone, Serialize)]
Expand All @@ -84,6 +87,7 @@ impl BlockProducer {
sequencer_state: SequencerState,
router_address: Address,
l2_gas_limit: u64,
subscription_manager: Option<ActorRef<SubscriptionManager>>,
) -> Result<Self, EthClientError> {
let BlockProducerConfig {
block_time_ms,
Expand Down Expand Up @@ -122,6 +126,7 @@ impl BlockProducer {
block_gas_limit: l2_gas_limit,
eth_client,
router_address,
subscription_manager,
})
}

Expand Down Expand Up @@ -201,6 +206,8 @@ impl BlockProducer {
let transactions_count = block.body.transactions.len();
let block_number = block.header.number;
let block_hash = block.hash();
// Save the header for newHeads notifications before block is moved into store_block.
let block_header = block.header.clone();
self.store_fee_config_by_block(block.header.number).await?;
self.blockchain
.store_block(block, account_updates_list, execution_result)?;
Expand All @@ -217,6 +224,11 @@ impl BlockProducer {
// Make the new head be part of the canonical chain
apply_fork_choice(&self.store, block_hash, block_hash, block_hash).await?;

// Notify all eth_subscribe("newHeads") subscribers.
if let Some(ref manager) = self.subscription_manager {
let _ = manager.new_head(block_header);
}

metrics!(
METRICS_BLOCKS.set_block_number(block_number);
#[allow(clippy::as_conversions)]
Expand Down Expand Up @@ -291,6 +303,7 @@ impl BlockProducer {

#[actor(protocol = BlockProducerProtocol)]
impl BlockProducer {
#[expect(clippy::too_many_arguments)]
pub async fn spawn(
store: Store,
rollup_store: StoreRollup,
Expand All @@ -299,6 +312,7 @@ impl BlockProducer {
sequencer_state: SequencerState,
router_address: Address,
l2_gas_limit: u64,
subscription_manager: Option<ActorRef<SubscriptionManager>>,
) -> Result<ActorRef<BlockProducer>, BlockProducerError> {
let block_producer = Self::new(
&cfg.block_producer,
Expand All @@ -309,6 +323,7 @@ impl BlockProducer {
sequencer_state,
router_address,
l2_gas_limit,
subscription_manager,
)?;
let actor_ref = block_producer.start_with_backend(Backend::Blocking);
Ok(actor_ref)
Expand Down
3 changes: 3 additions & 0 deletions crates/l2/sequencer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use ethrex_blockchain::Blockchain;
use ethrex_common::types::Genesis;
use ethrex_l2_common::prover::ProverType;
use ethrex_monitor::{EthrexMonitor, MonitorConfig as ExternalMonitorConfig};
use ethrex_rpc::SubscriptionManager;
use ethrex_storage::Store;
use ethrex_storage_rollup::StoreRollup;
use l1_committer::L1Committer;
Expand Down Expand Up @@ -53,6 +54,7 @@ pub async fn start_l2(
genesis: Genesis,
checkpoints_dir: PathBuf,
l2_gas_limit: u64,
subscription_manager: Option<ActorRef<SubscriptionManager>>,
) -> Result<
(
Option<ActorRef<L1Committer>>,
Expand Down Expand Up @@ -160,6 +162,7 @@ pub async fn start_l2(
shared_state.clone(),
cfg.l1_watcher.router_address,
l2_gas_limit,
subscription_manager,
)
.await
.inspect_err(|err| {
Expand Down
6 changes: 6 additions & 0 deletions crates/networking/rpc/engine/fork_choice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tracing::{debug, info, warn};

use crate::{
rpc::{RpcApiContext, RpcHandler},
subscription_manager::SubscriptionManagerProtocol,
types::{
fork_choice::{
ForkChoiceResponse, ForkChoiceState, PayloadAttributesV3, PayloadAttributesV4,
Expand Down Expand Up @@ -303,6 +304,11 @@ async fn handle_forkchoice(
}
};

// Notify all eth_subscribe("newHeads") subscribers.
if let Some(ws) = &context.ws {
let _ = ws.subscription_manager.new_head(head.clone());
}

Ok((
Some(head),
ForkChoiceResponse::from(PayloadStatus::valid_with_hash(
Expand Down
5 changes: 4 additions & 1 deletion crates/networking/rpc/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ mod eth;
mod mempool;
mod net;
pub mod rpc;
pub mod subscription_manager;
mod tracing;

pub mod clients;
Expand All @@ -87,7 +88,9 @@ pub use eth::{
transaction::EstimateGasRequest,
};
pub use rpc::{
ClientVersion, NodeData, RpcApiContext, RpcHandler, RpcRequestWrapper, map_debug_requests,
ClientVersion, NodeData, RpcApiContext, RpcHandler, RpcRequestWrapper, WebSocketConfig,
handle_eth_subscribe, handle_eth_unsubscribe, handle_websocket, map_debug_requests,
map_eth_requests, map_http_requests, rpc_response, shutdown_signal,
};
pub use subscription_manager::{SubscriptionManager, SubscriptionManagerProtocol};
pub use utils::{RpcErr, RpcErrorMetadata, RpcNamespace};
Loading
Loading