Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion mobile_verifier/src/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use file_store_oracles::FileType;
use futures::StreamExt;
use helium_iceberg::BoxedDataWriter;
use sqlx::PgPool;
use std::time::Duration;
use std::time::{Duration, Instant};
use task_manager::ManagedTask;
use tokio::sync::mpsc::Receiver;

Expand Down Expand Up @@ -104,6 +104,8 @@ impl<C: IcebergBackfill> Backfiller<C> {
return Ok(());
};

let start = Instant::now();

let file_info = file.file_info.clone();
let write_id = file_info.key.clone();
let mut txn = self.pool.begin().await?;
Expand All @@ -125,6 +127,7 @@ impl<C: IcebergBackfill> Backfiller<C> {
written,
skipped = total - written,
age = %file_age(&file_info),
duration = ?start.elapsed(),
"backfilled {} file",
C::FILE_TYPE
);
Expand Down
72 changes: 72 additions & 0 deletions mobile_verifier/src/cli/backfill_speedtest_avg.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use crate::{iceberg, speedtests_average::SpeedtestAvgBackfiller, Settings};
use anyhow::Result;
use chrono::{DateTime, Utc};
use task_manager::TaskManager;

#[derive(Debug, clap::Args)]
pub struct Cmd {
/// Process name for tracking speedtest_avg backfill (avoids conflict with daemon)
#[clap(long, default_value = "speedtest-avg-backfill")]
process_name: String,

/// Start processing files after this timestamp.
/// Format: RFC 3339 (e.g., 2024-01-01T00:00:00Z)
#[clap(long)]
start_after: DateTime<Utc>,

/// Stop processing files when their timestamp is > this value.
/// Use this to avoid reprocessing files that the daemon has already handled.
/// Format: RFC 3339 (e.g., 2025-02-25T00:00:00Z)
#[clap(long)]
stop_after: DateTime<Utc>,
}

impl Cmd {
pub async fn run(self, settings: &Settings) -> Result<()> {
let pool = settings
.database
.connect("mobile-verifier-speedtest-avg-backfill")
.await?;
sqlx::migrate!().run(&pool).await?;

let iceberg_settings = settings.iceberg_settings.as_ref().ok_or_else(|| {
anyhow::anyhow!("iceberg_settings required for speedtest_avg backfill")
})?;

let speedtest_avg_writer = iceberg::PocWriters::from_settings(iceberg_settings)
.await?
.speedtest_avg
.expect("iceberg writer");

tracing::info!(
process_name = %self.process_name,
start_after = %self.start_after,
stop_after = %self.stop_after,
"starting speedtest_avg backfill"
);

let opts = crate::backfill::BackfillOptions {
process_name: self.process_name,
start_after: self.start_after,
stop_after: self.stop_after,
poll_duration: None,
idle_timeout: None,
};

let (backfiller, server) = SpeedtestAvgBackfiller::create(
pool,
settings.buckets.output.connect().await,
Some(speedtest_avg_writer),
Some(opts),
)
.await?;

TaskManager::builder()
.add_task(server)
.add_task(backfiller)
.build()
.start()
.await?;
Ok(())
}
}
1 change: 1 addition & 0 deletions mobile_verifier/src/cli/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod backfill_ban;
pub mod backfill_speedtest;
pub mod backfill_speedtest_avg;
pub mod backfill_unique_connections;
pub mod reward_from_db;
pub mod server;
Expand Down
1 change: 1 addition & 0 deletions mobile_verifier/src/cli/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ impl Cmd {
ingest_bucket_client.clone(),
gateway_client.clone(),
poc_writers.speedtest,
poc_writers.speedtest_avg,
)
.await?,
)
Expand Down
10 changes: 10 additions & 0 deletions mobile_verifier/src/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ pub mod radio_reward;
pub mod radio_reward_covered_hex;
pub mod service_provider_reward;
pub mod speedtest;
pub mod speedtest_avg;
pub mod unallocated_reward;
pub mod unique_connections;

pub use ban::IcebergBan;
pub use heartbeat::IcebergHeartbeat;
pub use speedtest::IcebergSpeedtest;
pub use speedtest_avg::IcebergSpeedtestAvg;
pub use unique_connections::IcebergUniqueConnections;

pub const NAMESPACE: &str = "poc";
Expand All @@ -23,12 +25,14 @@ pub const REWARDS_NAMESPACE: &str = "rewards";
pub type BanWriter = BoxedDataWriter<IcebergBan>;
pub type HeartbeatWriter = BoxedDataWriter<IcebergHeartbeat>;
pub type SpeedtestWriter = BoxedDataWriter<IcebergSpeedtest>;
pub type SpeedtestAvgWriter = BoxedDataWriter<IcebergSpeedtestAvg>;
pub type UniqueConnectionsWriter = BoxedDataWriter<IcebergUniqueConnections>;

