diff --git a/Cargo.lock b/Cargo.lock index ce21c6e..81041af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -286,6 +286,16 @@ dependencies = [ "substreams-ethereum", ] +[[package]] +name = "db-evm-x402" +version = "0.0.0" +dependencies = [ + "common", + "proto", + "substreams", + "substreams-database-change 3.0.0", +] + [[package]] name = "dex-swaps" version = "0.1.0" @@ -1814,6 +1824,16 @@ dependencies = [ "tap", ] +[[package]] +name = "x402" +version = "0.0.0" +dependencies = [ + "proto", + "substreams", + "substreams-abis", + "substreams-ethereum", +] + [[package]] name = "zerocopy" version = "0.8.39" diff --git a/Cargo.toml b/Cargo.toml index c6b4115..8d988cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,8 @@ members = [ "evm-balances", "evm-supply", "evm-transfers", + "x402", + "evm-x402", "evm-dex", "evm-dex-foundational-store", diff --git a/evm-x402/Cargo.toml b/evm-x402/Cargo.toml new file mode 100644 index 0000000..1a93a67 --- /dev/null +++ b/evm-x402/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "db-evm-x402" +description = { workspace = true } +edition = { workspace = true } +version = { workspace = true } + +[lib] +crate-type = ["cdylib", "rlib"] + +[dependencies] +substreams = { workspace = true } +substreams-database-change = { workspace = true } + +proto = { path = "../proto" } +common = { path = "../common" } diff --git a/evm-x402/Makefile b/evm-x402/Makefile new file mode 100644 index 0000000..c55893d --- /dev/null +++ b/evm-x402/Makefile @@ -0,0 +1,29 @@ +ENDPOINT ?= base.substreams.pinax.network:443 +START_BLOCK ?= 45690000 +STOP_BLOCK ?= +1000 +PARALLEL_JOBS ?= 2500 +.DEFAULT_GOAL := pack + +.PHONY: build +build: + cargo build -p db-evm-x402 --target wasm32-unknown-unknown --release + +.PHONY: pack +pack: build + substreams pack -o ../spkg/{spkgDefaultName} + +.PHONY: noop +noop: build + substreams-sink-noop $(ENDPOINT) substreams.yaml db_out -H "X-Sf-Substreams-Parallel-Jobs: $(PARALLEL_JOBS)" : + +.PHONY: run +run: build + substreams run -e $(ENDPOINT) substreams.yaml db_out -s $(START_BLOCK) -t $(STOP_BLOCK) + +.PHONY: gui +gui: build + substreams gui -e $(ENDPOINT) substreams.yaml db_out -s $(START_BLOCK) --network mainnet + +.PHONY: prod +prod: build + substreams gui -e $(ENDPOINT) substreams.yaml db_out -s $(START_BLOCK) -t 0 --limit-processed-blocks 0 --production-mode -H "X-Sf-Substreams-Parallel-Jobs: $(PARALLEL_JOBS)" diff --git a/evm-x402/README.md b/evm-x402/README.md new file mode 100644 index 0000000..55f6649 --- /dev/null +++ b/evm-x402/README.md @@ -0,0 +1,5 @@ +# evm-x402 + +`evm-x402` maps normalized `x402` EVM payment events into database changes. + +The event extractor lives in the sibling `x402` package. This package imports `x402-v0.1.0.spkg` and exposes `db_out` for SQL sinks. diff --git a/evm-x402/clickhouse/Makefile b/evm-x402/clickhouse/Makefile new file mode 100644 index 0000000..ea26778 --- /dev/null +++ b/evm-x402/clickhouse/Makefile @@ -0,0 +1,26 @@ +ENDPOINT ?= base.substreams.pinax.network:443 +START_BLOCK ?= 45690000 +STOP_BLOCK ?= 45691000 +PARALLEL_JOBS ?= 1000 +.DEFAULT_GOAL := pack + +.PHONY: schema +schema: + @echo "-- This file is generated. Do not edit." > schema.sql + @echo "" >> schema.sql + @for f in $$(ls schema.*.sql 2>/dev/null | sort); do \ + cat "$$f" >> schema.sql; \ + echo "" >> schema.sql; \ + done + +.PHONY: pack +pack: schema + substreams pack -o ../../spkg/{spkgDefaultName} + +.PHONY: dev +dev: pack + substreams-sink-sql run clickhouse://default:@localhost:9000/default substreams.yaml -e $(ENDPOINT) $(START_BLOCK):$(STOP_BLOCK) --undo-buffer-size 0 --on-module-hash-mistmatch=warn --batch-block-flush-interval 1 --live-block-flush-interval 1 --infinite-retry --development-mode + +.PHONY: setup +setup: pack + substreams-sink-sql setup clickhouse://default:@localhost:9000/default substreams.yaml diff --git a/evm-x402/clickhouse/README.md b/evm-x402/clickhouse/README.md new file mode 100644 index 0000000..ff8e9f0 --- /dev/null +++ b/evm-x402/clickhouse/README.md @@ -0,0 +1,7 @@ +# evm-x402 ClickHouse + +ClickHouse sink package for settled x402 EVM payments. + +The sink writes individual settlements into `x402_payments` with block, transaction, log, and call metadata. + +It maintains materialized aggregate states for facilitator-first totals, recipient-first totals, and time-windowed volume grouped by facilitator, recipient, asset, and settlement type. diff --git a/evm-x402/clickhouse/schema.0.blocks.sql b/evm-x402/clickhouse/schema.0.blocks.sql new file mode 100644 index 0000000..1007122 --- /dev/null +++ b/evm-x402/clickhouse/schema.0.blocks.sql @@ -0,0 +1,13 @@ +CREATE TABLE IF NOT EXISTS blocks ( + block_num UInt32, + block_hash String, + timestamp DateTime(0, 'UTC'), + minute UInt32 MATERIALIZED toRelativeMinuteNum(timestamp), + + -- PROJECTIONS -- + PROJECTION prj_block_hash ( SELECT * ORDER BY block_hash ), + PROJECTION prj_timestamp ( SELECT * ORDER BY timestamp ) +) +ENGINE = MergeTree +ORDER BY ( block_num ) +COMMENT 'Blocks'; \ No newline at end of file diff --git a/evm-x402/clickhouse/schema.0.templates.sql b/evm-x402/clickhouse/schema.0.templates.sql new file mode 100644 index 0000000..3773446 --- /dev/null +++ b/evm-x402/clickhouse/schema.0.templates.sql @@ -0,0 +1,66 @@ +-- Template Logs -- +CREATE TABLE IF NOT EXISTS TEMPLATE_LOG ( + -- block -- + block_num UInt32, + block_hash String, + timestamp DateTime('UTC'), + minute UInt32 MATERIALIZED toRelativeMinuteNum(timestamp), + + -- transaction -- + tx_index UInt32, -- derived from Substreams + tx_hash String, + tx_from String, + tx_to LowCardinality(String), + tx_nonce UInt64, + tx_gas_price UInt256, + tx_gas_limit UInt64, + tx_gas_used UInt64, + tx_value UInt256, + + -- log -- + log_index UInt32, -- derived from Substreams + log_block_index UInt32 COMMENT 'BlockIndex represents the index of the log relative to the Block.', + log_address LowCardinality(String), + log_ordinal UInt64 COMMENT 'The block global ordinal when the log was recorded.', + log_topics String COMMENT 'Comma-separated list of log topics', + log_topic0 String MATERIALIZED splitByChar(',', log_topics)[1], -- event signature + log_topic1 String MATERIALIZED splitByChar(',', log_topics)[2], -- second topic (topic1), empty string if no topics + log_topic2 String MATERIALIZED splitByChar(',', log_topics)[3], -- third topic (topic2), empty string if no topics + log_topic3 String MATERIALIZED splitByChar(',', log_topics)[4], -- fourth topic (topic3), empty string if no topics + log_data String, + + -- call metadata (only available on chains with DetailLevel: EXTENDED) -- + call_caller String, + call_index UInt32, + call_begin_ordinal UInt64, + call_end_ordinal UInt64, + call_address String, + call_value UInt256, + call_gas_consumed UInt64, + call_gas_limit UInt64, + call_depth UInt32, + call_parent_index UInt32, + call_type LowCardinality(String), + + -- indexes -- + INDEX idx_timestamp (timestamp) TYPE minmax GRANULARITY 1, + INDEX idx_block_num (block_num) TYPE minmax GRANULARITY 1, + + -- count() -- + PROJECTION prj_tx_from_count ( SELECT tx_from, count(), min(block_num), max(block_num), min(timestamp), max(timestamp), min(minute), max(minute) GROUP BY tx_from ), + PROJECTION prj_tx_to_count ( SELECT tx_to, count(), min(block_num), max(block_num), min(timestamp), max(timestamp), min(minute), max(minute) GROUP BY tx_to ), + PROJECTION prj_tx_hash_count ( SELECT tx_hash, count(), min(block_num), max(block_num), min(timestamp), max(timestamp), min(minute), max(minute) GROUP BY tx_hash ), + PROJECTION prj_log_address_count ( SELECT log_address, count(), min(block_num), max(block_num), min(timestamp), max(timestamp), min(minute), max(minute) GROUP BY log_address ), + + -- minute + timestamp -- + PROJECTION prj_tx_hash_by_timestamp ( SELECT tx_hash, minute, timestamp GROUP BY tx_hash, minute, timestamp ), + + -- minute -- + PROJECTION prj_tx_from_by_minute ( SELECT tx_from, minute GROUP BY tx_from, minute ), + PROJECTION prj_tx_to_by_minute ( SELECT tx_to, minute GROUP BY tx_to, minute ), + PROJECTION prj_log_address_by_minute ( SELECT log_address, minute GROUP BY log_address, minute ) +) +ENGINE = MergeTree +ORDER BY ( + timestamp, block_num +); diff --git a/evm-x402/clickhouse/schema.1.table.x402_payments.sql b/evm-x402/clickhouse/schema.1.table.x402_payments.sql new file mode 100644 index 0000000..dfb43f6 --- /dev/null +++ b/evm-x402/clickhouse/schema.1.table.x402_payments.sql @@ -0,0 +1,40 @@ +-- Normalized x402 payment settlement logs written directly by `evm-x402` +CREATE TABLE IF NOT EXISTS x402_payments AS TEMPLATE_LOG +COMMENT 'Settled x402 payment events on EVM chains'; + +ALTER TABLE x402_payments + -- payment -- + ADD COLUMN IF NOT EXISTS asset LowCardinality(String), + ADD COLUMN IF NOT EXISTS payer String, + ADD COLUMN IF NOT EXISTS recipient String, + ADD COLUMN IF NOT EXISTS facilitator String, + ADD COLUMN IF NOT EXISTS amount UInt256, + ADD COLUMN IF NOT EXISTS nonce String, + ADD COLUMN IF NOT EXISTS transfer_method LowCardinality(String), + ADD COLUMN IF NOT EXISTS settlement_source LowCardinality(String), + ADD COLUMN IF NOT EXISTS scheme LowCardinality(String), + ADD COLUMN IF NOT EXISTS valid_after UInt256, + ADD COLUMN IF NOT EXISTS valid_before UInt256, + ADD COLUMN IF NOT EXISTS facilitator_allowlist_matched Bool, + ADD COLUMN IF NOT EXISTS confidence LowCardinality(String), + + -- INDEXES -- + ADD INDEX IF NOT EXISTS idx_amount (amount) TYPE minmax GRANULARITY 1, + + -- PROJECTIONS -- + -- count() -- + ADD PROJECTION IF NOT EXISTS prj_facilitator_count ( SELECT facilitator, count(), min(block_num), max(block_num), min(timestamp), max(timestamp), min(minute), max(minute) GROUP BY facilitator ), + ADD PROJECTION IF NOT EXISTS prj_recipient_count ( SELECT recipient, count(), min(block_num), max(block_num), min(timestamp), max(timestamp), min(minute), max(minute) GROUP BY recipient ), + ADD PROJECTION IF NOT EXISTS prj_payer_count ( SELECT payer, count(), min(block_num), max(block_num), min(timestamp), max(timestamp), min(minute), max(minute) GROUP BY payer ), + ADD PROJECTION IF NOT EXISTS prj_asset_count ( SELECT asset, count(), min(block_num), max(block_num), min(timestamp), max(timestamp), min(minute), max(minute) GROUP BY asset ), + ADD PROJECTION IF NOT EXISTS prj_transfer_method_count ( SELECT transfer_method, count(), min(block_num), max(block_num), min(timestamp), max(timestamp), min(minute), max(minute) GROUP BY transfer_method ), + ADD PROJECTION IF NOT EXISTS prj_settlement_source_count ( SELECT settlement_source, count(), min(block_num), max(block_num), min(timestamp), max(timestamp), min(minute), max(minute) GROUP BY settlement_source ), + + -- minute + timestamp -- + ADD PROJECTION IF NOT EXISTS prj_tx_hash_by_timestamp ( SELECT tx_hash, minute, timestamp GROUP BY tx_hash, minute, timestamp ), + + -- minute -- + ADD PROJECTION IF NOT EXISTS prj_facilitator_by_minute ( SELECT facilitator, minute GROUP BY facilitator, minute ), + ADD PROJECTION IF NOT EXISTS prj_recipient_by_minute ( SELECT recipient, minute GROUP BY recipient, minute ), + ADD PROJECTION IF NOT EXISTS prj_payer_by_minute ( SELECT payer, minute GROUP BY payer, minute ), + ADD PROJECTION IF NOT EXISTS prj_asset_by_minute ( SELECT asset, minute GROUP BY asset, minute ); diff --git a/evm-x402/clickhouse/schema.2.mv.state_x402.sql b/evm-x402/clickhouse/schema.2.mv.state_x402.sql new file mode 100644 index 0000000..b266ae9 --- /dev/null +++ b/evm-x402/clickhouse/schema.2.mv.state_x402.sql @@ -0,0 +1,101 @@ +-- x402 totals grouped for facilitator-first access patterns +CREATE TABLE IF NOT EXISTS state_x402 ( + -- timestamp & block number -- + min_timestamp SimpleAggregateFunction(min, DateTime('UTC', 0)) COMMENT 'first timestamp seen', + max_timestamp SimpleAggregateFunction(max, DateTime('UTC', 0)) COMMENT 'last timestamp seen', + min_block_num SimpleAggregateFunction(min, UInt32) COMMENT 'first block number seen', + max_block_num SimpleAggregateFunction(max, UInt32) COMMENT 'last block number seen', + + -- x402 identity -- + facilitator String, + recipient String, + asset LowCardinality(String), + transfer_method LowCardinality(String), + settlement_source LowCardinality(String), + scheme LowCardinality(String), + + -- aggregates -- + payments SimpleAggregateFunction(sum, UInt64), + amount SimpleAggregateFunction(sum, UInt256), + uniq_payer AggregateFunction(uniq, String), + uniq_tx_from AggregateFunction(uniq, String), + uniq_tx_hash AggregateFunction(uniq, String), + + -- indexes -- + INDEX idx_min_timestamp (min_timestamp) TYPE minmax GRANULARITY 1, + INDEX idx_max_timestamp (max_timestamp) TYPE minmax GRANULARITY 1, + INDEX idx_min_block_num (min_block_num) TYPE minmax GRANULARITY 1, + INDEX idx_max_block_num (max_block_num) TYPE minmax GRANULARITY 1, + INDEX idx_facilitator (facilitator) TYPE bloom_filter GRANULARITY 1, + INDEX idx_recipient (recipient) TYPE bloom_filter GRANULARITY 1, + INDEX idx_asset (asset) TYPE set(1024) GRANULARITY 1, + INDEX idx_payments (payments) TYPE minmax GRANULARITY 1, + INDEX idx_amount (amount) TYPE minmax GRANULARITY 1, + + -- projections -- + PROJECTION prj_group_by_facilitator ( + SELECT + min(min_timestamp), + max(max_timestamp), + min(min_block_num), + max(max_block_num), + facilitator, + asset, + transfer_method, + settlement_source, + scheme, + sum(payments), + sum(amount), + uniqMerge(uniq_payer), + uniqMerge(uniq_tx_from), + uniqMerge(uniq_tx_hash) + GROUP BY facilitator, asset, transfer_method, settlement_source, scheme + ), + PROJECTION prj_group_by_facilitator_recipient ( + SELECT + min(min_timestamp), + max(max_timestamp), + min(min_block_num), + max(max_block_num), + facilitator, + recipient, + asset, + transfer_method, + settlement_source, + scheme, + sum(payments), + sum(amount), + uniqMerge(uniq_payer), + uniqMerge(uniq_tx_from), + uniqMerge(uniq_tx_hash) + GROUP BY facilitator, recipient, asset, transfer_method, settlement_source, scheme + ) +) +ENGINE = AggregatingMergeTree +ORDER BY (facilitator, recipient, asset, transfer_method, settlement_source, scheme) +SETTINGS deduplicate_merge_projection_mode = 'rebuild' +COMMENT 'x402 totals grouped by facilitator first and recipient second'; + +CREATE MATERIALIZED VIEW IF NOT EXISTS mv_state_x402 +TO state_x402 +AS +SELECT + min(timestamp) AS min_timestamp, + max(timestamp) AS max_timestamp, + min(block_num) AS min_block_num, + max(block_num) AS max_block_num, + + facilitator, + recipient, + asset, + transfer_method, + settlement_source, + scheme, + + count() AS payments, + sum(amount) AS amount, + uniqState(payer) AS uniq_payer, + uniqState(tx_from) AS uniq_tx_from, + uniqState(tx_hash) AS uniq_tx_hash +FROM x402_payments +GROUP BY facilitator, recipient, asset, transfer_method, settlement_source, scheme; diff --git a/evm-x402/clickhouse/schema.3.view.state_x402.sql b/evm-x402/clickhouse/schema.3.view.state_x402.sql new file mode 100644 index 0000000..798bf83 --- /dev/null +++ b/evm-x402/clickhouse/schema.3.view.state_x402.sql @@ -0,0 +1,31 @@ +-- Finalized read view over the AggregatingMergeTree state table +CREATE VIEW IF NOT EXISTS x402_state AS +SELECT + -- timestamp & block number -- + min(min_timestamp) AS min_timestamp, + max(max_timestamp) AS max_timestamp, + min(min_block_num) AS min_block_num, + max(max_block_num) AS max_block_num, + + -- x402 identity -- + facilitator, + recipient, + asset, + transfer_method, + settlement_source, + scheme, + + -- aggregates -- + sum(payments) AS payments, + sum(amount) AS amount, + uniqMerge(uniq_payer) AS unique_payers, + uniqMerge(uniq_tx_from) AS unique_tx_from, + uniqMerge(uniq_tx_hash) AS unique_tx_hash +FROM state_x402 +GROUP BY + facilitator, + recipient, + asset, + transfer_method, + settlement_source, + scheme; diff --git a/evm-x402/clickhouse/substreams.yaml b/evm-x402/clickhouse/substreams.yaml new file mode 100644 index 0000000..c60b8f1 --- /dev/null +++ b/evm-x402/clickhouse/substreams.yaml @@ -0,0 +1,29 @@ +specVersion: v0.1.0 +package: + name: evm_clickhouse_x402 + version: v0.1.0 + url: https://github.com/pinax-network/substreams-evm + description: x402 EVM payment settlements (ClickHouse) + image: ../../image.png + +imports: + sql: ../../spkg/substreams-sink-sql-protodefs-v1.0.7.spkg + db: ../../spkg/evm-x402-v0.1.0.spkg + +modules: + - name: db_out + use: db:db_out + +sink: + module: db_out + type: sf.substreams.sink.sql.v1.Service + config: + schema: "./schema.sql" + engine: clickhouse + postgraphile_frontend: + enabled: false + +network: mainnet + +params: + db_out: "hex" diff --git a/evm-x402/src/lib.rs b/evm-x402/src/lib.rs new file mode 100644 index 0000000..aa550ce --- /dev/null +++ b/evm-x402/src/lib.rs @@ -0,0 +1,94 @@ +use common::{bytes_to_hex, bytes_to_string, handle_encoding_param}; +use proto::pb::evm::x402::v1 as pb; +use substreams::errors::Error; +use substreams::pb::substreams::Clock; +use substreams_database_change::pb::database::DatabaseChanges; +use substreams_database_change::tables::Tables; + +#[substreams::handlers::map] +pub fn db_out(params: String, clock: Clock, events: pb::Events) -> Result { + let encoding = handle_encoding_param(¶ms); + let mut tables = Tables::new(); + + let seconds = clock.timestamp.as_ref().map(|ts| ts.seconds).unwrap_or_default(); + + for transaction in events.transactions.iter() { + for (log_index, log) in transaction.logs.iter().enumerate() { + let Some(pb::log::Log::Payment(payment)) = &log.log else { + continue; + }; + + let key = [("timestamp", seconds.to_string()), ("block_num", clock.number.to_string())]; + let row = tables.create_row("x402_payments", key); + + row.set("block_num", clock.number); + row.set("block_hash", format!("0x{}", clock.id)); + row.set("timestamp", seconds); + row.set("tx_index", transaction.index); + row.set("tx_hash", bytes_to_hex(&transaction.hash)); + row.set("tx_from", bytes_to_string(&transaction.from, &encoding)); + row.set( + "tx_to", + transaction.to.as_ref().map(|address| bytes_to_string(address, &encoding)).unwrap_or_default(), + ); + row.set("tx_nonce", transaction.nonce); + row.set("tx_gas_price", &transaction.gas_price); + row.set("tx_gas_limit", transaction.gas_limit); + row.set("tx_gas_used", transaction.gas_used); + row.set("tx_value", &transaction.value); + + row.set("log_index", log_index as u32); + row.set("log_block_index", log.block_index); + row.set("log_address", bytes_to_string(&log.address, &encoding)); + row.set("log_ordinal", log.ordinal); + row.set("log_topics", log.topics.iter().map(|topic| bytes_to_hex(topic)).collect::>().join(",")); + row.set("log_data", bytes_to_hex(&log.data)); + + let call = log.call.as_ref(); + row.set("call_caller", call.map(|call| bytes_to_string(&call.caller, &encoding)).unwrap_or_default()); + row.set("call_index", call.map(|call| call.index).unwrap_or_default()); + row.set("call_begin_ordinal", call.map(|call| call.begin_ordinal).unwrap_or_default()); + row.set("call_end_ordinal", call.map(|call| call.end_ordinal).unwrap_or_default()); + row.set("call_address", call.map(|call| bytes_to_string(&call.address, &encoding)).unwrap_or_default()); + row.set("call_value", call.map(|call| call.value.as_str()).unwrap_or_default()); + row.set("call_gas_consumed", call.map(|call| call.gas_consumed).unwrap_or_default()); + row.set("call_gas_limit", call.map(|call| call.gas_limit).unwrap_or_default()); + row.set("call_depth", call.map(|call| call.depth).unwrap_or_default()); + row.set("call_parent_index", call.map(|call| call.parent_index).unwrap_or_default()); + row.set( + "call_type", + call.map(|call| pb::CallType::try_from(call.call_type).unwrap_or_default().as_str_name()) + .unwrap_or_default(), + ); + + row.set("asset", bytes_to_string(&payment.asset, &encoding)); + row.set("payer", bytes_to_string(&payment.payer, &encoding)); + row.set("recipient", bytes_to_string(&payment.recipient, &encoding)); + row.set("facilitator", bytes_to_string(&payment.facilitator, &encoding)); + row.set("amount", &payment.amount); + row.set("nonce", bytes_to_hex(&payment.nonce)); + row.set( + "transfer_method", + pb::TransferMethod::try_from(payment.transfer_method).unwrap_or_default().as_str_name(), + ); + row.set( + "settlement_source", + pb::SettlementSource::try_from(payment.settlement_source).unwrap_or_default().as_str_name(), + ); + row.set("scheme", &payment.scheme); + row.set("valid_after", payment.valid_after.as_deref().unwrap_or("0")); + row.set("valid_before", payment.valid_before.as_deref().unwrap_or("0")); + row.set("facilitator_allowlist_matched", payment.facilitator_allowlist_matched); + row.set("confidence", &payment.confidence); + } + } + + if !tables.tables.is_empty() { + let row = tables.create_row("blocks", [("block_num", clock.number.to_string())]); + row.set("block_num", clock.number); + row.set("block_hash", format!("0x{}", clock.id)); + row.set("timestamp", seconds); + } + + Ok(tables.to_database_changes()) +} diff --git a/evm-x402/substreams.yaml b/evm-x402/substreams.yaml new file mode 100644 index 0000000..7b1489f --- /dev/null +++ b/evm-x402/substreams.yaml @@ -0,0 +1,31 @@ +specVersion: v0.1.0 +package: + name: evm_x402 + version: v0.1.0 + url: https://github.com/pinax-network/substreams-evm + description: x402 EVM payment settlement database changes + image: ../image.png + +imports: + database_changes: ../spkg/substreams-database-change-v2.0.0.spkg + x402: ../spkg/x402-v0.1.0.spkg + +binaries: + default: + type: wasm/rust-v1 + file: ../target/wasm32-unknown-unknown/release/db_evm_x402.wasm + +modules: + - name: db_out + kind: map + inputs: + - params: string + - source: sf.substreams.v1.Clock + - map: x402:map_events + output: + type: proto:sf.substreams.sink.database.v1.DatabaseChanges + +network: mainnet + +params: + db_out: "hex" diff --git a/proto/src/pb/evm.x402.v1.rs b/proto/src/pb/evm.x402.v1.rs new file mode 100644 index 0000000..271797b --- /dev/null +++ b/proto/src/pb/evm.x402.v1.rs @@ -0,0 +1,222 @@ +// @generated +// This file is @generated by prost-build. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Events { + #[prost(message, repeated, tag="1")] + pub transactions: ::prost::alloc::vec::Vec, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Transaction { + #[prost(bytes="vec", tag="1")] + pub hash: ::prost::alloc::vec::Vec, + #[prost(bytes="vec", tag="2")] + pub from: ::prost::alloc::vec::Vec, + #[prost(bytes="vec", optional, tag="3")] + pub to: ::core::option::Option<::prost::alloc::vec::Vec>, + #[prost(uint32, tag="4")] + pub index: u32, + #[prost(uint64, tag="5")] + pub nonce: u64, + /// uint256 + #[prost(string, tag="6")] + pub gas_price: ::prost::alloc::string::String, + #[prost(uint64, tag="7")] + pub gas_limit: u64, + #[prost(uint64, tag="8")] + pub gas_used: u64, + /// uint256 + #[prost(string, tag="9")] + pub value: ::prost::alloc::string::String, + #[prost(message, repeated, tag="10")] + pub logs: ::prost::alloc::vec::Vec, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Log { + #[prost(bytes="vec", tag="1")] + pub address: ::prost::alloc::vec::Vec, + #[prost(uint64, tag="2")] + pub ordinal: u64, + #[prost(bytes="vec", repeated, tag="3")] + pub topics: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, + #[prost(bytes="vec", tag="4")] + pub data: ::prost::alloc::vec::Vec, + /// Call metadata (only available on chains with DetailLevel: EXTENDED) + #[prost(message, optional, tag="5")] + pub call: ::core::option::Option, + /// Native block/log position fields + #[prost(uint32, tag="6")] + pub block_index: u32, + #[prost(oneof="log::Log", tags="10")] + pub log: ::core::option::Option, +} +/// Nested message and enum types in `Log`. +pub mod log { + #[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Log { + #[prost(message, tag="10")] + Payment(super::Payment), + } +} +/// Call metadata (only available on chains with DetailLevel: EXTENDED) +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Call { + #[prost(uint32, tag="1")] + pub index: u32, + #[prost(uint64, tag="2")] + pub begin_ordinal: u64, + #[prost(uint64, tag="3")] + pub end_ordinal: u64, + #[prost(bytes="vec", tag="4")] + pub caller: ::prost::alloc::vec::Vec, + #[prost(bytes="vec", tag="5")] + pub address: ::prost::alloc::vec::Vec, + /// uint256 + #[prost(string, tag="6")] + pub value: ::prost::alloc::string::String, + #[prost(uint64, tag="7")] + pub gas_consumed: u64, + #[prost(uint64, tag="8")] + pub gas_limit: u64, + #[prost(uint32, tag="9")] + pub depth: u32, + #[prost(uint32, tag="10")] + pub parent_index: u32, + #[prost(enumeration="CallType", tag="11")] + pub call_type: i32, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Payment { + #[prost(bytes="vec", tag="1")] + pub asset: ::prost::alloc::vec::Vec, + #[prost(bytes="vec", tag="2")] + pub payer: ::prost::alloc::vec::Vec, + #[prost(bytes="vec", tag="3")] + pub recipient: ::prost::alloc::vec::Vec, + #[prost(bytes="vec", tag="4")] + pub facilitator: ::prost::alloc::vec::Vec, + /// uint256 + #[prost(string, tag="5")] + pub amount: ::prost::alloc::string::String, + #[prost(bytes="vec", tag="6")] + pub nonce: ::prost::alloc::vec::Vec, + #[prost(enumeration="TransferMethod", tag="10")] + pub transfer_method: i32, + #[prost(enumeration="SettlementSource", tag="11")] + pub settlement_source: i32, + #[prost(string, tag="12")] + pub scheme: ::prost::alloc::string::String, + #[prost(string, optional, tag="13")] + pub valid_after: ::core::option::Option<::prost::alloc::string::String>, + #[prost(string, optional, tag="14")] + pub valid_before: ::core::option::Option<::prost::alloc::string::String>, + #[prost(bool, tag="20")] + pub facilitator_allowlist_matched: bool, + #[prost(string, tag="21")] + pub confidence: ::prost::alloc::string::String, +} +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum TransferMethod { + Unspecified = 0, + Eip3009 = 1, + Permit2 = 2, +} +impl TransferMethod { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + TransferMethod::Unspecified => "TRANSFER_METHOD_UNSPECIFIED", + TransferMethod::Eip3009 => "TRANSFER_METHOD_EIP3009", + TransferMethod::Permit2 => "TRANSFER_METHOD_PERMIT2", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "TRANSFER_METHOD_UNSPECIFIED" => Some(Self::Unspecified), + "TRANSFER_METHOD_EIP3009" => Some(Self::Eip3009), + "TRANSFER_METHOD_PERMIT2" => Some(Self::Permit2), + _ => None, + } + } +} +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum SettlementSource { + Unspecified = 0, + AuthorizationUsed = 1, + Permit2Settled = 2, + Permit2SettledWithPermit = 3, +} +impl SettlementSource { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + SettlementSource::Unspecified => "SETTLEMENT_SOURCE_UNSPECIFIED", + SettlementSource::AuthorizationUsed => "SETTLEMENT_SOURCE_AUTHORIZATION_USED", + SettlementSource::Permit2Settled => "SETTLEMENT_SOURCE_PERMIT2_SETTLED", + SettlementSource::Permit2SettledWithPermit => "SETTLEMENT_SOURCE_PERMIT2_SETTLED_WITH_PERMIT", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "SETTLEMENT_SOURCE_UNSPECIFIED" => Some(Self::Unspecified), + "SETTLEMENT_SOURCE_AUTHORIZATION_USED" => Some(Self::AuthorizationUsed), + "SETTLEMENT_SOURCE_PERMIT2_SETTLED" => Some(Self::Permit2Settled), + "SETTLEMENT_SOURCE_PERMIT2_SETTLED_WITH_PERMIT" => Some(Self::Permit2SettledWithPermit), + _ => None, + } + } +} +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum CallType { + Unspecified = 0, + Call = 1, + Callcode = 2, + Delegate = 3, + Static = 4, + Create = 5, +} +impl CallType { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + CallType::Unspecified => "CALL_TYPE_UNSPECIFIED", + CallType::Call => "CALL_TYPE_CALL", + CallType::Callcode => "CALL_TYPE_CALLCODE", + CallType::Delegate => "CALL_TYPE_DELEGATE", + CallType::Static => "CALL_TYPE_STATIC", + CallType::Create => "CALL_TYPE_CREATE", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "CALL_TYPE_UNSPECIFIED" => Some(Self::Unspecified), + "CALL_TYPE_CALL" => Some(Self::Call), + "CALL_TYPE_CALLCODE" => Some(Self::Callcode), + "CALL_TYPE_DELEGATE" => Some(Self::Delegate), + "CALL_TYPE_STATIC" => Some(Self::Static), + "CALL_TYPE_CREATE" => Some(Self::Create), + _ => None, + } + } +} +// @@protoc_insertion_point(module) diff --git a/proto/src/pb/mod.rs b/proto/src/pb/mod.rs index 94dfe21..17e53f6 100644 --- a/proto/src/pb/mod.rs +++ b/proto/src/pb/mod.rs @@ -125,6 +125,13 @@ pub mod evm { // @@protoc_insertion_point(evm.seaport.v1) } } + pub mod x402 { + // @@protoc_insertion_point(attribute:evm.x402.v1) + pub mod v1 { + include!("evm.x402.v1.rs"); + // @@protoc_insertion_point(evm.x402.v1) + } + } } pub mod kyber_elastic { // @@protoc_insertion_point(attribute:kyber_elastic.v1) diff --git a/proto/substreams.yaml b/proto/substreams.yaml index 94590dd..1bdcdf9 100644 --- a/proto/substreams.yaml +++ b/proto/substreams.yaml @@ -24,6 +24,9 @@ protobuf: - v1/native-transfers.proto - v1/balances.proto + # x402 + - v1/x402.proto + # DEX - v1/dex-swaps.proto - v1/dex/sunpump.proto diff --git a/proto/v1/x402.proto b/proto/v1/x402.proto new file mode 100644 index 0000000..7374c1c --- /dev/null +++ b/proto/v1/x402.proto @@ -0,0 +1,92 @@ +syntax = "proto3"; + +package evm.x402.v1; + +message Events { + repeated Transaction transactions = 1; +} + +message Transaction { + bytes hash = 1; + bytes from = 2; + optional bytes to = 3; + uint32 index = 4; + uint64 nonce = 5; + string gas_price = 6; // uint256 + uint64 gas_limit = 7; + uint64 gas_used = 8; + string value = 9; // uint256 + repeated Log logs = 10; +} + +message Log { + bytes address = 1; + uint64 ordinal = 2; + repeated bytes topics = 3; + bytes data = 4; + + // Call metadata (only available on chains with DetailLevel: EXTENDED) + optional Call call = 5; + + // Native block/log position fields + uint32 block_index = 6; + + oneof log { + Payment payment = 10; + } +} + +// Call metadata (only available on chains with DetailLevel: EXTENDED) +message Call { + uint32 index = 1; + uint64 begin_ordinal = 2; + uint64 end_ordinal = 3; + bytes caller = 4; + bytes address = 5; + string value = 6; // uint256 + uint64 gas_consumed = 7; + uint64 gas_limit = 8; + uint32 depth = 9; + uint32 parent_index = 10; + CallType call_type = 11; +} + +message Payment { + bytes asset = 1; + bytes payer = 2; + bytes recipient = 3; + bytes facilitator = 4; + string amount = 5; // uint256 + bytes nonce = 6; + + TransferMethod transfer_method = 10; + SettlementSource settlement_source = 11; + string scheme = 12; + optional string valid_after = 13; + optional string valid_before = 14; + + bool facilitator_allowlist_matched = 20; + string confidence = 21; +} + +enum TransferMethod { + TRANSFER_METHOD_UNSPECIFIED = 0; + TRANSFER_METHOD_EIP3009 = 1; + TRANSFER_METHOD_PERMIT2 = 2; +} + +enum SettlementSource { + SETTLEMENT_SOURCE_UNSPECIFIED = 0; + SETTLEMENT_SOURCE_AUTHORIZATION_USED = 1; + SETTLEMENT_SOURCE_PERMIT2_SETTLED = 2; + SETTLEMENT_SOURCE_PERMIT2_SETTLED_WITH_PERMIT = 3; +} + +enum CallType { + CALL_TYPE_UNSPECIFIED = 0; + CALL_TYPE_CALL = 1; + CALL_TYPE_CALLCODE = 2; + CALL_TYPE_DELEGATE = 3; + CALL_TYPE_STATIC = 4; + CALL_TYPE_CREATE = 5; +} diff --git a/spkg/evm-clickhouse-x402-v0.1.0.spkg b/spkg/evm-clickhouse-x402-v0.1.0.spkg new file mode 100644 index 0000000..e01464d Binary files /dev/null and b/spkg/evm-clickhouse-x402-v0.1.0.spkg differ diff --git a/spkg/evm-x402-v0.1.0.spkg b/spkg/evm-x402-v0.1.0.spkg new file mode 100644 index 0000000..6eff4df Binary files /dev/null and b/spkg/evm-x402-v0.1.0.spkg differ diff --git a/spkg/x402-v0.1.0.spkg b/spkg/x402-v0.1.0.spkg new file mode 100644 index 0000000..22a3244 Binary files /dev/null and b/spkg/x402-v0.1.0.spkg differ diff --git a/x402/Cargo.toml b/x402/Cargo.toml new file mode 100644 index 0000000..b507475 --- /dev/null +++ b/x402/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "x402" +description = { workspace = true } +edition = { workspace = true } +version = { workspace = true } +authors = ["Denis "] + +[lib] +crate-type = ["cdylib"] + +[dependencies] +substreams = { workspace = true } +substreams-ethereum = { workspace = true } +substreams-abis = { workspace = true } +proto = { path = "../proto" } diff --git a/x402/Makefile b/x402/Makefile new file mode 100644 index 0000000..f1f73e8 --- /dev/null +++ b/x402/Makefile @@ -0,0 +1,29 @@ +ENDPOINT ?= base.substreams.pinax.network:443 +START_BLOCK ?= 45690000 +STOP_BLOCK ?= +1000 +PARALLEL_JOBS ?= 500 +.DEFAULT_GOAL := pack + +.PHONY: build +build: + cargo build -p x402 --target wasm32-unknown-unknown --release + +.PHONY: pack +pack: build + substreams pack -o ../spkg/{spkgDefaultName} + +.PHONY: noop +noop: build + substreams-sink-noop $(ENDPOINT) substreams.yaml map_events -H "X-Sf-Substreams-Parallel-Jobs: $(PARALLEL_JOBS)" : + +.PHONY: run +run: build + substreams run -e $(ENDPOINT) substreams.yaml map_events -s $(START_BLOCK) -t $(STOP_BLOCK) + +.PHONY: gui +gui: build + substreams gui -e $(ENDPOINT) substreams.yaml map_events -s $(START_BLOCK) --network mainnet + +.PHONY: prod +prod: build + substreams gui -e $(ENDPOINT) substreams.yaml map_events -s $(START_BLOCK) -t 0 --limit-processed-blocks 0 --production-mode -H "X-Sf-Substreams-Parallel-Jobs: $(PARALLEL_JOBS)" diff --git a/x402/README.md b/x402/README.md new file mode 100644 index 0000000..a8f0da4 --- /dev/null +++ b/x402/README.md @@ -0,0 +1,12 @@ +# x402 + +`x402` extracts settled x402 payment events from EVM chains. + +It currently supports: + +- EIP-3009 settlements by joining `AuthorizationUsed(authorizer, nonce)`, a matching ERC-20 `Transfer`, and decoded `transferWithAuthorization` calldata when traces are available. +- Permit2 settlements from the canonical `x402ExactPermit2Proxy` events `Settled()` and `SettledWithPermit()`, joined to the token transfer in the same transaction. + +The module only sees onchain settlement. HTTP resources, verification attempts, KYT failures, and fulfillment status require facilitator or resource-server logs. + +`map_events` does not apply facilitator filtering. It emits all onchain settlement candidates so stricter facilitator rules can be applied later in ClickHouse queries. diff --git a/x402/src/lib.rs b/x402/src/lib.rs new file mode 100644 index 0000000..1cb022d --- /dev/null +++ b/x402/src/lib.rs @@ -0,0 +1,371 @@ +use proto::pb::evm::x402::v1 as pb; +use substreams::errors::Error; +use substreams::hex; +use substreams_abis::standard::erc20::events as erc20_events; +use substreams_abis::tokens::erc20::usdc::fiattoken_v2_2::{events as usdc_events, functions as usdc_functions}; +use substreams_ethereum::pb::eth::v2::{Block, Call, Log, TransactionTrace}; +use substreams_ethereum::Event; + +const DEFAULT_X402_PERMIT2_PROXY: [u8; 20] = hex!("402085c248eea27d92e8b30b2c58ed07f9e20001"); +const TRANSFER_TOPIC: [u8; 32] = hex!("ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"); +const SETTLED_TOPIC: [u8; 32] = hex!("97088ec3606cfe8cc112180570d03fcde05f9b8e1bfef8e27784eaf5dd5691b6"); +const SETTLED_WITH_PERMIT_TOPIC: [u8; 32] = hex!("de5b89d10fc800c459329c382fabfcad0be0ed7e5328e01fae04e507b09ef5d8"); +const TRANSFER_WITH_AUTHORIZATION_BYTES_SELECTOR: [u8; 4] = hex!("cf092995"); +const TRANSFER_WITH_AUTHORIZATION_VRS_SELECTOR: [u8; 4] = hex!("e3ee160e"); + +struct DecodedAuthorization { + from: Vec, + to: Vec, + value: String, + valid_after: String, + valid_before: String, + nonce: Vec, + call_index: u32, +} + +struct TransferLog { + from: Vec, + to: Vec, + amount: String, +} + +#[substreams::handlers::map] +pub fn map_events(block: Block) -> Result { + let mut events = pb::Events::default(); + let mut payment_count = 0; + + for (tx_index, trx) in block.transactions().enumerate() { + let mut transaction = create_transaction(tx_index as u32, trx); + collect_eip3009_payments(trx, &mut transaction); + collect_permit2_payments(trx, &mut transaction); + + if !transaction.logs.is_empty() { + payment_count += transaction.logs.len(); + events.transactions.push(transaction); + } + } + + substreams::log::info!("x402 payments: {}", payment_count); + Ok(events) +} + +fn collect_eip3009_payments(trx: &TransactionTrace, transaction: &mut pb::Transaction) { + let decoded_authorizations = decoded_authorizations_by_call(trx); + let logs_with_calls: Vec<(&Log, Option<&Call>)> = if trx.calls.is_empty() { + trx.receipt().logs().map(|log_view| (log_view.log, None)).collect() + } else { + trx.logs_with_calls().map(|(log, call_view)| (log, Some(call_view.call))).collect() + }; + + for (log, call) in logs_with_calls.iter() { + let Some(authorization_used) = usdc_events::AuthorizationUsed::match_and_decode(log) else { + continue; + }; + + let decoded = call + .as_ref() + .and_then(|call| decode_transfer_with_authorization(call)) + .or_else(|| find_decoded_authorization(&decoded_authorizations, &log.address, &authorization_used.authorizer, &authorization_used.nonce)); + + let transfer = decoded + .as_ref() + .and_then(|auth| find_matching_transfer(trx, &log.address, &auth.from, &auth.to, &auth.value)) + .or_else(|| find_matching_transfer_by_authorizer(trx, &log.address, &authorization_used.authorizer)); + + let Some(transfer) = transfer else { + continue; + }; + + let payer = decoded + .as_ref() + .map(|auth| auth.from.clone()) + .unwrap_or_else(|| authorization_used.authorizer.to_vec()); + let recipient = decoded.as_ref().map(|auth| auth.to.clone()).unwrap_or_else(|| transfer.to.clone()); + let amount = decoded.as_ref().map(|auth| auth.value.clone()).unwrap_or_else(|| transfer.amount.clone()); + let nonce = decoded + .as_ref() + .map(|auth| auth.nonce.clone()) + .unwrap_or_else(|| authorization_used.nonce.to_vec()); + + let call = decoded + .as_ref() + .and_then(|auth| trx.calls.iter().find(|call| call.index == auth.call_index)) + .or(*call); + + transaction.logs.push(create_payment_log( + log, + call, + pb::Payment { + asset: log.address.to_vec(), + payer, + recipient, + facilitator: trx.from.to_vec(), + amount, + nonce, + transfer_method: pb::TransferMethod::Eip3009 as i32, + settlement_source: pb::SettlementSource::AuthorizationUsed as i32, + scheme: "exact".to_string(), + valid_after: decoded.as_ref().map(|auth| auth.valid_after.clone()), + valid_before: decoded.as_ref().map(|auth| auth.valid_before.clone()), + facilitator_allowlist_matched: false, + confidence: "heuristic".to_string(), + }, + )); + } +} + +fn collect_permit2_payments(trx: &TransactionTrace, transaction: &mut pb::Transaction) { + let logs_with_calls: Vec<(&Log, Option<&Call>)> = if trx.calls.is_empty() { + trx.receipt().logs().map(|log_view| (log_view.log, None)).collect() + } else { + trx.logs_with_calls().map(|(log, call_view)| (log, Some(call_view.call))).collect() + }; + + for (log, call) in logs_with_calls { + if log.address.as_slice() != DEFAULT_X402_PERMIT2_PROXY || log.topics.is_empty() { + continue; + } + + let source = if log.topics[0] == SETTLED_TOPIC { + pb::SettlementSource::Permit2Settled + } else if log.topics[0] == SETTLED_WITH_PERMIT_TOPIC { + pb::SettlementSource::Permit2SettledWithPermit + } else { + continue; + }; + + let Some((asset, transfer)) = find_last_erc20_transfer_before(trx, log.ordinal) else { + continue; + }; + + transaction.logs.push(create_payment_log( + log, + call, + pb::Payment { + asset, + payer: transfer.from, + recipient: transfer.to, + facilitator: trx.from.to_vec(), + amount: transfer.amount, + nonce: vec![], + transfer_method: pb::TransferMethod::Permit2 as i32, + settlement_source: source as i32, + scheme: "exact".to_string(), + valid_after: None, + valid_before: None, + facilitator_allowlist_matched: false, + confidence: "high".to_string(), + }, + )); + } +} + +fn create_transaction(tx_index: u32, trx: &TransactionTrace) -> pb::Transaction { + let gas_price = trx.clone().gas_price.unwrap_or_default().with_decimal(0).to_string(); + let value = trx.clone().value.unwrap_or_default().with_decimal(0).to_string(); + + pb::Transaction { + hash: trx.hash.to_vec(), + from: trx.from.to_vec(), + to: if trx.to.is_empty() { None } else { Some(trx.to.to_vec()) }, + index: tx_index, + nonce: trx.nonce, + gas_price, + gas_limit: trx.gas_limit, + gas_used: trx.receipt().receipt.cumulative_gas_used, + value, + logs: vec![], + } +} + +fn create_payment_log(log: &Log, call: Option<&Call>, payment: pb::Payment) -> pb::Log { + pb::Log { + address: log.address.to_vec(), + ordinal: log.ordinal, + topics: log.topics.iter().map(|topic| topic.to_vec()).collect(), + data: log.data.to_vec(), + call: call.map(create_call), + block_index: log.block_index, + log: Some(pb::log::Log::Payment(payment)), + } +} + +fn create_call(call: &Call) -> pb::Call { + pb::Call { + index: call.index, + begin_ordinal: call.begin_ordinal, + end_ordinal: call.end_ordinal, + caller: call.caller.to_vec(), + address: call.address.to_vec(), + value: call.value.clone().unwrap_or_default().with_decimal(0).to_string(), + gas_consumed: call.gas_consumed, + gas_limit: call.gas_limit, + depth: call.depth, + parent_index: call.parent_index, + call_type: call.call_type, + } +} + +fn decoded_authorizations_by_call(trx: &TransactionTrace) -> Vec<(Vec, DecodedAuthorization)> { + trx.calls + .iter() + .filter_map(|call| decode_transfer_with_authorization(call).map(|decoded| (call.address.to_vec(), decoded))) + .collect() +} + +fn decode_transfer_with_authorization(call: &Call) -> Option { + if is_transfer_with_authorization_vrs_call(call) { + let decoded = usdc_functions::TransferWithAuthorization2::decode(call).ok()?; + return Some(DecodedAuthorization { + from: decoded.from, + to: decoded.to, + value: decoded.value.to_string(), + valid_after: decoded.valid_after.to_string(), + valid_before: decoded.valid_before.to_string(), + nonce: decoded.nonce.to_vec(), + call_index: call.index, + }); + } + + if is_transfer_with_authorization_bytes_call(call) { + let decoded = usdc_functions::TransferWithAuthorization1::decode(call).ok()?; + return Some(DecodedAuthorization { + from: decoded.from, + to: decoded.to, + value: decoded.value.to_string(), + valid_after: decoded.valid_after.to_string(), + valid_before: decoded.valid_before.to_string(), + nonce: decoded.nonce.to_vec(), + call_index: call.index, + }); + } + + None +} + +fn find_decoded_authorization( + decoded_authorizations: &[(Vec, DecodedAuthorization)], + asset: &[u8], + authorizer: &[u8], + nonce: &[u8], +) -> Option { + decoded_authorizations + .iter() + .find(|(call_address, auth)| call_address.as_slice() == asset && auth.from.as_slice() == authorizer && auth.nonce.as_slice() == nonce) + .map(|(_, auth)| DecodedAuthorization { + from: auth.from.clone(), + to: auth.to.clone(), + value: auth.value.clone(), + valid_after: auth.valid_after.clone(), + valid_before: auth.valid_before.clone(), + nonce: auth.nonce.clone(), + call_index: auth.call_index, + }) +} + +fn find_matching_transfer(trx: &TransactionTrace, asset: &[u8], from: &[u8], to: &[u8], amount: &str) -> Option { + trx.receipt().logs().find_map(|log_view| { + let log = log_view.log; + if log.address.as_slice() != asset { + return None; + } + let transfer = decode_erc20_transfer(log)?; + if transfer.from.as_slice() == from && transfer.to.as_slice() == to && transfer.value.to_string() == amount { + Some(TransferLog { + from: transfer.from.to_vec(), + to: transfer.to.to_vec(), + amount: transfer.value.to_string(), + }) + } else { + None + } + }) +} + +fn find_matching_transfer_by_authorizer(trx: &TransactionTrace, asset: &[u8], authorizer: &[u8]) -> Option { + trx.receipt().logs().find_map(|log_view| { + let log = log_view.log; + if log.address.as_slice() != asset { + return None; + } + let transfer = decode_erc20_transfer(log)?; + if transfer.from.as_slice() == authorizer { + Some(TransferLog { + from: transfer.from.to_vec(), + to: transfer.to.to_vec(), + amount: transfer.value.to_string(), + }) + } else { + None + } + }) +} + +fn find_last_erc20_transfer_before(trx: &TransactionTrace, ordinal: u64) -> Option<(Vec, TransferLog)> { + trx.receipt() + .logs() + .filter_map(|log_view| { + let log = log_view.log; + if log.ordinal > ordinal { + return None; + } + let transfer = decode_erc20_transfer(log)?; + Some(( + log.address.to_vec(), + TransferLog { + from: transfer.from.to_vec(), + to: transfer.to.to_vec(), + amount: transfer.value.to_string(), + }, + )) + }) + .last() +} + +fn decode_erc20_transfer(log: &Log) -> Option { + if log.topics.len() < 3 || log.topics[0].as_slice() != TRANSFER_TOPIC || log.data.len() < 32 { + return None; + } + erc20_events::Transfer::match_and_decode(log) +} + +fn is_transfer_with_authorization_vrs_call(call: &Call) -> bool { + call.input.starts_with(&TRANSFER_WITH_AUTHORIZATION_VRS_SELECTOR) && call.input.len() >= 4 + (9 * 32) +} + +fn is_transfer_with_authorization_bytes_call(call: &Call) -> bool { + if !call.input.starts_with(&TRANSFER_WITH_AUTHORIZATION_BYTES_SELECTOR) || call.input.len() < 4 + (7 * 32) { + return false; + } + + let args = &call.input[4..]; + let Some(offset) = read_usize_word(args, 6 * 32) else { + return false; + }; + if offset < 7 * 32 || offset % 32 != 0 { + return false; + } + let Some(length) = read_usize_word(args, offset) else { + return false; + }; + let Some(end) = offset.checked_add(32).and_then(|start| start.checked_add(round_up_to_word(length))) else { + return false; + }; + end <= args.len() +} + +fn read_usize_word(bytes: &[u8], start: usize) -> Option { + let end = start.checked_add(32)?; + let word = bytes.get(start..end)?; + if word[..24].iter().any(|byte| *byte != 0) { + return None; + } + + let mut raw = [0u8; 8]; + raw.copy_from_slice(&word[24..32]); + Some(u64::from_be_bytes(raw) as usize) +} + +fn round_up_to_word(value: usize) -> usize { + value.checked_add(31).map(|value| value / 32 * 32).unwrap_or(usize::MAX) +} diff --git a/x402/substreams.yaml b/x402/substreams.yaml new file mode 100644 index 0000000..74c9551 --- /dev/null +++ b/x402/substreams.yaml @@ -0,0 +1,28 @@ +specVersion: v0.1.0 +package: + name: x402 + version: v0.1.0 + url: https://github.com/pinax-network/substreams-evm + description: x402 EVM payment settlement events + image: ../image.png + +binaries: + default: + type: wasm/rust-v1 + file: ../target/wasm32-unknown-unknown/release/x402.wasm + +protobuf: + files: + - v1/x402.proto + importPaths: + - ../proto + +modules: + - name: map_events + kind: map + inputs: + - source: sf.ethereum.type.v2.Block + output: + type: proto:evm.x402.v1.Events + +network: mainnet