Skip to content
Open
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
fd8d6cd
fee collector indexer + send packet indexer
RustNinja May 6, 2024
c19fc7c
Merge pull request #155 from ComposableFi/rustdev/fee-collector-send-…
RustNinja May 6, 2024
6128e21
introduced DatabaseFeeCollector to save the fee collected/withdrawn
RustNinja May 7, 2024
69a4e47
Merge pull request #156 from ComposableFi/rustdev/db-fee-collector
RustNinja May 7, 2024
61015fe
Decode SendPacketData to use an object to store to db.
RustNinja May 7, 2024
66a385f
parse and use event of send packet to store into db
RustNinja May 8, 2024
34923a3
Merge pull request #157 from ComposableFi/rustdev/db-send-packet-event
RustNinja May 8, 2024
81b8b81
extract relayer tx that submit calls to the ibc-core contract.
RustNinja May 8, 2024
85ec404
Introduce coinmarketcap api to fetch a price and add metrics fee usd.
RustNinja May 8, 2024
ae4ec00
Merge pull request #158 from ComposableFi/rustdev/coinmarketcap-price…
RustNinja May 8, 2024
a588239
add migration for indexer db and find the tx timestamp for fee collector
RustNinja May 8, 2024
6776314
update migration + feecollectors table
RustNinja May 8, 2024
6498c6c
collect send packet events into vector for storing to db
RustNinja May 8, 2024
dc0d3f5
added store_relayertransactions method to store relayer tx
RustNinja May 8, 2024
9aad530
add sendpacketevents into migration
RustNinja May 8, 2024
94d54df
added store_feecollectors method
RustNinja May 8, 2024
ce1b338
use the column count from corresponding database struct
RustNinja May 8, 2024
8bca713
add store_sendpacketevents method
RustNinja May 8, 2024
abb649e
Rename to DatabaseRelayerTransaction, update store_data method
RustNinja May 8, 2024
6fc42e8
update sql insert script into sendpacketevents table.
RustNinja May 8, 2024
d8b0850
remove commented not necessary code.
RustNinja May 8, 2024
403f755
parse call batch methods to store into db actions from relayer.
RustNinja May 9, 2024
7d026c1
remove commented code.
RustNinja May 9, 2024
a72ad4c
remove commented code.
RustNinja May 9, 2024
8f15f86
remove "continue" that will push the fee collector event anyway
RustNinja May 9, 2024
87d08a2
fix errors when insert to database.
RustNinja May 9, 2024
99c6e00
introduce args for metric indexer config instead of hardcoded varibables
RustNinja May 9, 2024
3f618d8
Merge pull request #160 from ComposableFi/rustdev/parse-batch-call-me…
RustNinja May 9, 2024
0565e9e
Merge pull request #161 from ComposableFi/rustdev/metrics-args
RustNinja May 9, 2024
079f5c8
Add block number to fee collector, fix bug with decoding, extract to fn
RustNinja May 10, 2024
d46272f
Merge pull request #162 from ComposableFi/rustdev/metrics-refactoring
RustNinja May 10, 2024
a6ad620
introduce incremental id field for feecollectors + store usd for fee
RustNinja May 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions hyperspace/ethereum/evm-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ version = "2.0.2"
edition = "2021"

[dependencies]
chrono = "0.4"
anyhow = "1"
array-bytes = "6.0.0"
async-trait = "0.1"
Expand Down
4 changes: 3 additions & 1 deletion hyperspace/ethereum/evm-indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ docker build . -t indexer
```

3. Copy the `.env.example` file to `.env` and add your environment variables.

4. Run the image

`TODO: programs and flags.`

5. Usage
`DATABASE_URL="pg://postgres:password@localhost/postgres" REDIS_URL="redis://localhost:6379" cargo run --package evm-indexer --bin evm-indexer -- --chain=mainnet --start-block=19789739 --rpcs=https://geth-execution-0.ethereum-mainnet.sre-scratchpad-349209.composablenodes.tech:443 --contract-addresses=0xd856f0f9efa054896fe3596e05978bbe686de131 --ibc-core-address=0xd856f0f9efa054896fe3596e05978bbe686de131 --relayer-address=0x6c57e54C379b5716999ae9AAE4917c9B35AC2CB9 --fee-collector-address=0x6f11fb4d4178c5d5e3ee4bc7820d8cfed0b59ce7 --ics20-transfer-bank-address=0x148acd3cd4d6a17cd2abbecd0745b09b62c64f84 --coinmarketcap-api-key=449a42d7-b66f-4b85-b957-cc6599b8c1e6`

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it a real api-key ?

