Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion core/src/client/api.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use alloy::{
eips::BlockId,
primitives::{Address, Bytes, B256, U256},
rpc::types::{AccessListResult, EIP1186AccountProofResponse, Filter, Log, SyncStatus},
rpc::types::{
AccessListResult, EIP1186AccountProofResponse, Filter, FilterChanges, Log, SyncStatus,
},
};
use async_trait::async_trait;
use eyre::Result;
Expand Down Expand Up @@ -65,6 +67,7 @@ pub trait HeliosApi<N: NetworkSpec>: Send + Sync + 'static {
// filters and subscriptions
async fn subscribe(&self, sub_type: SubscriptionType) -> Result<SubEventRx<N>>;
async fn get_filter_logs(&self, filter_id: U256) -> Result<Vec<Log>>;
async fn get_filter_changes(&self, filter_id: U256) -> Result<FilterChanges>;
async fn uninstall_filter(&self, filter_id: U256) -> Result<bool>;
async fn new_filter(&self, filter: &Filter) -> Result<U256>;
async fn new_block_filter(&self) -> Result<U256>;
Expand Down
50 changes: 48 additions & 2 deletions core/src/client/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use std::sync::Arc;

use alloy::consensus::BlockHeader;
use alloy::eips::{BlockId, BlockNumberOrTag};
use alloy::network::BlockResponse;
use alloy::network::{primitives::HeaderResponse, BlockResponse};
use alloy::primitives::{Address, Bytes, B256, U256};
use alloy::rpc::types::{
AccessListItem, AccessListResult, EIP1186AccountProofResponse, EIP1186StorageProof, Filter,
Log, SyncInfo, SyncStatus,
FilterChanges, Log, SyncInfo, SyncStatus,
};
use async_trait::async_trait;
use eyre::{eyre, Result};
Expand Down Expand Up @@ -375,6 +375,52 @@ impl<N: NetworkSpec, C: Consensus<N::BlockResponse>, E: ExecutionProvider<N>> He
}
}

async fn get_filter_changes(&self, filter_id: U256) -> Result<FilterChanges> {
match self.filter_state.get_filter(filter_id).await {
Some(FilterType::Logs { filter, last_poll }) => {
let current_block = self.get_block_number().await?.try_into()?;
if current_block <= last_poll {
return Ok(FilterChanges::Empty);
}

let mut updated_filter = filter.clone();
updated_filter.block_option = updated_filter
.block_option
.with_from_block((last_poll + 1).into());
let logs = self.get_logs(&updated_filter).await?;

// Update last_poll in filter_state
self.filter_state
.update_last_poll(filter_id, current_block)
.await;
Ok(FilterChanges::Logs(logs))
}
Some(FilterType::Blocks {
start_block: _,
last_poll,
}) => {
let current_block = self.get_block_number().await?.try_into()?;
if current_block <= last_poll {
return Ok(FilterChanges::Empty);
}

let mut block_hashes = Vec::new();
for block_num in (last_poll + 1)..=current_block {
if let Some(block) = self.get_block(block_num.into(), false).await? {
block_hashes.push(block.header().hash());
}
}

// Update last_poll in filter_state
self.filter_state
.update_last_poll(filter_id, current_block)
.await;
Ok(FilterChanges::Hashes(block_hashes))
}
None => Err(eyre!("filter not found")),
}
}

async fn uninstall_filter(&self, filter_id: U256) -> Result<bool> {
Ok(self.filter_state.uninstall_filter(filter_id).await)
}
Expand Down
15 changes: 15 additions & 0 deletions core/src/execution/filter_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,19 @@ impl FilterState {
pub async fn get_filter(&self, id: U256) -> Option<FilterType> {
self.filters.read().await.get(&id).cloned()
}

pub async fn update_last_poll(&self, id: U256, last_poll: u64) {
if let Some(filter_type) = self.filters.write().await.get_mut(&id) {
match filter_type {
FilterType::Logs {
last_poll: ref mut lp,
..
} => *lp = last_poll,
FilterType::Blocks {
last_poll: ref mut lp,
..
} => *lp = last_poll,
}
}
}
}
9 changes: 3 additions & 6 deletions core/src/jsonrpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use alloy::rpc::json_rpc::RpcObject;
use alloy::rpc::types::{
AccessListResult, EIP1186AccountProofResponse, Filter, FilterChanges, Log, SyncStatus,
};
use eyre::{eyre, Result};
use eyre::Result;
use jsonrpsee::{
core::{async_trait, server::Methods, SubscriptionResult},
proc_macros::rpc,
Expand Down Expand Up @@ -357,11 +357,8 @@ impl<N: NetworkSpec>
convert_err(self.client.get_logs(&filter).await)
}

