Skip to content

feat(l1): add eth_subscribe/eth_unsubscribe WebSocket subscription support#6496

Open
avilagaston9 wants to merge 25 commits intomainfrom
feat/l1/ws-subscriptions
Open

feat(l1): add eth_subscribe/eth_unsubscribe WebSocket subscription support#6496
avilagaston9 wants to merge 25 commits intomainfrom
feat/l1/ws-subscriptions

Conversation

@avilagaston9
Copy link
Copy Markdown
Contributor

@avilagaston9 avilagaston9 commented Apr 16, 2026

Motivation

Implement eth_subscribe("newHeads") WebSocket subscription support. This is standard Ethereum JSON-RPC functionality needed by external tools (e.g., the Credible Layer sidecar) that track new blocks in real time.

Description

Add a SubscriptionManager GenServer actor that owns all subscription state. When a new block becomes canonical (via fork choice update on L1, or block production on L2), the head BlockHeader is sent to the actor, which serializes it and fans out notifications to all subscribed WebSocket clients through bounded per-connection channels.

  • SubscriptionManager actor (crates/networking/rpc/subscription_manager.rs): handles Subscribe, Unsubscribe, and NewHead messages. Dead subscribers are cleaned up automatically on the next NewHead fan-out.
  • WebSocketConfig struct bundles the WS socket address and the actor handle.
  • handle_websocket is public so L2 reuses it directly — zero duplicate WS code in L2.
  • Subscriptions are created only on explicit eth_subscribe("newHeads"), not on connection open.
  • Bounded per-subscriber channel (capacity 512): slow clients lose notifications via try_send instead of accumulating unbounded memory. The actor is never blocked.
  • L1 broadcasts newHeads from engine/fork_choice.rs after successful canonical head advancement.
  • L2 broadcasts from the block producer after sealing each block.
  • L2 uses L1's existing --ws.enabled / --ws.addr / --ws.port CLI flags — no L2-specific WS flags.

How to Test

# Start ethrex with WS enabled
ethrex --ws.enabled --ws.port 8546 ...

# In another terminal, subscribe via websocat or python:
python3 -c "
import asyncio, json, websockets
async def test():
    async with websockets.connect('ws://localhost:8546') as ws:
        await ws.send(json.dumps({'jsonrpc':'2.0','method':'eth_subscribe','params':['newHeads'],'id':1}))
        print('Subscribe:', json.loads(await ws.recv()))
        print('Notification:', json.loads(await asyncio.wait_for(ws.recv(), timeout=15)))
asyncio.run(test())
"

Checklist

  • Code compiles (cargo check -p ethrex --features l2)
  • Zero warnings
  • L2 reuses L1 implementation (no duplicate WS code)
  • Bounded channels prevent memory issues with slow clients
  • CI passing (pending)

Implement eth_subscribe("newHeads") in both the L1 and L2 RPC WebSocket
servers. When a new block is produced, all subscribers receive the block
header as a JSON-RPC notification envelope.

L1 RPC (ethrex-rpc):
- Upgrade handle_websocket from a simple request-response loop to a
  select!-based loop that multiplexes incoming messages, subscription
  notification draining, and outbound message sending.
- Add handle_eth_subscribe, handle_eth_unsubscribe, generate_subscription_id,
  drain_subscriptions, build_subscription_notification as public functions
  so L2 can delegate to L1's implementation.
- Add new_heads_sender: Option<broadcast::Sender<Value>> field to
  RpcApiContext; None when WS is disabled or subscriptions not needed.
- Add NEW_HEADS_CHANNEL_CAPACITY constant (128) for the broadcast channel.
- Export all new symbols from lib.rs including tokio::sync::broadcast.

L2 RPC (ethrex-l2-rpc):
- Add optional WS server startup in start_api when ws_addr is Some.
- handle_websocket delegates eth_subscribe/eth_unsubscribe to L1's
  implementations via context.l1_ctx, eliminating duplicate code.
- Add ws_addr: Option<SocketAddr> and new_heads_sender parameters to
  start_api; pass new_heads_sender through to the embedded l1_ctx.
- Re-export NEW_HEADS_CHANNEL_CAPACITY and broadcast from ethrex_rpc.
- Add axum ws feature to Cargo.toml.

L2 CLI:
- Add --l2.ws-enabled, --l2.ws-addr, --l2.ws-port flags to Options.
- Create broadcast channel in init_l2 when WS is enabled; pass sender
  clone to both init_rpc_api and start_l2 (for the block producer).

L2 block producer:
- Add new_heads_sender: Option<broadcast::Sender<Value>> field.
- After sealing and storing each block, serialize the header, inject
  the computed block hash, and broadcast to all WS subscribers.
- Add new_heads_sender parameter to BlockProducer::new and spawn.

Tests:
- 11 unit tests in ethrex-l2-rpc covering subscription ID generation,
  broadcast channel delivery, notification format, and unsubscribe
  behavior (happy path, missing subscription, missing params).
@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 16, 2026

Lines of code report

