diff --git a/core/src/client/api.rs b/core/src/client/api.rs index 9b348346..7815f70d 100644 --- a/core/src/client/api.rs +++ b/core/src/client/api.rs @@ -1,7 +1,9 @@ use alloy::{ eips::BlockId, primitives::{Address, Bytes, B256, U256}, - rpc::types::{AccessListResult, EIP1186AccountProofResponse, Filter, Log, SyncStatus}, + rpc::types::{ + AccessListResult, EIP1186AccountProofResponse, Filter, FilterChanges, Log, SyncStatus, + }, }; use async_trait::async_trait; use eyre::Result; @@ -65,6 +67,7 @@ pub trait HeliosApi: Send + Sync + 'static { // filters and subscriptions async fn subscribe(&self, sub_type: SubscriptionType) -> Result>; async fn get_filter_logs(&self, filter_id: U256) -> Result>; + async fn get_filter_changes(&self, filter_id: U256) -> Result; async fn uninstall_filter(&self, filter_id: U256) -> Result; async fn new_filter(&self, filter: &Filter) -> Result; async fn new_block_filter(&self) -> Result; diff --git a/core/src/client/node.rs b/core/src/client/node.rs index e83f3001..29e93fdd 100644 --- a/core/src/client/node.rs +++ b/core/src/client/node.rs @@ -3,11 +3,11 @@ use std::sync::Arc; use alloy::consensus::BlockHeader; use alloy::eips::{BlockId, BlockNumberOrTag}; -use alloy::network::BlockResponse; +use alloy::network::{primitives::HeaderResponse, BlockResponse}; use alloy::primitives::{Address, Bytes, B256, U256}; use alloy::rpc::types::{ AccessListItem, AccessListResult, EIP1186AccountProofResponse, EIP1186StorageProof, Filter, - Log, SyncInfo, SyncStatus, + FilterChanges, Log, SyncInfo, SyncStatus, }; use async_trait::async_trait; use eyre::{eyre, Result}; @@ -375,6 +375,52 @@ impl, E: ExecutionProvider> He } } + async fn get_filter_changes(&self, filter_id: U256) -> Result { + match self.filter_state.get_filter(filter_id).await { + Some(FilterType::Logs { filter, last_poll }) => { + let current_block = self.get_block_number().await?.try_into()?; + if current_block <= last_poll { + return Ok(FilterChanges::Empty); + } + + let mut updated_filter = filter.clone(); + updated_filter.block_option = updated_filter + .block_option + .with_from_block((last_poll + 1).into()); + let logs = self.get_logs(&updated_filter).await?; + + // Update last_poll in filter_state + self.filter_state + .update_last_poll(filter_id, current_block) + .await; + Ok(FilterChanges::Logs(logs)) + } + Some(FilterType::Blocks { + start_block: _, + last_poll, + }) => { + let current_block = self.get_block_number().await?.try_into()?; + if current_block <= last_poll { + return Ok(FilterChanges::Empty); + } + + let mut block_hashes = Vec::new(); + for block_num in (last_poll + 1)..=current_block { + if let Some(block) = self.get_block(block_num.into(), false).await? { + block_hashes.push(block.header().hash()); + } + } + + // Update last_poll in filter_state + self.filter_state + .update_last_poll(filter_id, current_block) + .await; + Ok(FilterChanges::Hashes(block_hashes)) + } + None => Err(eyre!("filter not found")), + } + } + async fn uninstall_filter(&self, filter_id: U256) -> Result { Ok(self.filter_state.uninstall_filter(filter_id).await) } diff --git a/core/src/execution/filter_state.rs b/core/src/execution/filter_state.rs index 07357f01..664fe857 100644 --- a/core/src/execution/filter_state.rs +++ b/core/src/execution/filter_state.rs @@ -46,4 +46,19 @@ impl FilterState { pub async fn get_filter(&self, id: U256) -> Option { self.filters.read().await.get(&id).cloned() } + + pub async fn update_last_poll(&self, id: U256, last_poll: u64) { + if let Some(filter_type) = self.filters.write().await.get_mut(&id) { + match filter_type { + FilterType::Logs { + last_poll: ref mut lp, + .. + } => *lp = last_poll, + FilterType::Blocks { + last_poll: ref mut lp, + .. + } => *lp = last_poll, + } + } + } } diff --git a/core/src/jsonrpc/mod.rs b/core/src/jsonrpc/mod.rs index ba161c4f..19528bbb 100644 --- a/core/src/jsonrpc/mod.rs +++ b/core/src/jsonrpc/mod.rs @@ -9,7 +9,7 @@ use alloy::rpc::json_rpc::RpcObject; use alloy::rpc::types::{ AccessListResult, EIP1186AccountProofResponse, Filter, FilterChanges, Log, SyncStatus, }; -use eyre::{eyre, Result}; +use eyre::Result; use jsonrpsee::{ core::{async_trait, server::Methods, SubscriptionResult}, proc_macros::rpc, @@ -357,11 +357,8 @@ impl convert_err(self.client.get_logs(&filter).await) } - async fn get_filter_changes( - &self, - _filter_id: U256, - ) -> Result { - convert_err(Err(eyre!("not implemented"))) + async fn get_filter_changes(&self, filter_id: U256) -> Result { + convert_err(self.client.get_filter_changes(filter_id).await) } async fn get_filter_logs(&self, filter_id: U256) -> Result, ErrorObjectOwned> { diff --git a/tests/rpc_equivalence.rs b/tests/rpc_equivalence.rs index 629ca25c..3e58f40a 100644 --- a/tests/rpc_equivalence.rs +++ b/tests/rpc_equivalence.rs @@ -58,6 +58,10 @@ use helios_verifiable_api_server::server::{ // // Log/Event Methods: // - eth_getLogs (get_logs, get_logs_by_address, get_logs_by_topic, get_logs_block_range) +// - eth_getFilterChanges (get_filter_changes, get_filter_changes_empty) +// - eth_newFilter (used in filter tests) +// - eth_newBlockFilter (used in filter tests) +// - eth_uninstallFilter (used in filter tests) // // Historical Data: // - Historical block access (get_historical_block) @@ -1134,12 +1138,98 @@ async fn test_get_logs_block_range(helios: &RootProvider, expected: &RootProvide target_block, block_num )); - } } } Ok(()) } +async fn test_get_filter_changes(helios: &RootProvider, expected: &RootProvider) -> Result<()> { + // Create a log filter + let latest_block = helios.get_block_number().await?; + let target_block = latest_block.saturating_sub(2); // A few blocks back for more activity + + let filter = Filter::new() + .from_block(target_block) + .to_block(target_block); // from == to (single block) + + // Create filter on both providers + let helios_filter_id = helios.new_filter(&filter).await?; + let expected_filter_id = expected.new_filter(&filter).await?; + + // Get filter changes (should return logs from the target block) + let helios_changes = helios.get_filter_changes(helios_filter_id).await?; + let expected_changes = expected.get_filter_changes(expected_filter_id).await?; + + // Both should return the same type of changes + match (&helios_changes, &expected_changes) { + (alloy::rpc::types::FilterChanges::Logs(helios_logs), alloy::rpc::types::FilterChanges::Logs(expected_logs)) => { + ensure_eq!( + helios_logs.len(), + expected_logs.len(), + "Filter changes logs count mismatch: expected {}, got {}", + expected_logs.len(), + helios_logs.len() + ); + } + (alloy::rpc::types::FilterChanges::Empty, alloy::rpc::types::FilterChanges::Empty) => { + // Both empty - this is fine + } + _ => { + return Err(eyre::eyre!( + "Filter changes type mismatch: helios={:?}, expected={:?}", + helios_changes, + expected_changes + )); + } + } + + // Clean up filters + helios.uninstall_filter(helios_filter_id).await?; + expected.uninstall_filter(expected_filter_id).await?; + + Ok(()) +} + +async fn test_get_filter_changes_empty(helios: &RootProvider, expected: &RootProvider) -> Result<()> { + // Create a block filter + let helios_filter_id = helios.new_block_filter().await?; + let expected_filter_id = expected.new_block_filter().await?; + + // Get filter changes immediately (should be empty since no new blocks) + let helios_changes = helios.get_filter_changes(helios_filter_id).await?; + let expected_changes = expected.get_filter_changes(expected_filter_id).await?; + + // Both should return empty or the same type + match (&helios_changes, &expected_changes) { + (alloy::rpc::types::FilterChanges::Empty, alloy::rpc::types::FilterChanges::Empty) => { + // Both empty - this is expected + } + (alloy::rpc::types::FilterChanges::Hashes(helios_hashes), alloy::rpc::types::FilterChanges::Hashes(expected_hashes)) => { + // Both have hashes - they should match + ensure_eq!( + helios_hashes.len(), + expected_hashes.len(), + "Filter changes block hashes count mismatch: expected {}, got {}", + expected_hashes.len(), + helios_hashes.len() + ); + } + _ => { + return Err(eyre::eyre!( + "Filter changes type mismatch: helios={:?}, expected={:?}", + helios_changes, + expected_changes + )); + } + } + + // Clean up filters + helios.uninstall_filter(helios_filter_id).await?; + expected.uninstall_filter(expected_filter_id).await?; + + Ok(()) +} + async fn test_call_complex_contract(helios: &RootProvider, expected: &RootProvider) -> Result<()> { let block_num = helios.get_block_number().await?; let weth = address!("C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"); // WETH contract @@ -1451,6 +1541,9 @@ async fn rpc_equivalence_tests() { spawn_test!(test_get_logs_by_address, "get_logs_by_address"), spawn_test!(test_get_logs_by_topic, "get_logs_by_topic"), spawn_test!(test_get_logs_block_range, "get_logs_block_range"), + // Filter Methods + spawn_test!(test_get_filter_changes, "get_filter_changes"), + spawn_test!(test_get_filter_changes_empty, "get_filter_changes_empty"), // Historical Data spawn_test!(test_get_historical_block, "get_historical_block"), spawn_test!(test_get_too_old_block, "get_too_old_block"),