pub struct PocWriters {
pub ban: Option<BanWriter>,
pub heartbeat: Option<HeartbeatWriter>,
pub speedtest: Option<SpeedtestWriter>,
pub speedtest_avg: Option<SpeedtestAvgWriter>,
pub unique_connections: Option<UniqueConnectionsWriter>,
}

Expand All @@ -38,6 +42,7 @@ impl PocWriters {
ban: None,
heartbeat: None,
speedtest: None,
speedtest_avg: None,
unique_connections: None,
}
}
Expand All @@ -59,6 +64,10 @@ impl PocWriters {
.create_table_if_not_exists(speedtest::table_definition()?)
.await?
.boxed();
let speedtest_avg = catalog
.create_table_if_not_exists(speedtest_avg::table_definition()?)
.await?
.boxed();
let unique_connections = catalog
.create_table_if_not_exists(unique_connections::table_definition()?)
.await?
Expand All @@ -68,6 +77,7 @@ impl PocWriters {
ban: Some(ban),
heartbeat: Some(heartbeat),
speedtest: Some(speedtest),
speedtest_avg: Some(speedtest_avg),
unique_connections: Some(unique_connections),
})
}
Expand Down
173 changes: 173 additions & 0 deletions mobile_verifier/src/iceberg/speedtest_avg.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
use chrono::{DateTime, FixedOffset, Utc};
use file_store_oracles::mobile::speedtest::cli::{
SpeedtestAverage as FileStoreSpeedtestAverage, SpeedtestAverageEntry,
};
use helium_iceberg::{
FieldDefinition, FieldKind, PartitionDefinition, SortFieldDefinition, TableDefinition,
};
use serde::{Deserialize, Serialize};
use trino_rust_client::Trino;

use crate::speedtests_average::SpeedtestAverage as InMemorySpeedtestAverage;

pub use super::NAMESPACE;
pub const TABLE_NAME: &str = "speedtest_avgs";

#[derive(Debug, Clone, Trino, Serialize, Deserialize, PartialEq)]
pub struct IcebergSpeedtestAvgSample {
pub upload_speed_bps: u64,
pub download_speed_bps: u64,
pub latency_ms: u32,
pub timestamp: DateTime<FixedOffset>,
}

#[derive(Debug, Clone, Trino, Serialize, Deserialize, PartialEq)]
pub struct IcebergSpeedtestAvg {
pub hotspot_pubkey: String,
pub upload_speed_avg_bps: u64,
pub download_speed_avg_bps: u64,
pub latency_avg_ms: u32,
pub reward_multiplier: f32,
pub sample_count: u32,
pub timestamp: DateTime<FixedOffset>,
pub speedtests: Vec<IcebergSpeedtestAvgSample>,
}

fn sample_fields() -> [FieldDefinition; 4] {
[
FieldDefinition::required_long("upload_speed_bps"),
FieldDefinition::required_long("download_speed_bps"),
FieldDefinition::required_long("latency_ms"),
FieldDefinition::required_timestamptz("timestamp"),
]
}

pub fn table_definition() -> helium_iceberg::Result<TableDefinition> {
TableDefinition::builder(NAMESPACE, TABLE_NAME)
.with_fields([
FieldDefinition::required_string("hotspot_pubkey"),
FieldDefinition::required_long("upload_speed_avg_bps"),
FieldDefinition::required_long("download_speed_avg_bps"),
FieldDefinition::required_long("latency_avg_ms"),
FieldDefinition::required_float("reward_multiplier"),
FieldDefinition::required_long("sample_count"),
FieldDefinition::required_timestamptz("timestamp"),
FieldDefinition::required_list("speedtests", FieldKind::struct_type(sample_fields())),
])
.with_partition(PartitionDefinition::day("timestamp", "timestamp_day"))
.with_sort_fields([
SortFieldDefinition::ascending("hotspot_pubkey"),
SortFieldDefinition::ascending("timestamp"),
])
.build()
}

pub async fn get_all(
trino: &trino_rust_client::Client,
) -> anyhow::Result<Vec<IcebergSpeedtestAvg>> {
let all = match trino
.get_all(format!("SELECT * from {NAMESPACE}.{TABLE_NAME}"))
.await
{
Ok(all) => all.into_vec(),
Err(trino_rust_client::error::Error::EmptyData) => vec![],
Err(err) => return Err(err.into()),
};
Ok(all)
}

impl From<&SpeedtestAverageEntry> for IcebergSpeedtestAvgSample {
fn from(value: &SpeedtestAverageEntry) -> Self {
Self {
upload_speed_bps: value.upload_speed_bps,
download_speed_bps: value.download_speed_bps,
latency_ms: value.latency_ms,
timestamp: value.timestamp.into(),
}
}
}

