Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion common/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use alloy::{
consensus::TrieAccount,
primitives::{Bytes, B256, U256},
rpc::types::EIP1186StorageProof,
rpc::types::{EIP1186StorageProof, Log},
sol_types::decode_revert_reason,
};
use eyre::Report;
Expand Down Expand Up @@ -48,6 +48,7 @@ pub enum SubscriptionType {
#[serde(untagged)]
pub enum SubscriptionEvent<N: NetworkSpec> {
NewHeads(N::BlockResponse),
Logs(Log),
}

pub type SubEventRx<N> = Receiver<SubscriptionEvent<N>>;
Expand Down
6 changes: 5 additions & 1 deletion core/src/client/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ pub trait HeliosApi<N: NetworkSpec>: Send + Sync + 'static {
// logs
async fn get_logs(&self, filter: &Filter) -> Result<Vec<Log>>;
// filters and subscriptions
async fn subscribe(&self, sub_type: SubscriptionType) -> Result<SubEventRx<N>>;
async fn subscribe(
&self,
sub_type: SubscriptionType,
filter: Option<Filter>,
) -> Result<SubEventRx<N>>;
async fn get_filter_logs(&self, filter_id: U256) -> Result<Vec<Log>>;
async fn uninstall_filter(&self, filter_id: U256) -> Result<bool>;
async fn new_filter(&self, filter: &Filter) -> Result<U256>;
Expand Down
56 changes: 51 additions & 5 deletions core/src/client/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use async_trait::async_trait;
use eyre::{eyre, Result};
use revm::context::result::ExecutionResult;
use revm::context_interface::block::BlobExcessGasAndPrice;
use tokio::sync::Mutex;
use tokio::{select, sync::broadcast::Sender};
use tracing::{info, warn};

Expand All @@ -30,11 +31,14 @@ use crate::time::{SystemTime, UNIX_EPOCH};

use super::api::HeliosApi;

type LogSubscribers<N> = Arc<Mutex<Vec<(Filter, Sender<SubscriptionEvent<N>>)>>>;

pub struct Node<N: NetworkSpec, C: Consensus<N::BlockResponse>, E: ExecutionProvider<N>> {
pub consensus: C,
pub execution: Arc<E>,
filter_state: FilterState,
block_broadcast: Sender<SubscriptionEvent<N>>,
log_subscribers: LogSubscribers<N>,
fork_schedule: ForkSchedule,
phantom: PhantomData<N>,
}
Expand All @@ -47,6 +51,8 @@ impl<N: NetworkSpec, C: Consensus<N::BlockResponse>, E: ExecutionProvider<N>> No
let execution_ref = execution.clone();
let block_broadcast = Sender::new(100);
let block_broadcast_ref = block_broadcast.clone();
let log_subscribers: LogSubscribers<N> = Arc::new(Mutex::new(Vec::new()));
let log_subscribers_ref = log_subscribers.clone();

#[cfg(not(target_arch = "wasm32"))]
let run = tokio::spawn;
Expand All @@ -64,7 +70,6 @@ impl<N: NetworkSpec, C: Consensus<N::BlockResponse>, E: ExecutionProvider<N>> No
let block_number = block.header().number();
let timestamp = block.header().timestamp();

// Calculate age of the block
let current_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
Expand All @@ -85,9 +90,39 @@ impl<N: NetworkSpec, C: Consensus<N::BlockResponse>, E: ExecutionProvider<N>> No
).await;

_ = block_broadcast_ref.send(SubscriptionEvent::NewHeads(block));

// send logs to subscribers that match their filter.
// clone the list and drop the lock so new subscriptions
// aren't blocked while we fetch logs.
let active_subs = {
let mut subs = log_subscribers_ref.lock().await;
subs.retain(|(_, sender)| sender.receiver_count() > 0);
subs.clone()
};

for (filter, sender) in active_subs.iter() {
let block_filter = filter.clone()
.from_block(block_number)
.to_block(block_number);
match execution_ref.get_logs(&block_filter).await {
Ok(logs) => {
for log in logs {
let _ = sender.send(
SubscriptionEvent::Logs(log),
);
}
}
Err(e) => {
warn!(
target: "helios::client",
"failed to get logs for subscription: {}",
e
);
}
}
}
}
None => {
// Sender dropped, consensus task has exited - client is no longer usable
warn!(target: "helios::client", "consensus client stopped, shut Helios down manually");
break;
}
Expand All @@ -102,7 +137,6 @@ impl<N: NetworkSpec, C: Consensus<N::BlockResponse>, E: ExecutionProvider<N>> No
if let Some(block) = block {
let block_number = block.header().number();

// Only log if this is a new finalized block
if last_finalized_block_number != Some(block_number) {
info!(
target: "helios::client",
Expand All @@ -127,6 +161,7 @@ impl<N: NetworkSpec, C: Consensus<N::BlockResponse>, E: ExecutionProvider<N>> No
execution,
filter_state: FilterState::default(),
block_broadcast,
log_subscribers,
fork_schedule,
phantom: PhantomData,
}
Expand Down Expand Up @@ -503,10 +538,21 @@ impl<N: NetworkSpec, C: Consensus<N::BlockResponse>, E: ExecutionProvider<N>> He
Ok(Address::ZERO)
}

async fn subscribe(&self, sub_type: SubscriptionType) -> Result<SubEventRx<N>> {
async fn subscribe(
&self,
sub_type: SubscriptionType,
filter: Option<Filter>,
) -> Result<SubEventRx<N>> {
match sub_type {
SubscriptionType::NewHeads => Ok(self.block_broadcast.subscribe()),
_ => Err(eyre::eyre!("Unsupported subscription type: {:?}", sub_type)),
SubscriptionType::Logs => {
let filter = filter.ok_or_else(|| eyre!("logs subscription requires a filter"))?;
let sender = Sender::new(100);
let rx = sender.subscribe();
self.log_subscribers.lock().await.push((filter, sender));
Ok(rx)
}
_ => Err(eyre!("Unsupported subscription type: {:?}", sub_type)),
}
}

Expand Down
9 changes: 7 additions & 2 deletions core/src/jsonrpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,11 @@ trait EthRpc<
#[method(name = "syncing")]
async fn syncing(&self) -> Result<SyncStatus, ErrorObjectOwned>;
#[subscription(name = "subscribe", unsubscribe = "unsubscribe", item = String)]
async fn subscribe(&self, event_type: SubscriptionType) -> SubscriptionResult;
async fn subscribe(
&self,
event_type: SubscriptionType,
filter: Option<Filter>,
) -> SubscriptionResult;
}

#[rpc(client, server, namespace = "net")]
Expand Down Expand Up @@ -440,8 +444,9 @@ impl<N: NetworkSpec>
&self,
pending: PendingSubscriptionSink,
event_type: SubscriptionType,
filter: Option<Filter>,
) -> SubscriptionResult {
let maybe_rx = self.client.subscribe(event_type).await;
let maybe_rx = self.client.subscribe(event_type, filter).await;

handle_eth_subscription(pending, maybe_rx).await
}
Expand Down
4 changes: 2 additions & 2 deletions helios-ts/lib.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,8 @@ export class HeliosProvider {
async #handleSubscribe(req: Request) {
try {
const id = uuidv4();
// Capture only the emitter reference, not `this`
const emitter = this.#eventEmitter;
// params[1] is the filter object for "logs" subscriptions, undefined otherwise
await this.#client.subscribe(req.params[0], id, (data: any, subId: string) => {
const result = data instanceof Map ? mapToObj(data) : data;
const payload = {
Expand All @@ -327,7 +327,7 @@ export class HeliosProvider {
},
};
emitter.emit("message", payload);
});
}, req.params[1]);
this.#subscriptionIds.add(id);
return id;
} catch (err) {
Expand Down
8 changes: 7 additions & 1 deletion helios-ts/src/ethereum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,15 @@ impl EthereumClient {
sub_type: JsValue,
id: String,
callback: Function,
filter: JsValue,
) -> Result<bool, JsError> {
let sub_type: SubscriptionType = serde_wasm_bindgen::from_value(sub_type)?;
let rx = map_err(self.inner.subscribe(sub_type).await)?;
let filter: Option<Filter> = if filter.is_undefined() || filter.is_null() {
None
} else {
Some(serde_wasm_bindgen::from_value(filter)?)
};
let rx = map_err(self.inner.subscribe(sub_type, filter).await)?;

let subscription = Subscription::<Ethereum>::spawn_listener(id.clone(), rx, callback);
self.active_subscriptions.insert(id, subscription);
Expand Down
8 changes: 7 additions & 1 deletion helios-ts/src/linea.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,15 @@ impl LineaClient {
sub_type: JsValue,
id: String,
callback: Function,
filter: JsValue,
) -> Result<bool, JsError> {
let sub_type: SubscriptionType = serde_wasm_bindgen::from_value(sub_type)?;
let rx = map_err(self.inner.subscribe(sub_type).await)?;
let filter: Option<Filter> = if filter.is_undefined() || filter.is_null() {
None
} else {
Some(serde_wasm_bindgen::from_value(filter)?)
};
let rx = map_err(self.inner.subscribe(sub_type, filter).await)?;

let subscription = Subscription::<Linea>::spawn_listener(id.clone(), rx, callback);
self.active_subscriptions.insert(id, subscription);
Expand Down
8 changes: 7 additions & 1 deletion helios-ts/src/opstack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,15 @@ impl OpStackClient {
sub_type: JsValue,
id: String,
callback: Function,
filter: JsValue,
) -> Result<bool, JsError> {
let sub_type: SubscriptionType = serde_wasm_bindgen::from_value(sub_type)?;
let rx = map_err(self.inner.subscribe(sub_type).await)?;
let filter: Option<Filter> = if filter.is_undefined() || filter.is_null() {
None
} else {
Some(serde_wasm_bindgen::from_value(filter)?)
};
let rx = map_err(self.inner.subscribe(sub_type, filter).await)?;

let subscription = Subscription::<OpStack>::spawn_listener(id.clone(), rx, callback);
self.active_subscriptions.insert(id, subscription);
Expand Down
2 changes: 1 addition & 1 deletion rpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,5 @@ Helios provides a variety of RPC methods for interacting with the Ethereum netwo
| `eth_getProof` | `get_proof` | Returns the merkle proof for a given account and optionally some storage keys. | `client.get_proof(&self, address: &str, slots: [H256], block: BlockTag)` |
| `eth_coinbase` | `get_coinbase` | Returns the client coinbase address. | `client.get_coinbase(&self)` |
| `eth_syncing` | `syncing` | Returns an object with data about the sync status or false. | `client.syncing(&self)` |
| `eth_subscribe` | `subscribe` | Subscribes to events. Only "newHeads" is currently supported. | `client.subscribe(&self, event_type: &str)` |
| `eth_subscribe` | `subscribe` | Subscribes to events. Supports "newHeads" and "logs" (with a filter parameter). | `client.subscribe(&self, event_type: &str, filter: Option<Filter>)` |
| `web3_clientVersion` | `client_version` | Returns the current version of the chain client. | `client.client_version(&self)` |