From be038cbb5979be3f1223b905b9f8466711d8e139 Mon Sep 17 00:00:00 2001 From: Matjaz Domen Pecan Date: Wed, 29 Apr 2026 15:38:58 +0200 Subject: [PATCH 1/3] feat(server): persist & backfill v1_hash on filters table MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the `v1_hash` column to the `filters` table (nullable; backfilled operationally) and ships a service-token-protected backfill endpoint that populates it for legacy rows by re-reading their TOML from R2 and applying `tokf_common::canonical_v1::hash` (ADR-0002). The publish path now computes v1 alongside `content_hash` and rejects new submissions whose v1 collides with an existing row, returning the original author and `is_new = false`. Stops new publishes from re-splitting canonically equivalent filters across content_hash variants — the going-forward fix for #350. Out of scope (explicit follow-up PR): - Dedup migration for existing duplicate rows - UNIQUE(v1_hash) constraint - v1_hash NOT NULL tightening Also dedupes test boilerplate by extracting `post_json` into filters/test_helpers.rs, which both backfill_tests and regenerate_tests now use. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../migrations/20260428000000_add_v1_hash.sql | 17 ++ .../src/routes/filters/backfill.rs | 118 ++++++++ .../src/routes/filters/backfill_tests.rs | 269 ++++++++++++++++++ crates/tokf-server/src/routes/filters/mod.rs | 5 +- .../src/routes/filters/publish/mod.rs | 37 ++- .../src/routes/filters/publish/stdlib/mod.rs | 7 +- .../src/routes/filters/publish/tests.rs | 111 ++++++++ .../src/routes/filters/regenerate_tests.rs | 22 +- .../src/routes/filters/test_helpers.rs | 21 ++ crates/tokf-server/src/routes/mod.rs | 4 + 10 files changed, 588 insertions(+), 23 deletions(-) create mode 100644 crates/tokf-server/migrations/20260428000000_add_v1_hash.sql create mode 100644 crates/tokf-server/src/routes/filters/backfill_tests.rs diff --git a/crates/tokf-server/migrations/20260428000000_add_v1_hash.sql b/crates/tokf-server/migrations/20260428000000_add_v1_hash.sql new file mode 100644 index 00000000..319e48ec --- /dev/null +++ b/crates/tokf-server/migrations/20260428000000_add_v1_hash.sql @@ -0,0 +1,17 @@ +-- Add `v1_hash` column for the schema-independent canonical TOML hash (ADR-0002). +-- +-- Nullable on purpose: existing rows are NULL until the backfill endpoint +-- (`POST /api/filters/backfill-v1-hashes`) has populated them by re-reading +-- each row's TOML from R2. Adding NOT NULL or UNIQUE here would either fail +-- the migration or break the backfill mid-run as soon as the first +-- historical duplicate is encountered. +-- +-- The dedup migration (separate PR) will collapse duplicate v1 rows and may +-- add UNIQUE alongside. +-- +-- A non-partial index is used because CockroachDB cannot create a partial +-- index on a column added in the same migration ("column ... is not public", +-- error 0A000). The space cost of indexing NULL is negligible (NULLs are +-- already collapsed in B-tree indexes). +ALTER TABLE filters ADD COLUMN v1_hash TEXT; +CREATE INDEX filters_v1_hash_idx ON filters(v1_hash); diff --git a/crates/tokf-server/src/routes/filters/backfill.rs b/crates/tokf-server/src/routes/filters/backfill.rs index 4307e578..eac34da8 100644 --- a/crates/tokf-server/src/routes/filters/backfill.rs +++ b/crates/tokf-server/src/routes/filters/backfill.rs @@ -99,3 +99,121 @@ pub async fn backfill_versions( Json(BackfillVersionsResponse { updated, skipped }), )) } + +// ── v1_hash backfill ────────────────────────────────────────────────────── + +#[derive(Debug, Deserialize)] +pub struct BackfillV1Request { + /// Maximum rows to process in this call. Operator iterates until + /// `processed == 0`. Defaults to 100; capped at `MAX_BACKFILL_V1_LIMIT`. + #[serde(default = "default_v1_batch_size")] + pub limit: usize, +} + +const fn default_v1_batch_size() -> usize { + 100 +} + +#[derive(Debug, Serialize)] +pub struct BackfillV1Response { + pub processed: usize, + pub updated: usize, + pub failed: Vec, +} + +#[derive(Debug, Serialize)] +pub struct BackfillV1Failure { + pub content_hash: String, + pub error: String, +} + +const MAX_BACKFILL_V1_LIMIT: usize = 500; + +/// `POST /api/filters/backfill-v1-hashes` — populate `v1_hash` for legacy rows. +/// +/// Service-token auth. Idempotent. Picks up to `limit` rows where +/// `v1_hash IS NULL`, fetches the TOML from R2, computes the v1 hash +/// (ADR-0002), writes it back. Rows whose R2 object is missing or whose +/// TOML fails to canonicalise are reported in `failed` and skipped — the +/// operator inspects the response and triages those manually. +/// +/// Operator runbook: invoke repeatedly until `processed == 0`. Failures +/// don't stop the batch. +pub async fn backfill_v1_hashes( + _auth: ServiceAuth, + State(state): State, + Json(req): Json, +) -> Result<(StatusCode, Json), AppError> { + let limit = req.limit.clamp(1, MAX_BACKFILL_V1_LIMIT); + // Safe: `limit` is clamped to [1, MAX_BACKFILL_V1_LIMIT] (= 500) which fits in i64. + #[allow(clippy::cast_possible_wrap)] + let limit_i64 = limit as i64; + + let rows: Vec<(String, String)> = sqlx::query_as( + "SELECT content_hash, r2_key FROM filters + WHERE v1_hash IS NULL + ORDER BY created_at ASC + LIMIT $1", + ) + .bind(limit_i64) + .fetch_all(&state.db) + .await?; + + tracing::info!( + candidates = rows.len(), + limit, + "backfill-v1-hashes request received" + ); + + let mut updated = 0usize; + let mut failed = Vec::new(); + + for (content_hash, r2_key) in &rows { + match compute_and_store_v1(&state, content_hash, r2_key).await { + Ok(()) => updated += 1, + Err(e) => { + tracing::warn!(hash = %content_hash, "backfill-v1 failed: {e}"); + failed.push(BackfillV1Failure { + content_hash: content_hash.clone(), + error: e.to_string(), + }); + } + } + } + + tracing::info!( + processed = rows.len(), + updated, + failed = failed.len(), + "backfill-v1-hashes complete" + ); + + Ok(( + StatusCode::OK, + Json(BackfillV1Response { + processed: rows.len(), + updated, + failed, + }), + )) +} + +async fn compute_and_store_v1( + state: &AppState, + content_hash: &str, + r2_key: &str, +) -> anyhow::Result<()> { + let bytes = state + .storage + .get(r2_key) + .await? + .ok_or_else(|| anyhow::anyhow!("R2 object missing: {r2_key}"))?; + let toml_str = std::str::from_utf8(&bytes)?; + let v1 = tokf_common::canonical_v1::hash(toml_str)?; + sqlx::query("UPDATE filters SET v1_hash = $1 WHERE content_hash = $2") + .bind(&v1) + .bind(content_hash) + .execute(&state.db) + .await?; + Ok(()) +} diff --git a/crates/tokf-server/src/routes/filters/backfill_tests.rs b/crates/tokf-server/src/routes/filters/backfill_tests.rs new file mode 100644 index 00000000..4c67733e --- /dev/null +++ b/crates/tokf-server/src/routes/filters/backfill_tests.rs @@ -0,0 +1,269 @@ +use std::sync::Arc; + +use axum::{ + body::Body, + http::{Request, StatusCode}, +}; +use http_body_util::BodyExt; +use sqlx::PgPool; +use tower::ServiceExt; + +use crate::routes::test_helpers::insert_service_token; +use crate::storage::StorageClient as _; +use crate::storage::mock::InMemoryStorageClient; + +use super::test_helpers::{ + insert_test_user, make_state_with_storage, post_json, publish_filter_helper, +}; + +const VALID_FILTER_TOML: &[u8] = b"command = \"my-tool\"\n"; + +const URI: &str = "/api/filters/backfill-v1-hashes"; + +/// Publish a filter, then NULL out its `v1_hash` to simulate a row created +/// before this PR shipped. +async fn publish_then_null_v1( + pool: &PgPool, + storage: Arc, + user_token: &str, + toml: &[u8], +) -> String { + let state = make_state_with_storage(pool.clone(), storage); + let app = crate::routes::create_router(state); + let hash = publish_filter_helper(app, user_token, toml, &[]).await; + sqlx::query("UPDATE filters SET v1_hash = NULL WHERE content_hash = $1") + .bind(&hash) + .execute(pool) + .await + .unwrap(); + hash +} + +#[crdb_test_macro::crdb_test(migrations = "./migrations")] +async fn backfill_v1_populates_null_rows(pool: PgPool) { + let storage = Arc::new(InMemoryStorageClient::new()); + let (_, alice) = insert_test_user(&pool, "alice_bf").await; + let (_, bob) = insert_test_user(&pool, "bob_bf").await; + let service_token = insert_service_token(&pool, "bf-test").await; + + let alice_toml: &[u8] = b"command = \"alice-tool\"\n"; + let bob_toml: &[u8] = b"command = \"bob-tool\"\n"; + + let alice_hash = publish_then_null_v1(&pool, Arc::clone(&storage), &alice, alice_toml).await; + let bob_hash = publish_then_null_v1(&pool, Arc::clone(&storage), &bob, bob_toml).await; + + let app = + crate::routes::create_router(make_state_with_storage(pool.clone(), Arc::clone(&storage))); + let resp = post_json( + app, + &service_token, + URI, + &serde_json::json!({ "limit": 100 }), + ) + .await; + assert_eq!(resp.status(), StatusCode::OK); + + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["processed"], 2); + assert_eq!(json["updated"], 2); + assert!(json["failed"].as_array().unwrap().is_empty()); + + let alice_v1: Option = + sqlx::query_scalar("SELECT v1_hash FROM filters WHERE content_hash = $1") + .bind(&alice_hash) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!( + alice_v1.unwrap(), + tokf_common::canonical_v1::hash(std::str::from_utf8(alice_toml).unwrap()).unwrap() + ); + + let bob_v1: Option = + sqlx::query_scalar("SELECT v1_hash FROM filters WHERE content_hash = $1") + .bind(&bob_hash) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!( + bob_v1.unwrap(), + tokf_common::canonical_v1::hash(std::str::from_utf8(bob_toml).unwrap()).unwrap() + ); +} + +#[crdb_test_macro::crdb_test(migrations = "./migrations")] +async fn backfill_v1_skips_already_populated(pool: PgPool) { + let storage = Arc::new(InMemoryStorageClient::new()); + let (_, user) = insert_test_user(&pool, "alice_skip").await; + let service_token = insert_service_token(&pool, "bf-skip").await; + + publish_then_null_v1(&pool, Arc::clone(&storage), &user, VALID_FILTER_TOML).await; + + let app = + crate::routes::create_router(make_state_with_storage(pool.clone(), Arc::clone(&storage))); + let resp = post_json( + app, + &service_token, + URI, + &serde_json::json!({ "limit": 10 }), + ) + .await; + assert_eq!(resp.status(), StatusCode::OK); + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let first: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(first["processed"], 1); + assert_eq!(first["updated"], 1); + + let app = + crate::routes::create_router(make_state_with_storage(pool.clone(), Arc::clone(&storage))); + let resp = post_json( + app, + &service_token, + URI, + &serde_json::json!({ "limit": 10 }), + ) + .await; + assert_eq!(resp.status(), StatusCode::OK); + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let second: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!( + second["processed"], 0, + "second invocation should find no candidates" + ); +} + +#[crdb_test_macro::crdb_test(migrations = "./migrations")] +async fn backfill_v1_reports_failure_when_r2_object_missing(pool: PgPool) { + let storage = Arc::new(InMemoryStorageClient::new()); + let (_, user) = insert_test_user(&pool, "alice_missing").await; + let service_token = insert_service_token(&pool, "bf-missing").await; + + let hash = publish_then_null_v1(&pool, Arc::clone(&storage), &user, VALID_FILTER_TOML).await; + + // Wipe the R2 object so the backfill can't find it. + let r2_key: String = sqlx::query_scalar("SELECT r2_key FROM filters WHERE content_hash = $1") + .bind(&hash) + .fetch_one(&pool) + .await + .unwrap(); + storage.delete(&r2_key).await.unwrap(); + + let app = + crate::routes::create_router(make_state_with_storage(pool.clone(), Arc::clone(&storage))); + let resp = post_json( + app, + &service_token, + URI, + &serde_json::json!({ "limit": 10 }), + ) + .await; + assert_eq!(resp.status(), StatusCode::OK); + + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["processed"], 1); + assert_eq!(json["updated"], 0); + let failed = json["failed"].as_array().unwrap(); + assert_eq!(failed.len(), 1); + assert_eq!(failed[0]["content_hash"], hash); + + // v1_hash should still be NULL since compute failed. + let v1: Option = + sqlx::query_scalar("SELECT v1_hash FROM filters WHERE content_hash = $1") + .bind(&hash) + .fetch_one(&pool) + .await + .unwrap(); + assert!( + v1.is_none(), + "v1_hash must remain NULL when backfill failed" + ); +} + +#[crdb_test_macro::crdb_test(migrations = "./migrations")] +async fn backfill_v1_requires_service_token(pool: PgPool) { + let storage = Arc::new(InMemoryStorageClient::new()); + let app = crate::routes::create_router(make_state_with_storage(pool, storage)); + + let resp = app + .oneshot( + Request::builder() + .method("POST") + .uri("/api/filters/backfill-v1-hashes") + .header("content-type", "application/json") + .body(Body::from(b"{}".to_vec())) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(resp.status(), StatusCode::UNAUTHORIZED); +} + +#[crdb_test_macro::crdb_test(migrations = "./migrations")] +async fn backfill_v1_rejects_invalid_bearer(pool: PgPool) { + let storage = Arc::new(InMemoryStorageClient::new()); + let app = crate::routes::create_router(make_state_with_storage(pool, storage)); + + let resp = post_json( + app, + "not-a-real-token", + URI, + &serde_json::json!({ "limit": 10 }), + ) + .await; + assert_eq!(resp.status(), StatusCode::UNAUTHORIZED); +} + +#[crdb_test_macro::crdb_test(migrations = "./migrations")] +async fn backfill_v1_respects_limit(pool: PgPool) { + let storage = Arc::new(InMemoryStorageClient::new()); + let (_, user) = insert_test_user(&pool, "alice_limit").await; + let service_token = insert_service_token(&pool, "bf-limit").await; + + // Publish 5 distinct filters; null their v1_hash. + for i in 0..5 { + let toml = format!("command = \"tool-{i}\"\n"); + publish_then_null_v1(&pool, Arc::clone(&storage), &user, toml.as_bytes()).await; + } + + let app = + crate::routes::create_router(make_state_with_storage(pool.clone(), Arc::clone(&storage))); + let resp = post_json(app, &service_token, URI, &serde_json::json!({ "limit": 2 })).await; + assert_eq!(resp.status(), StatusCode::OK); + + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["processed"], 2, "should respect requested limit"); + assert_eq!(json["updated"], 2); + + let still_null: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM filters WHERE v1_hash IS NULL") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(still_null, 3, "3 rows should still be unprocessed"); +} + +#[crdb_test_macro::crdb_test(migrations = "./migrations")] +async fn backfill_v1_caps_limit_at_max(pool: PgPool) { + let storage = Arc::new(InMemoryStorageClient::new()); + let service_token = insert_service_token(&pool, "bf-cap").await; + let app = + crate::routes::create_router(make_state_with_storage(pool.clone(), Arc::clone(&storage))); + + // No filters seeded; just confirm the call succeeds with an out-of-range + // limit (clamped, not rejected). + let resp = post_json( + app, + &service_token, + URI, + &serde_json::json!({ "limit": 1_000_000 }), + ) + .await; + assert_eq!(resp.status(), StatusCode::OK); + + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["processed"], 0); +} diff --git a/crates/tokf-server/src/routes/filters/mod.rs b/crates/tokf-server/src/routes/filters/mod.rs index 68befa3b..583310af 100644 --- a/crates/tokf-server/src/routes/filters/mod.rs +++ b/crates/tokf-server/src/routes/filters/mod.rs @@ -1,4 +1,7 @@ mod backfill; +#[cfg(test)] +#[allow(clippy::unwrap_used, clippy::expect_used)] +mod backfill_tests; mod publish; mod regenerate; mod search; @@ -9,7 +12,7 @@ mod search_tests; pub mod test_helpers; mod update_tests; -pub use backfill::backfill_versions; +pub use backfill::{backfill_v1_hashes, backfill_versions}; pub use publish::publish_filter; pub use publish::stdlib::publish_stdlib; pub use regenerate::regenerate_examples; diff --git a/crates/tokf-server/src/routes/filters/publish/mod.rs b/crates/tokf-server/src/routes/filters/publish/mod.rs index d6d4a786..1a4f4524 100644 --- a/crates/tokf-server/src/routes/filters/publish/mod.rs +++ b/crates/tokf-server/src/routes/filters/publish/mod.rs @@ -88,6 +88,7 @@ async fn parse_multipart( /// Grouped fields for inserting a filter record. struct FilterInsert<'a> { content_hash: &'a str, + v1_hash: &'a str, command_pattern: &'a str, canonical_command: &'a str, author_id: i64, @@ -101,17 +102,44 @@ struct FilterInsert<'a> { /// Uploading to R2 before this call means orphaned objects are possible on DB /// failure, but they are harmless (no user-visible state and R2 uploads are /// idempotent on retry). +/// +/// Performs a v1-collision pre-check: if a row already exists with the same +/// `v1_hash` but a different `content_hash`, returns that row's author and +/// does not insert. This stops new publishes from re-splitting canonically +/// equivalent filters across `content_hash` variants. Legacy rows with NULL +/// `v1_hash` are excluded from the check until they are backfilled. async fn upsert_filter_record( state: &AppState, insert: &FilterInsert<'_>, author_username: &str, ) -> Result<(String, bool), AppError> { + if let Some((existing_hash, existing_author)) = sqlx::query_as::<_, (String, String)>( + "SELECT f.content_hash, u.username FROM filters f + JOIN users u ON u.id = f.author_id + WHERE f.v1_hash = $1 AND f.content_hash <> $2 + LIMIT 1", + ) + .bind(insert.v1_hash) + .bind(insert.content_hash) + .fetch_optional(&state.db) + .await? + { + tracing::info!( + new_hash = %insert.content_hash, + existing_hash = %existing_hash, + v1 = %insert.v1_hash, + "publish rejected as v1-equivalent of existing filter", + ); + return Ok((existing_author, false)); + } + let result = sqlx::query( - "INSERT INTO filters (content_hash, command_pattern, canonical_command, author_id, r2_key, safety_passed) - VALUES ($1, $2, $3, $4, $5, $6) + "INSERT INTO filters (content_hash, v1_hash, command_pattern, canonical_command, author_id, r2_key, safety_passed) + VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (content_hash) DO NOTHING", ) .bind(insert.content_hash) + .bind(insert.v1_hash) .bind(insert.command_pattern) .bind(insert.canonical_command) .bind(insert.author_id) @@ -195,6 +223,7 @@ pub(super) async fn set_examples_generated_at(pool: &sqlx::PgPool, content_hash: /// Validated, hashed filter ready for storage. pub(super) struct PreparedFilter { pub(super) content_hash: String, + pub(super) v1_hash: String, pub(super) command_pattern: String, pub(super) canonical_command: String, pub(super) config: FilterConfig, @@ -252,8 +281,11 @@ pub(super) fn validate_and_prepare( } let content_hash = canonical_hash(&config).map_err(|e| format!("hash error: {e}"))?; + let v1_hash = + tokf_common::canonical_v1::hash(toml_str).map_err(|e| format!("v1 hash error: {e}"))?; Ok(PreparedFilter { content_hash, + v1_hash, command_pattern, canonical_command, config, @@ -411,6 +443,7 @@ pub async fn publish_filter( let insert = FilterInsert { content_hash: &prepared.content_hash, + v1_hash: &prepared.v1_hash, command_pattern: &prepared.command_pattern, canonical_command: &prepared.canonical_command, author_id: auth.user_id, diff --git a/crates/tokf-server/src/routes/filters/publish/stdlib/mod.rs b/crates/tokf-server/src/routes/filters/publish/stdlib/mod.rs index 3f782d78..b3ff2c86 100644 --- a/crates/tokf-server/src/routes/filters/publish/stdlib/mod.rs +++ b/crates/tokf-server/src/routes/filters/publish/stdlib/mod.rs @@ -369,11 +369,12 @@ async fn persist_filter( upload_tests(state, &prepared.content_hash, prepared.test_files.clone()).await?; sqlx::query( - "INSERT INTO filters (content_hash, command_pattern, canonical_command, author_id, r2_key, is_stdlib) - VALUES ($1, $2, $3, $4, $5, TRUE) - ON CONFLICT (content_hash) DO UPDATE SET is_stdlib = TRUE", + "INSERT INTO filters (content_hash, v1_hash, command_pattern, canonical_command, author_id, r2_key, is_stdlib) + VALUES ($1, $2, $3, $4, $5, $6, TRUE) + ON CONFLICT (content_hash) DO UPDATE SET is_stdlib = TRUE, v1_hash = COALESCE(filters.v1_hash, EXCLUDED.v1_hash)", ) .bind(&prepared.content_hash) + .bind(&prepared.v1_hash) .bind(&prepared.command_pattern) .bind(&prepared.canonical_command) .bind(author_id) diff --git a/crates/tokf-server/src/routes/filters/publish/tests.rs b/crates/tokf-server/src/routes/filters/publish/tests.rs index aae86b80..6aca12be 100644 --- a/crates/tokf-server/src/routes/filters/publish/tests.rs +++ b/crates/tokf-server/src/routes/filters/publish/tests.rs @@ -536,3 +536,114 @@ async fn publish_filter_stores_examples_in_r2(pool: PgPool) { examples_json["safety"] ); } + +// ── v1_hash persistence + collision tests ───────────────────────────────── + +#[crdb_test_macro::crdb_test(migrations = "./migrations")] +async fn publish_stores_v1_hash_on_new_row(pool: PgPool) { + let (_, token) = insert_test_user(&pool, "alice_v1_store").await; + let app = crate::routes::create_router(make_state(pool.clone())); + + let resp = post_filter( + app, + &token, + &[ + ("filter", VALID_FILTER_TOML), + MIT_ACCEPT, + DEFAULT_PASSING_TEST, + ], + ) + .await; + assert_eq!(resp.status(), StatusCode::CREATED); + + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + let content_hash = json["content_hash"].as_str().unwrap(); + + let v1_hash: Option = + sqlx::query_scalar("SELECT v1_hash FROM filters WHERE content_hash = $1") + .bind(content_hash) + .fetch_one(&pool) + .await + .unwrap(); + let v1_hash = v1_hash.expect("v1_hash should be populated on publish"); + + let expected = tokf_common::canonical_v1::hash(std::str::from_utf8(VALID_FILTER_TOML).unwrap()) + .expect("canonical_v1::hash should succeed for valid TOML"); + assert_eq!( + v1_hash, expected, + "stored v1_hash should match canonical_v1 of the published TOML" + ); +} + +/// Two byte-different but canonically-equivalent filters published by +/// different authors: the second is rejected as a v1-equivalent of the +/// first; `is_new` is false; only one row exists in the DB; the response +/// reports the original author. +#[crdb_test_macro::crdb_test(migrations = "./migrations")] +async fn publish_v1_collision_returns_existing_author(pool: PgPool) { + let (_, alice_token) = insert_test_user(&pool, "alice_v1").await; + let (_, bob_token) = insert_test_user(&pool, "bob_v1").await; + + // Whitespace + leading comment differ; v1 canonicalisation collapses both. + let alice_toml: &[u8] = b"command = \"my-tool\"\n"; + let bob_toml: &[u8] = b"# bob's slightly different version\n\ncommand = \"my-tool\"\n\n"; + + // Sanity check: same v1, different bytes. + let v1_alice = + tokf_common::canonical_v1::hash(std::str::from_utf8(alice_toml).unwrap()).unwrap(); + let v1_bob = tokf_common::canonical_v1::hash(std::str::from_utf8(bob_toml).unwrap()).unwrap(); + assert_eq!( + v1_alice, v1_bob, + "test setup: filters must canonicalise to the same v1 hash" + ); + assert_ne!(alice_toml, bob_toml, "test setup: filter bytes must differ"); + + let app = crate::routes::create_router(make_state(pool.clone())); + let alice_resp = post_filter( + app, + &alice_token, + &[("filter", alice_toml), MIT_ACCEPT, DEFAULT_PASSING_TEST], + ) + .await; + assert_eq!(alice_resp.status(), StatusCode::CREATED); + let alice_body = alice_resp.into_body().collect().await.unwrap().to_bytes(); + let alice_json: serde_json::Value = serde_json::from_slice(&alice_body).unwrap(); + let alice_content_hash = alice_json["content_hash"].as_str().unwrap().to_string(); + + let app = crate::routes::create_router(make_state(pool.clone())); + let bob_resp = post_filter( + app, + &bob_token, + &[("filter", bob_toml), MIT_ACCEPT, DEFAULT_PASSING_TEST], + ) + .await; + assert_eq!( + bob_resp.status(), + StatusCode::OK, + "v1-equivalent republish should be 200 OK (treated as duplicate)" + ); + + let bob_body = bob_resp.into_body().collect().await.unwrap().to_bytes(); + let bob_json: serde_json::Value = serde_json::from_slice(&bob_body).unwrap(); + assert_eq!( + bob_json["author"], "alice_v1", + "response should report the original author, not bob" + ); + assert_eq!( + bob_json["content_hash"].as_str().unwrap(), + alice_content_hash, + "response should report the existing content_hash, not bob's" + ); + + // Only one row exists in DB. + let row_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM filters WHERE v1_hash = $1") + .bind(&v1_alice) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!( + row_count, 1, + "only one row should exist for canonically-equivalent filters" + ); +} diff --git a/crates/tokf-server/src/routes/filters/regenerate_tests.rs b/crates/tokf-server/src/routes/filters/regenerate_tests.rs index de5da728..26907d37 100644 --- a/crates/tokf-server/src/routes/filters/regenerate_tests.rs +++ b/crates/tokf-server/src/routes/filters/regenerate_tests.rs @@ -1,23 +1,21 @@ use std::sync::Arc; -use axum::{ - body::Body, - http::{Request, StatusCode}, -}; +use axum::http::StatusCode; use http_body_util::BodyExt; use sqlx::PgPool; -use tower::ServiceExt; use crate::routes::test_helpers::insert_service_token; use crate::storage::StorageClient as _; use crate::storage::mock::InMemoryStorageClient; use super::super::test_helpers::{ - insert_test_user, make_state_with_storage, publish_filter_helper, + insert_test_user, make_state_with_storage, post_json, publish_filter_helper, }; const VALID_FILTER_TOML: &[u8] = b"command = \"my-tool\"\n"; +const URI: &str = "/api/filters/regenerate-examples"; + fn make_state_with_mem_storage( pool: PgPool, storage: Arc, @@ -30,17 +28,7 @@ async fn post_regenerate( token: &str, body: &serde_json::Value, ) -> axum::response::Response { - app.oneshot( - Request::builder() - .method("POST") - .uri("/api/filters/regenerate-examples") - .header("authorization", format!("Bearer {token}")) - .header("content-type", "application/json") - .body(Body::from(serde_json::to_vec(body).unwrap())) - .unwrap(), - ) - .await - .unwrap() + post_json(app, token, URI, body).await } // ── Tests ──────────────────────────────────────────────────────────────────── diff --git a/crates/tokf-server/src/routes/filters/test_helpers.rs b/crates/tokf-server/src/routes/filters/test_helpers.rs index 09efd16c..798b7144 100644 --- a/crates/tokf-server/src/routes/filters/test_helpers.rs +++ b/crates/tokf-server/src/routes/filters/test_helpers.rs @@ -148,6 +148,27 @@ pub async fn get_request(app: axum::Router, token: &str, uri: &str) -> axum::res .unwrap() } +/// POST a JSON body to a URI with a bearer token. Used by every endpoint +/// test whose handler accepts `Json<...>` over `application/json`. +pub async fn post_json( + app: axum::Router, + token: &str, + uri: &str, + body: &serde_json::Value, +) -> axum::response::Response { + app.oneshot( + Request::builder() + .method("POST") + .uri(uri) + .header("authorization", format!("Bearer {token}")) + .header("content-type", "application/json") + .body(Body::from(serde_json::to_vec(body).unwrap())) + .unwrap(), + ) + .await + .unwrap() +} + /// POST `/api/filters` helper that returns the full response (for publish-specific tests). pub async fn post_filter( app: axum::Router, diff --git a/crates/tokf-server/src/routes/mod.rs b/crates/tokf-server/src/routes/mod.rs index 002635df..8c862c15 100644 --- a/crates/tokf-server/src/routes/mod.rs +++ b/crates/tokf-server/src/routes/mod.rs @@ -55,6 +55,10 @@ pub fn create_router(state: AppState) -> Router { "/api/filters/backfill-versions", post(filters::backfill_versions), ) + .route( + "/api/filters/backfill-v1-hashes", + post(filters::backfill_v1_hashes), + ) .route("/api/sync", post(sync::sync_usage)) .route("/api/catalog/refresh", post(catalog::refresh_catalog)) .route("/api/catalog/grouped", get(catalog::get_grouped_catalog)) From 8df4a1d35629cbe0beb91754b2806017ff1e9c24 Mon Sep 17 00:00:00 2001 From: Matjaz Domen Pecan Date: Wed, 29 Apr 2026 15:47:53 +0200 Subject: [PATCH 2/3] fix(server): correct v1-collision response and tighten coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The original `publish_v1_collision_returns_existing_author` test was broken: alice's and bob's TOMLs (whitespace-only differences) parsed to identical `FilterConfig` values and therefore identical `content_hash`, so bob's submission was caught by the existing byte-duplicate path rather than the new v1-collision branch. The test passed for the wrong reason, leaving the headline behaviour of this PR untested. Fixed the fixture to use `command = "x"` vs `command = ["x"]`, which parses to different `CommandPattern` enum variants (different content_hash) but canonical_v1 collapses both into the single-string form (same v1_hash). This actually triggers the v1-collision branch in `upsert_filter_record`. Doing so surfaced a real bug: the response was returning the rejected publisher's `content_hash` instead of the existing row's. Fixed by threading the resolved hash back through `UpsertResult { author, content_hash, is_new }`. Adds two tests Agent 4 flagged as missing: - `publish_stdlib_stores_v1_hash` — verifies stdlib publish populates v1 - `backfill_v1_reports_failure_when_r2_object_unparseable` — covers the invalid-UTF-8 / unparseable-TOML failure path in `compute_and_store_v1` Extracts `run_backfill_expecting_one_failure` to dedupe the two backfill-failure tests. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/routes/filters/backfill_tests.rs | 82 +++++++++++++------ .../src/routes/filters/publish/mod.rs | 64 +++++++++++---- .../routes/filters/publish/stdlib/tests.rs | 20 +++++ .../src/routes/filters/publish/tests.rs | 23 +++++- 4 files changed, 142 insertions(+), 47 deletions(-) diff --git a/crates/tokf-server/src/routes/filters/backfill_tests.rs b/crates/tokf-server/src/routes/filters/backfill_tests.rs index 4c67733e..138d704d 100644 --- a/crates/tokf-server/src/routes/filters/backfill_tests.rs +++ b/crates/tokf-server/src/routes/filters/backfill_tests.rs @@ -133,31 +133,19 @@ async fn backfill_v1_skips_already_populated(pool: PgPool) { ); } -#[crdb_test_macro::crdb_test(migrations = "./migrations")] -async fn backfill_v1_reports_failure_when_r2_object_missing(pool: PgPool) { - let storage = Arc::new(InMemoryStorageClient::new()); - let (_, user) = insert_test_user(&pool, "alice_missing").await; - let service_token = insert_service_token(&pool, "bf-missing").await; - - let hash = publish_then_null_v1(&pool, Arc::clone(&storage), &user, VALID_FILTER_TOML).await; - - // Wipe the R2 object so the backfill can't find it. - let r2_key: String = sqlx::query_scalar("SELECT r2_key FROM filters WHERE content_hash = $1") - .bind(&hash) - .fetch_one(&pool) - .await - .unwrap(); - storage.delete(&r2_key).await.unwrap(); - +/// Drive a backfill against a single corrupted-R2 row and assert that: +/// the call succeeds with `processed=1, updated=0`, the row appears in +/// `failed[]`, and the row's `v1_hash` remains NULL. Used by every +/// per-failure-mode test below. +async fn run_backfill_expecting_one_failure( + pool: &PgPool, + storage: Arc, + service_token: &str, + hash: &str, +) { let app = crate::routes::create_router(make_state_with_storage(pool.clone(), Arc::clone(&storage))); - let resp = post_json( - app, - &service_token, - URI, - &serde_json::json!({ "limit": 10 }), - ) - .await; + let resp = post_json(app, service_token, URI, &serde_json::json!({ "limit": 10 })).await; assert_eq!(resp.status(), StatusCode::OK); let body = resp.into_body().collect().await.unwrap().to_bytes(); @@ -168,11 +156,10 @@ async fn backfill_v1_reports_failure_when_r2_object_missing(pool: PgPool) { assert_eq!(failed.len(), 1); assert_eq!(failed[0]["content_hash"], hash); - // v1_hash should still be NULL since compute failed. let v1: Option = sqlx::query_scalar("SELECT v1_hash FROM filters WHERE content_hash = $1") - .bind(&hash) - .fetch_one(&pool) + .bind(hash) + .fetch_one(pool) .await .unwrap(); assert!( @@ -181,6 +168,49 @@ async fn backfill_v1_reports_failure_when_r2_object_missing(pool: PgPool) { ); } +async fn r2_key_for(pool: &PgPool, hash: &str) -> String { + sqlx::query_scalar("SELECT r2_key FROM filters WHERE content_hash = $1") + .bind(hash) + .fetch_one(pool) + .await + .unwrap() +} + +#[crdb_test_macro::crdb_test(migrations = "./migrations")] +async fn backfill_v1_reports_failure_when_r2_object_missing(pool: PgPool) { + let storage = Arc::new(InMemoryStorageClient::new()); + let (_, user) = insert_test_user(&pool, "alice_missing").await; + let service_token = insert_service_token(&pool, "bf-missing").await; + + let hash = publish_then_null_v1(&pool, Arc::clone(&storage), &user, VALID_FILTER_TOML).await; + storage + .delete(&r2_key_for(&pool, &hash).await) + .await + .unwrap(); + + run_backfill_expecting_one_failure(&pool, storage, &service_token, &hash).await; +} + +/// Exercises both UTF-8 and TOML-parse failure branches in +/// `compute_and_store_v1` by writing bytes that are invalid as both. +#[crdb_test_macro::crdb_test(migrations = "./migrations")] +async fn backfill_v1_reports_failure_when_r2_object_unparseable(pool: PgPool) { + let storage = Arc::new(InMemoryStorageClient::new()); + let (_, user) = insert_test_user(&pool, "alice_corrupt").await; + let service_token = insert_service_token(&pool, "bf-corrupt").await; + + let hash = publish_then_null_v1(&pool, Arc::clone(&storage), &user, VALID_FILTER_TOML).await; + storage + .put( + &r2_key_for(&pool, &hash).await, + b"\xff\xfe not valid toml [[[".to_vec(), + ) + .await + .unwrap(); + + run_backfill_expecting_one_failure(&pool, storage, &service_token, &hash).await; +} + #[crdb_test_macro::crdb_test(migrations = "./migrations")] async fn backfill_v1_requires_service_token(pool: PgPool) { let storage = Arc::new(InMemoryStorageClient::new()); diff --git a/crates/tokf-server/src/routes/filters/publish/mod.rs b/crates/tokf-server/src/routes/filters/publish/mod.rs index 1a4f4524..9563156f 100644 --- a/crates/tokf-server/src/routes/filters/publish/mod.rs +++ b/crates/tokf-server/src/routes/filters/publish/mod.rs @@ -96,23 +96,37 @@ struct FilterInsert<'a> { safety_passed: bool, } -/// Returns `(author_username, was_new)` for the filter with `content_hash`. +/// Result of upserting a published filter. +struct UpsertResult { + /// The username of the row's author. For new inserts this matches the + /// publisher; for duplicates it's the original author. + author: String, + /// The canonical `content_hash` of the row in the DB. Equals + /// `insert.content_hash` for new inserts and byte-identical + /// resubmissions; for v1-equivalent duplicates this is the *existing* + /// row's `content_hash`, not the rejected publisher's. + content_hash: String, + /// True when a new row was inserted, false for any duplicate path. + is_new: bool, +} + +/// Insert or detect an existing filter row. /// -/// Attempts an INSERT and uses the result to distinguish new vs duplicate. /// Uploading to R2 before this call means orphaned objects are possible on DB /// failure, but they are harmless (no user-visible state and R2 uploads are /// idempotent on retry). /// /// Performs a v1-collision pre-check: if a row already exists with the same /// `v1_hash` but a different `content_hash`, returns that row's author and -/// does not insert. This stops new publishes from re-splitting canonically -/// equivalent filters across `content_hash` variants. Legacy rows with NULL -/// `v1_hash` are excluded from the check until they are backfilled. +/// canonical hash without inserting. This stops new publishes from +/// re-splitting canonically equivalent filters across `content_hash` +/// variants. Legacy rows with NULL `v1_hash` are excluded from the check +/// until they are backfilled. async fn upsert_filter_record( state: &AppState, insert: &FilterInsert<'_>, author_username: &str, -) -> Result<(String, bool), AppError> { +) -> Result { if let Some((existing_hash, existing_author)) = sqlx::query_as::<_, (String, String)>( "SELECT f.content_hash, u.username FROM filters f JOIN users u ON u.id = f.author_id @@ -130,7 +144,11 @@ async fn upsert_filter_record( v1 = %insert.v1_hash, "publish rejected as v1-equivalent of existing filter", ); - return Ok((existing_author, false)); + return Ok(UpsertResult { + author: existing_author, + content_hash: existing_hash, + is_new: false, + }); } let result = sqlx::query( @@ -149,7 +167,7 @@ async fn upsert_filter_record( .await?; if result.rows_affected() == 0 { - // Duplicate — fetch the original author's username + // Byte-identical duplicate — fetch the original author's username. let existing_author: String = sqlx::query_scalar( "SELECT u.username FROM filters f JOIN users u ON u.id = f.author_id @@ -158,10 +176,18 @@ async fn upsert_filter_record( .bind(insert.content_hash) .fetch_one(&state.db) .await?; - return Ok((existing_author, false)); + return Ok(UpsertResult { + author: existing_author, + content_hash: insert.content_hash.to_string(), + is_new: false, + }); } - Ok((author_username.to_string(), true)) + Ok(UpsertResult { + author: author_username.to_string(), + content_hash: insert.content_hash.to_string(), + is_new: true, + }) } pub async fn upload_tests( @@ -450,9 +476,13 @@ pub async fn publish_filter( r2_key: &r2_key, safety_passed, }; - let (author, is_new) = upsert_filter_record(&state, &insert, &auth.username).await?; + let upserted = upsert_filter_record(&state, &insert, &auth.username).await?; - // Upload examples to R2 AFTER insert so set_examples_generated_at finds the row. + // For v1-collision duplicates, `upserted.content_hash` is the existing + // row's canonical hash, not `prepared.content_hash`. Examples and test + // rows below are still keyed by `prepared.content_hash` because: + // - examples upload is best-effort and orphaned R2 objects are harmless, + // - `insert_filter_tests` is gated on `is_new` so it only runs for genuinely new rows. if let Ok((examples_json, _)) = examples_result { if let Err(e) = storage::upload_examples(&*state.storage, &prepared.content_hash, examples_json).await @@ -463,7 +493,7 @@ pub async fn publish_filter( } } - if is_new { + if upserted.is_new { insert_filter_tests(&state.db, &prepared.content_hash, &test_r2_keys).await?; // Fire-and-forget: materialize per-filter metadata + catalog index to R2 @@ -474,8 +504,8 @@ pub async fn publish_filter( ); } - let registry_url = format!("{}/filters/{}", state.public_url, prepared.content_hash); - let status = if is_new { + let registry_url = format!("{}/filters/{}", state.public_url, upserted.content_hash); + let status = if upserted.is_new { StatusCode::CREATED } else { StatusCode::OK @@ -484,9 +514,9 @@ pub async fn publish_filter( status, crate::routes::ip::rate_limit_headers(&rl), Json(PublishFilterResponse { - content_hash: prepared.content_hash, + content_hash: upserted.content_hash, command_pattern: prepared.command_pattern, - author, + author: upserted.author, registry_url, }), )) diff --git a/crates/tokf-server/src/routes/filters/publish/stdlib/tests.rs b/crates/tokf-server/src/routes/filters/publish/stdlib/tests.rs index 375853bb..74fe3ef1 100644 --- a/crates/tokf-server/src/routes/filters/publish/stdlib/tests.rs +++ b/crates/tokf-server/src/routes/filters/publish/stdlib/tests.rs @@ -303,3 +303,23 @@ async fn publish_stdlib_rejects_invalid_username(pool: PgPool) { "error should mention username: {error}" ); } + +#[crdb_test_macro::crdb_test(migrations = "./migrations")] +async fn publish_stdlib_stores_v1_hash(pool: PgPool) { + let token = insert_service_token(&pool, "ci-test").await; + let state = make_state(pool.clone()); + let app = crate::routes::create_router(state); + + let req = make_valid_request(); + let resp = post_stdlib(app, &token, &req).await; + assert_eq!(resp.status(), StatusCode::CREATED); + + // Stdlib publish goes through its own INSERT; assert v1_hash is populated. + let v1_hash: Option = sqlx::query_scalar("SELECT v1_hash FROM filters LIMIT 1") + .fetch_one(&pool) + .await + .unwrap(); + let v1_hash = v1_hash.expect("stdlib publish must populate v1_hash"); + let expected = tokf_common::canonical_v1::hash(&req.filters[0].filter_toml).unwrap(); + assert_eq!(v1_hash, expected); +} diff --git a/crates/tokf-server/src/routes/filters/publish/tests.rs b/crates/tokf-server/src/routes/filters/publish/tests.rs index 6aca12be..53fa47c6 100644 --- a/crates/tokf-server/src/routes/filters/publish/tests.rs +++ b/crates/tokf-server/src/routes/filters/publish/tests.rs @@ -585,11 +585,17 @@ async fn publish_v1_collision_returns_existing_author(pool: PgPool) { let (_, alice_token) = insert_test_user(&pool, "alice_v1").await; let (_, bob_token) = insert_test_user(&pool, "bob_v1").await; - // Whitespace + leading comment differ; v1 canonicalisation collapses both. + // `command = "x"` vs `command = ["x"]` — these parse to different + // `CommandPattern` enum variants (`Single` vs `Multiple`), so + // `canonical_hash` (which serialises the parsed FilterConfig) yields + // different content_hashes. canonical_v1 collapses both into the + // single-string form, so they share a v1_hash. This is the smallest + // fixture that exercises the v1-collision branch in + // `upsert_filter_record` rather than the byte-equal duplicate path. let alice_toml: &[u8] = b"command = \"my-tool\"\n"; - let bob_toml: &[u8] = b"# bob's slightly different version\n\ncommand = \"my-tool\"\n\n"; + let bob_toml: &[u8] = b"command = [\"my-tool\"]\n"; - // Sanity check: same v1, different bytes. + // Sanity check: same v1, different content_hash. let v1_alice = tokf_common::canonical_v1::hash(std::str::from_utf8(alice_toml).unwrap()).unwrap(); let v1_bob = tokf_common::canonical_v1::hash(std::str::from_utf8(bob_toml).unwrap()).unwrap(); @@ -597,7 +603,16 @@ async fn publish_v1_collision_returns_existing_author(pool: PgPool) { v1_alice, v1_bob, "test setup: filters must canonicalise to the same v1 hash" ); - assert_ne!(alice_toml, bob_toml, "test setup: filter bytes must differ"); + let alice_cfg: tokf_common::config::types::FilterConfig = + toml::from_str(std::str::from_utf8(alice_toml).unwrap()).unwrap(); + let bob_cfg: tokf_common::config::types::FilterConfig = + toml::from_str(std::str::from_utf8(bob_toml).unwrap()).unwrap(); + assert_ne!( + tokf_common::hash::canonical_hash(&alice_cfg).unwrap(), + tokf_common::hash::canonical_hash(&bob_cfg).unwrap(), + "test setup: content_hashes must differ — otherwise the test exercises \ + the byte-duplicate path, not the v1-collision branch", + ); let app = crate::routes::create_router(make_state(pool.clone())); let alice_resp = post_filter( From ec889d78aae28ecaf8b6238381d0fae6dc673836 Mon Sep 17 00:00:00 2001 From: Matjaz Domen Pecan Date: Wed, 29 Apr 2026 16:01:49 +0200 Subject: [PATCH 3/3] refactor(server): simplify per /simplify review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cleanup pass identified by three review agents: - Extract `storage::get_utf8` helper to share the R2-fetch + UTF-8 decode pattern; use it from `compute_and_store_v1`. Three older call sites (search, regenerate, update_tests) keep the inline form for now — scope-limited migration is a follow-up. - Collapse `MAX_BACKFILL_ENTRIES` and `MAX_BACKFILL_V1_LIMIT` (both 500) into one `MAX_BACKFILL_BATCH` constant. - Add `expected_v1` test helper (accepts &[u8] or &str) to dedupe the `tokf_common::canonical_v1::hash(std::str::from_utf8(...).unwrap()).unwrap()` pattern across 5 test sites. - Combine `backfill_v1_reports_failure_when_r2_object_{missing,unparseable}` into a single parameterised test driving both `CorruptR2` modes. - Trim verbose comments on `UpsertResult`, `upsert_filter_record`, and the v1-collision-handoff narrative in `publish_filter` (kept the WHY content, dropped the WHAT-restating prose). - Tighten the migration SQL header. - Reshape `publish_stores_v1_hash_on_new_row` to use the existing `publish_filter_helper`, dropping similarity to other publish tests. Quality gate: 267 server tests pass, fmt + clippy + dupes (0.4% exact / 0.3% near, both under 0.5% limit) all clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../migrations/20260428000000_add_v1_hash.sql | 19 ++--- .../src/routes/filters/backfill.rs | 26 +++--- .../src/routes/filters/backfill_tests.rs | 85 ++++++++++--------- .../src/routes/filters/publish/mod.rs | 39 ++++----- .../routes/filters/publish/stdlib/tests.rs | 5 +- .../src/routes/filters/publish/tests.rs | 39 ++------- .../src/routes/filters/test_helpers.rs | 8 ++ crates/tokf-server/src/storage/mod.rs | 15 ++++ 8 files changed, 113 insertions(+), 123 deletions(-) diff --git a/crates/tokf-server/migrations/20260428000000_add_v1_hash.sql b/crates/tokf-server/migrations/20260428000000_add_v1_hash.sql index 319e48ec..8d76d50e 100644 --- a/crates/tokf-server/migrations/20260428000000_add_v1_hash.sql +++ b/crates/tokf-server/migrations/20260428000000_add_v1_hash.sql @@ -1,17 +1,14 @@ --- Add `v1_hash` column for the schema-independent canonical TOML hash (ADR-0002). +-- Schema-independent canonical TOML hash (ADR-0002). -- -- Nullable on purpose: existing rows are NULL until the backfill endpoint -- (`POST /api/filters/backfill-v1-hashes`) has populated them by re-reading --- each row's TOML from R2. Adding NOT NULL or UNIQUE here would either fail --- the migration or break the backfill mid-run as soon as the first --- historical duplicate is encountered. +-- each row's TOML from R2. NOT NULL or UNIQUE here would either fail the +-- migration or break the backfill mid-run on the first historical duplicate; +-- both belong to the dedup migration (separate PR) which also collapses +-- duplicate v1 rows. -- --- The dedup migration (separate PR) will collapse duplicate v1 rows and may --- add UNIQUE alongside. --- --- A non-partial index is used because CockroachDB cannot create a partial --- index on a column added in the same migration ("column ... is not public", --- error 0A000). The space cost of indexing NULL is negligible (NULLs are --- already collapsed in B-tree indexes). +-- Non-partial index because CockroachDB rejects a partial index on a column +-- added in the same migration ("column ... is not public", error 0A000). +-- B-tree NULL handling makes the space cost negligible. ALTER TABLE filters ADD COLUMN v1_hash TEXT; CREATE INDEX filters_v1_hash_idx ON filters(v1_hash); diff --git a/crates/tokf-server/src/routes/filters/backfill.rs b/crates/tokf-server/src/routes/filters/backfill.rs index eac34da8..057341a8 100644 --- a/crates/tokf-server/src/routes/filters/backfill.rs +++ b/crates/tokf-server/src/routes/filters/backfill.rs @@ -27,8 +27,9 @@ pub struct BackfillVersionsResponse { pub skipped: usize, } -/// Maximum number of entries in a single backfill request. -const MAX_BACKFILL_ENTRIES: usize = 500; +/// Maximum batch size for either backfill endpoint. Bounds operator load +/// against the DB / R2. +const MAX_BACKFILL_BATCH: usize = 500; /// `POST /api/filters/backfill-versions` — Backfill version data for filters. /// @@ -39,9 +40,9 @@ pub async fn backfill_versions( State(state): State, Json(req): Json, ) -> Result<(StatusCode, Json), AppError> { - if req.entries.len() > MAX_BACKFILL_ENTRIES { + if req.entries.len() > MAX_BACKFILL_BATCH { return Err(AppError::BadRequest(format!( - "batch size {} exceeds maximum of {MAX_BACKFILL_ENTRIES}", + "batch size {} exceeds maximum of {MAX_BACKFILL_BATCH}", req.entries.len() ))); } @@ -105,7 +106,7 @@ pub async fn backfill_versions( #[derive(Debug, Deserialize)] pub struct BackfillV1Request { /// Maximum rows to process in this call. Operator iterates until - /// `processed == 0`. Defaults to 100; capped at `MAX_BACKFILL_V1_LIMIT`. + /// `processed == 0`. Defaults to 100; capped at `MAX_BACKFILL_BATCH`. #[serde(default = "default_v1_batch_size")] pub limit: usize, } @@ -127,8 +128,6 @@ pub struct BackfillV1Failure { pub error: String, } -const MAX_BACKFILL_V1_LIMIT: usize = 500; - /// `POST /api/filters/backfill-v1-hashes` — populate `v1_hash` for legacy rows. /// /// Service-token auth. Idempotent. Picks up to `limit` rows where @@ -144,8 +143,8 @@ pub async fn backfill_v1_hashes( State(state): State, Json(req): Json, ) -> Result<(StatusCode, Json), AppError> { - let limit = req.limit.clamp(1, MAX_BACKFILL_V1_LIMIT); - // Safe: `limit` is clamped to [1, MAX_BACKFILL_V1_LIMIT] (= 500) which fits in i64. + let limit = req.limit.clamp(1, MAX_BACKFILL_BATCH); + // Safe: `limit` is clamped to [1, MAX_BACKFILL_BATCH] (= 500) which fits in i64. #[allow(clippy::cast_possible_wrap)] let limit_i64 = limit as i64; @@ -203,13 +202,8 @@ async fn compute_and_store_v1( content_hash: &str, r2_key: &str, ) -> anyhow::Result<()> { - let bytes = state - .storage - .get(r2_key) - .await? - .ok_or_else(|| anyhow::anyhow!("R2 object missing: {r2_key}"))?; - let toml_str = std::str::from_utf8(&bytes)?; - let v1 = tokf_common::canonical_v1::hash(toml_str)?; + let toml_str = crate::storage::get_utf8(&*state.storage, r2_key).await?; + let v1 = tokf_common::canonical_v1::hash(&toml_str)?; sqlx::query("UPDATE filters SET v1_hash = $1 WHERE content_hash = $2") .bind(&v1) .bind(content_hash) diff --git a/crates/tokf-server/src/routes/filters/backfill_tests.rs b/crates/tokf-server/src/routes/filters/backfill_tests.rs index 138d704d..bdebfad5 100644 --- a/crates/tokf-server/src/routes/filters/backfill_tests.rs +++ b/crates/tokf-server/src/routes/filters/backfill_tests.rs @@ -13,7 +13,7 @@ use crate::storage::StorageClient as _; use crate::storage::mock::InMemoryStorageClient; use super::test_helpers::{ - insert_test_user, make_state_with_storage, post_json, publish_filter_helper, + expected_v1, insert_test_user, make_state_with_storage, post_json, publish_filter_helper, }; const VALID_FILTER_TOML: &[u8] = b"command = \"my-tool\"\n"; @@ -75,10 +75,7 @@ async fn backfill_v1_populates_null_rows(pool: PgPool) { .fetch_one(&pool) .await .unwrap(); - assert_eq!( - alice_v1.unwrap(), - tokf_common::canonical_v1::hash(std::str::from_utf8(alice_toml).unwrap()).unwrap() - ); + assert_eq!(alice_v1.unwrap(), expected_v1(alice_toml)); let bob_v1: Option = sqlx::query_scalar("SELECT v1_hash FROM filters WHERE content_hash = $1") @@ -86,10 +83,7 @@ async fn backfill_v1_populates_null_rows(pool: PgPool) { .fetch_one(&pool) .await .unwrap(); - assert_eq!( - bob_v1.unwrap(), - tokf_common::canonical_v1::hash(std::str::from_utf8(bob_toml).unwrap()).unwrap() - ); + assert_eq!(bob_v1.unwrap(), expected_v1(bob_toml)); } #[crdb_test_macro::crdb_test(migrations = "./migrations")] @@ -176,39 +170,52 @@ async fn r2_key_for(pool: &PgPool, hash: &str) -> String { .unwrap() } -#[crdb_test_macro::crdb_test(migrations = "./migrations")] -async fn backfill_v1_reports_failure_when_r2_object_missing(pool: PgPool) { - let storage = Arc::new(InMemoryStorageClient::new()); - let (_, user) = insert_test_user(&pool, "alice_missing").await; - let service_token = insert_service_token(&pool, "bf-missing").await; - - let hash = publish_then_null_v1(&pool, Arc::clone(&storage), &user, VALID_FILTER_TOML).await; - storage - .delete(&r2_key_for(&pool, &hash).await) - .await - .unwrap(); - - run_backfill_expecting_one_failure(&pool, storage, &service_token, &hash).await; +/// Each kind of corrupted R2 state that `compute_and_store_v1` should +/// surface as a per-row failure (without aborting the batch or writing +/// `v1_hash`). +enum CorruptR2 { + /// Object deleted — `storage.get` returns None. + Missing, + /// Bytes that fail both UTF-8 decode and TOML parse, exercising both + /// post-fetch failure branches in `compute_and_store_v1`. + Unparseable, } -/// Exercises both UTF-8 and TOML-parse failure branches in -/// `compute_and_store_v1` by writing bytes that are invalid as both. #[crdb_test_macro::crdb_test(migrations = "./migrations")] -async fn backfill_v1_reports_failure_when_r2_object_unparseable(pool: PgPool) { - let storage = Arc::new(InMemoryStorageClient::new()); - let (_, user) = insert_test_user(&pool, "alice_corrupt").await; - let service_token = insert_service_token(&pool, "bf-corrupt").await; - - let hash = publish_then_null_v1(&pool, Arc::clone(&storage), &user, VALID_FILTER_TOML).await; - storage - .put( - &r2_key_for(&pool, &hash).await, - b"\xff\xfe not valid toml [[[".to_vec(), - ) - .await - .unwrap(); - - run_backfill_expecting_one_failure(&pool, storage, &service_token, &hash).await; +async fn backfill_v1_reports_per_row_failures(pool: PgPool) { + let service_token = insert_service_token(&pool, "bf-failures").await; + + for (idx, kind) in [CorruptR2::Missing, CorruptR2::Unparseable] + .into_iter() + .enumerate() + { + let storage = Arc::new(InMemoryStorageClient::new()); + let (_, user) = insert_test_user(&pool, &format!("alice_fail_{idx}")).await; + let hash = + publish_then_null_v1(&pool, Arc::clone(&storage), &user, VALID_FILTER_TOML).await; + let r2_key = r2_key_for(&pool, &hash).await; + match kind { + CorruptR2::Missing => storage.delete(&r2_key).await.unwrap(), + CorruptR2::Unparseable => { + storage + .put(&r2_key, b"\xff\xfe not valid toml [[[".to_vec()) + .await + .unwrap(); + } + } + run_backfill_expecting_one_failure(&pool, storage, &service_token, &hash).await; + // Reset for next iteration so the second case starts from a clean + // "v1_hash IS NULL" set (the failure left the row NULL — we just + // want to make sure the backfill's `LIMIT` finds the right row). + sqlx::query("DELETE FROM filter_tests") + .execute(&pool) + .await + .unwrap(); + sqlx::query("DELETE FROM filters") + .execute(&pool) + .await + .unwrap(); + } } #[crdb_test_macro::crdb_test(migrations = "./migrations")] diff --git a/crates/tokf-server/src/routes/filters/publish/mod.rs b/crates/tokf-server/src/routes/filters/publish/mod.rs index 9563156f..1184298d 100644 --- a/crates/tokf-server/src/routes/filters/publish/mod.rs +++ b/crates/tokf-server/src/routes/filters/publish/mod.rs @@ -96,32 +96,26 @@ struct FilterInsert<'a> { safety_passed: bool, } -/// Result of upserting a published filter. +/// Resolved view of a publish attempt: the author and `content_hash` of +/// the row that ends up in the registry. For v1-equivalent duplicates +/// these come from the *existing* row, not the rejected submission. struct UpsertResult { - /// The username of the row's author. For new inserts this matches the - /// publisher; for duplicates it's the original author. author: String, - /// The canonical `content_hash` of the row in the DB. Equals - /// `insert.content_hash` for new inserts and byte-identical - /// resubmissions; for v1-equivalent duplicates this is the *existing* - /// row's `content_hash`, not the rejected publisher's. content_hash: String, - /// True when a new row was inserted, false for any duplicate path. is_new: bool, } -/// Insert or detect an existing filter row. +/// Insert a new filter row, or resolve a duplicate. /// -/// Uploading to R2 before this call means orphaned objects are possible on DB -/// failure, but they are harmless (no user-visible state and R2 uploads are -/// idempotent on retry). +/// Two duplicate paths: byte-identical (caught by `ON CONFLICT (content_hash)`) +/// and v1-equivalent (caught by the pre-check below — same canonical TOML +/// shape, different `content_hash`). Both return the existing row's author +/// and canonical hash. Legacy rows with NULL `v1_hash` are excluded from the +/// v1-equivalence check via SQL three-valued logic until they are backfilled. /// -/// Performs a v1-collision pre-check: if a row already exists with the same -/// `v1_hash` but a different `content_hash`, returns that row's author and -/// canonical hash without inserting. This stops new publishes from -/// re-splitting canonically equivalent filters across `content_hash` -/// variants. Legacy rows with NULL `v1_hash` are excluded from the check -/// until they are backfilled. +/// Uploading to R2 before this call means orphaned objects are possible on +/// DB failure, but they are harmless (no user-visible state and R2 uploads +/// are idempotent on retry). async fn upsert_filter_record( state: &AppState, insert: &FilterInsert<'_>, @@ -478,11 +472,10 @@ pub async fn publish_filter( }; let upserted = upsert_filter_record(&state, &insert, &auth.username).await?; - // For v1-collision duplicates, `upserted.content_hash` is the existing - // row's canonical hash, not `prepared.content_hash`. Examples and test - // rows below are still keyed by `prepared.content_hash` because: - // - examples upload is best-effort and orphaned R2 objects are harmless, - // - `insert_filter_tests` is gated on `is_new` so it only runs for genuinely new rows. + // Below, examples + test rows are keyed by `prepared.content_hash` (the + // submitted hash) while the response uses `upserted.content_hash` (the + // existing-row hash on a v1-collision). Safe because examples uploads + // tolerate orphans and `insert_filter_tests` is gated on `is_new`. if let Ok((examples_json, _)) = examples_result { if let Err(e) = storage::upload_examples(&*state.storage, &prepared.content_hash, examples_json).await diff --git a/crates/tokf-server/src/routes/filters/publish/stdlib/tests.rs b/crates/tokf-server/src/routes/filters/publish/stdlib/tests.rs index 74fe3ef1..6e387436 100644 --- a/crates/tokf-server/src/routes/filters/publish/stdlib/tests.rs +++ b/crates/tokf-server/src/routes/filters/publish/stdlib/tests.rs @@ -7,7 +7,7 @@ use axum::{ use http_body_util::BodyExt; use tower::ServiceExt; -use crate::routes::filters::test_helpers::make_state; +use crate::routes::filters::test_helpers::{expected_v1, make_state}; use crate::routes::test_helpers::insert_service_token; use crate::storage::mock::InMemoryStorageClient; @@ -320,6 +320,5 @@ async fn publish_stdlib_stores_v1_hash(pool: PgPool) { .await .unwrap(); let v1_hash = v1_hash.expect("stdlib publish must populate v1_hash"); - let expected = tokf_common::canonical_v1::hash(&req.filters[0].filter_toml).unwrap(); - assert_eq!(v1_hash, expected); + assert_eq!(v1_hash, expected_v1(&req.filters[0].filter_toml)); } diff --git a/crates/tokf-server/src/routes/filters/publish/tests.rs b/crates/tokf-server/src/routes/filters/publish/tests.rs index 53fa47c6..618764b8 100644 --- a/crates/tokf-server/src/routes/filters/publish/tests.rs +++ b/crates/tokf-server/src/routes/filters/publish/tests.rs @@ -15,8 +15,8 @@ use crate::storage::mock::InMemoryStorageClient; use crate::storage::StorageClient as _; use super::super::test_helpers::{ - DEFAULT_PASSING_TEST, MIT_ACCEPT, insert_test_user, make_multipart, make_state, - make_state_with_storage, post_filter, + DEFAULT_PASSING_TEST, MIT_ACCEPT, expected_v1, insert_test_user, make_multipart, make_state, + make_state_with_storage, post_filter, publish_filter_helper, }; const VALID_FILTER_TOML: &[u8] = b"command = \"my-tool\"\n"; @@ -543,37 +543,15 @@ async fn publish_filter_stores_examples_in_r2(pool: PgPool) { async fn publish_stores_v1_hash_on_new_row(pool: PgPool) { let (_, token) = insert_test_user(&pool, "alice_v1_store").await; let app = crate::routes::create_router(make_state(pool.clone())); + let hash = publish_filter_helper(app, &token, VALID_FILTER_TOML, &[]).await; - let resp = post_filter( - app, - &token, - &[ - ("filter", VALID_FILTER_TOML), - MIT_ACCEPT, - DEFAULT_PASSING_TEST, - ], - ) - .await; - assert_eq!(resp.status(), StatusCode::CREATED); - - let body = resp.into_body().collect().await.unwrap().to_bytes(); - let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); - let content_hash = json["content_hash"].as_str().unwrap(); - - let v1_hash: Option = + let v1: Option = sqlx::query_scalar("SELECT v1_hash FROM filters WHERE content_hash = $1") - .bind(content_hash) + .bind(&hash) .fetch_one(&pool) .await .unwrap(); - let v1_hash = v1_hash.expect("v1_hash should be populated on publish"); - - let expected = tokf_common::canonical_v1::hash(std::str::from_utf8(VALID_FILTER_TOML).unwrap()) - .expect("canonical_v1::hash should succeed for valid TOML"); - assert_eq!( - v1_hash, expected, - "stored v1_hash should match canonical_v1 of the published TOML" - ); + assert_eq!(v1.unwrap(), expected_v1(VALID_FILTER_TOML)); } /// Two byte-different but canonically-equivalent filters published by @@ -596,9 +574,8 @@ async fn publish_v1_collision_returns_existing_author(pool: PgPool) { let bob_toml: &[u8] = b"command = [\"my-tool\"]\n"; // Sanity check: same v1, different content_hash. - let v1_alice = - tokf_common::canonical_v1::hash(std::str::from_utf8(alice_toml).unwrap()).unwrap(); - let v1_bob = tokf_common::canonical_v1::hash(std::str::from_utf8(bob_toml).unwrap()).unwrap(); + let v1_alice = expected_v1(alice_toml); + let v1_bob = expected_v1(bob_toml); assert_eq!( v1_alice, v1_bob, "test setup: filters must canonicalise to the same v1 hash" diff --git a/crates/tokf-server/src/routes/filters/test_helpers.rs b/crates/tokf-server/src/routes/filters/test_helpers.rs index 798b7144..eb031b9c 100644 --- a/crates/tokf-server/src/routes/filters/test_helpers.rs +++ b/crates/tokf-server/src/routes/filters/test_helpers.rs @@ -148,6 +148,14 @@ pub async fn get_request(app: axum::Router, token: &str, uri: &str) -> axum::res .unwrap() } +/// Compute the canonical-v1 hash of TOML for assertion in tests. Accepts +/// `&[u8]` or `&str`. Panics if the bytes aren't UTF-8 or +/// `canonical_v1::hash` errors — both impossible for the small inline +/// fixtures used here. +pub fn expected_v1(toml: impl AsRef<[u8]>) -> String { + tokf_common::canonical_v1::hash(std::str::from_utf8(toml.as_ref()).unwrap()).unwrap() +} + /// POST a JSON body to a URI with a bearer token. Used by every endpoint /// test whose handler accepts `Json<...>` over `application/json`. pub async fn post_json( diff --git a/crates/tokf-server/src/storage/mod.rs b/crates/tokf-server/src/storage/mod.rs index 0ca4f742..690d1084 100644 --- a/crates/tokf-server/src/storage/mod.rs +++ b/crates/tokf-server/src/storage/mod.rs @@ -18,6 +18,21 @@ pub trait StorageClient: Send + Sync { async fn delete(&self, key: &str) -> anyhow::Result<()>; } +/// Fetch an object as a UTF-8 string. Errors when the key is missing or +/// when the bytes are not valid UTF-8. +/// +/// # Errors +/// +/// Returns an error if the storage call fails, the key does not exist, or +/// the content is not valid UTF-8. +pub async fn get_utf8(storage: &dyn StorageClient, key: &str) -> anyhow::Result { + let bytes = storage + .get(key) + .await? + .ok_or_else(|| anyhow::anyhow!("storage object missing: {key}"))?; + String::from_utf8(bytes).map_err(|e| anyhow::anyhow!("storage object not UTF-8: {e}")) +} + /// Upload a filter TOML if not already stored. Returns the R2 key. /// /// Key format: `filters/{content_hash}/filter.toml`