diff --git a/mobile_verifier/src/backfill.rs b/mobile_verifier/src/backfill.rs index 3374fd295..5c3110e20 100644 --- a/mobile_verifier/src/backfill.rs +++ b/mobile_verifier/src/backfill.rs @@ -8,7 +8,7 @@ use file_store_oracles::FileType; use futures::StreamExt; use helium_iceberg::BoxedDataWriter; use sqlx::PgPool; -use std::time::Duration; +use std::time::{Duration, Instant}; use task_manager::ManagedTask; use tokio::sync::mpsc::Receiver; @@ -104,6 +104,8 @@ impl Backfiller { return Ok(()); }; + let start = Instant::now(); + let file_info = file.file_info.clone(); let write_id = file_info.key.clone(); let mut txn = self.pool.begin().await?; @@ -125,6 +127,7 @@ impl Backfiller { written, skipped = total - written, age = %file_age(&file_info), + duration = ?start.elapsed(), "backfilled {} file", C::FILE_TYPE ); diff --git a/mobile_verifier/src/cli/backfill_speedtest_avg.rs b/mobile_verifier/src/cli/backfill_speedtest_avg.rs new file mode 100644 index 000000000..bf1aca20e --- /dev/null +++ b/mobile_verifier/src/cli/backfill_speedtest_avg.rs @@ -0,0 +1,72 @@ +use crate::{iceberg, speedtests_average::SpeedtestAvgBackfiller, Settings}; +use anyhow::Result; +use chrono::{DateTime, Utc}; +use task_manager::TaskManager; + +#[derive(Debug, clap::Args)] +pub struct Cmd { + /// Process name for tracking speedtest_avg backfill (avoids conflict with daemon) + #[clap(long, default_value = "speedtest-avg-backfill")] + process_name: String, + + /// Start processing files after this timestamp. + /// Format: RFC 3339 (e.g., 2024-01-01T00:00:00Z) + #[clap(long)] + start_after: DateTime, + + /// Stop processing files when their timestamp is > this value. + /// Use this to avoid reprocessing files that the daemon has already handled. + /// Format: RFC 3339 (e.g., 2025-02-25T00:00:00Z) + #[clap(long)] + stop_after: DateTime, +} + +impl Cmd { + pub async fn run(self, settings: &Settings) -> Result<()> { + let pool = settings + .database + .connect("mobile-verifier-speedtest-avg-backfill") + .await?; + sqlx::migrate!().run(&pool).await?; + + let iceberg_settings = settings.iceberg_settings.as_ref().ok_or_else(|| { + anyhow::anyhow!("iceberg_settings required for speedtest_avg backfill") + })?; + + let speedtest_avg_writer = iceberg::PocWriters::from_settings(iceberg_settings) + .await? + .speedtest_avg + .expect("iceberg writer"); + + tracing::info!( + process_name = %self.process_name, + start_after = %self.start_after, + stop_after = %self.stop_after, + "starting speedtest_avg backfill" + ); + + let opts = crate::backfill::BackfillOptions { + process_name: self.process_name, + start_after: self.start_after, + stop_after: self.stop_after, + poll_duration: None, + idle_timeout: None, + }; + + let (backfiller, server) = SpeedtestAvgBackfiller::create( + pool, + settings.buckets.output.connect().await, + Some(speedtest_avg_writer), + Some(opts), + ) + .await?; + + TaskManager::builder() + .add_task(server) + .add_task(backfiller) + .build() + .start() + .await?; + Ok(()) + } +} diff --git a/mobile_verifier/src/cli/mod.rs b/mobile_verifier/src/cli/mod.rs index 4afe578ba..142dd7b79 100644 --- a/mobile_verifier/src/cli/mod.rs +++ b/mobile_verifier/src/cli/mod.rs @@ -1,5 +1,6 @@ pub mod backfill_ban; pub mod backfill_speedtest; +pub mod backfill_speedtest_avg; pub mod backfill_unique_connections; pub mod reward_from_db; pub mod server; diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index ca967be79..498b3aa25 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -116,6 +116,7 @@ impl Cmd { ingest_bucket_client.clone(), gateway_client.clone(), poc_writers.speedtest, + poc_writers.speedtest_avg, ) .await?, ) diff --git a/mobile_verifier/src/iceberg/mod.rs b/mobile_verifier/src/iceberg/mod.rs index 7ee8a7d70..003bbb0b2 100644 --- a/mobile_verifier/src/iceberg/mod.rs +++ b/mobile_verifier/src/iceberg/mod.rs @@ -9,12 +9,14 @@ pub mod radio_reward; pub mod radio_reward_covered_hex; pub mod service_provider_reward; pub mod speedtest; +pub mod speedtest_avg; pub mod unallocated_reward; pub mod unique_connections; pub use ban::IcebergBan; pub use heartbeat::IcebergHeartbeat; pub use speedtest::IcebergSpeedtest; +pub use speedtest_avg::IcebergSpeedtestAvg; pub use unique_connections::IcebergUniqueConnections; pub const NAMESPACE: &str = "poc"; @@ -23,12 +25,14 @@ pub const REWARDS_NAMESPACE: &str = "rewards"; pub type BanWriter = BoxedDataWriter; pub type HeartbeatWriter = BoxedDataWriter; pub type SpeedtestWriter = BoxedDataWriter; +pub type SpeedtestAvgWriter = BoxedDataWriter; pub type UniqueConnectionsWriter = BoxedDataWriter; pub struct PocWriters { pub ban: Option, pub heartbeat: Option, pub speedtest: Option, + pub speedtest_avg: Option, pub unique_connections: Option, } @@ -38,6 +42,7 @@ impl PocWriters { ban: None, heartbeat: None, speedtest: None, + speedtest_avg: None, unique_connections: None, } } @@ -59,6 +64,10 @@ impl PocWriters { .create_table_if_not_exists(speedtest::table_definition()?) .await? .boxed(); + let speedtest_avg = catalog + .create_table_if_not_exists(speedtest_avg::table_definition()?) + .await? + .boxed(); let unique_connections = catalog .create_table_if_not_exists(unique_connections::table_definition()?) .await? @@ -68,6 +77,7 @@ impl PocWriters { ban: Some(ban), heartbeat: Some(heartbeat), speedtest: Some(speedtest), + speedtest_avg: Some(speedtest_avg), unique_connections: Some(unique_connections), }) } diff --git a/mobile_verifier/src/iceberg/speedtest_avg.rs b/mobile_verifier/src/iceberg/speedtest_avg.rs new file mode 100644 index 000000000..f8737525c --- /dev/null +++ b/mobile_verifier/src/iceberg/speedtest_avg.rs @@ -0,0 +1,173 @@ +use chrono::{DateTime, FixedOffset, Utc}; +use file_store_oracles::mobile::speedtest::cli::{ + SpeedtestAverage as FileStoreSpeedtestAverage, SpeedtestAverageEntry, +}; +use helium_iceberg::{ + FieldDefinition, FieldKind, PartitionDefinition, SortFieldDefinition, TableDefinition, +}; +use serde::{Deserialize, Serialize}; +use trino_rust_client::Trino; + +use crate::speedtests_average::SpeedtestAverage as InMemorySpeedtestAverage; + +pub use super::NAMESPACE; +pub const TABLE_NAME: &str = "speedtest_avgs"; + +#[derive(Debug, Clone, Trino, Serialize, Deserialize, PartialEq)] +pub struct IcebergSpeedtestAvgSample { + pub upload_speed_bps: u64, + pub download_speed_bps: u64, + pub latency_ms: u32, + pub timestamp: DateTime, +} + +#[derive(Debug, Clone, Trino, Serialize, Deserialize, PartialEq)] +pub struct IcebergSpeedtestAvg { + pub hotspot_pubkey: String, + pub upload_speed_avg_bps: u64, + pub download_speed_avg_bps: u64, + pub latency_avg_ms: u32, + pub reward_multiplier: f32, + pub sample_count: u32, + pub timestamp: DateTime, + pub speedtests: Vec, +} + +fn sample_fields() -> [FieldDefinition; 4] { + [ + FieldDefinition::required_long("upload_speed_bps"), + FieldDefinition::required_long("download_speed_bps"), + FieldDefinition::required_long("latency_ms"), + FieldDefinition::required_timestamptz("timestamp"), + ] +} + +pub fn table_definition() -> helium_iceberg::Result { + TableDefinition::builder(NAMESPACE, TABLE_NAME) + .with_fields([ + FieldDefinition::required_string("hotspot_pubkey"), + FieldDefinition::required_long("upload_speed_avg_bps"), + FieldDefinition::required_long("download_speed_avg_bps"), + FieldDefinition::required_long("latency_avg_ms"), + FieldDefinition::required_float("reward_multiplier"), + FieldDefinition::required_long("sample_count"), + FieldDefinition::required_timestamptz("timestamp"), + FieldDefinition::required_list("speedtests", FieldKind::struct_type(sample_fields())), + ]) + .with_partition(PartitionDefinition::day("timestamp", "timestamp_day")) + .with_sort_fields([ + SortFieldDefinition::ascending("hotspot_pubkey"), + SortFieldDefinition::ascending("timestamp"), + ]) + .build() +} + +pub async fn get_all( + trino: &trino_rust_client::Client, +) -> anyhow::Result> { + let all = match trino + .get_all(format!("SELECT * from {NAMESPACE}.{TABLE_NAME}")) + .await + { + Ok(all) => all.into_vec(), + Err(trino_rust_client::error::Error::EmptyData) => vec![], + Err(err) => return Err(err.into()), + }; + Ok(all) +} + +impl From<&SpeedtestAverageEntry> for IcebergSpeedtestAvgSample { + fn from(value: &SpeedtestAverageEntry) -> Self { + Self { + upload_speed_bps: value.upload_speed_bps, + download_speed_bps: value.download_speed_bps, + latency_ms: value.latency_ms, + timestamp: value.timestamp.into(), + } + } +} + +impl From<&FileStoreSpeedtestAverage> for IcebergSpeedtestAvg { + fn from(value: &FileStoreSpeedtestAverage) -> Self { + Self { + hotspot_pubkey: value.pub_key.to_string(), + upload_speed_avg_bps: value.upload_speed_avg_bps, + download_speed_avg_bps: value.download_speed_avg_bps, + latency_avg_ms: value.latency_avg_ms, + reward_multiplier: value.reward_multiplier, + sample_count: value.speedtests.len() as u32, + timestamp: value.timestamp.into(), + speedtests: value + .speedtests + .iter() + .map(IcebergSpeedtestAvgSample::from) + .collect(), + } + } +} + +impl From<&InMemorySpeedtestAverage> for IcebergSpeedtestAvg { + fn from(value: &InMemorySpeedtestAverage) -> Self { + let timestamp = Utc::now(); + let speedtests = value + .speedtests + .iter() + .map(|st| IcebergSpeedtestAvgSample { + upload_speed_bps: st.report.upload_speed, + download_speed_bps: st.report.download_speed, + latency_ms: st.report.latency, + timestamp: st.report.timestamp.into(), + }) + .collect(); + Self { + hotspot_pubkey: value.pubkey.to_string(), + upload_speed_avg_bps: value.upload_speed_avg_bps, + download_speed_avg_bps: value.download_speed_avg_bps, + latency_avg_ms: value.latency_avg_ms, + reward_multiplier: value.reward_multiplier.try_into().unwrap_or(0.0), + sample_count: value.speedtests.len() as u32, + timestamp: timestamp.into(), + speedtests, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::SubsecRound; + use helium_crypto::PublicKeyBinary; + use helium_proto::services::poc_mobile::SpeedtestAvgValidity; + + #[test] + fn convert_file_store_speedtest_average() { + let pub_key: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" + .parse() + .unwrap(); + let now = Utc::now().trunc_subsecs(3); + let avg = FileStoreSpeedtestAverage { + pub_key: pub_key.clone(), + upload_speed_avg_bps: 10_000_000, + download_speed_avg_bps: 100_000_000, + latency_avg_ms: 25, + validity: SpeedtestAvgValidity::Valid, + speedtests: vec![SpeedtestAverageEntry { + upload_speed_bps: 11_000_000, + download_speed_bps: 110_000_000, + latency_ms: 20, + timestamp: now, + }], + timestamp: now, + reward_multiplier: 1.0, + }; + + let row = IcebergSpeedtestAvg::from(&avg); + assert_eq!(row.hotspot_pubkey, pub_key.to_string()); + assert_eq!(row.upload_speed_avg_bps, 10_000_000); + assert_eq!(row.download_speed_avg_bps, 100_000_000); + assert_eq!(row.latency_avg_ms, 25); + assert_eq!(row.reward_multiplier, 1.0); + assert_eq!(row.sample_count, 1); + assert_eq!(row.speedtests.len(), 1); + } +} diff --git a/mobile_verifier/src/main.rs b/mobile_verifier/src/main.rs index 5f2b33047..6489326a1 100644 --- a/mobile_verifier/src/main.rs +++ b/mobile_verifier/src/main.rs @@ -2,8 +2,8 @@ use anyhow::Result; use clap::Parser; use mobile_verifier::{ cli::{ - backfill_ban, backfill_speedtest, backfill_unique_connections, reward_from_db, server, - verify_disktree, + backfill_ban, backfill_speedtest, backfill_speedtest_avg, backfill_unique_connections, + reward_from_db, server, verify_disktree, }, Settings, }; @@ -43,6 +43,8 @@ pub enum Cmd { VerifyDisktree(verify_disktree::Cmd), /// Backfill historical VerifiedSpeedtest files to the poc.speedtests iceberg table. BackfillSpeedtest(backfill_speedtest::Cmd), + /// Backfill historical SpeedtestAvg files to the poc.speedtest_avgs iceberg table. + BackfillSpeedtestAvg(backfill_speedtest_avg::Cmd), /// Backfill historical VerifiedUniqueConnections files to the poc.unique_connections iceberg table. BackfillUniqueConnections(backfill_unique_connections::Cmd), /// Backfill historical VerifiedMobileBanReport files to the poc.bans iceberg table. @@ -56,6 +58,7 @@ impl Cmd { Self::RewardFromDb(cmd) => cmd.run(&settings).await, Self::VerifyDisktree(cmd) => cmd.run(&settings).await, Self::BackfillSpeedtest(cmd) => cmd.run(&settings).await, + Self::BackfillSpeedtestAvg(cmd) => cmd.run(&settings).await, Self::BackfillUniqueConnections(cmd) => cmd.run(&settings).await, Self::BackfillBan(cmd) => cmd.run(&settings).await, } diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index e778c0381..aa39c4382 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -18,7 +18,7 @@ use file_store_oracles::{ use futures::stream::{StreamExt, TryStreamExt}; use helium_crypto::PublicKeyBinary; use helium_proto::services::poc_mobile::{ - SpeedtestAvg as SpeedtestAvgProto, SpeedtestIngestReportV1, + SpeedtestAvg as SpeedtestAvgProto, SpeedtestAvgValidity, SpeedtestIngestReportV1, SpeedtestVerificationResult as SpeedtestResult, VerifiedSpeedtest as VerifiedSpeedtestProto, }; use mobile_config::gateway::client::GatewayInfoResolver; @@ -84,6 +84,7 @@ pub struct SpeedtestDaemon { speedtest_avg_file_sink: FileSinkClient, verified_speedtest_file_sink: FileSinkClient, iceberg_writer: Option, + speedtest_avg_iceberg_writer: Option, } impl SpeedtestDaemon @@ -97,6 +98,7 @@ where bucket_client: BucketClient, gateway_resolver: GIR, iceberg_writer: Option, + speedtest_avg_iceberg_writer: Option, ) -> anyhow::Result { let (speedtests_avg, speedtests_avg_server) = SpeedtestAvgProto::file_sink( &settings.cache, @@ -131,6 +133,7 @@ where speedtests_avg, speedtests_validity, iceberg_writer, + speedtest_avg_iceberg_writer, ); Ok(TaskManager::builder() @@ -148,6 +151,7 @@ where speedtest_avg_file_sink: FileSinkClient, verified_speedtest_file_sink: FileSinkClient, iceberg_writer: Option, + speedtest_avg_iceberg_writer: Option, ) -> Self { Self { pool, @@ -156,6 +160,7 @@ where speedtest_avg_file_sink, verified_speedtest_file_sink, iceberg_writer, + speedtest_avg_iceberg_writer, } } @@ -189,6 +194,7 @@ where let mut speedtests = file.into_stream(&mut transaction).await?; let mut iceberg_records = Vec::new(); + let mut iceberg_avg_records = Vec::new(); while let Some(speedtest_report) = speedtests.next().await { let result = self.validate_speedtest(&speedtest_report).await?; @@ -206,6 +212,11 @@ where if self.iceberg_writer.is_some() { iceberg_records.push(iceberg::IcebergSpeedtest::from(&speedtest_report)); } + if self.speedtest_avg_iceberg_writer.is_some() + && average.validity == SpeedtestAvgValidity::Valid + { + iceberg_avg_records.push(iceberg::IcebergSpeedtestAvg::from(&average)); + } } // write out paper trail of speedtest validity self.write_verified_speedtest(speedtest_report, result) @@ -214,6 +225,12 @@ where iceberg::maybe_write_idempotent(self.iceberg_writer.as_ref(), &write_id, iceberg_records) .await?; + iceberg::maybe_write_idempotent( + self.speedtest_avg_iceberg_writer.as_ref(), + &write_id, + iceberg_avg_records, + ) + .await?; self.speedtest_avg_file_sink.commit().await?; self.verified_speedtest_file_sink.commit().await?; diff --git a/mobile_verifier/src/speedtests_average.rs b/mobile_verifier/src/speedtests_average.rs index 444662b53..0ede9427a 100644 --- a/mobile_verifier/src/speedtests_average.rs +++ b/mobile_verifier/src/speedtests_average.rs @@ -1,13 +1,34 @@ -use crate::speedtests::{self, Speedtest}; +use crate::{ + backfill::{Backfiller, IcebergBackfill}, + iceberg, + speedtests::{self, Speedtest}, +}; use chrono::{DateTime, Utc}; use file_store::{file_sink::FileSinkClient, traits::TimestampEncode}; -use file_store_oracles::traits::MsgTimestamp; +use file_store_oracles::{traits::MsgTimestamp, FileType}; use helium_crypto::PublicKeyBinary; -use helium_proto::services::poc_mobile as proto; +use helium_proto::services::poc_mobile::{self as proto, SpeedtestAvgValidity}; use rust_decimal::Decimal; use rust_decimal_macros::dec; use std::collections::HashMap; +// ── SpeedtestAvg backfill ───────────────────────────────────────────────────── + +pub struct SpeedtestAvgConverter; + +impl IcebergBackfill for SpeedtestAvgConverter { + type FileRecord = file_store_oracles::mobile::speedtest::cli::SpeedtestAverage; + type IcebergRow = iceberg::IcebergSpeedtestAvg; + const FILE_TYPE: FileType = FileType::SpeedtestAvg; + + fn convert(record: Self::FileRecord) -> Option { + (record.validity == SpeedtestAvgValidity::Valid) + .then(|| iceberg::IcebergSpeedtestAvg::from(&record)) + } +} + +pub type SpeedtestAvgBackfiller = Backfiller; + pub const SPEEDTEST_LAPSE: i64 = 48; const MIN_DOWNLOAD: u64 = mbps(30); const MIN_UPLOAD: u64 = mbps(2); diff --git a/mobile_verifier/tests/integrations/common/mod.rs b/mobile_verifier/tests/integrations/common/mod.rs index 98036d4c3..49cb666b1 100644 --- a/mobile_verifier/tests/integrations/common/mod.rs +++ b/mobile_verifier/tests/integrations/common/mod.rs @@ -440,6 +440,7 @@ pub async fn setup_iceberg() -> anyhow::Result { mobile_verifier::iceberg::ban::table_definition()?, mobile_verifier::iceberg::heartbeat::table_definition()?, mobile_verifier::iceberg::speedtest::table_definition()?, + mobile_verifier::iceberg::speedtest_avg::table_definition()?, mobile_verifier::iceberg::unique_connections::table_definition()?, ]) .await?; diff --git a/mobile_verifier/tests/integrations/main.rs b/mobile_verifier/tests/integrations/main.rs index ef466a562..d9fed4625 100644 --- a/mobile_verifier/tests/integrations/main.rs +++ b/mobile_verifier/tests/integrations/main.rs @@ -12,6 +12,7 @@ mod modeled_coverage; mod rewarder_poc_dc; mod rewarder_sp_rewards; mod seniority; +mod speedtest_avgs_iceberg; mod speedtests; mod speedtests_iceberg; mod unique_connections_iceberg; diff --git a/mobile_verifier/tests/integrations/speedtest_avgs_iceberg.rs b/mobile_verifier/tests/integrations/speedtest_avgs_iceberg.rs new file mode 100644 index 000000000..045bcad7a --- /dev/null +++ b/mobile_verifier/tests/integrations/speedtest_avgs_iceberg.rs @@ -0,0 +1,301 @@ +use chrono::{DateTime, Duration, SubsecRound, Utc}; +use file_store::{aws_local::AwsLocal, traits::TimestampEncode}; +use file_store_oracles::FileType; +use helium_crypto::PublicKeyBinary; +use helium_proto::services::poc_mobile::{Speedtest, SpeedtestAvg, SpeedtestAvgValidity}; +use mobile_verifier::{ + backfill::BackfillOptions, + iceberg::{self, speedtest_avg::IcebergSpeedtestAvg}, + speedtests_average::SpeedtestAvgBackfiller, +}; +use sqlx::PgPool; + +const TEST_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); + +fn test_backfill_options( + process_name: &str, + start_after: DateTime, + stop_after: DateTime, +) -> BackfillOptions { + BackfillOptions { + process_name: process_name.to_string(), + start_after, + stop_after, + poll_duration: Some(std::time::Duration::from_millis(100)), + idle_timeout: Some(std::time::Duration::from_millis(500)), + } +} + +fn make_iceberg_speedtest_avg(seed: u8) -> IcebergSpeedtestAvg { + let pubkey = PublicKeyBinary::from(vec![seed, seed, seed]); + let now = Utc::now().trunc_subsecs(3); + IcebergSpeedtestAvg { + hotspot_pubkey: pubkey.to_string(), + upload_speed_avg_bps: 10_000_000, + download_speed_avg_bps: 100_000_000, + latency_avg_ms: 25, + reward_multiplier: 1.0, + sample_count: 2, + timestamp: now.into(), + speedtests: vec![], + } +} + +fn make_speedtest_avg_proto( + timestamp: DateTime, + validity: SpeedtestAvgValidity, +) -> SpeedtestAvg { + let pubkey: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" + .parse() + .unwrap(); + let ts_s = timestamp.encode_timestamp(); + SpeedtestAvg { + pub_key: pubkey.as_ref().to_vec(), + upload_speed_avg_bps: 10_000_000, + download_speed_avg_bps: 100_000_000, + latency_avg_ms: 25, + timestamp: ts_s, + speedtests: vec![Speedtest { + upload_speed_bps: 11_000_000, + download_speed_bps: 110_000_000, + latency_ms: 20, + timestamp: ts_s, + }], + validity: validity as i32, + reward_multiplier: 1.0, + } +} + +// ── Pure write tests (no DB needed) ────────────────────────────────────────── + +#[tokio::test] +async fn write_single_speedtest_avg() -> anyhow::Result<()> { + let harness = crate::common::setup_iceberg().await?; + let writer = harness + .get_table_writer::(iceberg::speedtest_avg::TABLE_NAME) + .await?; + + let record = make_iceberg_speedtest_avg(1); + + writer + .write_idempotent("test_single", vec![record.clone()]) + .await?; + + let all = iceberg::speedtest_avg::get_all(harness.trino()).await?; + assert_eq!(all, vec![record]); + + Ok(()) +} + +#[tokio::test] +async fn write_multiple_speedtest_avgs() -> anyhow::Result<()> { + let harness = crate::common::setup_iceberg().await?; + let writer = harness + .get_table_writer::(iceberg::speedtest_avg::TABLE_NAME) + .await?; + + let records: Vec = (1u8..=5).map(make_iceberg_speedtest_avg).collect(); + + writer.write_idempotent("test_multiple", records).await?; + + let all = iceberg::speedtest_avg::get_all(harness.trino()).await?; + assert_eq!(all.len(), 5); + + Ok(()) +} + +#[tokio::test] +async fn idempotent_write_deduplicates() -> anyhow::Result<()> { + let harness = crate::common::setup_iceberg().await?; + let writer = harness + .get_table_writer::(iceberg::speedtest_avg::TABLE_NAME) + .await?; + + let record = make_iceberg_speedtest_avg(1); + + writer + .write_idempotent("same_id", vec![record.clone()]) + .await?; + writer + .write_idempotent("same_id", vec![record.clone()]) + .await?; + + let all = iceberg::speedtest_avg::get_all(harness.trino()).await?; + assert_eq!(all.len(), 1, "second write with same id should be a no-op"); + + Ok(()) +} + +// ── Backfill tests (DB needed for file-tracking state) ──────────────────────── + +#[sqlx::test] +async fn backfill_writes_speedtest_avgs_to_iceberg(pool: PgPool) -> anyhow::Result<()> { + let harness = crate::common::setup_iceberg().await?; + let writer = harness + .get_table_writer::(iceberg::speedtest_avg::TABLE_NAME) + .await?; + + let awsl = AwsLocal::new().await; + awsl.create_bucket().await?; + + let base_time = Utc::now() - Duration::hours(1); + let start_time = base_time - Duration::minutes(1); + let file1_time = base_time; + let file2_time = base_time + Duration::minutes(5); + let end_time = base_time + Duration::days(42); + + awsl.put_protos_at_time( + FileType::SpeedtestAvg.to_string(), + vec![make_speedtest_avg_proto( + file1_time, + SpeedtestAvgValidity::Valid, + )], + file1_time, + ) + .await?; + + awsl.put_protos_at_time( + FileType::SpeedtestAvg.to_string(), + vec![make_speedtest_avg_proto( + file2_time, + SpeedtestAvgValidity::Valid, + )], + file2_time, + ) + .await?; + + let opts = test_backfill_options("test-avg-backfill", start_time, end_time); + let (backfiller, server) = + SpeedtestAvgBackfiller::create(pool, awsl.bucket_client(), Some(writer), Some(opts)) + .await?; + + tokio::time::timeout( + TEST_TIMEOUT, + task_manager::TaskManager::builder() + .add_task(server) + .add_task(backfiller) + .build() + .start(), + ) + .await + .map_err(|_| anyhow::anyhow!("backfill timed out after {:?}", TEST_TIMEOUT))??; + + let rows = iceberg::speedtest_avg::get_all(harness.trino()).await?; + assert_eq!(rows.len(), 2, "expected 2 speedtest_avgs in iceberg"); + + awsl.cleanup().await?; + Ok(()) +} + +#[sqlx::test] +async fn backfill_stops_at_timestamp(pool: PgPool) -> anyhow::Result<()> { + let harness = crate::common::setup_iceberg().await?; + let writer = harness + .get_table_writer::(iceberg::speedtest_avg::TABLE_NAME) + .await?; + + let awsl = AwsLocal::new().await; + awsl.create_bucket().await?; + + let base_time = Utc::now() - Duration::hours(2); + let start_time = base_time - Duration::minutes(1); + let file1_time = base_time; + let file2_time = base_time + Duration::minutes(30); + let stop_time = base_time + Duration::minutes(45); + let file3_time = base_time + Duration::hours(1); // should be skipped + + for (time, label) in [ + (file1_time, "file1"), + (file2_time, "file2"), + (file3_time, "file3"), + ] { + awsl.put_protos_at_time( + FileType::SpeedtestAvg.to_string(), + vec![make_speedtest_avg_proto(time, SpeedtestAvgValidity::Valid)], + time, + ) + .await + .unwrap_or_else(|e| panic!("failed to put {label}: {e}")); + } + + let opts = test_backfill_options("test-avg-backfill-stop", start_time, stop_time); + let (backfiller, server) = + SpeedtestAvgBackfiller::create(pool, awsl.bucket_client(), Some(writer), Some(opts)) + .await?; + + tokio::time::timeout( + TEST_TIMEOUT, + task_manager::TaskManager::builder() + .add_task(server) + .add_task(backfiller) + .build() + .start(), + ) + .await + .map_err(|_| anyhow::anyhow!("backfill timed out after {:?}", TEST_TIMEOUT))??; + + let rows = iceberg::speedtest_avg::get_all(harness.trino()).await?; + assert_eq!( + rows.len(), + 2, + "expected 2 speedtest_avgs (file3 after stop_after should be skipped)" + ); + + awsl.cleanup().await?; + Ok(()) +} + +#[sqlx::test] +async fn backfill_filters_invalid_speedtest_avgs(pool: PgPool) -> anyhow::Result<()> { + let harness = crate::common::setup_iceberg().await?; + let writer = harness + .get_table_writer::(iceberg::speedtest_avg::TABLE_NAME) + .await?; + + let awsl = AwsLocal::new().await; + awsl.create_bucket().await?; + + let base_time = Utc::now() - Duration::hours(1); + let start_time = base_time - Duration::minutes(1); + let file_time = base_time; + let end_time = base_time + Duration::days(42); + + awsl.put_protos_at_time( + FileType::SpeedtestAvg.to_string(), + vec![ + make_speedtest_avg_proto(file_time, SpeedtestAvgValidity::Valid), + make_speedtest_avg_proto(file_time, SpeedtestAvgValidity::TooFewSamples), + make_speedtest_avg_proto(file_time, SpeedtestAvgValidity::SlowDownloadSpeed), + make_speedtest_avg_proto(file_time, SpeedtestAvgValidity::SlowUploadSpeed), + make_speedtest_avg_proto(file_time, SpeedtestAvgValidity::HighLatency), + ], + file_time, + ) + .await?; + + let opts = test_backfill_options("test-avg-backfill-filter", start_time, end_time); + let (backfiller, server) = + SpeedtestAvgBackfiller::create(pool, awsl.bucket_client(), Some(writer), Some(opts)) + .await?; + + tokio::time::timeout( + TEST_TIMEOUT, + task_manager::TaskManager::builder() + .add_task(server) + .add_task(backfiller) + .build() + .start(), + ) + .await + .map_err(|_| anyhow::anyhow!("backfill timed out after {:?}", TEST_TIMEOUT))??; + + let rows = iceberg::speedtest_avg::get_all(harness.trino()).await?; + assert_eq!( + rows.len(), + 1, + "only valid speedtest_avgs should be written to iceberg" + ); + + awsl.cleanup().await?; + Ok(()) +} diff --git a/mobile_verifier/tests/integrations/speedtests.rs b/mobile_verifier/tests/integrations/speedtests.rs index aded86ff5..161290f9f 100644 --- a/mobile_verifier/tests/integrations/speedtests.rs +++ b/mobile_verifier/tests/integrations/speedtests.rs @@ -75,6 +75,7 @@ async fn speedtests_average_should_only_include_last_48_hours( speedtest_avg_client, verified_client, None, + None, ); daemon.process_file(stream).await?; @@ -106,6 +107,7 @@ async fn speedtest_upload_exceeds_300megabits_ps_limit(pool: Pool) -> speedtest_avg_client, verified_client, None, + None, ); let speedtest_report = CellSpeedtestIngestReport { @@ -145,6 +147,7 @@ async fn speedtest_download_exceeds_300_megabits_ps_limit( speedtest_avg_client, verified_client, None, + None, ); // Create speedtest with download speed > 300Mbits @@ -185,6 +188,7 @@ async fn speedtest_both_speeds_exceed_300_megabits_ps_limit( speedtest_avg_client, verified_client, None, + None, ); // Create speedtest with both speeds > 300Mbits @@ -225,6 +229,7 @@ async fn speedtest_within_300_megabits_ps_limit_should_be_valid( speedtest_avg_client, verified_client, None, + None, ); // Create speedtest with both speeds within 300Mbits limit @@ -265,6 +270,7 @@ async fn speedtest_exactly_300_megabits_ps_limit_should_be_valid( speedtest_avg_client, verified_client, None, + None, ); // Create speedtest with speeds exactly at 300Mbits limit @@ -369,6 +375,7 @@ async fn invalid_speedtests_should_not_affect_average(pool: Pool) -> a speedtest_avg_client, verified_client, None, + None, ); daemon.process_file(stream).await?;