Total lines added: 316
Total lines removed: 0
Total lines changed: 316

Detailed view
+------------------------------------------------------+-------+------+
| File                                                 | Lines | Diff |
+------------------------------------------------------+-------+------+
| ethrex/cmd/ethrex/initializers.rs                    | 641   | +4   |
+------------------------------------------------------+-------+------+
| ethrex/cmd/ethrex/l2/initializers.rs                 | 380   | +13  |
+------------------------------------------------------+-------+------+
| ethrex/crates/l2/networking/rpc/rpc.rs               | 256   | +28  |
+------------------------------------------------------+-------+------+
| ethrex/crates/l2/sequencer/block_producer.rs         | 323   | +10  |
+------------------------------------------------------+-------+------+
| ethrex/crates/l2/sequencer/mod.rs                    | 277   | +3   |
+------------------------------------------------------+-------+------+
| ethrex/crates/networking/rpc/engine/fork_choice.rs   | 552   | +4   |
+------------------------------------------------------+-------+------+
| ethrex/crates/networking/rpc/lib.rs                  | 30    | +3   |
+------------------------------------------------------+-------+------+
| ethrex/crates/networking/rpc/rpc.rs                  | 1142  | +150 |
+------------------------------------------------------+-------+------+
| ethrex/crates/networking/rpc/subscription_manager.rs | 101   | +101 |
+------------------------------------------------------+-------+------+

it as a parameter. Follows the same pattern as get_http_socket_addr(opts).
creating two separate variables (new_heads_sender + new_heads_sender_for_block_producer).
… doc improvements

- Remove duplicate L2 WS CLI flags (l2.ws-enabled/addr/port) — L2 now
  uses L1's --ws.enabled / --ws.addr / --ws.port flags
- Broadcast newHeads from engine fork_choice after successful canonical
  head advancement