impl From<&FileStoreSpeedtestAverage> for IcebergSpeedtestAvg {
fn from(value: &FileStoreSpeedtestAverage) -> Self {
Self {
hotspot_pubkey: value.pub_key.to_string(),
upload_speed_avg_bps: value.upload_speed_avg_bps,
download_speed_avg_bps: value.download_speed_avg_bps,
latency_avg_ms: value.latency_avg_ms,
reward_multiplier: value.reward_multiplier,
sample_count: value.speedtests.len() as u32,
timestamp: value.timestamp.into(),
speedtests: value
.speedtests
.iter()
.map(IcebergSpeedtestAvgSample::from)
.collect(),
}
}
}

impl From<&InMemorySpeedtestAverage> for IcebergSpeedtestAvg {
fn from(value: &InMemorySpeedtestAverage) -> Self {
let timestamp = Utc::now();
let speedtests = value
.speedtests
.iter()
.map(|st| IcebergSpeedtestAvgSample {
upload_speed_bps: st.report.upload_speed,
download_speed_bps: st.report.download_speed,
latency_ms: st.report.latency,
timestamp: st.report.timestamp.into(),
})
.collect();
Self {
hotspot_pubkey: value.pubkey.to_string(),
upload_speed_avg_bps: value.upload_speed_avg_bps,
download_speed_avg_bps: value.download_speed_avg_bps,
latency_avg_ms: value.latency_avg_ms,
reward_multiplier: value.reward_multiplier.try_into().unwrap_or(0.0),
sample_count: value.speedtests.len() as u32,
timestamp: timestamp.into(),
speedtests,
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use chrono::SubsecRound;
use helium_crypto::PublicKeyBinary;
use helium_proto::services::poc_mobile::SpeedtestAvgValidity;

#[test]
fn convert_file_store_speedtest_average() {
let pub_key: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6"
.parse()
.unwrap();
let now = Utc::now().trunc_subsecs(3);
let avg = FileStoreSpeedtestAverage {
pub_key: pub_key.clone(),
upload_speed_avg_bps: 10_000_000,
download_speed_avg_bps: 100_000_000,
latency_avg_ms: 25,
validity: SpeedtestAvgValidity::Valid,
speedtests: vec![SpeedtestAverageEntry {
upload_speed_bps: 11_000_000,
download_speed_bps: 110_000_000,
latency_ms: 20,
timestamp: now,
}],
timestamp: now,
reward_multiplier: 1.0,
};

let row = IcebergSpeedtestAvg::from(&avg);
assert_eq!(row.hotspot_pubkey, pub_key.to_string());
assert_eq!(row.upload_speed_avg_bps, 10_000_000);
assert_eq!(row.download_speed_avg_bps, 100_000_000);
assert_eq!(row.latency_avg_ms, 25);
assert_eq!(row.reward_multiplier, 1.0);
assert_eq!(row.sample_count, 1);
assert_eq!(row.speedtests.len(), 1);
}
}
7 changes: 5 additions & 2 deletions mobile_verifier/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use anyhow::Result;
use clap::Parser;
use mobile_verifier::{
cli::{
backfill_ban, backfill_speedtest, backfill_unique_connections, reward_from_db, server,
verify_disktree,
backfill_ban, backfill_speedtest, backfill_speedtest_avg, backfill_unique_connections,
reward_from_db, server, verify_disktree,
},
Settings,
};
Expand Down Expand Up @@ -43,6 +43,8 @@ pub enum Cmd {
VerifyDisktree(verify_disktree::Cmd),
/// Backfill historical VerifiedSpeedtest files to the poc.speedtests iceberg table.
BackfillSpeedtest(backfill_speedtest::Cmd),
/// Backfill historical SpeedtestAvg files to the poc.speedtest_avgs iceberg table.
BackfillSpeedtestAvg(backfill_speedtest_avg::Cmd),
/// Backfill historical VerifiedUniqueConnections files to the poc.unique_connections iceberg table.
BackfillUniqueConnections(backfill_unique_connections::Cmd),
/// Backfill historical VerifiedMobileBanReport files to the poc.bans iceberg table.
Expand All @@ -56,6 +58,7 @@ impl Cmd {
Self::RewardFromDb(cmd) => cmd.run(&settings).await,
Self::VerifyDisktree(cmd) => cmd.run(&settings).await,
Self::BackfillSpeedtest(cmd) => cmd.run(&settings).await,
Self::BackfillSpeedtestAvg(cmd) => cmd.run(&settings).await,
Self::BackfillUniqueConnections(cmd) => cmd.run(&settings).await,
Self::BackfillBan(cmd) => cmd.run(&settings).await,
}
Expand Down
Loading
Loading