@RustNinja RustNinja May 10, 2024

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

real key. without any funds. 10k monthly credits calls.
example.
repo is private. you can substitute with your key if you wish

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

58 changes: 57 additions & 1 deletion hyperspace/ethereum/evm-indexer/bin/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,68 @@ use evm_indexer::{
configs::indexer_config::EVMIndexerConfig, db::db::Database, indexer, rpc::rpc::Rpc,
};
use log::*;
use serde::Deserialize;
use simple_logger::SimpleLogger;
use std::time::Duration;
use tokio::time::sleep;

use std::collections::HashMap;
use reqwest::{Client, Url};
use chrono::Utc;


async fn fetch_cryptocurrency_data(api_key: &str) -> Result<String, reqwest::Error> {
//price-performance-stats is not allowed for free plan
// let mut url = Url::parse("https://pro-api.coinmarketcap.com/v2/cryptocurrency/price-performance-stats/latest").unwrap();
//only quotes/latest is allowed for free plan
let mut url = Url::parse("https://pro-api.coinmarketcap.com/v2/cryptocurrency/quotes/latest").unwrap();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe, this should be configurable? not hardcodeded, in case coinmarketcap changes endpoint

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are so many params for this indexer to set as a dependency.
if endpoint will change then the code probably will not work at all and need to change the logic inside of source code.


let mut query_params = HashMap::new();
query_params.insert("symbol", "ETH");

url.set_query(Some(&query_params.iter().map(|(k, v)| format!("{}={}", k, v)).collect::<Vec<_>>().join("&")));

let client = Client::new();
let resp = client.get(url)
.header("X-CMC_PRO_API_KEY", api_key)
.header("Accepts", "application/json")
.send().await.expect("Failed to send request")
.text().await?;

Ok(resp)
}

async fn price_from_cmc_data(cmc_api_key : &str) -> Option<f64> {
let str_cmc_data = fetch_cryptocurrency_data(cmc_api_key).await.unwrap();
let data: serde_json::Value = serde_json::from_str(&str_cmc_data).unwrap();
data["data"]["ETH"][0]["quote"]["USD"]["price"].as_f64()
}

#[derive(Debug)]
struct Cryptocurrency {
name: String,
price: f64,
last_timestamp: i64,
}