async fn get_filter_changes(
&self,
_filter_id: U256,
) -> Result<FilterChanges, ErrorObjectOwned> {
convert_err(Err(eyre!("not implemented")))
async fn get_filter_changes(&self, filter_id: U256) -> Result<FilterChanges, ErrorObjectOwned> {
convert_err(self.client.get_filter_changes(filter_id).await)
}

async fn get_filter_logs(&self, filter_id: U256) -> Result<Vec<Log>, ErrorObjectOwned> {
Expand Down
95 changes: 94 additions & 1 deletion tests/rpc_equivalence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ use helios_verifiable_api_server::server::{
//
// Log/Event Methods:
// - eth_getLogs (get_logs, get_logs_by_address, get_logs_by_topic, get_logs_block_range)
// - eth_getFilterChanges (get_filter_changes, get_filter_changes_empty)
// - eth_newFilter (used in filter tests)
// - eth_newBlockFilter (used in filter tests)
// - eth_uninstallFilter (used in filter tests)
//
// Historical Data:
// - Historical block access (get_historical_block)
Expand Down Expand Up @@ -1134,12 +1138,98 @@ async fn test_get_logs_block_range(helios: &RootProvider, expected: &RootProvide
target_block,
block_num
));
}
}
}
Ok(())
}

async fn test_get_filter_changes(helios: &RootProvider, expected: &RootProvider) -> Result<()> {
// Create a log filter
let latest_block = helios.get_block_number().await?;
let target_block = latest_block.saturating_sub(2); // A few blocks back for more activity

let filter = Filter::new()
.from_block(target_block)
.to_block(target_block); // from == to (single block)

// Create filter on both providers
let helios_filter_id = helios.new_filter(&filter).await?;
let expected_filter_id = expected.new_filter(&filter).await?;

// Get filter changes (should return logs from the target block)
let helios_changes = helios.get_filter_changes(helios_filter_id).await?;
let expected_changes = expected.get_filter_changes(expected_filter_id).await?;

// Both should return the same type of changes
match (&helios_changes, &expected_changes) {
(alloy::rpc::types::FilterChanges::Logs(helios_logs), alloy::rpc::types::FilterChanges::Logs(expected_logs)) => {
ensure_eq!(
helios_logs.len(),
expected_logs.len(),
"Filter changes logs count mismatch: expected {}, got {}",
expected_logs.len(),
helios_logs.len()
);
}
(alloy::rpc::types::FilterChanges::Empty, alloy::rpc::types::FilterChanges::Empty) => {
// Both empty - this is fine
}
_ => {
return Err(eyre::eyre!(
"Filter changes type mismatch: helios={:?}, expected={:?}",
helios_changes,
expected_changes
));
}
}

// Clean up filters
helios.uninstall_filter(helios_filter_id).await?;
expected.uninstall_filter(expected_filter_id).await?;

Ok(())
}

async fn test_get_filter_changes_empty(helios: &RootProvider, expected: &RootProvider) -> Result<()> {
// Create a block filter
let helios_filter_id = helios.new_block_filter().await?;
let expected_filter_id = expected.new_block_filter().await?;

// Get filter changes immediately (should be empty since no new blocks)
let helios_changes = helios.get_filter_changes(helios_filter_id).await?;
let expected_changes = expected.get_filter_changes(expected_filter_id).await?;

// Both should return empty or the same type
match (&helios_changes, &expected_changes) {
(alloy::rpc::types::FilterChanges::Empty, alloy::rpc::types::FilterChanges::Empty) => {
// Both empty - this is expected
}
(alloy::rpc::types::FilterChanges::Hashes(helios_hashes), alloy::rpc::types::FilterChanges::Hashes(expected_hashes)) => {
// Both have hashes - they should match
ensure_eq!(
helios_hashes.len(),
expected_hashes.len(),
"Filter changes block hashes count mismatch: expected {}, got {}",
expected_hashes.len(),
helios_hashes.len()
);
}
_ => {
return Err(eyre::eyre!(
"Filter changes type mismatch: helios={:?}, expected={:?}",
helios_changes,
expected_changes
));
}
}

// Clean up filters
helios.uninstall_filter(helios_filter_id).await?;
expected.uninstall_filter(expected_filter_id).await?;

Ok(())
}

async fn test_call_complex_contract(helios: &RootProvider, expected: &RootProvider) -> Result<()> {
let block_num = helios.get_block_number().await?;
let weth = address!("C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"); // WETH contract
Expand Down Expand Up @@ -1451,6 +1541,9 @@ async fn rpc_equivalence_tests() {
spawn_test!(test_get_logs_by_address, "get_logs_by_address"),
spawn_test!(test_get_logs_by_topic, "get_logs_by_topic"),
spawn_test!(test_get_logs_block_range, "get_logs_block_range"),
// Filter Methods
spawn_test!(test_get_filter_changes, "get_filter_changes"),
spawn_test!(test_get_filter_changes_empty, "get_filter_changes_empty"),
// Historical Data
spawn_test!(test_get_historical_block, "get_historical_block"),
spawn_test!(test_get_too_old_block, "get_too_old_block"),
Expand Down