From 9abc534f3e49fed212375746018edd5a4d5e2b3e Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Fri, 24 Apr 2026 14:38:04 -0400 Subject: [PATCH 1/6] Write price data to iceberg with backfill --- Cargo.lock | 5 + price/Cargo.toml | 6 + price/migrations/1_files_processed.sql | 7 + price/pkg/settings-template.toml | 24 +++ price/src/backfill.rs | 230 +++++++++++++++++++++++++ price/src/iceberg/mod.rs | 39 +++++ price/src/iceberg/price_report.rs | 43 +++++ price/src/lib.rs | 2 + price/src/main.rs | 28 ++- price/src/price_generator.rs | 32 +++- price/src/settings.rs | 40 +++++ 11 files changed, 450 insertions(+), 6 deletions(-) create mode 100644 price/migrations/1_files_processed.sql create mode 100644 price/src/backfill.rs create mode 100644 price/src/iceberg/mod.rs create mode 100644 price/src/iceberg/price_report.rs diff --git a/Cargo.lock b/Cargo.lock index 059ef00d7..b5eb6d950 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6288,23 +6288,28 @@ dependencies = [ "clap", "config", "custom-tracing", + "db-store", "file-store", "file-store-oracles", "futures", + "helium-iceberg", "helium-proto", "humantime-serde", "metrics", "poc-metrics", + "prost", "reqwest 0.12.28", "rust_decimal", "serde", "serde_json", + "sqlx", "task-manager", "temp-env", "tls-init", "tokio", "tracing", "triggered", + "trino-rust-client", ] [[package]] diff --git a/price/Cargo.toml b/price/Cargo.toml index 39c68e515..41acde057 100644 --- a/price/Cargo.toml +++ b/price/Cargo.toml @@ -19,16 +19,22 @@ metrics = { workspace = true } tokio = { workspace = true } chrono = { workspace = true } helium-proto = { workspace = true } +prost = { workspace = true } rust_decimal = { workspace = true } +sqlx = { workspace = true } triggered = { workspace = true } humantime-serde = { workspace = true } custom-tracing = { path = "../custom_tracing" } +db-store = { path = "../db_store" } file-store = { path = "../file_store" } file-store-oracles = { path = "../file_store_oracles" } poc-metrics = { path = "../metrics" } task-manager = { path = "../task_manager" } tls-init = { path = "../tls_init" } +helium-iceberg = { path = "../helium_iceberg", features = ["test-harness"] } +trino-rust-client = { version = "0.9" } + [dev-dependencies] temp-env = "0.3.6" diff --git a/price/migrations/1_files_processed.sql b/price/migrations/1_files_processed.sql new file mode 100644 index 000000000..be836fa43 --- /dev/null +++ b/price/migrations/1_files_processed.sql @@ -0,0 +1,7 @@ +CREATE TABLE files_processed ( + process_name TEXT NOT NULL DEFAULT 'default', + file_name VARCHAR PRIMARY KEY, + file_type VARCHAR NOT NULL, + file_timestamp TIMESTAMPTZ NOT NULL, + processed_at TIMESTAMPTZ NOT NULL +); diff --git a/price/pkg/settings-template.toml b/price/pkg/settings-template.toml index c688e9fe1..6c2836fc5 100644 --- a/price/pkg/settings-template.toml +++ b/price/pkg/settings-template.toml @@ -56,3 +56,27 @@ interval = "60 seconds" # Prometheus scrape endpoint. Default below. # # endpoint = "127.0.0.1:19000" + + +# Database settings. Required only when running the `backfill` subcommand; +# the `server` path does not open any DB connections. Used by the +# file_info_poller to track which S3 files have already been processed. +# +# [database] +# url = "postgres://postgres:postgres@localhost/price" +# max_connections = 10 + + +# Iceberg catalog settings. When provided, the `server` also writes each +# live tick into the Iceberg table `rewards.price`. Required by `backfill`. +# +# [iceberg_settings] +# catalog_uri = "http://iceberg-catalog:8181" +# catalog_name = "nova" +# warehouse = "s3://warehouse-bucket/iceberg" +# +# [iceberg_settings.auth] +# # (optional) OAuth2 / token config +# +# [iceberg_settings.s3] +# region = "us-west-2" diff --git a/price/src/backfill.rs b/price/src/backfill.rs new file mode 100644 index 000000000..36b14f73e --- /dev/null +++ b/price/src/backfill.rs @@ -0,0 +1,230 @@ +use crate::{ + iceberg::{self, IcebergPriceReport, PriceWriter}, + settings::Settings, +}; +use anyhow::{Context, Result}; +use chrono::{DateTime, Utc}; +use file_store::{file_info_poller::FileInfoStream, file_source, FileInfo}; +use file_store_oracles::FileType; +use futures::StreamExt; +use helium_proto::PriceReportV1; +use sqlx::{PgPool, Pool, Postgres}; +use task_manager::{ManagedTask, TaskManager}; +use tokio::sync::mpsc::Receiver; + +#[derive(Debug, clap::Args)] +pub struct Cmd { + /// Process name for tracking backfill progress in `files_processed`. + #[clap(long, default_value = "backfill")] + process_name: String, + + /// Start processing files after this timestamp. + /// Format: RFC 3339 (e.g. 2024-01-01T00:00:00Z) + #[clap(long)] + start_after: DateTime, + + /// Stop processing files when their timestamp is >= this value. + /// Set this to the date Iceberg was first enabled in production so the + /// backfiller does not overlap with the daemon's real-time writes. + /// Format: RFC 3339 (e.g. 2026-04-24T00:00:00Z) + #[clap(long)] + stop_after: DateTime, +} + +impl Cmd { + pub async fn run(self, settings: &Settings) -> Result<()> { + let database = settings + .database + .as_ref() + .ok_or_else(|| anyhow::anyhow!("database settings required for backfill"))?; + let iceberg_settings = settings + .iceberg_settings + .as_ref() + .ok_or_else(|| anyhow::anyhow!("iceberg_settings required for backfill"))?; + + let pool = database.connect("price-backfill").await?; + sqlx::migrate!().run(&pool).await?; + + let writer = iceberg::get_writer(iceberg_settings).await?; + + tracing::info!( + process_name = %self.process_name, + start_after = %self.start_after, + stop_after = %self.stop_after, + "starting price backfill" + ); + + let task = PriceReportBackfiller::create_managed_task( + pool, + settings.output_bucket_client().await, + writer, + BackfillOptions { + process_name: self.process_name, + start_after: self.start_after, + stop_after: self.stop_after, + }, + ) + .await?; + + task.start().await?; + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub struct BackfillOptions { + pub process_name: String, + pub start_after: DateTime, + pub stop_after: DateTime, +} + +pub struct PriceReportBackfiller { + pool: Pool, + reports: Receiver>, + writer: PriceWriter, + done: bool, +} + +impl PriceReportBackfiller { + pub fn new( + pool: Pool, + reports: Receiver>, + writer: PriceWriter, + ) -> Self { + Self { + pool, + reports, + writer, + done: false, + } + } + + pub async fn create( + pool: PgPool, + bucket_client: file_store::BucketClient, + writer: PriceWriter, + options: BackfillOptions, + ) -> Result<(Self, impl ManagedTask)> { + let (reports, reports_server) = + file_source::Continuous::prost_source::() + .state(pool.clone()) + .bucket_client(bucket_client) + .prefix(FileType::PriceReport.to_string()) + .lookback_start_after(options.start_after) + .stop_after(options.stop_after) + .process_name(options.process_name) + .create() + .await?; + + Ok(( + PriceReportBackfiller::new(pool, reports, writer), + reports_server, + )) + } + + pub async fn create_managed_task( + pool: PgPool, + bucket_client: file_store::BucketClient, + writer: PriceWriter, + options: BackfillOptions, + ) -> Result { + let (backfiller, server) = Self::create(pool, bucket_client, writer, options).await?; + + Ok(TaskManager::builder() + .add_task(server) + .add_task(backfiller) + .build()) + } + + async fn run(mut self, mut shutdown: triggered::Listener) -> Result<()> { + tracing::info!("price backfiller starting"); + loop { + if self.done { + tracing::info!("price backfiller complete"); + return Ok(()); + } + tokio::select! { + biased; + _ = &mut shutdown => { + tracing::info!("price backfiller shutting down"); + return Ok(()); + } + file = self.reports.recv() => { + self.handle(file).await?; + } + } + } + } + + async fn handle(&mut self, file: Option>) -> Result<()> { + let Some(file_info_stream) = file else { + tracing::info!("price backfiller completed (channel closed)"); + self.done = true; + return Ok(()); + }; + + let file_info = file_info_stream.file_info.clone(); + tracing::info!( + file = %file_info, + timestamp = %file_info.timestamp, + age = %format_file_age(&file_info), + "backfilling price report file" + ); + + let mut txn = self.pool.begin().await?; + let write_id = file_info.key.clone(); + let reports = file_info_stream.into_stream(&mut txn).await?; + let all_reports: Vec<_> = reports.collect().await; + let total = all_reports.len(); + + let iceberg_records: Vec<_> = all_reports + .iter() + .filter_map(|report| match IcebergPriceReport::try_from(report) { + Ok(record) => Some(record), + Err(err) => { + tracing::warn!(?err, "skipping invalid price report"); + None + } + }) + .collect(); + let valid = iceberg_records.len(); + let skipped = total - valid; + + self.writer + .write_idempotent(&write_id, iceberg_records) + .await + .context("writing price reports to iceberg")?; + + txn.commit().await.context("committing db transaction")?; + + tracing::info!( + file = %file_info, + valid, + skipped, + "backfilled price report file" + ); + Ok(()) + } +} + +impl ManagedTask for PriceReportBackfiller { + fn start_task(self: Box, shutdown: triggered::Listener) -> task_manager::TaskFuture { + task_manager::spawn(self.run(shutdown)) + } +} + +fn format_file_age(file_info: &FileInfo) -> String { + let age = Utc::now().signed_duration_since(file_info.timestamp); + let total = age.num_seconds(); + if total < 0 { + return "in the future".to_string(); + } + let days = age.num_days(); + let hours = age.num_hours() % 24; + let minutes = age.num_minutes() % 60; + match (days, hours, minutes) { + (0, 0, m) => format!("{m}m ago"), + (0, h, m) => format!("{h}h {m}m ago"), + (d, h, _) => format!("{d}d {h}h ago"), + } +} diff --git a/price/src/iceberg/mod.rs b/price/src/iceberg/mod.rs new file mode 100644 index 000000000..16c295848 --- /dev/null +++ b/price/src/iceberg/mod.rs @@ -0,0 +1,39 @@ +use anyhow::Context; +use helium_iceberg::{BoxedDataWriter, IntoBoxedDataWriter}; +use serde::Serialize; + +pub mod price_report; + +pub use price_report::IcebergPriceReport; + +pub const NAMESPACE: &str = "rewards"; + +pub type PriceWriter = BoxedDataWriter; + +pub async fn get_writer(settings: &helium_iceberg::Settings) -> anyhow::Result { + let catalog = settings.connect().await.context("connecting to catalog")?; + + catalog.create_namespace_if_not_exists(NAMESPACE).await?; + + let writer = catalog + .create_table_if_not_exists(price_report::table_definition()?) + .await?; + + Ok(writer.boxed()) +} + +/// Optional idempotent append — no-op when `writer` is `None` (iceberg +/// writes are optional in some deployments). +pub async fn maybe_write_idempotent( + writer: Option<&BoxedDataWriter>, + id: &str, + records: Vec, +) -> anyhow::Result<()> { + if let Some(data_writer) = writer { + data_writer + .write_idempotent(id, records) + .await + .context("writing idempotent")?; + } + Ok(()) +} diff --git a/price/src/iceberg/price_report.rs b/price/src/iceberg/price_report.rs new file mode 100644 index 000000000..5116ebddd --- /dev/null +++ b/price/src/iceberg/price_report.rs @@ -0,0 +1,43 @@ +use anyhow::anyhow; +use chrono::{DateTime, FixedOffset, TimeZone, Utc}; +use helium_iceberg::{FieldDefinition, PartitionDefinition, TableDefinition}; +use helium_proto::PriceReportV1; +use serde::{Deserialize, Serialize}; +use trino_rust_client::Trino; + +use super::NAMESPACE; +pub const TABLE_NAME: &str = "price"; + +#[derive(Debug, Clone, Trino, Serialize, Deserialize, PartialEq)] +pub struct IcebergPriceReport { + pub timestamp: DateTime, + pub price: u64, + pub token_type: String, +} + +pub fn table_definition() -> helium_iceberg::Result { + TableDefinition::builder(NAMESPACE, TABLE_NAME) + .with_fields([ + FieldDefinition::required_timestamptz("timestamp"), + FieldDefinition::required_long("price"), + FieldDefinition::required_string("token_type"), + ]) + .with_partition(PartitionDefinition::day("timestamp", "timestamp_day")) + .build() +} + +impl TryFrom<&PriceReportV1> for IcebergPriceReport { + type Error = anyhow::Error; + + fn try_from(value: &PriceReportV1) -> Result { + let timestamp = Utc + .timestamp_opt(value.timestamp as i64, 0) + .single() + .ok_or_else(|| anyhow!("invalid timestamp {}", value.timestamp))?; + Ok(Self { + timestamp: timestamp.into(), + price: value.price, + token_type: value.token_type().as_str_name().to_string(), + }) + } +} diff --git a/price/src/lib.rs b/price/src/lib.rs index 90a8c72d5..d05f20b3c 100644 --- a/price/src/lib.rs +++ b/price/src/lib.rs @@ -1,7 +1,9 @@ extern crate tls_init; +pub mod backfill; pub mod cli; pub mod hermes; +pub mod iceberg; pub mod metrics; pub mod price_generator; pub mod settings; diff --git a/price/src/main.rs b/price/src/main.rs index 8bd366d97..1574de32e 100644 --- a/price/src/main.rs +++ b/price/src/main.rs @@ -3,7 +3,7 @@ use clap::Parser; use file_store::file_upload; use file_store_oracles::traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}; use helium_proto::PriceReportV1; -use price::{cli::check, PriceGenerator, Settings}; +use price::{backfill, cli::check, iceberg, PriceGenerator, Settings}; use std::{ path::{self, PathBuf}, time::Duration, @@ -36,10 +36,11 @@ impl Cli { pub enum Cmd { Server(Server), Check(Check), + Backfill(backfill::Cmd), } impl Cmd { - pub async fn run(&self, config: Option) -> Result<()> { + pub async fn run(self, config: Option) -> Result<()> { match self { Self::Server(cmd) => { let settings = Settings::new(config)?; @@ -48,12 +49,18 @@ impl Cmd { cmd.run(&settings).await } Self::Check(options) => { - let url = match &options.url { - Some(url) => url.clone(), + let url = match options.url { + Some(url) => url, None => Settings::new(config)?.source, }; check::run(url).await } + Self::Backfill(cmd) => { + let settings = Settings::new(config)?; + custom_tracing::init(settings.log.clone(), settings.custom_tracing.clone()).await?; + tracing::info!("Settings: {}", serde_json::to_string_pretty(&settings)?); + cmd.run(&settings).await + } } } } @@ -89,10 +96,21 @@ impl Server { ) .await?; + let iceberg_writer = match settings.iceberg_settings.as_ref() { + Some(iceberg_settings) => { + tracing::info!("iceberg settings provided, connecting..."); + Some(iceberg::get_writer(iceberg_settings).await?) + } + None => { + tracing::info!("no iceberg settings provided"); + None + } + }; + let mut task_manager = TaskManager::new(); task_manager.add(file_upload_server); task_manager.add(price_sink_server); - task_manager.add(PriceGenerator::new(settings, price_sink.clone()).await?); + task_manager.add(PriceGenerator::new(settings, price_sink.clone(), iceberg_writer).await?); task_manager.start().await?; Ok(()) diff --git a/price/src/price_generator.rs b/price/src/price_generator.rs index a2cdb0f6f..2b4ef3a0f 100644 --- a/price/src/price_generator.rs +++ b/price/src/price_generator.rs @@ -1,4 +1,9 @@ -use crate::{hermes, metrics::Metrics, Settings}; +use crate::{ + hermes, + iceberg::{self, IcebergPriceReport, PriceWriter}, + metrics::Metrics, + Settings, +}; use anyhow::{anyhow, Result}; use chrono::{DateTime, TimeZone, Utc}; use file_store::file_sink; @@ -34,6 +39,7 @@ pub struct PriceGenerator { stale_price_duration: Duration, latest_price_file: PathBuf, file_sink: file_sink::FileSinkClient, + iceberg_writer: Option, } impl ManagedTask for PriceGenerator { @@ -56,6 +62,7 @@ impl PriceGenerator { pub async fn new( settings: &Settings, file_sink: file_sink::FileSinkClient, + iceberg_writer: Option, ) -> Result { Ok(Self { last_price_opt: None, @@ -66,6 +73,7 @@ impl PriceGenerator { stale_price_duration: settings.stale_price_duration, latest_price_file: settings.cache.join(LATEST_PRICE_FILE), file_sink, + iceberg_writer, }) } @@ -102,6 +110,28 @@ impl PriceGenerator { tracing::info!(token = TOKEN, price.price, "updating price"); self.file_sink.write(price_report, []).await?; + let iceberg_record = match IcebergPriceReport::try_from(&price_report) { + Ok(record) => record, + Err(err) => { + tracing::error!(token = TOKEN, ?err, "invalid iceberg record; skipping"); + return Ok(()); + } + }; + let write_id = format!("{TOKEN}_{}", price_report.timestamp); + if let Err(err) = iceberg::maybe_write_idempotent( + self.iceberg_writer.as_ref(), + &write_id, + vec![iceberg_record], + ) + .await + { + tracing::error!( + token = TOKEN, + ?err, + "iceberg write failed; file_sink copy is durable" + ); + } + Ok(()) } diff --git a/price/src/settings.rs b/price/src/settings.rs index 845a39eed..799cd5c7a 100644 --- a/price/src/settings.rs +++ b/price/src/settings.rs @@ -1,4 +1,5 @@ use anyhow::Result; +use chrono::{DateTime, Utc}; use config::{Config, Environment, File}; use humantime_serde::re::humantime; use serde::{Deserialize, Serialize}; @@ -38,6 +39,32 @@ pub struct Settings { /// How long to use a stale price in minutes #[serde(with = "humantime_serde", default = "default_stale_price_duration")] pub stale_price_duration: Duration, + /// Database settings. Required when running `backfill`; unused by the + /// server path. + #[serde(default)] + pub database: Option, + /// Iceberg catalog settings. When provided, live ticks also write to + /// the `rewards.price` Iceberg table. Required by `backfill`. + #[serde(default)] + pub iceberg_settings: Option, +} + +/// Settings controlling the Iceberg backfill window. +/// +/// Backfill covers [`start_after`, `stop_after`). Set `stop_after` to the date +/// Iceberg was first enabled in production so the backfiller does not overlap +/// with the daemon's real-time Iceberg writes. +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct BackfillSettings { + /// Start of the backfill window. Defaults to UNIX_EPOCH (all available history). + #[serde(default = "default_backfill_start_after")] + pub start_after: DateTime, + /// End of the backfill window (exclusive). + pub stop_after: DateTime, +} + +fn default_backfill_start_after() -> DateTime { + DateTime::UNIX_EPOCH } fn default_source() -> String { @@ -61,6 +88,19 @@ fn default_cache() -> PathBuf { } impl Settings { + /// Build a `BucketClient` for the output bucket using the shared + /// `file_store` credentials. + pub async fn output_bucket_client(&self) -> file_store::BucketClient { + file_store::BucketClient::new( + self.output_bucket.clone(), + self.file_store.region.clone(), + self.file_store.endpoint.clone(), + self.file_store.access_key_id.clone(), + self.file_store.secret_access_key.clone(), + ) + .await + } + /// Load Settings from a given path. Settings are loaded from a given /// optional path and can be overridden with environment variables. /// From bccec1a9575ea11d2636d505b9dbaed8f68cee47 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Tue, 5 May 2026 09:42:12 -0400 Subject: [PATCH 2/6] Update price/Cargo.toml Co-authored-by: Michael Jeffrey --- price/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/price/Cargo.toml b/price/Cargo.toml index 41acde057..9cc60f55c 100644 --- a/price/Cargo.toml +++ b/price/Cargo.toml @@ -33,7 +33,7 @@ poc-metrics = { path = "../metrics" } task-manager = { path = "../task_manager" } tls-init = { path = "../tls_init" } -helium-iceberg = { path = "../helium_iceberg", features = ["test-harness"] } +helium-iceberg = { path = "../helium_iceberg" } trino-rust-client = { version = "0.9" } [dev-dependencies] From 59b0160c432e8f47b2fe6a5d09072ab1a289cd39 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Tue, 5 May 2026 10:42:18 -0400 Subject: [PATCH 3/6] change settings to accept BucketSettings --- price/pkg/settings-template.toml | 5 +++-- price/src/backfill.rs | 2 +- price/src/main.rs | 5 +---- price/src/settings.rs | 25 +++++-------------------- 4 files changed, 10 insertions(+), 27 deletions(-) diff --git a/price/pkg/settings-template.toml b/price/pkg/settings-template.toml index 6c2836fc5..348ab9855 100644 --- a/price/pkg/settings-template.toml +++ b/price/pkg/settings-template.toml @@ -7,8 +7,6 @@ # # source = "https://hermes.pyth.network/v2/updates/price/latest?ids[]=649fdd7ec08e8e2a20f425729854e90293dcbe2376abc47197a14da6ff339756" -# S3 bucket to write price reports to. Required. -output_bucket = "price" # Folder for the local on-disk price cache / file sink staging area. # Default below. @@ -29,6 +27,9 @@ interval = "60 seconds" # # default_price = +#output bucket +[output] +bucket = "price" [custom_tracing] # File watched for runtime tracing-config reloads. Default below. diff --git a/price/src/backfill.rs b/price/src/backfill.rs index 36b14f73e..fab917a92 100644 --- a/price/src/backfill.rs +++ b/price/src/backfill.rs @@ -56,7 +56,7 @@ impl Cmd { let task = PriceReportBackfiller::create_managed_task( pool, - settings.output_bucket_client().await, + settings.output.connect().await, writer, BackfillOptions { process_name: self.process_name, diff --git a/price/src/main.rs b/price/src/main.rs index 1574de32e..182af9780 100644 --- a/price/src/main.rs +++ b/price/src/main.rs @@ -80,12 +80,9 @@ impl Server { // Install the prometheus metrics exporter poc_metrics::start_metrics(&settings.metrics)?; - let file_store_client = settings.file_store.connect().await; - // Initialize uploader let (file_upload, file_upload_server) = - file_upload::FileUpload::new(file_store_client.clone(), settings.output_bucket.clone()) - .await; + file_upload::FileUpload::from_bucket_client(settings.output.connect().await).await; let (price_sink, price_sink_server) = PriceReportV1::file_sink( &settings.cache, diff --git a/price/src/settings.rs b/price/src/settings.rs index 799cd5c7a..98238b584 100644 --- a/price/src/settings.rs +++ b/price/src/settings.rs @@ -20,9 +20,7 @@ pub struct Settings { /// parameter for the HNT feed. Required. #[serde(default = "default_source")] pub source: String, - #[serde(default)] - pub file_store: file_store::Settings, - pub output_bucket: String, + pub output: file_store::BucketSettings, /// Folder for local cache of ingest data #[serde(default = "default_cache")] pub cache: PathBuf, @@ -88,19 +86,6 @@ fn default_cache() -> PathBuf { } impl Settings { - /// Build a `BucketClient` for the output bucket using the shared - /// `file_store` credentials. - pub async fn output_bucket_client(&self) -> file_store::BucketClient { - file_store::BucketClient::new( - self.output_bucket.clone(), - self.file_store.region.clone(), - self.file_store.endpoint.clone(), - self.file_store.access_key_id.clone(), - self.file_store.secret_access_key.clone(), - ) - .await - } - /// Load Settings from a given path. Settings are loaded from a given /// optional path and can be overridden with environment variables. /// @@ -138,14 +123,14 @@ mod tests { fn test_default_price_override() -> anyhow::Result<()> { let settings = temp_env::with_vars( [ - ("PRICE__OUTPUT_BUCKET", Some("test-bucket".to_string())), + ("PRICE__OUTPUT__BUCKET", Some("test-bucket".to_string())), ("PRICE__DEFAULT_PRICE", Some("100000000".to_string())), ], || Settings::new::(None), )?; assert_eq!(settings.default_price, Some(100_000_000)); - assert_eq!(settings.output_bucket, "test-bucket"); + assert_eq!(settings.output.bucket, "test-bucket"); Ok(()) } @@ -159,7 +144,7 @@ mod tests { })?; assert!(settings.source.contains("hermes.pyth.network")); - assert_eq!(settings.output_bucket, "price"); + assert_eq!(settings.output.bucket, "price"); assert_eq!(settings.interval, Duration::from_secs(60)); Ok(()) } @@ -169,7 +154,7 @@ mod tests { let url = "https://example.test/v2/updates/price/latest?ids[]=abc"; let settings = temp_env::with_vars( [ - ("PRICE__OUTPUT_BUCKET", Some("test-bucket".to_string())), + ("PRICE__OUTPUT__BUCKET", Some("test-bucket".to_string())), ("PRICE__SOURCE", Some(url.to_string())), ], || Settings::new::(None), From 1adfad7f2c74e5cce608c04daea8f1574b27eb1d Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Tue, 5 May 2026 15:50:01 -0400 Subject: [PATCH 4/6] Update to use new BatchWriter --- price/pkg/settings-template.toml | 9 +++++++ price/src/backfill.rs | 13 +++++----- price/src/iceberg/mod.rs | 34 +++++++++----------------- price/src/main.rs | 22 +++++++++++++---- price/src/price_generator.rs | 41 ++++++++++---------------------- price/src/settings.rs | 18 ++++++++++++++ 6 files changed, 76 insertions(+), 61 deletions(-) diff --git a/price/pkg/settings-template.toml b/price/pkg/settings-template.toml index 348ab9855..22ec72e2f 100644 --- a/price/pkg/settings-template.toml +++ b/price/pkg/settings-template.toml @@ -71,6 +71,15 @@ bucket = "price" # Iceberg catalog settings. When provided, the `server` also writes each # live tick into the Iceberg table `rewards.price`. Required by `backfill`. # +# Live writes go through a `BatchedWriter` that spools each record to +# `/iceberg-spool` for crash durability and commits a single +# Iceberg snapshot when either `iceberg_batch_size` records or +# `iceberg_batch_timeout` of wall-clock time has elapsed since the last +# commit. Defaults below produce ~24 snapshots/day at a 60s tick. +# +# iceberg_batch_size = 60 +# iceberg_batch_timeout = "1 hour" +# # [iceberg_settings] # catalog_uri = "http://iceberg-catalog:8181" # catalog_name = "nova" diff --git a/price/src/backfill.rs b/price/src/backfill.rs index fab917a92..d91054c24 100644 --- a/price/src/backfill.rs +++ b/price/src/backfill.rs @@ -1,5 +1,5 @@ use crate::{ - iceberg::{self, IcebergPriceReport, PriceWriter}, + iceberg::{self, IcebergPriceReport, PriceTable}, settings::Settings, }; use anyhow::{Context, Result}; @@ -7,6 +7,7 @@ use chrono::{DateTime, Utc}; use file_store::{file_info_poller::FileInfoStream, file_source, FileInfo}; use file_store_oracles::FileType; use futures::StreamExt; +use helium_iceberg::DataWriter; use helium_proto::PriceReportV1; use sqlx::{PgPool, Pool, Postgres}; use task_manager::{ManagedTask, TaskManager}; @@ -45,7 +46,7 @@ impl Cmd { let pool = database.connect("price-backfill").await?; sqlx::migrate!().run(&pool).await?; - let writer = iceberg::get_writer(iceberg_settings).await?; + let writer = iceberg::connect_table(iceberg_settings).await?; tracing::info!( process_name = %self.process_name, @@ -81,7 +82,7 @@ pub struct BackfillOptions { pub struct PriceReportBackfiller { pool: Pool, reports: Receiver>, - writer: PriceWriter, + writer: PriceTable, done: bool, } @@ -89,7 +90,7 @@ impl PriceReportBackfiller { pub fn new( pool: Pool, reports: Receiver>, - writer: PriceWriter, + writer: PriceTable, ) -> Self { Self { pool, @@ -102,7 +103,7 @@ impl PriceReportBackfiller { pub async fn create( pool: PgPool, bucket_client: file_store::BucketClient, - writer: PriceWriter, + writer: PriceTable, options: BackfillOptions, ) -> Result<(Self, impl ManagedTask)> { let (reports, reports_server) = @@ -125,7 +126,7 @@ impl PriceReportBackfiller { pub async fn create_managed_task( pool: PgPool, bucket_client: file_store::BucketClient, - writer: PriceWriter, + writer: PriceTable, options: BackfillOptions, ) -> Result { let (backfiller, server) = Self::create(pool, bucket_client, writer, options).await?; diff --git a/price/src/iceberg/mod.rs b/price/src/iceberg/mod.rs index 16c295848..e1461868f 100644 --- a/price/src/iceberg/mod.rs +++ b/price/src/iceberg/mod.rs @@ -1,6 +1,4 @@ use anyhow::Context; -use helium_iceberg::{BoxedDataWriter, IntoBoxedDataWriter}; -use serde::Serialize; pub mod price_report; @@ -8,32 +6,22 @@ pub use price_report::IcebergPriceReport; pub const NAMESPACE: &str = "rewards"; -pub type PriceWriter = BoxedDataWriter; +/// Type alias for the price-report Iceberg table handle. The server feeds +/// this into a `BatchedWriter`; the backfiller uses it directly via +/// `DataWriter::write_idempotent` (one snapshot per S3 file, keyed by file +/// name for safe replay). +pub type PriceTable = helium_iceberg::IcebergTable; -pub async fn get_writer(settings: &helium_iceberg::Settings) -> anyhow::Result { +/// Connect to the catalog, ensure the namespace + table exist, and return a +/// table handle. +pub async fn connect_table(settings: &helium_iceberg::Settings) -> anyhow::Result { let catalog = settings.connect().await.context("connecting to catalog")?; catalog.create_namespace_if_not_exists(NAMESPACE).await?; - let writer = catalog - .create_table_if_not_exists(price_report::table_definition()?) + let table = catalog + .create_table_if_not_exists::(price_report::table_definition()?) .await?; - Ok(writer.boxed()) -} - -/// Optional idempotent append — no-op when `writer` is `None` (iceberg -/// writes are optional in some deployments). -pub async fn maybe_write_idempotent( - writer: Option<&BoxedDataWriter>, - id: &str, - records: Vec, -) -> anyhow::Result<()> { - if let Some(data_writer) = writer { - data_writer - .write_idempotent(id, records) - .await - .context("writing idempotent")?; - } - Ok(()) + Ok(table) } diff --git a/price/src/main.rs b/price/src/main.rs index 182af9780..c3249095d 100644 --- a/price/src/main.rs +++ b/price/src/main.rs @@ -2,6 +2,7 @@ use anyhow::Result; use clap::Parser; use file_store::file_upload; use file_store_oracles::traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}; +use helium_iceberg::{BatchedWriter, BatchedWriterConfig}; use helium_proto::PriceReportV1; use price::{backfill, cli::check, iceberg, PriceGenerator, Settings}; use std::{ @@ -93,10 +94,26 @@ impl Server { ) .await?; + let mut task_manager = TaskManager::new(); + task_manager.add(file_upload_server); + task_manager.add(price_sink_server); + + // When Iceberg is configured, build a `BatchedWriter` so the + // PriceGenerator can `queue` each tick without producing one + // snapshot per minute. The `BatchedWriterTask` owns flushing on + // size/time thresholds, replays its on-disk spool on startup so a + // crash between the file_sink write and the Iceberg commit doesn't + // lose records, and drains on graceful shutdown. let iceberg_writer = match settings.iceberg_settings.as_ref() { Some(iceberg_settings) => { tracing::info!("iceberg settings provided, connecting..."); - Some(iceberg::get_writer(iceberg_settings).await?) + let table = iceberg::connect_table(iceberg_settings).await?; + let config = BatchedWriterConfig::new(settings.cache.join("iceberg-spool")) + .with_max_batch_size(settings.iceberg_batch_size) + .with_batch_timeout(settings.iceberg_batch_timeout); + let (writer, batched_task) = BatchedWriter::new(table, config); + task_manager.add(batched_task); + Some(writer) } None => { tracing::info!("no iceberg settings provided"); @@ -104,9 +121,6 @@ impl Server { } }; - let mut task_manager = TaskManager::new(); - task_manager.add(file_upload_server); - task_manager.add(price_sink_server); task_manager.add(PriceGenerator::new(settings, price_sink.clone(), iceberg_writer).await?); task_manager.start().await?; diff --git a/price/src/price_generator.rs b/price/src/price_generator.rs index 2b4ef3a0f..1cf46f9ac 100644 --- a/price/src/price_generator.rs +++ b/price/src/price_generator.rs @@ -1,13 +1,9 @@ -use crate::{ - hermes, - iceberg::{self, IcebergPriceReport, PriceWriter}, - metrics::Metrics, - Settings, -}; +use crate::{hermes, iceberg::IcebergPriceReport, metrics::Metrics, Settings}; use anyhow::{anyhow, Result}; use chrono::{DateTime, TimeZone, Utc}; use file_store::file_sink; use futures::TryFutureExt; +use helium_iceberg::BatchedWriter; use helium_proto::{BlockchainTokenTypeV1, PriceReportV1}; use serde::{Deserialize, Serialize}; use std::{path::PathBuf, time::Duration}; @@ -39,7 +35,7 @@ pub struct PriceGenerator { stale_price_duration: Duration, latest_price_file: PathBuf, file_sink: file_sink::FileSinkClient, - iceberg_writer: Option, + iceberg_writer: Option>, } impl ManagedTask for PriceGenerator { @@ -62,7 +58,7 @@ impl PriceGenerator { pub async fn new( settings: &Settings, file_sink: file_sink::FileSinkClient, - iceberg_writer: Option, + iceberg_writer: Option>, ) -> Result { Ok(Self { last_price_opt: None, @@ -110,26 +106,15 @@ impl PriceGenerator { tracing::info!(token = TOKEN, price.price, "updating price"); self.file_sink.write(price_report, []).await?; - let iceberg_record = match IcebergPriceReport::try_from(&price_report) { - Ok(record) => record, - Err(err) => { - tracing::error!(token = TOKEN, ?err, "invalid iceberg record; skipping"); - return Ok(()); - } - }; - let write_id = format!("{TOKEN}_{}", price_report.timestamp); - if let Err(err) = iceberg::maybe_write_idempotent( - self.iceberg_writer.as_ref(), - &write_id, - vec![iceberg_record], - ) - .await - { - tracing::error!( - token = TOKEN, - ?err, - "iceberg write failed; file_sink copy is durable" - ); + if let Some(writer) = &self.iceberg_writer { + let iceberg_record = match IcebergPriceReport::try_from(&price_report) { + Ok(record) => record, + Err(err) => { + tracing::error!(token = TOKEN, ?err, "invalid iceberg record; skipping"); + return Ok(()); + } + }; + writer.queue(iceberg_record).await?; } Ok(()) diff --git a/price/src/settings.rs b/price/src/settings.rs index 98238b584..408afac9d 100644 --- a/price/src/settings.rs +++ b/price/src/settings.rs @@ -45,6 +45,16 @@ pub struct Settings { /// the `rewards.price` Iceberg table. Required by `backfill`. #[serde(default)] pub iceberg_settings: Option, + /// Maximum number of records buffered before forcing an Iceberg commit. + /// Whichever of `iceberg_batch_size` / `iceberg_batch_timeout` fires + /// first triggers the snapshot. With a 60s tick this is also a cap on + /// catch-up batch growth. Default = 60. + #[serde(default = "default_iceberg_batch_size")] + pub iceberg_batch_size: usize, + /// Maximum time between Iceberg commits. With the default (1 hour) the + /// daemon writes ~24 snapshots/day instead of one per tick. Default = 1 hour. + #[serde(with = "humantime_serde", default = "default_iceberg_batch_timeout")] + pub iceberg_batch_timeout: Duration, } /// Settings controlling the Iceberg backfill window. @@ -85,6 +95,14 @@ fn default_cache() -> PathBuf { PathBuf::from("/opt/price/data") } +fn default_iceberg_batch_size() -> usize { + 60 +} + +fn default_iceberg_batch_timeout() -> Duration { + humantime::parse_duration("10 minutes").unwrap() +} + impl Settings { /// Load Settings from a given path. Settings are loaded from a given /// optional path and can be overridden with environment variables. From 3010b1b493195db26756d1052d01eb1414d0939d Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Wed, 6 May 2026 07:14:13 -0400 Subject: [PATCH 5/6] udpate price to include price_usd --- Cargo.lock | 1 + price/Cargo.toml | 1 + price/pkg/settings-template.toml | 2 +- price/src/backfill.rs | 50 ++++++++++++++++++++++++------- price/src/iceberg/price_report.rs | 40 ++++++++++++++++++++++--- price/src/settings.rs | 4 +-- 6 files changed, 80 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b5eb6d950..b0331c2b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6302,6 +6302,7 @@ dependencies = [ "rust_decimal", "serde", "serde_json", + "solana", "sqlx", "task-manager", "temp-env", diff --git a/price/Cargo.toml b/price/Cargo.toml index 9cc60f55c..3b20d6fa0 100644 --- a/price/Cargo.toml +++ b/price/Cargo.toml @@ -30,6 +30,7 @@ db-store = { path = "../db_store" } file-store = { path = "../file_store" } file-store-oracles = { path = "../file_store_oracles" } poc-metrics = { path = "../metrics" } +solana = { path = "../solana" } task-manager = { path = "../task_manager" } tls-init = { path = "../tls_init" } diff --git a/price/pkg/settings-template.toml b/price/pkg/settings-template.toml index 22ec72e2f..48a17987d 100644 --- a/price/pkg/settings-template.toml +++ b/price/pkg/settings-template.toml @@ -69,7 +69,7 @@ bucket = "price" # Iceberg catalog settings. When provided, the `server` also writes each -# live tick into the Iceberg table `rewards.price`. Required by `backfill`. +# live tick into the Iceberg table `rewards.prices`. Required by `backfill`. # # Live writes go through a `BatchedWriter` that spools each record to # `/iceberg-spool` for crash durability and commits a single diff --git a/price/src/backfill.rs b/price/src/backfill.rs index d91054c24..0142bc574 100644 --- a/price/src/backfill.rs +++ b/price/src/backfill.rs @@ -1,5 +1,5 @@ use crate::{ - iceberg::{self, IcebergPriceReport, PriceTable}, + iceberg::{self, IcebergPriceReport}, settings::Settings, }; use anyhow::{Context, Result}; @@ -7,7 +7,7 @@ use chrono::{DateTime, Utc}; use file_store::{file_info_poller::FileInfoStream, file_source, FileInfo}; use file_store_oracles::FileType; use futures::StreamExt; -use helium_iceberg::DataWriter; +use helium_iceberg::{BatchedWriter, BatchedWriterConfig}; use helium_proto::PriceReportV1; use sqlx::{PgPool, Pool, Postgres}; use task_manager::{ManagedTask, TaskManager}; @@ -46,7 +46,13 @@ impl Cmd { let pool = database.connect("price-backfill").await?; sqlx::migrate!().run(&pool).await?; - let writer = iceberg::connect_table(iceberg_settings).await?; + let table = iceberg::connect_table(iceberg_settings).await?; + // Use a separate spool dir from the server so the two paths can + // coexist on the same host without crossing each other's replay. + let config = BatchedWriterConfig::new(settings.cache.join("iceberg-spool-backfill")) + .with_max_batch_size(settings.iceberg_batch_size) + .with_batch_timeout(settings.iceberg_batch_timeout); + let (writer, batched_task) = BatchedWriter::new(table, config); tracing::info!( process_name = %self.process_name, @@ -59,6 +65,7 @@ impl Cmd { pool, settings.output.connect().await, writer, + batched_task, BackfillOptions { process_name: self.process_name, start_after: self.start_after, @@ -82,7 +89,7 @@ pub struct BackfillOptions { pub struct PriceReportBackfiller { pool: Pool, reports: Receiver>, - writer: PriceTable, + writer: BatchedWriter, done: bool, } @@ -90,7 +97,7 @@ impl PriceReportBackfiller { pub fn new( pool: Pool, reports: Receiver>, - writer: PriceTable, + writer: BatchedWriter, ) -> Self { Self { pool, @@ -103,7 +110,7 @@ impl PriceReportBackfiller { pub async fn create( pool: PgPool, bucket_client: file_store::BucketClient, - writer: PriceTable, + writer: BatchedWriter, options: BackfillOptions, ) -> Result<(Self, impl ManagedTask)> { let (reports, reports_server) = @@ -123,15 +130,24 @@ impl PriceReportBackfiller { )) } - pub async fn create_managed_task( + pub async fn create_managed_task( pool: PgPool, bucket_client: file_store::BucketClient, - writer: PriceTable, + writer: BatchedWriter, + batched_task: T, options: BackfillOptions, ) -> Result { let (backfiller, server) = Self::create(pool, bucket_client, writer, options).await?; + // Start order is FIFO; shutdown is LIFO. Adding `batched_task` first + // means on a signal-triggered shutdown the backfiller stops first, + // then the file_source server, and the BatchedWriterTask drains its + // spool last. On natural completion (`stop_after` reached), the + // file_source closes the channel → backfiller exits and is dropped + // → its `BatchedWriter` handle (the only sender) drops → the + // BatchedWriterTask sees `None`, drains, and exits cleanly. Ok(TaskManager::builder() + .add_task(batched_task) .add_task(server) .add_task(backfiller) .build()) @@ -173,7 +189,6 @@ impl PriceReportBackfiller { ); let mut txn = self.pool.begin().await?; - let write_id = file_info.key.clone(); let reports = file_info_stream.into_stream(&mut txn).await?; let all_reports: Vec<_> = reports.collect().await; let total = all_reports.len(); @@ -191,10 +206,23 @@ impl PriceReportBackfiller { let valid = iceberg_records.len(); let skipped = total - valid; + // `queue_all` returns once the records are durable in the spool's + // kernel page cache. The actual Iceberg commit happens + // asynchronously when the BatchedWriter's size or time threshold + // is reached (or on graceful shutdown / channel close). + // + // Trade-off vs. the previous `write_idempotent` path: there is now + // a narrow window between the spool ack and the DB txn commit + // where a hard crash can cause a file's records to be replayed + // from spool on restart while the file is also re-emitted by the + // file_info_poller (since its row never made it into + // `files_processed`). Acceptable here because backfill is a + // one-shot, operator-supervised command and any duplicates are + // recoverable by re-running with a fresh table. self.writer - .write_idempotent(&write_id, iceberg_records) + .queue_all(iceberg_records) .await - .context("writing price reports to iceberg")?; + .context("queueing price reports to iceberg")?; txn.commit().await.context("committing db transaction")?; diff --git a/price/src/iceberg/price_report.rs b/price/src/iceberg/price_report.rs index 5116ebddd..4b0844aa5 100644 --- a/price/src/iceberg/price_report.rs +++ b/price/src/iceberg/price_report.rs @@ -2,16 +2,31 @@ use anyhow::anyhow; use chrono::{DateTime, FixedOffset, TimeZone, Utc}; use helium_iceberg::{FieldDefinition, PartitionDefinition, TableDefinition}; use helium_proto::PriceReportV1; +use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; -use trino_rust_client::Trino; +use solana::Token; +use std::str::FromStr; use super::NAMESPACE; -pub const TABLE_NAME: &str = "price"; +pub const TABLE_NAME: &str = "prices"; -#[derive(Debug, Clone, Trino, Serialize, Deserialize, PartialEq)] +/// Decimal column scale for `price_usd`. Wider than any token we mint +/// (HNT=8, MOBILE/IOT=6) so the column doesn't drop digits when we add +/// new tokens. Precision picked to leave 8 integer digits — comfortably +/// above any plausible per-token price. +const PRICE_USD_PRECISION: u32 = 18; +const PRICE_USD_SCALE: u32 = 10; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct IcebergPriceReport { pub timestamp: DateTime, + /// Raw price scaled by the token's decimals (e.g. for HNT, USD * 10^8). + /// Preserved for backwards compatibility with the protobuf record. pub price: u64, + /// Same value rendered as plain USD (`price / 10^token.decimals()`). + /// Stored as decimal so Trino / downstream consumers don't have to + /// know the per-token scaling. + pub price_usd: Decimal, pub token_type: String, } @@ -20,6 +35,7 @@ pub fn table_definition() -> helium_iceberg::Result { .with_fields([ FieldDefinition::required_timestamptz("timestamp"), FieldDefinition::required_long("price"), + FieldDefinition::required_decimal("price_usd", PRICE_USD_PRECISION, PRICE_USD_SCALE), FieldDefinition::required_string("token_type"), ]) .with_partition(PartitionDefinition::day("timestamp", "timestamp_day")) @@ -34,10 +50,26 @@ impl TryFrom<&PriceReportV1> for IcebergPriceReport { .timestamp_opt(value.timestamp as i64, 0) .single() .ok_or_else(|| anyhow!("invalid timestamp {}", value.timestamp))?; + + // The proto's `as_str_name` and `Token::FromStr` agree on lowercase + // token names ("hnt", "mobile", "iot"), so the round-trip is direct. + // `Token::decimals()` is the source of truth for per-token scaling. + let token_name = value.token_type().as_str_name(); + let token = Token::from_str(token_name) + .map_err(|err| anyhow!("unknown token {token_name:?}: {err}"))?; + let scale = u32::from(token.decimals()); + let divisor = 10u64 + .checked_pow(scale) + .ok_or_else(|| anyhow!("decimals overflow for {token_name}"))?; + let price_usd = Decimal::from(value.price) + .checked_div(Decimal::from(divisor)) + .ok_or_else(|| anyhow!("price scaling overflow for {token_name}"))?; + Ok(Self { timestamp: timestamp.into(), price: value.price, - token_type: value.token_type().as_str_name().to_string(), + price_usd, + token_type: token_name.to_string(), }) } } diff --git a/price/src/settings.rs b/price/src/settings.rs index 408afac9d..39870a5bd 100644 --- a/price/src/settings.rs +++ b/price/src/settings.rs @@ -42,7 +42,7 @@ pub struct Settings { #[serde(default)] pub database: Option, /// Iceberg catalog settings. When provided, live ticks also write to - /// the `rewards.price` Iceberg table. Required by `backfill`. + /// the `rewards.prices` Iceberg table. Required by `backfill`. #[serde(default)] pub iceberg_settings: Option, /// Maximum number of records buffered before forcing an Iceberg commit. @@ -96,7 +96,7 @@ fn default_cache() -> PathBuf { } fn default_iceberg_batch_size() -> usize { - 60 + 10_000 } fn default_iceberg_batch_timeout() -> Duration { From 6a5f1ead110c5262cd493536c1f0ffca9aba760c Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Wed, 6 May 2026 08:14:04 -0400 Subject: [PATCH 6/6] Add PriceSink and make both s3 output and iceberg optional --- Cargo.lock | 1 + price/Cargo.toml | 1 + price/pkg/settings-template.toml | 12 +++-- price/src/backfill.rs | 9 +++- price/src/iceberg/mod.rs | 2 +- price/src/lib.rs | 1 + price/src/main.rs | 92 ++++++++++++++++++-------------- price/src/price_generator.rs | 32 +++-------- price/src/settings.rs | 21 +++++--- price/src/sinks/file_store.rs | 29 ++++++++++ price/src/sinks/iceberg.rs | 38 +++++++++++++ price/src/sinks/mod.rs | 31 +++++++++++ 12 files changed, 194 insertions(+), 75 deletions(-) create mode 100644 price/src/sinks/file_store.rs create mode 100644 price/src/sinks/iceberg.rs create mode 100644 price/src/sinks/mod.rs diff --git a/Cargo.lock b/Cargo.lock index b0331c2b4..97548a0ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6284,6 +6284,7 @@ name = "price" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "chrono", "clap", "config", diff --git a/price/Cargo.toml b/price/Cargo.toml index 3b20d6fa0..ae5df2f59 100644 --- a/price/Cargo.toml +++ b/price/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true [dependencies] anyhow = { workspace = true } +async-trait = { workspace = true } config = { workspace = true } clap = { workspace = true } serde = { workspace = true } diff --git a/price/pkg/settings-template.toml b/price/pkg/settings-template.toml index 48a17987d..04bf02299 100644 --- a/price/pkg/settings-template.toml +++ b/price/pkg/settings-template.toml @@ -27,9 +27,13 @@ interval = "60 seconds" # # default_price = -#output bucket -[output] -bucket = "price" +# S3 bucket for `PriceReportV1` protobuf files. +# - Optional for `server`. Omit when running an Iceberg-only deployment; +# the server will then fan out only to the Iceberg sink. +# - Required for `backfill`, which reads these files back to seed Iceberg. +# +# [output] +# bucket = "price" [custom_tracing] # File watched for runtime tracing-config reloads. Default below. @@ -69,7 +73,7 @@ bucket = "price" # Iceberg catalog settings. When provided, the `server` also writes each -# live tick into the Iceberg table `rewards.prices`. Required by `backfill`. +# live tick into the Iceberg table `tokens.prices`. Required by `backfill`. # # Live writes go through a `BatchedWriter` that spools each record to # `/iceberg-spool` for crash durability and commits a single diff --git a/price/src/backfill.rs b/price/src/backfill.rs index 0142bc574..89828a4e4 100644 --- a/price/src/backfill.rs +++ b/price/src/backfill.rs @@ -42,6 +42,13 @@ impl Cmd { .iceberg_settings .as_ref() .ok_or_else(|| anyhow::anyhow!("iceberg_settings required for backfill"))?; + // Backfill reads `PriceReportV1` files from S3 to seed the + // Iceberg history; the bucket is non-optional here even though + // the daemon allows Iceberg-only configurations. + let output = settings + .output + .as_ref() + .ok_or_else(|| anyhow::anyhow!("`output` (S3 bucket) is required for backfill"))?; let pool = database.connect("price-backfill").await?; sqlx::migrate!().run(&pool).await?; @@ -63,7 +70,7 @@ impl Cmd { let task = PriceReportBackfiller::create_managed_task( pool, - settings.output.connect().await, + output.connect().await, writer, batched_task, BackfillOptions { diff --git a/price/src/iceberg/mod.rs b/price/src/iceberg/mod.rs index e1461868f..dcff5f845 100644 --- a/price/src/iceberg/mod.rs +++ b/price/src/iceberg/mod.rs @@ -4,7 +4,7 @@ pub mod price_report; pub use price_report::IcebergPriceReport; -pub const NAMESPACE: &str = "rewards"; +pub const NAMESPACE: &str = "tokens"; /// Type alias for the price-report Iceberg table handle. The server feeds /// this into a `BatchedWriter`; the backfiller uses it directly via diff --git a/price/src/lib.rs b/price/src/lib.rs index d05f20b3c..ae13e6c06 100644 --- a/price/src/lib.rs +++ b/price/src/lib.rs @@ -7,6 +7,7 @@ pub mod iceberg; pub mod metrics; pub mod price_generator; pub mod settings; +pub mod sinks; pub use price_generator::PriceGenerator; pub use settings::Settings; diff --git a/price/src/main.rs b/price/src/main.rs index c3249095d..d8a0ca129 100644 --- a/price/src/main.rs +++ b/price/src/main.rs @@ -4,7 +4,13 @@ use file_store::file_upload; use file_store_oracles::traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}; use helium_iceberg::{BatchedWriter, BatchedWriterConfig}; use helium_proto::PriceReportV1; -use price::{backfill, cli::check, iceberg, PriceGenerator, Settings}; +use price::{ + backfill, + cli::check, + iceberg, + sinks::{IcebergPriceSink, PriceSink, S3PriceSink}, + PriceGenerator, Settings, +}; use std::{ path::{self, PathBuf}, time::Duration, @@ -81,47 +87,55 @@ impl Server { // Install the prometheus metrics exporter poc_metrics::start_metrics(&settings.metrics)?; - // Initialize uploader - let (file_upload, file_upload_server) = - file_upload::FileUpload::from_bucket_client(settings.output.connect().await).await; + let mut task_manager = TaskManager::new(); + let mut sinks: Vec> = Vec::new(); + + // S3 sink: only enabled when `output` is configured. Both + // file_upload and price_sink (the FileSinkClient) + // are spawned together; their handles drive the upload pipeline + // and the PriceReportV1 file roll. The `S3PriceSink` wrapper + // gives us a uniform `PriceSink` interface. + if let Some(output) = settings.output.as_ref() { + tracing::info!("output bucket configured, starting file_sink"); + let (file_upload, file_upload_server) = + file_upload::FileUpload::from_bucket_client(output.connect().await).await; + let (price_sink, price_sink_server) = PriceReportV1::file_sink( + &settings.cache, + file_upload.clone(), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(Duration::from_secs(PRICE_SINK_ROLL_SECS)), + env!("CARGO_PKG_NAME"), + ) + .await?; + task_manager.add(file_upload_server); + task_manager.add(price_sink_server); + sinks.push(Box::new(S3PriceSink::new(price_sink))); + } - let (price_sink, price_sink_server) = PriceReportV1::file_sink( - &settings.cache, - file_upload.clone(), - FileSinkCommitStrategy::Automatic, - FileSinkRollTime::Duration(Duration::from_secs(PRICE_SINK_ROLL_SECS)), - env!("CARGO_PKG_NAME"), - ) - .await?; + // Iceberg sink: optional. The `BatchedWriter` owns batching, + // on-disk spool durability, and the snapshot commit. The task + // is registered after the S3 servers so shutdown LIFO drains + // pending records into Iceberg before tearing down upstream + // pieces. + if let Some(iceberg_settings) = settings.iceberg_settings.as_ref() { + tracing::info!("iceberg settings provided, connecting..."); + let table = iceberg::connect_table(iceberg_settings).await?; + let config = BatchedWriterConfig::new(settings.cache.join("iceberg-spool")) + .with_max_batch_size(settings.iceberg_batch_size) + .with_batch_timeout(settings.iceberg_batch_timeout); + let (writer, batched_task) = BatchedWriter::new(table, config); + task_manager.add(batched_task); + sinks.push(Box::new(IcebergPriceSink::new(writer))); + } - let mut task_manager = TaskManager::new(); - task_manager.add(file_upload_server); - task_manager.add(price_sink_server); - - // When Iceberg is configured, build a `BatchedWriter` so the - // PriceGenerator can `queue` each tick without producing one - // snapshot per minute. The `BatchedWriterTask` owns flushing on - // size/time thresholds, replays its on-disk spool on startup so a - // crash between the file_sink write and the Iceberg commit doesn't - // lose records, and drains on graceful shutdown. - let iceberg_writer = match settings.iceberg_settings.as_ref() { - Some(iceberg_settings) => { - tracing::info!("iceberg settings provided, connecting..."); - let table = iceberg::connect_table(iceberg_settings).await?; - let config = BatchedWriterConfig::new(settings.cache.join("iceberg-spool")) - .with_max_batch_size(settings.iceberg_batch_size) - .with_batch_timeout(settings.iceberg_batch_timeout); - let (writer, batched_task) = BatchedWriter::new(table, config); - task_manager.add(batched_task); - Some(writer) - } - None => { - tracing::info!("no iceberg settings provided"); - None - } - }; + if sinks.is_empty() { + anyhow::bail!( + "price server has no configured sinks: set `output` (S3) and/or \ + `iceberg_settings` to a non-empty value" + ); + } - task_manager.add(PriceGenerator::new(settings, price_sink.clone(), iceberg_writer).await?); + task_manager.add(PriceGenerator::new(settings, sinks).await?); task_manager.start().await?; Ok(()) diff --git a/price/src/price_generator.rs b/price/src/price_generator.rs index 1cf46f9ac..38d58babc 100644 --- a/price/src/price_generator.rs +++ b/price/src/price_generator.rs @@ -1,9 +1,7 @@ -use crate::{hermes, iceberg::IcebergPriceReport, metrics::Metrics, Settings}; +use crate::{hermes, metrics::Metrics, sinks::PriceSink, Settings}; use anyhow::{anyhow, Result}; use chrono::{DateTime, TimeZone, Utc}; -use file_store::file_sink; use futures::TryFutureExt; -use helium_iceberg::BatchedWriter; use helium_proto::{BlockchainTokenTypeV1, PriceReportV1}; use serde::{Deserialize, Serialize}; use std::{path::PathBuf, time::Duration}; @@ -34,8 +32,9 @@ pub struct PriceGenerator { default_price: Option, stale_price_duration: Duration, latest_price_file: PathBuf, - file_sink: file_sink::FileSinkClient, - iceberg_writer: Option>, + /// Configured destinations. Empty in unit tests; populated by + /// `Server::run` based on `output` and `iceberg_settings`. + sinks: Vec>, } impl ManagedTask for PriceGenerator { @@ -55,11 +54,7 @@ impl From<&Price> for PriceReportV1 { } impl PriceGenerator { - pub async fn new( - settings: &Settings, - file_sink: file_sink::FileSinkClient, - iceberg_writer: Option>, - ) -> Result { + pub async fn new(settings: &Settings, sinks: Vec>) -> Result { Ok(Self { last_price_opt: None, http: reqwest::Client::new(), @@ -68,8 +63,7 @@ impl PriceGenerator { interval_duration: settings.interval, stale_price_duration: settings.stale_price_duration, latest_price_file: settings.cache.join(LATEST_PRICE_FILE), - file_sink, - iceberg_writer, + sinks, }) } @@ -104,19 +98,9 @@ impl PriceGenerator { async fn write_price_to_sink(&self, price: &Price) -> Result<()> { let price_report = PriceReportV1::from(price); tracing::info!(token = TOKEN, price.price, "updating price"); - self.file_sink.write(price_report, []).await?; - - if let Some(writer) = &self.iceberg_writer { - let iceberg_record = match IcebergPriceReport::try_from(&price_report) { - Ok(record) => record, - Err(err) => { - tracing::error!(token = TOKEN, ?err, "invalid iceberg record; skipping"); - return Ok(()); - } - }; - writer.queue(iceberg_record).await?; + for sink in &self.sinks { + sink.write(&price_report).await?; } - Ok(()) } diff --git a/price/src/settings.rs b/price/src/settings.rs index 39870a5bd..994c35872 100644 --- a/price/src/settings.rs +++ b/price/src/settings.rs @@ -20,7 +20,13 @@ pub struct Settings { /// parameter for the HNT feed. Required. #[serde(default = "default_source")] pub source: String, - pub output: file_store::BucketSettings, + /// S3 bucket for `PriceReportV1` protobuf files. Optional for + /// `server`; required for `backfill` (which reads these files + /// back). When unset on the server, the file_sink/FileUpload tasks + /// are not started and ticks go only to the configured Iceberg + /// table. + #[serde(default)] + pub output: Option, /// Folder for local cache of ingest data #[serde(default = "default_cache")] pub cache: PathBuf, @@ -42,7 +48,7 @@ pub struct Settings { #[serde(default)] pub database: Option, /// Iceberg catalog settings. When provided, live ticks also write to - /// the `rewards.prices` Iceberg table. Required by `backfill`. + /// the `tokens.prices` Iceberg table. Required by `backfill`. #[serde(default)] pub iceberg_settings: Option, /// Maximum number of records buffered before forcing an Iceberg commit. @@ -148,21 +154,24 @@ mod tests { )?; assert_eq!(settings.default_price, Some(100_000_000)); - assert_eq!(settings.output.bucket, "test-bucket"); + assert_eq!( + settings.output.as_ref().expect("output").bucket, + "test-bucket" + ); Ok(()) } #[test] fn test_settings_template_parses() -> anyhow::Result<()> { let template = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("pkg/settings-template.toml"); - // The template intentionally leaves output_bucket populated; no env - // overrides so we exercise pure file parsing. + // The template ships with `[output]` commented out — `output` + // is optional for the server and required only for backfill. let settings = temp_env::with_vars(Vec::<(&str, Option)>::new(), || { Settings::new(Some(&template)) })?; assert!(settings.source.contains("hermes.pyth.network")); - assert_eq!(settings.output.bucket, "price"); + assert!(settings.output.is_none()); assert_eq!(settings.interval, Duration::from_secs(60)); Ok(()) } diff --git a/price/src/sinks/file_store.rs b/price/src/sinks/file_store.rs new file mode 100644 index 000000000..c2bf582e2 --- /dev/null +++ b/price/src/sinks/file_store.rs @@ -0,0 +1,29 @@ +use crate::sinks::PriceSink; +use anyhow::Result; +use async_trait::async_trait; +use file_store::file_sink::FileSinkClient; +use helium_proto::PriceReportV1; + +/// Writes each tick to S3 via the existing `file_sink` (rolled into +/// `PriceReportV1..gz` files on the schedule configured in +/// `Server::run`). +pub struct S3PriceSink { + sink: FileSinkClient, +} + +impl S3PriceSink { + pub fn new(sink: FileSinkClient) -> Self { + Self { sink } + } +} + +#[async_trait] +impl PriceSink for S3PriceSink { + async fn write(&self, report: &PriceReportV1) -> Result<()> { + // `PriceReportV1` is a small `Copy` struct — deref to hand it + // by value into the file_sink without taking ownership at the + // trait surface. + self.sink.write(*report, []).await?; + Ok(()) + } +} diff --git a/price/src/sinks/iceberg.rs b/price/src/sinks/iceberg.rs new file mode 100644 index 000000000..5cc6c5eaf --- /dev/null +++ b/price/src/sinks/iceberg.rs @@ -0,0 +1,38 @@ +use crate::iceberg::IcebergPriceReport; +use crate::sinks::PriceSink; +use anyhow::Result; +use async_trait::async_trait; +use helium_iceberg::BatchedWriter; +use helium_proto::PriceReportV1; + +/// Queues each tick into a `BatchedWriter` keyed on the +/// `tokens.prices` Iceberg table. The actual snapshot commit is async +/// and governed by the writer's batch-size / batch-timeout knobs. +pub struct IcebergPriceSink { + writer: BatchedWriter, +} + +impl IcebergPriceSink { + pub fn new(writer: BatchedWriter) -> Self { + Self { writer } + } +} + +#[async_trait] +impl PriceSink for IcebergPriceSink { + async fn write(&self, report: &PriceReportV1) -> Result<()> { + // `try_from` is the proto → iceberg conversion (sets the + // `price_usd` Decimal column from the token's decimals). A + // conversion failure is data-shape, not sink-shape; log and + // skip rather than crashing the daemon. + let record = match IcebergPriceReport::try_from(report) { + Ok(record) => record, + Err(err) => { + tracing::error!(?err, "invalid iceberg record; skipping"); + return Ok(()); + } + }; + self.writer.queue(record).await?; + Ok(()) + } +} diff --git a/price/src/sinks/mod.rs b/price/src/sinks/mod.rs new file mode 100644 index 000000000..e8b3cea99 --- /dev/null +++ b/price/src/sinks/mod.rs @@ -0,0 +1,31 @@ +//! Pluggable destinations for `PriceReportV1` records emitted by the +//! daemon. Each tick is fanned out to every configured sink in +//! `PriceGenerator`. Today we have two implementations: +//! +//! - [`S3PriceSink`] writes proto records to S3 via `file_sink`. +//! - [`IcebergPriceSink`] queues records into a `BatchedWriter` for +//! commit to the `tokens.prices` Iceberg table. +//! +//! Adding another destination (e.g. a webhook, a metric) is a matter of +//! one new impl — `PriceGenerator` doesn't need to learn about it. + +use anyhow::Result; +use async_trait::async_trait; +use helium_proto::PriceReportV1; + +mod file_store; +mod iceberg; + +pub use file_store::S3PriceSink; +pub use iceberg::IcebergPriceSink; + +#[async_trait] +pub trait PriceSink: Send + Sync { + /// Persist a price report. Returning `Err` is fatal — the daemon + /// crashes so the orchestrator can restart it. Both + /// `FileSinkClient::write` and `BatchedWriter::queue` only fail + /// when their backing task is dead, which is unrecoverable in + /// process. Soft skips (e.g. invalid records) should log and + /// return `Ok(())`. + async fn write(&self, report: &PriceReportV1) -> Result<()>; +}