#[tokio::main()]
async fn main() {
dotenv().ok();



let log = SimpleLogger::new().with_level(LevelFilter::Info);

let mut config = EVMIndexerConfig::new();

let mut eth_price = Cryptocurrency {
name: "ETH".to_string(),
price: price_from_cmc_data(&config.coinmarketcap_api_key).await.unwrap(),
last_timestamp: Utc::now().timestamp(),
};

println!("{:?}", eth_price);

if config.debug {
log.with_level(LevelFilter::Debug).init().unwrap();
} else {
Expand Down Expand Up @@ -46,7 +96,13 @@ async fn main() {
None => rpc.get_last_block().await.unwrap(),
};
loop {
indexer::sync_chain(&rpc, &db, &mut config, &mut indexed_blocks, from_block).await;
//check that timestamp is not older than 60 mins
if eth_price.last_timestamp < Utc::now().timestamp() - 60 * 60 {
eth_price.price = price_from_cmc_data(&config.coinmarketcap_api_key).await.unwrap();
eth_price.last_timestamp = Utc::now().timestamp();
println!("new eth price {:?}", eth_price);
}
indexer::sync_chain(&rpc, &db, &mut config, &mut indexed_blocks, from_block, eth_price.price).await;
sleep(Duration::from_millis(50)).await;
}
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
CREATE TABLE relayertransactions (
block_hash TEXT NOT NULL,
block_number BIGINT NOT NULL,
chain TEXT NOT NULL,
from_address TEXT NOT NULL,
gas TEXT NOT NULL,
gas_price TEXT NOT NULL,
fee_usd TEXT NOT NULL,
hash TEXT NOT NULL,
input TEXT NOT NULL,
max_fee_per_gas TEXT,
max_priority_fee_per_gas TEXT,
method TEXT NOT NULL,
call_batch_methods TEXT NOT NULL,
nonce TEXT NOT NULL,
timestamp TEXT NOT NULL,
to_address TEXT NOT NULL,
transaction_index BIGINT NOT NULL,
transaction_type BIGINT,
value TEXT NOT NULL,
CONSTRAINT relayertransactions_pkey PRIMARY KEY (hash)
);

CREATE INDEX IF NOT EXISTS relayertransactions_by_block_number ON relayertransactions (block_number DESC);

CREATE INDEX IF NOT EXISTS relayertransactions_by_sender ON relayertransactions (from_address); --; -- STORING (to_address);

CREATE INDEX IF NOT EXISTS relayertransactions_by_receiver ON relayertransactions (to_address); --; -- STORING (from_address);

CREATE INDEX IF NOT EXISTS relayertransactions_by_chain ON relayertransactions (chain);

CREATE INDEX IF NOT EXISTS relayertransactions_by_timestamp ON relayertransactions (timestamp DESC);

CREATE TABLE feecollectors (
token_address TEXT NOT NULL,
transaction_hash TEXT NOT NULL,
address TEXT NOT NULL,
block_number BIGINT NOT NULL,
amount TEXT NOT NULL,
timestamp TEXT NOT NULL,
is_withdrawn TEXT NOT NULL,
CONSTRAINT feecollectors_pkey PRIMARY KEY (token_address, transaction_hash, address)
);

CREATE INDEX IF NOT EXISTS feecollectors_by_timestamp ON feecollectors (timestamp DESC);

CREATE INDEX IF NOT EXISTS feecollectors_by_address ON feecollectors (address);

CREATE INDEX IF NOT EXISTS feecollectors_by_receiver ON feecollectors (token_address);


CREATE TABLE sendpacketevents (
transaction_hash TEXT NOT NULL,
sequence BIGINT NOT NULL,
source_port TEXT NOT NULL,
source_channel TEXT NOT NULL,
source_port_indexed TEXT NOT NULL,
source_channel_indexed TEXT NOT NULL,
timeout_revision_number BIGINT NOT NULL,
timeout_revision_height BIGINT NOT NULL,
timeout_timestamp BIGINT NOT NULL,
data TEXT NOT NULL,
amount TEXT NOT NULL,
denom TEXT NOT NULL,
receiver TEXT NOT NULL,
sender TEXT NOT NULL,
memo TEXT NOT NULL,
CONSTRAINT sendpacketevents_pkey PRIMARY KEY (transaction_hash)
);

CREATE INDEX IF NOT EXISTS sendpacketevents_by_sequence ON sendpacketevents (sequence);

CREATE INDEX IF NOT EXISTS sendpacketevents_by_transaction_hash ON sendpacketevents (transaction_hash);

CREATE INDEX IF NOT EXISTS sendpacketevents_by_receiver ON sendpacketevents (receiver);

CREATE INDEX IF NOT EXISTS sendpacketevents_by_denom ON sendpacketevents (denom);

CREATE INDEX IF NOT EXISTS sendpacketevents_by_sender ON sendpacketevents (sender);

CREATE INDEX IF NOT EXISTS sendpacketevents_by_amount ON sendpacketevents (amount);

25 changes: 25 additions & 0 deletions hyperspace/ethereum/evm-indexer/src/configs/indexer_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,21 @@ pub struct EVMIndexerArgs {

#[arg(long, help = "Comma separated list of contract addresses to index.")]
pub contract_addresses: Vec<Address>,

#[arg(long, help = "Relayer address to track and index his transactions.")]
pub relayer_address: Address,

#[arg(long, help = "Fee Collector Address")]
pub fee_collector_address: Address,

#[arg(long, help = "Ics20 Transfer Bank Address")]
pub ics20_transfer_bank_address: Address,

#[arg(long, help = "Ibc Core Address")]
pub ibc_core_address: Address,

#[arg(long, help = "Coinmarketcap Api Key to fetch a currency price and store into database.")]
pub coinmarketcap_api_key: String,
}

#[derive(Debug, Clone)]
Expand All @@ -43,6 +58,11 @@ pub struct EVMIndexerConfig {
pub recalc_blocks_indexer: bool,
pub contract_addresses: Vec<Address>,
pub block_confirmation_length: usize,
pub relayer_address: Address,
pub fee_collector_address: Address,
pub ics20_transfer_bank_address: Address,
pub ibc_core_address: Address,
pub coinmarketcap_api_key: String,
}

impl EVMIndexerConfig {
Expand Down Expand Up @@ -71,6 +91,11 @@ impl EVMIndexerConfig {
recalc_blocks_indexer: args.recalculate_indexed_blocks,
contract_addresses: args.contract_addresses,
block_confirmation_length: 14,
relayer_address: args.relayer_address,
fee_collector_address: args.fee_collector_address,
ics20_transfer_bank_address: args.ics20_transfer_bank_address,
ibc_core_address: args.ibc_core_address,
coinmarketcap_api_key: args.coinmarketcap_api_key,
}
}
}
125 changes: 123 additions & 2 deletions hyperspace/ethereum/evm-indexer/src/db/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ use sqlx::{
use crate::{chains::chains::Chain, db::migration::migrate};

use super::models::models::{
DatabaseBlock, DatabaseChainIndexedState, DatabaseContract, DatabaseContractInformation,
DatabaseIBCEventData, DatabaseLog, DatabaseMethod, DatabaseReceipt, DatabaseTransaction,
DatabaseBlock, DatabaseChainIndexedState, DatabaseContract, DatabaseContractInformation, DatabaseFeeCollector, DatabaseIBCEventData, DatabaseLog, DatabaseMethod, DatabaseReceipt, DatabaseSendPacketEvent, DatabaseTransaction, DatabaseRelayerTransaction
};

pub const MAX_DIESEL_PARAM_SIZE: u16 = u16::MAX;
Expand Down Expand Up @@ -117,6 +116,9 @@ impl Database {
logs: &Vec<DatabaseLog>,
contracts: &Vec<DatabaseContract>,
ibc_events: &Vec<DatabaseIBCEventData>,
store_relayertransactions: &Vec<DatabaseRelayerTransaction>,
store_feecollectors: &Vec<DatabaseFeeCollector>,
store_sendpacketevents: &Vec<DatabaseSendPacketEvent>,
) {
if contracts.len() > 0 {
self.store_contracts(&contracts).await.unwrap();
Expand All @@ -142,6 +144,18 @@ impl Database {
self.store_ibc_events(&ibc_events).await.unwrap();
}

if store_relayertransactions.len() > 0 {
self.store_relayertransactions(&store_relayertransactions).await.unwrap();
}

if store_feecollectors.len() > 0 {
self.store_feecollectors(&store_feecollectors).await.unwrap();
}

if store_sendpacketevents.len() > 0 {
self.store_sendpacketevents(&store_sendpacketevents).await.unwrap();
}

info!(
"Inserted: blocks ({}) transactions ({}) receipts ({}) logs ({}) contracts ({}) ibc events ({}) for chain {}",
blocks.len(),
Expand Down Expand Up @@ -235,6 +249,113 @@ impl Database {
Ok(())
}

async fn store_relayertransactions(&self, relayertransactions: &Vec<DatabaseRelayerTransaction>) -> Result<()> {
let connection = self.get_connection();

let chunks = get_chunks(relayertransactions.len(), DatabaseRelayerTransaction::field_count());

for (start, end) in chunks {
let mut query_builder = QueryBuilder::new("INSERT INTO relayertransactions (block_hash, block_number, chain, from_address, gas, gas_price, fee_usd, max_priority_fee_per_gas, max_fee_per_gas, hash, input, method, call_batch_methods, nonce, timestamp, to_address, transaction_index, transaction_type, value) ");

query_builder.push_values(&relayertransactions[start..end], |mut row, transaction| {
row.push_bind(transaction.block_hash.clone())
.push_bind(transaction.block_number)
.push_bind(transaction.chain.clone())
.push_bind(transaction.from_address.clone())
.push_bind(transaction.gas.clone())
.push_bind(transaction.gas_price.clone())
.push_bind(transaction.fee_usd.clone())
.push_bind(transaction.max_priority_fee_per_gas.clone())
.push_bind(transaction.max_fee_per_gas.clone())
.push_bind(transaction.hash.clone())
.push_bind(transaction.input.clone())
.push_bind(transaction.method.clone())
.push_bind(transaction.call_batch_methods.clone())
.push_bind(transaction.nonce.clone())
.push_bind(transaction.timestamp.clone())
.push_bind(transaction.to_address.clone())
.push_bind(transaction.transaction_index)
.push_bind(transaction.transaction_type)
.push_bind(transaction.value.clone());
});

let query = query_builder.build();

query
.execute(connection)
.await
.expect("Unable to store relayertransactions into database");
}

Ok(())
}

async fn store_feecollectors(&self, fee_collector_events: &Vec<DatabaseFeeCollector>) -> Result<()> {
let connection = self.get_connection();

let chunks = get_chunks(fee_collector_events.len(), DatabaseFeeCollector::field_count());

for (start, end) in chunks {
let mut query_builder = QueryBuilder::new("INSERT INTO feecollectors (token_address, transaction_hash, address, block_number, amount, timestamp, is_withdrawn) ");

query_builder.push_values(&fee_collector_events[start..end], |mut row, transaction| {
row.push_bind(transaction.token_address.clone())
.push_bind(transaction.transaction_hash.clone())
.push_bind(transaction.address.clone())
.push_bind(transaction.block_number)
.push_bind(transaction.amount.clone())
.push_bind(transaction.timestamp.clone())
.push_bind(transaction.is_withdrawn);
});

let query = query_builder.build();

query
.execute(connection)
.await
.expect("Unable to store feecollectors into database");
}

Ok(())
}

async fn store_sendpacketevents(&self, sendpacketevents: &Vec<DatabaseSendPacketEvent>) -> Result<()> {
let connection = self.get_connection();

let chunks = get_chunks(sendpacketevents.len(), DatabaseSendPacketEvent::field_count());

for (start, end) in chunks {
let mut query_builder = QueryBuilder::new("INSERT INTO sendpacketevents (transaction_hash, sequence, source_port, source_channel, source_port_indexed, source_channel_indexed, timeout_revision_number, timeout_revision_height, timeout_timestamp, data, amount, denom, receiver, sender, memo) ");

query_builder.push_values(&sendpacketevents[start..end], |mut row, transaction| {
row.push_bind(transaction.transaction_hash.clone())
.push_bind(transaction.sequence)
.push_bind(transaction.source_port.clone())
.push_bind(transaction.source_channel.clone())
.push_bind(transaction.source_port_indexed.clone())
.push_bind(transaction.source_channel_indexed.clone())
.push_bind(transaction.timeout_revision_number)
.push_bind(transaction.timeout_revision_height)
.push_bind(transaction.timeout_timestamp)
.push_bind(transaction.data.clone())
.push_bind(transaction.amount.clone())
.push_bind(transaction.denom.clone())
.push_bind(transaction.receiver.clone())
.push_bind(transaction.sender.clone())
.push_bind(transaction.memo.clone());
});

let query = query_builder.build();

query
.execute(connection)
.await
.expect("Unable to store sendpacketevents into database");
}

Ok(())
}

async fn store_transactions_receipts(&self, receipts: &Vec<DatabaseReceipt>) -> Result<()> {
let connection = self.get_connection();

Expand Down
Loading