Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ members = [
"evm-balances",
"evm-supply",
"evm-transfers",
"x402",
"evm-x402",
"evm-dex",
"evm-dex-foundational-store",

Expand Down
15 changes: 15 additions & 0 deletions evm-x402/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "db-evm-x402"
description = { workspace = true }
edition = { workspace = true }
version = { workspace = true }

[lib]
crate-type = ["cdylib", "rlib"]

[dependencies]
substreams = { workspace = true }
substreams-database-change = { workspace = true }

proto = { path = "../proto" }
common = { path = "../common" }
29 changes: 29 additions & 0 deletions evm-x402/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
ENDPOINT ?= base.substreams.pinax.network:443
START_BLOCK ?= 45690000
STOP_BLOCK ?= +1000
PARALLEL_JOBS ?= 2500
.DEFAULT_GOAL := pack

.PHONY: build
build:
cargo build -p db-evm-x402 --target wasm32-unknown-unknown --release

.PHONY: pack
pack: build
substreams pack -o ../spkg/{spkgDefaultName}

.PHONY: noop
noop: build
substreams-sink-noop $(ENDPOINT) substreams.yaml db_out -H "X-Sf-Substreams-Parallel-Jobs: $(PARALLEL_JOBS)" :

.PHONY: run
run: build
substreams run -e $(ENDPOINT) substreams.yaml db_out -s $(START_BLOCK) -t $(STOP_BLOCK)

.PHONY: gui
gui: build
substreams gui -e $(ENDPOINT) substreams.yaml db_out -s $(START_BLOCK) --network mainnet

.PHONY: prod
prod: build
substreams gui -e $(ENDPOINT) substreams.yaml db_out -s $(START_BLOCK) -t 0 --limit-processed-blocks 0 --production-mode -H "X-Sf-Substreams-Parallel-Jobs: $(PARALLEL_JOBS)"
5 changes: 5 additions & 0 deletions evm-x402/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# evm-x402

`evm-x402` maps normalized `x402` EVM payment events into database changes.

The event extractor lives in the sibling `x402` package. This package imports `x402-v0.1.0.spkg` and exposes `db_out` for SQL sinks.
26 changes: 26 additions & 0 deletions evm-x402/clickhouse/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
ENDPOINT ?= base.substreams.pinax.network:443
START_BLOCK ?= 45690000
STOP_BLOCK ?= 45691000
PARALLEL_JOBS ?= 1000
.DEFAULT_GOAL := pack

.PHONY: schema
schema:
@echo "-- This file is generated. Do not edit." > schema.sql
@echo "" >> schema.sql
@for f in $$(ls schema.*.sql 2>/dev/null | sort); do \
cat "$$f" >> schema.sql; \
echo "" >> schema.sql; \
done

.PHONY: pack
pack: schema
substreams pack -o ../../spkg/{spkgDefaultName}

.PHONY: dev
dev: pack
substreams-sink-sql run clickhouse://default:@localhost:9000/default substreams.yaml -e $(ENDPOINT) $(START_BLOCK):$(STOP_BLOCK) --undo-buffer-size 0 --on-module-hash-mistmatch=warn --batch-block-flush-interval 1 --live-block-flush-interval 1 --infinite-retry --development-mode

.PHONY: setup
setup: pack
substreams-sink-sql setup clickhouse://default:@localhost:9000/default substreams.yaml
7 changes: 7 additions & 0 deletions evm-x402/clickhouse/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# evm-x402 ClickHouse

ClickHouse sink package for settled x402 EVM payments.

The sink writes individual settlements into `x402_payments` with block, transaction, log, and call metadata.

It maintains materialized aggregate states for facilitator-first totals, recipient-first totals, and time-windowed volume grouped by facilitator, recipient, asset, and settlement type.
13 changes: 13 additions & 0 deletions evm-x402/clickhouse/schema.0.blocks.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
CREATE TABLE IF NOT EXISTS blocks (
block_num UInt32,
block_hash String,
timestamp DateTime(0, 'UTC'),
minute UInt32 MATERIALIZED toRelativeMinuteNum(timestamp),

-- PROJECTIONS --
PROJECTION prj_block_hash ( SELECT * ORDER BY block_hash ),
PROJECTION prj_timestamp ( SELECT * ORDER BY timestamp )
)
ENGINE = MergeTree
ORDER BY ( block_num )
COMMENT 'Blocks';
66 changes: 66 additions & 0 deletions evm-x402/clickhouse/schema.0.templates.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
-- Template Logs --
CREATE TABLE IF NOT EXISTS TEMPLATE_LOG (
-- block --
block_num UInt32,
block_hash String,
timestamp DateTime('UTC'),
minute UInt32 MATERIALIZED toRelativeMinuteNum(timestamp),

-- transaction --
tx_index UInt32, -- derived from Substreams
tx_hash String,
tx_from String,
tx_to LowCardinality(String),
tx_nonce UInt64,
tx_gas_price UInt256,
tx_gas_limit UInt64,
tx_gas_used UInt64,
tx_value UInt256,

-- log --
log_index UInt32, -- derived from Substreams
log_block_index UInt32 COMMENT 'BlockIndex represents the index of the log relative to the Block.',
log_address LowCardinality(String),
log_ordinal UInt64 COMMENT 'The block global ordinal when the log was recorded.',
log_topics String COMMENT 'Comma-separated list of log topics',
log_topic0 String MATERIALIZED splitByChar(',', log_topics)[1], -- event signature
log_topic1 String MATERIALIZED splitByChar(',', log_topics)[2], -- second topic (topic1), empty string if no topics
log_topic2 String MATERIALIZED splitByChar(',', log_topics)[3], -- third topic (topic2), empty string if no topics
log_topic3 String MATERIALIZED splitByChar(',', log_topics)[4], -- fourth topic (topic3), empty string if no topics
log_data String,

-- call metadata (only available on chains with DetailLevel: EXTENDED) --
call_caller String,
call_index UInt32,
call_begin_ordinal UInt64,
call_end_ordinal UInt64,
call_address String,
call_value UInt256,
call_gas_consumed UInt64,
call_gas_limit UInt64,
call_depth UInt32,
call_parent_index UInt32,
call_type LowCardinality(String),

-- indexes --
INDEX idx_timestamp (timestamp) TYPE minmax GRANULARITY 1,
INDEX idx_block_num (block_num) TYPE minmax GRANULARITY 1,

-- count() --
PROJECTION prj_tx_from_count ( SELECT tx_from, count(), min(block_num), max(block_num), min(timestamp), max(timestamp), min(minute), max(minute) GROUP BY tx_from ),
PROJECTION prj_tx_to_count ( SELECT tx_to, count(), min(block_num), max(block_num), min(timestamp), max(timestamp), min(minute), max(minute) GROUP BY tx_to ),
PROJECTION prj_tx_hash_count ( SELECT tx_hash, count(), min(block_num), max(block_num), min(timestamp), max(timestamp), min(minute), max(minute) GROUP BY tx_hash ),
PROJECTION prj_log_address_count ( SELECT log_address, count(), min(block_num), max(block_num), min(timestamp), max(timestamp), min(minute), max(minute) GROUP BY log_address ),

-- minute + timestamp --
PROJECTION prj_tx_hash_by_timestamp ( SELECT tx_hash, minute, timestamp GROUP BY tx_hash, minute, timestamp ),

-- minute --
PROJECTION prj_tx_from_by_minute ( SELECT tx_from, minute GROUP BY tx_from, minute ),
PROJECTION prj_tx_to_by_minute ( SELECT tx_to, minute GROUP BY tx_to, minute ),
PROJECTION prj_log_address_by_minute ( SELECT log_address, minute GROUP BY log_address, minute )
)
ENGINE = MergeTree
ORDER BY (
timestamp, block_num
);
40 changes: 40 additions & 0 deletions evm-x402/clickhouse/schema.1.table.x402_payments.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
-- Normalized x402 payment settlement logs written directly by `evm-x402`
CREATE TABLE IF NOT EXISTS x402_payments AS TEMPLATE_LOG
COMMENT 'Settled x402 payment events on EVM chains';

ALTER TABLE x402_payments
-- payment --
ADD COLUMN IF NOT EXISTS asset LowCardinality(String),
ADD COLUMN IF NOT EXISTS payer String,
ADD COLUMN IF NOT EXISTS recipient String,
ADD COLUMN IF NOT EXISTS facilitator String,
ADD COLUMN IF NOT EXISTS amount UInt256,
ADD COLUMN IF NOT EXISTS nonce String,
ADD COLUMN IF NOT EXISTS transfer_method LowCardinality(String),
ADD COLUMN IF NOT EXISTS settlement_source LowCardinality(String),
ADD COLUMN IF NOT EXISTS scheme LowCardinality(String),
ADD COLUMN IF NOT EXISTS valid_after UInt256,
ADD COLUMN IF NOT EXISTS valid_before UInt256,
ADD COLUMN IF NOT EXISTS facilitator_allowlist_matched Bool,
ADD COLUMN IF NOT EXISTS confidence LowCardinality(String),

-- INDEXES --
ADD INDEX IF NOT EXISTS idx_amount (amount) TYPE minmax GRANULARITY 1,

-- PROJECTIONS --
-- count() --
ADD PROJECTION IF NOT EXISTS prj_facilitator_count ( SELECT facilitator, count(), min(block_num), max(block_num), min(timestamp), max(timestamp), min(minute), max(minute) GROUP BY facilitator ),
ADD PROJECTION IF NOT EXISTS prj_recipient_count ( SELECT recipient, count(), min(block_num), max(block_num), min(timestamp), max(timestamp), min(minute), max(minute) GROUP BY recipient ),
ADD PROJECTION IF NOT EXISTS prj_payer_count ( SELECT payer, count(), min(block_num), max(block_num), min(timestamp), max(timestamp), min(minute), max(minute) GROUP BY payer ),
ADD PROJECTION IF NOT EXISTS prj_asset_count ( SELECT asset, count(), min(block_num), max(block_num), min(timestamp), max(timestamp), min(minute), max(minute) GROUP BY asset ),
ADD PROJECTION IF NOT EXISTS prj_transfer_method_count ( SELECT transfer_method, count(), min(block_num), max(block_num), min(timestamp), max(timestamp), min(minute), max(minute) GROUP BY transfer_method ),
ADD PROJECTION IF NOT EXISTS prj_settlement_source_count ( SELECT settlement_source, count(), min(block_num), max(block_num), min(timestamp), max(timestamp), min(minute), max(minute) GROUP BY settlement_source ),

-- minute + timestamp --
ADD PROJECTION IF NOT EXISTS prj_tx_hash_by_timestamp ( SELECT tx_hash, minute, timestamp GROUP BY tx_hash, minute, timestamp ),

-- minute --
ADD PROJECTION IF NOT EXISTS prj_facilitator_by_minute ( SELECT facilitator, minute GROUP BY facilitator, minute ),
ADD PROJECTION IF NOT EXISTS prj_recipient_by_minute ( SELECT recipient, minute GROUP BY recipient, minute ),
ADD PROJECTION IF NOT EXISTS prj_payer_by_minute ( SELECT payer, minute GROUP BY payer, minute ),
ADD PROJECTION IF NOT EXISTS prj_asset_by_minute ( SELECT asset, minute GROUP BY asset, minute );
101 changes: 101 additions & 0 deletions evm-x402/clickhouse/schema.2.mv.state_x402.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
-- x402 totals grouped for facilitator-first access patterns
CREATE TABLE IF NOT EXISTS state_x402 (
-- timestamp & block number --
min_timestamp SimpleAggregateFunction(min, DateTime('UTC', 0)) COMMENT 'first timestamp seen',
max_timestamp SimpleAggregateFunction(max, DateTime('UTC', 0)) COMMENT 'last timestamp seen',
min_block_num SimpleAggregateFunction(min, UInt32) COMMENT 'first block number seen',
max_block_num SimpleAggregateFunction(max, UInt32) COMMENT 'last block number seen',

-- x402 identity --
facilitator String,
recipient String,
asset LowCardinality(String),
transfer_method LowCardinality(String),
settlement_source LowCardinality(String),
scheme LowCardinality(String),

-- aggregates --
payments SimpleAggregateFunction(sum, UInt64),
amount SimpleAggregateFunction(sum, UInt256),
uniq_payer AggregateFunction(uniq, String),
uniq_tx_from AggregateFunction(uniq, String),
uniq_tx_hash AggregateFunction(uniq, String),

-- indexes --
INDEX idx_min_timestamp (min_timestamp) TYPE minmax GRANULARITY 1,
INDEX idx_max_timestamp (max_timestamp) TYPE minmax GRANULARITY 1,
INDEX idx_min_block_num (min_block_num) TYPE minmax GRANULARITY 1,
INDEX idx_max_block_num (max_block_num) TYPE minmax GRANULARITY 1,
INDEX idx_facilitator (facilitator) TYPE bloom_filter GRANULARITY 1,
INDEX idx_recipient (recipient) TYPE bloom_filter GRANULARITY 1,
INDEX idx_asset (asset) TYPE set(1024) GRANULARITY 1,
INDEX idx_payments (payments) TYPE minmax GRANULARITY 1,
INDEX idx_amount (amount) TYPE minmax GRANULARITY 1,

-- projections --
PROJECTION prj_group_by_facilitator (
SELECT
min(min_timestamp),
max(max_timestamp),
min(min_block_num),
max(max_block_num),
facilitator,
asset,
transfer_method,
settlement_source,
scheme,
sum(payments),
sum(amount),
uniqMerge(uniq_payer),
uniqMerge(uniq_tx_from),
uniqMerge(uniq_tx_hash)
GROUP BY facilitator, asset, transfer_method, settlement_source, scheme
),
PROJECTION prj_group_by_facilitator_recipient (
SELECT
min(min_timestamp),
max(max_timestamp),
min(min_block_num),
max(max_block_num),
facilitator,
recipient,
asset,
transfer_method,
settlement_source,
scheme,
sum(payments),
sum(amount),
uniqMerge(uniq_payer),
uniqMerge(uniq_tx_from),
uniqMerge(uniq_tx_hash)
GROUP BY facilitator, recipient, asset, transfer_method, settlement_source, scheme
)
)
ENGINE = AggregatingMergeTree
ORDER BY (facilitator, recipient, asset, transfer_method, settlement_source, scheme)
SETTINGS deduplicate_merge_projection_mode = 'rebuild'
COMMENT 'x402 totals grouped by facilitator first and recipient second';

CREATE MATERIALIZED VIEW IF NOT EXISTS mv_state_x402
TO state_x402
AS
SELECT
min(timestamp) AS min_timestamp,
max(timestamp) AS max_timestamp,
min(block_num) AS min_block_num,
max(block_num) AS max_block_num,

facilitator,
recipient,
asset,
transfer_method,
settlement_source,
scheme,

count() AS payments,
sum(amount) AS amount,
uniqState(payer) AS uniq_payer,
uniqState(tx_from) AS uniq_tx_from,
uniqState(tx_hash) AS uniq_tx_hash
FROM x402_payments
GROUP BY facilitator, recipient, asset, transfer_method, settlement_source, scheme;
31 changes: 31 additions & 0 deletions evm-x402/clickhouse/schema.3.view.state_x402.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
-- Finalized read view over the AggregatingMergeTree state table
CREATE VIEW IF NOT EXISTS x402_state AS
SELECT
-- timestamp & block number --
min(min_timestamp) AS min_timestamp,
max(max_timestamp) AS max_timestamp,
min(min_block_num) AS min_block_num,
max(max_block_num) AS max_block_num,

-- x402 identity --
facilitator,
recipient,
asset,
transfer_method,
settlement_source,
scheme,

-- aggregates --
sum(payments) AS payments,
sum(amount) AS amount,
uniqMerge(uniq_payer) AS unique_payers,
uniqMerge(uniq_tx_from) AS unique_tx_from,
uniqMerge(uniq_tx_hash) AS unique_tx_hash
FROM state_x402
GROUP BY
facilitator,
recipient,
asset,
transfer_method,
settlement_source,
scheme;
Loading
Loading