- Create broadcast channel in L1 initializer when WS is enabled
- Update channel capacity from 128 to 10 (matching Geth's chainEvChanSize)
sender together. Remove separate ws_addr and new_heads_sender parameters
from start_api — now a single Option<WebSocketConfig>. Remove duplicate
L2 WS CLI flags (l2.ws-enabled/addr/port); L2 inherits L1 flags.
paths, remove L2-specific comments and log prefixes, drop unnecessary
doc comment about L1 crate delegation.
…t subscriptions

Introduce a SubscriptionManager actor (spawned-concurrency GenServer) that owns
all eth_subscribe("newHeads") subscription state. Each WebSocket connection gets
an mpsc channel and a subscription ID when it connects; the actor fans out new
block headers to all registered channels on new_head messages.

This removes the tokio broadcast channel approach, which had a "lagged subscriber"
problem and required each WS handler to maintain a HashMap of broadcast receivers.
The actor approach is simpler: two branches in the select! loop instead of three,
and dead subscribers are cleaned up automatically when their mpsc channel closes.

Changes:
- Add crates/networking/rpc/subscription_manager.rs with the SubscriptionManager
  actor, SubscriptionManagerProtocol trait (#[protocol] macro), and
  build_subscription_notification helper.
- WebSocketConfig now holds ActorRef<SubscriptionManager> instead of
  broadcast::Sender<Value>. NEW_HEADS_CHANNEL_CAPACITY is removed.
- handle_websocket simplified to 2 select! branches; subscribe on connect,
  unsubscribe on disconnect.
- handle_eth_subscribe/unsubscribe updated to use the actor; broadcast-based
  drain_subscriptions and generate_subscription_id are removed.
- fork_choice.rs sends new heads via actor.new_head() instead of sender.send().
- block_producer.rs replaces broadcast::Sender with ActorRef<SubscriptionManager>.
- Both initializers.rs files spawn the actor instead of creating a broadcast channel.
- L2 sequencer mod.rs and block_producer.rs updated to pass ActorRef through.
- L2 networking rpc updated similarly; tests rewritten against the actor API.
new_head() now accepts BlockHeader directly instead of serde_json::Value.
The actor handles JSON serialization and hash injection, so callers just
pass the raw header.
Previously, every new WebSocket connection was auto-subscribed to
newHeads. Now subscription only happens when the client explicitly
sends eth_subscribe("newHeads"). handle_eth_subscribe calls
subscription_manager.subscribe() and stores the ID. The ID is
cleared on eth_unsubscribe or connection close.
Make handle_websocket public so L2 calls it with context.l1_ctx instead
of duplicating the entire select loop, subscribe/unsubscribe routing,
and cleanup logic. L2-specific RPC methods are not available over WS
for now (only standard Ethereum methods). Remove all WS unit tests
added by this PR.
of unbounded. When a slow client's channel is full, the notification is
dropped via try_send rather than accumulating unbounded memory. The actor
is never blocked.
…ptionManager tests

Change subscription tracking from Option<String> to Vec<String> so each
eth_subscribe call creates a new subscription with its own ID, matching
the Ethereum JSON-RPC spec. Previously a duplicate eth_subscribe("newHeads")
on the same connection returned the existing ID instead of creating a new one.

Also add 8 tests for the SubscriptionManager actor covering subscribe/unsubscribe,
fan-out, dead subscriber cleanup, and notification format.
-32700 Parse error with null id for unparseable requests (per
JSON-RPC 2.0 spec), and use BadParams instead of Internal for
unsupported subscription types since it is a client error.
and optimize fan-out serialization in SubscriptionManager.

Sequential IDs leaked server activity information; replaced with
128-bit random hex IDs. Added MAX_SUBSCRIPTIONS_PER_CONNECTION (128)
check in handle_eth_subscribe to prevent subscription spam. Optimized
new_head fan-out to serialize the header result JSON once and build
each subscriber's notification envelope via string formatting instead
of repeated serde_json serialization.
can route L2-specific RPC methods (ethrexL2_*, sponsored txs, etc.)
through its own map_http_requests when serving WebSocket connections.
Previously the L2 WS handler delegated to the L1 context, meaning
only L1 methods were reachable over WebSocket. Added Clone to
RpcRequest and RpcRequestId to support passing requests by value
into the routing closure.
@avilagaston9 avilagaston9 force-pushed the feat/l1/ws-subscriptions branch from 9d040d9 to abc812f Compare April 17, 2026 15:30
@avilagaston9 avilagaston9 marked this pull request as ready for review April 17, 2026 15:48
@avilagaston9 avilagaston9 requested review from a team, ManuelBilbao and ilitteri as code owners April 17, 2026 15:48
Copilot AI review requested due to automatic review settings April 17, 2026 15:48
@ethrex-project-sync ethrex-project-sync bot moved this to In Review in ethrex_l1 Apr 17, 2026
@github-actions
Copy link
Copy Markdown

🤖 Claude Code Review

Code Review: PR #6496eth_subscribe/eth_unsubscribe WebSocket Subscription Support

Overview

This PR implements the Ethereum WebSocket subscription standard (eth_subscribe("newHeads")) using an actor-based SubscriptionManager. New block headers are broadcast from both L1's fork-choice handler and L2's block producer to subscribed clients via bounded per-connection channels. The architecture is clean and the core design is sound.


Architecture & Design

Strengths:

  • The SubscriptionManager actor removes all shared mutable state — no Mutex, no RwLock, no broadcast-channel "lagged receiver" problem.
  • Per-connection bounded channels (SUBSCRIBER_CHANNEL_CAPACITY = 512) with try_send is the correct approach: slow clients drop messages rather than exerting backpressure on the actor.
  • Per-connection cleanup at disconnect (handle_websocket end) complements lazy dead-subscriber detection on fan-out.
  • The generic route_request closure in handle_websocket is an elegant abstraction that eliminates duplicated WS handling between L1 and L2.
  • Cross-connection unsubscribe is prevented: handle_eth_unsubscribe checks that the subscription ID belongs to the calling connection before delegating to the actor.

Bugs & Correctness Issues

Silent failure on internal serialization error (crates/networking/rpc/rpc.rs, handle_ws_request)

let resp = rpc_response(req.id, result).ok()?;
Some(resp.to_string())

If rpc_response returns Err, ok()? exits the function with None, and handle_websocket sends nothing to the client. The JSON-RPC 2.0 spec requires a response for every request, including an {"error": {"code": -32603, ...}} on internal errors. A client awaiting a response will hang. This affects all three match arms in handle_ws_request. Consider replacing with explicit error serialization:

match rpc_response(req.id, result) {
    Ok(resp) => Some(resp.to_string()),
    Err(e) => Some(/* serialize internal error */),
}

Dead subscriber ID remains in subscription_ids when actor disagrees (handle_eth_unsubscribe, line ~674)

if removed {
    subscription_ids.swap_remove(pos);
}

The position pos was found before calling the actor. If the actor returns false (e.g., it already cleaned up the dead subscriber via the fan-out path), the ID stays in subscription_ids and is not cleaned up. This wastes one slot of MAX_SUBSCRIPTIONS_PER_CONNECTION. More importantly, at disconnect the cleanup loop will call unsubscribe on an already-removed ID, which is harmless but indicates the local state has diverged. Recommend removing the if removed guard — the local list should track what this connection registered, regardless of whether the actor still has it.


Security

Non-cryptographic RNG for subscription IDs (subscription_manager.rs, line ~850)

rand::thread_rng().fill_bytes(&mut bytes);

thread_rng() uses a seeded PRNG, not the OS entropy source. While subscription ID guessing is partially mitigated by the per-connection ownership check, using OsRng from the rand crate is standard practice for externally-visible token generation:

rand::rngs::OsRng.fill_bytes(&mut bytes);

Performance

Header clone before store_block (block_producer.rs, line ~255)

let block_header = block.header.clone();
self.blockchain.store_block(block, ...)?;

The comment explains this correctly — block is moved into store_block so the clone must happen first. The clone is necessary and appropriately placed.

tokio::try_join! with a single future (crates/l2/networking/rpc/rpc.rs, line ~198)

let _ = tokio::try_join!(http_server)
    .inspect_err(|e| info!("Error shutting down servers: {e:?}"));

tokio::try_join! with one future is equivalent to http_server.await. This is harmless but misleading — it reads like a placeholder for future servers. Just http_server.await? would be clearer.


Code Quality

Fragile manual JSON construction (subscription_manager.rs, build_subscription_notification)

format!(
    r#"{{"jsonrpc":"2.0","method":"eth_subscription","params":{{"subscription":"{sub_id}","result":{result_json}}}}}"#
)

This is safe given that sub_id is always 0x[0-9a-f]+ (hex-encoded) and result_json is produced by serde_json. The performance motivation (avoid re-serializing the header per subscriber) is documented and valid. As the number of notification types grows, consider extracting a proper struct for the envelope to keep this manageable.

axum::routing::any for the WS endpoint (L1 rpc.rs line ~453, L2 rpc.rs line ~184)

.route("/", axum::routing::any(ws_handler))

WebSocket upgrades require a GET request. Using axum::routing::get is more semantically correct and avoids handling non-upgrade POST/PUT requests on the WS port (axum's WebSocketUpgrade extractor rejects them, but returning a 405 early is cleaner).

Defensive error path that should be unreachable (handle_eth_subscribe)

let ws = context
    .ws
    .as_ref()
    .ok_or_else(|| RpcErr::Internal("WebSocket server not enabled".to_string()))?;

handle_eth_subscribe is only reachable from handle_ws_request, which is only called from handle_websocket, which is only called from the WS server route (i.e., only when ws is Some). This path cannot be reached in practice. The defensive guard is understandable but the error message would be confusing in a bug report. A debug_assert!(context.ws.is_some()) with an expect would be more honest.


Test Coverage

  • The actor-level unit tests in subscription_manager_tests.rs are thorough: unique IDs, double-unsubscribe, fan-out, dead-subscriber cleanup, and hash injection are all covered. Good.
  • There are no integration tests for the full JSON-RPC round-trip over WebSocket: connect → eth_subscribe → receive newHeadseth_unsubscribe. The test API now passes None for ws, so no existing test exercises the subscription JSON-RPC layer.
  • The rpc_response(...).ok()? silent-failure bug (noted above) is not caught by any test because no test sends a request through handle_ws_request that triggers a serialization failure.

Minor Nits

  • pub mod subscription_manager in lib.rs makes the internal actor protocol types public. If only SubscriptionManager, SubscriptionManagerProtocol, and the two constants are intended for external use, consider re-exporting selectively and keeping the module private.
  • The comment on new_head in fork_choice.rs (// Notify all eth_subscribe("newHeads") subscribers.) and in block_producer.rs explain what rather than why — the why (cross-component notification to WS clients) could be captured in one sentence to help future readers understand the coupling.

Summary

The implementation is well-structured and the core actor design is the right approach. The main items to address before merging:

  1. Correctness (medium priority): Fix silent None returns in handle_ws_request — clients must receive a JSON-RPC error response, not silence.
  2. Security (low-medium): Switch thread_rng() to OsRng for subscription ID generation.
  3. Correctness (low): Unconditionally remove the ID from subscription_ids in unsubscribe, regardless of what the actor returns.
  4. Tests: Add at least one integration test exercising eth_subscribe/eth_unsubscribe through the WS JSON-RPC handler layer.

Automated review by Claude (Anthropic) · sonnet · custom prompt

@github-actions
Copy link
Copy Markdown

🤖 Codex Code Review

Findings:

  1. L2 newHeads is only wired for locally produced sequencer blocks, so follower/based nodes can silently miss WS head notifications. The subscription manager is passed only into start_l2 and then into BlockProducer (cmd/ethrex/l2/initializers.rs:362, crates/l2/sequencer/block_producer.rs:224), but other canonical-head advance paths still call apply_fork_choice(...) without any notification step, notably the based fetcher (crates/l2/sequencer/mod.rs:199, crates/l2/based/block_fetcher.rs:241) and P2P block import (crates/networking/p2p/rlpx/l2/l2_connection.rs:444). In practice, eth_subscribe("newHeads") will work on the sequencer but not reliably on nodes advancing via fetched/imported L2 blocks. This should be centralized around L2 head canonicalization, or the manager needs to be threaded into those paths too.

  2. The new WS request parser regresses JSON-RPC batch support over WebSocket. handle_ws_request now deserializes each incoming frame as a single RpcRequest (crates/networking/rpc/rpc.rs:754), so a valid batch like [{"id":1,...},{"id":2,...}] will fail with -32700 Parse error at crates/networking/rpc/rpc.rs:766. The previous implementation delegated WS traffic through handle_http_request, which accepted RpcRequestWrapper::Multiple. If batch-over-WS was previously relied on by clients, this is a breaking change. Parsing RpcRequestWrapper here as well would preserve existing behavior.

No other correctness/security problems stood out in the diff itself.

I could not run cargo test in this sandbox because rustup attempted to write under a read-only path.


Automated review by OpenAI Codex · gpt-5.4 · custom prompt

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds Ethereum JSON-RPC WebSocket subscription support (eth_subscribe / eth_unsubscribe) for "newHeads" by introducing an actor-owned subscription registry and wiring canonical-head updates to WS fan-out for both L1 and L2.

Changes:

  • Introduce an actor-based SubscriptionManager with bounded per-subscriber channels and ID-based subscribe/unsubscribe.
  • Extend the WS RPC handler to support eth_subscribe("newHeads") / eth_unsubscribe alongside normal request/response routing, and plumb WS config via WebSocketConfig.
  • Emit newHeads notifications from L1 fork-choice canonical head updates and from the L2 block producer after sealing.

Reviewed changes

Copilot reviewed 14 out of 14 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
test/tests/rpc/subscription_manager_tests.rs Adds unit tests for SubscriptionManager subscribe/unsubscribe and fan-out/cleanup behavior.
test/tests/rpc/mod.rs Registers the new subscription manager test module.
crates/networking/rpc/utils.rs Derives Clone for RpcRequestId and RpcRequest to support WS routing flow.
crates/networking/rpc/test_utils.rs Updates test API startup/context helpers for the new ws: Option<WebSocketConfig> plumbing.
crates/networking/rpc/subscription_manager.rs New actor implementing subscription state and newHeads notification fan-out.
crates/networking/rpc/rpc.rs Introduces WebSocketConfig, makes handle_websocket public/generic, and implements WS subscribe/unsubscribe handling.
crates/networking/rpc/lib.rs Exposes subscription manager module and re-exports WS-related APIs/types.
crates/networking/rpc/engine/fork_choice.rs Sends canonical head updates to the subscription manager on successful fork choice updates.
crates/l2/sequencer/mod.rs Threads optional subscription manager handle into L2 startup for block producer notifications.
crates/l2/sequencer/block_producer.rs Broadcasts newHeads after sealing/applying fork choice, using the optional subscription manager.
crates/l2/networking/rpc/rpc.rs Starts an optional WS server in L2 and reuses L1’s handle_websocket with L2 request routing.
crates/l2/networking/rpc/Cargo.toml Enables Axum’s ws feature for L2 RPC crate.
cmd/ethrex/l2/initializers.rs Builds WebSocketConfig/SubscriptionManager when WS is enabled and passes it to both RPC and sequencer.
cmd/ethrex/initializers.rs Builds WebSocketConfig/SubscriptionManager for L1 when WS is enabled and passes it into start_api.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +782 to +786
match req.method.as_str() {
"eth_subscribe" => {
let result = handle_eth_subscribe(&req, context, out_tx, subscription_ids).await;
let resp = rpc_response(req.id, result).ok()?;
Some(resp.to_string())
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New WebSocket behavior (handle_ws_request routing + eth_subscribe/eth_unsubscribe) isn’t covered by tests in this crate. Adding targeted unit/integration tests would help prevent regressions (e.g., batch requests over WS, invalid-request vs parse-error responses, subscription limit enforcement, and unsubscribe scoping to a single connection).

Copilot uses AI. Check for mistakes.
Comment on lines +766 to +779
let req: RpcRequest = match serde_json::from_str(body) {
Ok(r) => r,
Err(_) => {
// JSON-RPC 2.0 spec: parse error responses must have "id": null.
let resp = serde_json::json!({
"jsonrpc": "2.0",
"id": null,
"error": {
"code": -32700,
"message": "Parse error"
}
});
return Some(resp.to_string());
}
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle_ws_request deserializes directly into RpcRequest and treats any deserialization error as a JSON parse error (-32700). This will incorrectly classify valid JSON that isn’t a valid request object (e.g., missing required fields), and it also regresses prior WS support for batch requests (arrays), which used to work via handle_http_request/RpcRequestWrapper. Consider parsing into RpcRequestWrapper (or serde_json::Value first) so you can: (1) return -32700 only for invalid JSON, (2) return -32600 (Invalid Request) for structurally-invalid JSON-RPC objects, and (3) preserve batch handling over WebSocket.

Copilot uses AI. Check for mistakes.
Comment on lines 488 to 493
#[allow(clippy::too_many_arguments)]
pub async fn start_api(
http_addr: SocketAddr,
ws_addr: Option<SocketAddr>,
ws: Option<WebSocketConfig>,
authrpc_addr: SocketAddr,
storage: Store,
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The start_api doc comment (and argument docs) still refer to ws_addr even though the function now accepts ws: Option<WebSocketConfig>. Updating the docs here will prevent confusion for callers and keep generated docs accurate (e.g., describe WebSocketConfig.addr + subscription_manager instead of a bare socket addr).

Copilot uses AI. Check for mistakes.
msg: subscription_manager_protocol::Subscribe,
_ctx: &Context<Self>,
) -> String {
let id = generate_subscription_id();
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle_subscribe inserts a freshly-generated random ID into self.subscribers without checking whether that ID already exists. While collisions are unlikely, a collision would overwrite an existing subscription sender and effectively drop a subscriber. Consider looping until an unused ID is generated (or switch to a monotonic/UUID-based ID source) before inserting.

Suggested change
let id = generate_subscription_id();
let id = loop {
let id = generate_subscription_id();
if !self.subscribers.contains_key(&id) {
break id;
}
};

Copilot uses AI. Check for mistakes.
@greptile-apps
Copy link
Copy Markdown

greptile-apps bot commented Apr 17, 2026

Greptile Summary

Adds eth_subscribe("newHeads") / eth_unsubscribe WebSocket support via a new SubscriptionManager actor that fans out block header notifications to all connected subscribers. L2 reuses L1's implementation by sharing the actor handle between the RPC server and block producer, with no duplicate WebSocket code.

Confidence Score: 4/5

Safe to merge once CI passes; all findings are P2 style/robustness suggestions with no functional blockers.

All identified issues are P2: JSON string interpolation, deprecated thread_rng, async{}/async move{} inconsistency, and the silent non-response edge case. The core logic — actor lifecycle, bounded channels, dead-subscriber cleanup, cleanup on disconnect, L2 reuse — is sound. Score is 4 rather than 5 only because CI is still pending.

crates/networking/rpc/subscription_manager.rs (JSON building and RNG), crates/networking/rpc/rpc.rs (async capture inconsistency)

Important Files Changed

Filename Overview
crates/networking/rpc/subscription_manager.rs New actor managing all eth_subscribe state; clean design with bounded channels and dead-subscriber cleanup, but JSON building uses string interpolation and thread_rng is deprecated in newer rand
crates/networking/rpc/rpc.rs Core WS loop refactored to support eth_subscribe/eth_unsubscribe; L1 handler uses async {} while L2 counterpart uses async move {} — inconsistent but functional; silent non-response on rpc_response failure
crates/networking/rpc/engine/fork_choice.rs Broadcasts newHead after canonical update via fire-and-forget actor send; correctly placed after the chain is updated
crates/l2/sequencer/block_producer.rs Clones header before block is consumed, notifies subscription manager after apply_fork_choice succeeds; correct ordering
cmd/ethrex/l2/initializers.rs Single SubscriptionManager actor cloned and shared between RPC server and block producer correctly; ws_config.clone() is safe since ActorRef is a cheap reference
test/tests/rpc/subscription_manager_tests.rs New unit tests cover subscribe uniqueness, unsubscribe round-trips, dead-subscriber cleanup, fan-out, and hash injection; good coverage of the actor's core paths
crates/l2/networking/rpc/rpc.rs L2 correctly reuses L1's handle_websocket via ctx.l1_ctx; ws config is set on the embedded L1 context so subscriptions are wired up properly
crates/networking/rpc/utils.rs RpcRequest and RpcRequestId gain Clone derives; needed so req.id can be kept after req is moved into route_request

Sequence Diagram

sequenceDiagram
    participant CL as Consensus / BlockProducer
    participant FC as fork_choice / block_producer
    participant SM as SubscriptionManager (actor)
    participant WS as handle_websocket loop
    participant Client as WS Client

    Client->>WS: eth_subscribe("newHeads")
    WS->>SM: subscribe(out_tx)
    SM-->>WS: subscription_id
    WS-->>Client: {"result":"0x..."}

    CL->>FC: engine_forkchoiceUpdated / seal_block
    FC->>SM: new_head(BlockHeader) [fire-and-forget]
    SM->>SM: serialize header + inject hash
    SM->>WS: try_send(notification JSON)
    WS-->>Client: eth_subscription notification

    Client->>WS: eth_unsubscribe("0x...")
    WS->>SM: unsubscribe(id)
    SM-->>WS: true/false
    WS-->>Client: {"result":true}

    Note over WS,SM: On disconnect: cleanup loop unsubscribes all owned IDs
Loading

Comments Outside Diff (1)

  1. crates/networking/rpc/rpc.rs, line 579-582 (link)

    P2 Silent non-response on rpc_response serialization failure

    In handle_ws_request, all three match arms use rpc_response(...).ok()?. If rpc_response fails to serialize (extremely unlikely but possible for pathological payloads), the function returns None, and the calling loop sends no reply for the request. The WS client will hang waiting for a response. A fallback error envelope would be more robust:

    let resp = rpc_response(req.id, result).unwrap_or_else(|_| {
        serde_json::json!({"jsonrpc":"2.0","id":null,"error":{"code":-32603,"message":"Internal error"}})
    });
    Some(resp.to_string())
    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: crates/networking/rpc/rpc.rs
    Line: 579-582
    
    Comment:
    **Silent non-response on `rpc_response` serialization failure**
    
    In `handle_ws_request`, all three match arms use `rpc_response(...).ok()?`. If `rpc_response` fails to serialize (extremely unlikely but possible for pathological payloads), the function returns `None`, and the calling loop sends no reply for the request. The WS client will hang waiting for a response. A fallback error envelope would be more robust:
    
    ```rust
    let resp = rpc_response(req.id, result).unwrap_or_else(|_| {
        serde_json::json!({"jsonrpc":"2.0","id":null,"error":{"code":-32603,"message":"Internal error"}})
    });
    Some(resp.to_string())
    ```
    
    How can I resolve this? If you propose a fix, please make it concise.
Prompt To Fix All With AI
This is a comment left during a code review.
Path: crates/networking/rpc/subscription_manager.rs
Line: 153-157

Comment:
**JSON built via string interpolation**

`build_subscription_notification` embeds `result_json` verbatim into a hand-rolled format string. This works today because `sub_id` is hex-only and `result_json` is serde_json output, but any future change that makes either value contain a `"` or `}` in an unexpected place would silently produce malformed JSON. Prefer proper serialization:

```rust
fn build_subscription_notification(sub_id: &str, result_json: &str) -> String {
    serde_json::json!({
        "jsonrpc": "2.0",
        "method": "eth_subscription",
        "params": {
            "subscription": sub_id,
            "result": serde_json::from_str::<serde_json::Value>(result_json).unwrap_or(serde_json::Value::Null)
        }
    })
    .to_string()
}
```

Alternatively, keep the pre-serialisation optimisation by holding `header_value` (a `serde_json::Value`) and passing it directly to `serde_json::json!` instead of converting to a string and back.

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: crates/networking/rpc/subscription_manager.rs
Line: 160-163

Comment:
**`rand::thread_rng()` in an actor's async handler**

`rand::thread_rng()` is thread-local. The actor runs under `spawned_concurrency`'s `Backend::Blocking`, which pools threads, so each invocation could draw from a different thread's RNG state. This is fine for randomness quality but `thread_rng` is deprecated in rand 0.9+. Prefer `rand::rng()` (rand 0.9 API) or `OsRng` to stay forward-compatible and thread-agnostic:

```suggestion
fn generate_subscription_id() -> String {
    let mut bytes = [0u8; 16];
    rand::rng().fill_bytes(&mut bytes);
    format!("0x{}", hex::encode(bytes))
}
```

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: crates/networking/rpc/rpc.rs
Line: 596-604

Comment:
**`async {}` vs `async move {}` inconsistency between L1 and L2**

The L1 handler uses `async {}` (capture by reference) while the equivalent L2 handler at `crates/l2/networking/rpc/rpc.rs:164` uses `async move {}`. Both appear to compile because `ctx` ends up being moved into the inner `on_upgrade` callback regardless. However, the inconsistency is confusing and the non-`move` form relies on implicit capture semantics. For clarity and alignment with the L2 version:

```suggestion
        let ws_handler = |ws: WebSocketUpgrade, State(ctx): State<RpcApiContext>| async move {
            ws.on_upgrade(|mut socket| async move {
                handle_websocket(&mut socket, &ctx, |req| {
                    let c = ctx.clone();
                    async move { map_http_requests(&req, c).await }
                })
                .await;
            })
        };
```

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: crates/networking/rpc/rpc.rs
Line: 579-582

Comment:
**Silent non-response on `rpc_response` serialization failure**

In `handle_ws_request`, all three match arms use `rpc_response(...).ok()?`. If `rpc_response` fails to serialize (extremely unlikely but possible for pathological payloads), the function returns `None`, and the calling loop sends no reply for the request. The WS client will hang waiting for a response. A fallback error envelope would be more robust:

```rust
let resp = rpc_response(req.id, result).unwrap_or_else(|_| {
    serde_json::json!({"jsonrpc":"2.0","id":null,"error":{"code":-32603,"message":"Internal error"}})
});
Some(resp.to_string())
```

How can I resolve this? If you propose a fix, please make it concise.

Reviews (1): Last reviewed commit: "Make build_subscription_notification pri..." | Re-trigger Greptile

Comment on lines +153 to +157
fn build_subscription_notification(sub_id: &str, result_json: &str) -> String {
format!(
r#"{{"jsonrpc":"2.0","method":"eth_subscription","params":{{"subscription":"{sub_id}","result":{result_json}}}}}"#
)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 JSON built via string interpolation

build_subscription_notification embeds result_json verbatim into a hand-rolled format string. This works today because sub_id is hex-only and result_json is serde_json output, but any future change that makes either value contain a " or } in an unexpected place would silently produce malformed JSON. Prefer proper serialization:

fn build_subscription_notification(sub_id: &str, result_json: &str) -> String {
    serde_json::json!({
        "jsonrpc": "2.0",
        "method": "eth_subscription",
        "params": {
            "subscription": sub_id,
            "result": serde_json::from_str::<serde_json::Value>(result_json).unwrap_or(serde_json::Value::Null)
        }
    })
    .to_string()
}

Alternatively, keep the pre-serialisation optimisation by holding header_value (a serde_json::Value) and passing it directly to serde_json::json! instead of converting to a string and back.

Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/networking/rpc/subscription_manager.rs
Line: 153-157

Comment:
**JSON built via string interpolation**

`build_subscription_notification` embeds `result_json` verbatim into a hand-rolled format string. This works today because `sub_id` is hex-only and `result_json` is serde_json output, but any future change that makes either value contain a `"` or `}` in an unexpected place would silently produce malformed JSON. Prefer proper serialization:

```rust
fn build_subscription_notification(sub_id: &str, result_json: &str) -> String {
    serde_json::json!({
        "jsonrpc": "2.0",
        "method": "eth_subscription",
        "params": {
            "subscription": sub_id,
            "result": serde_json::from_str::<serde_json::Value>(result_json).unwrap_or(serde_json::Value::Null)
        }
    })
    .to_string()
}
```

Alternatively, keep the pre-serialisation optimisation by holding `header_value` (a `serde_json::Value`) and passing it directly to `serde_json::json!` instead of converting to a string and back.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +160 to +163
fn generate_subscription_id() -> String {
let mut bytes = [0u8; 16];
rand::thread_rng().fill_bytes(&mut bytes);
format!("0x{}", hex::encode(bytes))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 rand::thread_rng() in an actor's async handler

rand::thread_rng() is thread-local. The actor runs under spawned_concurrency's Backend::Blocking, which pools threads, so each invocation could draw from a different thread's RNG state. This is fine for randomness quality but thread_rng is deprecated in rand 0.9+. Prefer rand::rng() (rand 0.9 API) or OsRng to stay forward-compatible and thread-agnostic:

Suggested change
fn generate_subscription_id() -> String {
let mut bytes = [0u8; 16];
rand::thread_rng().fill_bytes(&mut bytes);
format!("0x{}", hex::encode(bytes))
fn generate_subscription_id() -> String {
let mut bytes = [0u8; 16];
rand::rng().fill_bytes(&mut bytes);
format!("0x{}", hex::encode(bytes))
}
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/networking/rpc/subscription_manager.rs
Line: 160-163

Comment:
**`rand::thread_rng()` in an actor's async handler**

`rand::thread_rng()` is thread-local. The actor runs under `spawned_concurrency`'s `Backend::Blocking`, which pools threads, so each invocation could draw from a different thread's RNG state. This is fine for randomness quality but `thread_rng` is deprecated in rand 0.9+. Prefer `rand::rng()` (rand 0.9 API) or `OsRng` to stay forward-compatible and thread-agnostic:

```suggestion
fn generate_subscription_id() -> String {
    let mut bytes = [0u8; 16];
    rand::rng().fill_bytes(&mut bytes);
    format!("0x{}", hex::encode(bytes))
}
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +596 to 604
let ws_handler = |ws: WebSocketUpgrade, State(ctx): State<RpcApiContext>| async {
ws.on_upgrade(|mut socket| async move {
handle_websocket(&mut socket, &ctx, |req| {
let c = ctx.clone();
async move { map_http_requests(&req, c).await }
})
.await;
})
};
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 async {} vs async move {} inconsistency between L1 and L2

The L1 handler uses async {} (capture by reference) while the equivalent L2 handler at crates/l2/networking/rpc/rpc.rs:164 uses async move {}. Both appear to compile because ctx ends up being moved into the inner on_upgrade callback regardless. However, the inconsistency is confusing and the non-move form relies on implicit capture semantics. For clarity and alignment with the L2 version:

Suggested change
let ws_handler = |ws: WebSocketUpgrade, State(ctx): State<RpcApiContext>| async {
ws.on_upgrade(|mut socket| async move {
handle_websocket(&mut socket, &ctx, |req| {
let c = ctx.clone();
async move { map_http_requests(&req, c).await }
})
.await;
})
};
let ws_handler = |ws: WebSocketUpgrade, State(ctx): State<RpcApiContext>| async move {
ws.on_upgrade(|mut socket| async move {
handle_websocket(&mut socket, &ctx, |req| {
let c = ctx.clone();
async move { map_http_requests(&req, c).await }
})
.await;
})
};
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/networking/rpc/rpc.rs
Line: 596-604

Comment:
**`async {}` vs `async move {}` inconsistency between L1 and L2**

The L1 handler uses `async {}` (capture by reference) while the equivalent L2 handler at `crates/l2/networking/rpc/rpc.rs:164` uses `async move {}`. Both appear to compile because `ctx` ends up being moved into the inner `on_upgrade` callback regardless. However, the inconsistency is confusing and the non-`move` form relies on implicit capture semantics. For clarity and alignment with the L2 version:

```suggestion
        let ws_handler = |ws: WebSocketUpgrade, State(ctx): State<RpcApiContext>| async move {
            ws.on_upgrade(|mut socket| async move {
                handle_websocket(&mut socket, &ctx, |req| {
                    let c = ctx.clone();
                    async move { map_http_requests(&req, c).await }
                })
                .await;
            })
        };
```

How can I resolve this? If you propose a fix, please make it concise.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

L1 Ethereum client

Projects

Status: In Review

Development

Successfully merging this pull request may close these issues.

2 participants