Skip to content

Write price data to iceberg with backfill#1181

Merged
bbalser merged 6 commits intomainfrom
bbalser/price/iceberg
May 6, 2026
Merged

Write price data to iceberg with backfill#1181
bbalser merged 6 commits intomainfrom
bbalser/price/iceberg

Conversation

@bbalser
Copy link
Copy Markdown
Collaborator

@bbalser bbalser commented Apr 27, 2026

Price oracle: Iceberg integration with optional S3, backfill, and pluggable sinks

Branch layered on top of bbalser/helium-icebeg-batched-writer and rebased to current main. 6 commits, 14 files changed (+664 / −41 LOC).

Overview

Adds an Iceberg sink to the price daemon, a one-shot backfill subcommand to seed historical price data into Iceberg from existing S3 protobuf files, and refactors the per-tick output path into a PriceSink trait so S3 and Iceberg are interchangeable, individually optional sinks. Everything writes to the new tokens.prices table.

Commits

SHA Description
9abc534f Initial Iceberg writer + backfill subcommand wired through IcebergTable::write_idempotent
bccec1a9 Cargo.toml cleanup
59b0160c Switch settings to use file_store::BucketSettings
1adfad7f Replace per-tick write_idempotent with the new BatchedWriter (size/time-batched, spool-backed)
3010b1b4 Add price_usd: Decimal column derived via solana::Token::decimals()
6a5f1ead Introduce PriceSink trait; make S3 output optional on the server

Changes by area

1. Iceberg writes (new)

  • New module price/src/iceberg/ with:
    • IcebergPriceReport row type (timestamp, price: u64, price_usd: Decimal, token_type: String).
    • connect_table(&helium_iceberg::Settings) -> IcebergTable<IcebergPriceReport> — ensures namespace and table exist.
  • Schema: tokens.prices, partitioned daily on timestamp.
CREATE TABLE tokens.prices (
    timestamp   timestamptz       NOT NULL,
    price       bigint            NOT NULL,        -- raw scaled u64 (proto compat)
    price_usd   decimal(18, 10)   NOT NULL,        -- USD = price / 10^token.decimals()
    token_type  string            NOT NULL
)
PARTITIONED BY (day(timestamp));
  • price_usd derivation uses solana::Token::decimals() (HNT=8, MOBILE/IOT=6, …), removing the hardcoded HNT_DECIMALS for that column.

2. BatchedWriter for live and backfill paths

Both the daemon and backfill use helium_iceberg::BatchedWriter (introduced upstream in bbalser/helium-icebeg-batched-writer), which:

  • queues records into an Arrow-IPC spool on disk (kernel-page-cache durable),
  • commits a single Iceberg snapshot when size or time threshold hits,
  • replays leftover spool files on startup so a crash between queue ack and commit doesn't lose records.

Knobs in Settings:

Setting Default Purpose
iceberg_batch_size 1000 Max records buffered before forced commit
iceberg_batch_timeout 10 minutes Max wall-clock time between commits

The two writer instances use separate spool dirs (<cache>/iceberg-spool vs <cache>/iceberg-spool-backfill) so they can run on the same host without crossing each other's replay.

3. PriceSink trait — pluggable destinations

New module price/src/sinks/:

#[async_trait]
pub trait PriceSink: Send + Sync {
    async fn write(&self, report: &PriceReportV1) -> Result<()>;
}

Two implementations:

  • S3PriceSink wraps FileSinkClient<PriceReportV1> (rolled .gz files in S3).
  • IcebergPriceSink wraps BatchedWriter<IcebergPriceReport> and owns the proto→iceberg conversion.

PriceGenerator now holds sinks: Vec<Box<dyn PriceSink>> and the per-tick fan-out collapses to:

for sink in &self.sinks {
    sink.write(&price_report).await?;
}

4. Optional S3 output on the server

Settings::output is now Option<file_store::BucketSettings>. Server::run is a fan-out builder:

output iceberg_settings Behavior
Set unset S3 only
unset Set Iceberg only
Set Set Both (existing behavior)
unset unset Error at startup — "no configured sinks"

Local <cache>/hnt.latest cache is always written regardless of sink config (used to seed stale-price fallback after restart).

5. backfill subcommand (new)

price backfill --start-after <RFC3339> --stop-after <RFC3339> reads existing PriceReportV1 S3 files via file_source::Continuous and queues them into Iceberg through the BatchedWriter. Bookkeeping:

  • Tracks processed files in a Postgres files_processed table (new migration 1_files_processed.sql).
  • output (S3) is required for backfill since this is the read source.
  • Designed as a one-shot operator-supervised command — --stop-after should be set to the date Iceberg was first enabled in production so it does not overlap with the daemon's live writes.

6. Configuration surface

price/pkg/settings-template.toml updates:

  • [output] block now commented out with a note (optional for server, required for backfill).
  • New iceberg_batch_size / iceberg_batch_timeout knobs documented.
  • [iceberg_settings] block documented (tokens.prices table).
  • [database] block documented (only consulted by backfill).

Cargo.toml adds: async-trait, solana (workspace path dep — pulls helium_lib::Token), helium-iceberg, trino-rust-client, db-store, sqlx.


Files

 price/Cargo.toml                       |  +8
 price/migrations/1_files_processed.sql |  +7   (new)
 price/pkg/settings-template.toml       | +42  -3
 price/src/backfill.rs                  | +266   (new)
 price/src/iceberg/mod.rs               | +27    (new)
 price/src/iceberg/price_report.rs      | +75    (new)
 price/src/lib.rs                       |  +3
 price/src/main.rs                      | +85  -23
 price/src/price_generator.rs           | +19  -8
 price/src/settings.rs                  | +68  -10
 price/src/sinks/mod.rs                 | +31    (new)
 price/src/sinks/file_store.rs          | +29    (new)
 price/src/sinks/iceberg.rs             | +38    (new)
 Cargo.lock                             |  +7

Comment thread price/Cargo.toml Outdated
Comment thread price/src/settings.rs Outdated
impl Settings {
/// Build a `BucketClient` for the output bucket using the shared
/// `file_store` credentials.
pub async fn output_bucket_client(&self) -> file_store::BucketClient {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably update Settings.file_store to use file_store::BucketSetting and use the connect function provided by that.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update 405640f

Comment thread price/src/main.rs
Comment thread price/src/settings.rs
/// Database settings. Required when running `backfill`; unused by the
/// server path.
#[serde(default)]
pub database: Option<db_store::Settings>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the database is only used for backfill, should this setting be moved in there?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can move if you feel strongly, but this is easier and quicker to get things moving

@bbalser bbalser force-pushed the bbalser/price/iceberg branch 2 times, most recently from f69ac88 to ae2da3a Compare May 5, 2026 20:23
@bbalser bbalser force-pushed the bbalser/price/iceberg branch from f70c094 to 6a5f1ea Compare May 6, 2026 13:36
@bbalser bbalser marked this pull request as ready for review May 6, 2026 13:41
@bbalser bbalser merged commit 2ff3fa5 into main May 6, 2026
29 checks passed
@bbalser bbalser deleted the bbalser/price/iceberg branch May 6, 2026 18:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants