diff --git a/crates/tokf-server/migrations/20260428000000_add_v1_hash.sql b/crates/tokf-server/migrations/20260428000000_add_v1_hash.sql new file mode 100644 index 00000000..8d76d50e --- /dev/null +++ b/crates/tokf-server/migrations/20260428000000_add_v1_hash.sql @@ -0,0 +1,14 @@ +-- Schema-independent canonical TOML hash (ADR-0002). +-- +-- Nullable on purpose: existing rows are NULL until the backfill endpoint +-- (`POST /api/filters/backfill-v1-hashes`) has populated them by re-reading +-- each row's TOML from R2. NOT NULL or UNIQUE here would either fail the +-- migration or break the backfill mid-run on the first historical duplicate; +-- both belong to the dedup migration (separate PR) which also collapses +-- duplicate v1 rows. +-- +-- Non-partial index because CockroachDB rejects a partial index on a column +-- added in the same migration ("column ... is not public", error 0A000). +-- B-tree NULL handling makes the space cost negligible. +ALTER TABLE filters ADD COLUMN v1_hash TEXT; +CREATE INDEX filters_v1_hash_idx ON filters(v1_hash); diff --git a/crates/tokf-server/src/routes/filters/backfill.rs b/crates/tokf-server/src/routes/filters/backfill.rs index 4307e578..057341a8 100644 --- a/crates/tokf-server/src/routes/filters/backfill.rs +++ b/crates/tokf-server/src/routes/filters/backfill.rs @@ -27,8 +27,9 @@ pub struct BackfillVersionsResponse { pub skipped: usize, } -/// Maximum number of entries in a single backfill request. -const MAX_BACKFILL_ENTRIES: usize = 500; +/// Maximum batch size for either backfill endpoint. Bounds operator load +/// against the DB / R2. +const MAX_BACKFILL_BATCH: usize = 500; /// `POST /api/filters/backfill-versions` — Backfill version data for filters. /// @@ -39,9 +40,9 @@ pub async fn backfill_versions( State(state): State, Json(req): Json, ) -> Result<(StatusCode, Json), AppError> { - if req.entries.len() > MAX_BACKFILL_ENTRIES { + if req.entries.len() > MAX_BACKFILL_BATCH { return Err(AppError::BadRequest(format!( - "batch size {} exceeds maximum of {MAX_BACKFILL_ENTRIES}", + "batch size {} exceeds maximum of {MAX_BACKFILL_BATCH}", req.entries.len() ))); } @@ -99,3 +100,114 @@ pub async fn backfill_versions( Json(BackfillVersionsResponse { updated, skipped }), )) } + +// ── v1_hash backfill ────────────────────────────────────────────────────── + +#[derive(Debug, Deserialize)] +pub struct BackfillV1Request { + /// Maximum rows to process in this call. Operator iterates until + /// `processed == 0`. Defaults to 100; capped at `MAX_BACKFILL_BATCH`. + #[serde(default = "default_v1_batch_size")] + pub limit: usize, +} + +const fn default_v1_batch_size() -> usize { + 100 +} + +#[derive(Debug, Serialize)] +pub struct BackfillV1Response { + pub processed: usize, + pub updated: usize, + pub failed: Vec, +} + +#[derive(Debug, Serialize)] +pub struct BackfillV1Failure { + pub content_hash: String, + pub error: String, +} + +/// `POST /api/filters/backfill-v1-hashes` — populate `v1_hash` for legacy rows. +/// +/// Service-token auth. Idempotent. Picks up to `limit` rows where +/// `v1_hash IS NULL`, fetches the TOML from R2, computes the v1 hash +/// (ADR-0002), writes it back. Rows whose R2 object is missing or whose +/// TOML fails to canonicalise are reported in `failed` and skipped — the +/// operator inspects the response and triages those manually. +/// +/// Operator runbook: invoke repeatedly until `processed == 0`. Failures +/// don't stop the batch. +pub async fn backfill_v1_hashes( + _auth: ServiceAuth, + State(state): State, + Json(req): Json, +) -> Result<(StatusCode, Json), AppError> { + let limit = req.limit.clamp(1, MAX_BACKFILL_BATCH); + // Safe: `limit` is clamped to [1, MAX_BACKFILL_BATCH] (= 500) which fits in i64. + #[allow(clippy::cast_possible_wrap)] + let limit_i64 = limit as i64; + + let rows: Vec<(String, String)> = sqlx::query_as( + "SELECT content_hash, r2_key FROM filters + WHERE v1_hash IS NULL + ORDER BY created_at ASC + LIMIT $1", + ) + .bind(limit_i64) + .fetch_all(&state.db) + .await?; + + tracing::info!( + candidates = rows.len(), + limit, + "backfill-v1-hashes request received" + ); + + let mut updated = 0usize; + let mut failed = Vec::new(); + + for (content_hash, r2_key) in &rows { + match compute_and_store_v1(&state, content_hash, r2_key).await { + Ok(()) => updated += 1, + Err(e) => { + tracing::warn!(hash = %content_hash, "backfill-v1 failed: {e}"); + failed.push(BackfillV1Failure { + content_hash: content_hash.clone(), + error: e.to_string(), + }); + } + } + } + + tracing::info!( + processed = rows.len(), + updated, + failed = failed.len(), + "backfill-v1-hashes complete" + ); + + Ok(( + StatusCode::OK, + Json(BackfillV1Response { + processed: rows.len(), + updated, + failed, + }), + )) +} + +async fn compute_and_store_v1( + state: &AppState, + content_hash: &str, + r2_key: &str, +) -> anyhow::Result<()> { + let toml_str = crate::storage::get_utf8(&*state.storage, r2_key).await?; + let v1 = tokf_common::canonical_v1::hash(&toml_str)?; + sqlx::query("UPDATE filters SET v1_hash = $1 WHERE content_hash = $2") + .bind(&v1) + .bind(content_hash) + .execute(&state.db) + .await?; + Ok(()) +} diff --git a/crates/tokf-server/src/routes/filters/backfill_tests.rs b/crates/tokf-server/src/routes/filters/backfill_tests.rs new file mode 100644 index 00000000..bdebfad5 --- /dev/null +++ b/crates/tokf-server/src/routes/filters/backfill_tests.rs @@ -0,0 +1,306 @@ +use std::sync::Arc; + +use axum::{ + body::Body, + http::{Request, StatusCode}, +}; +use http_body_util::BodyExt; +use sqlx::PgPool; +use tower::ServiceExt; + +use crate::routes::test_helpers::insert_service_token; +use crate::storage::StorageClient as _; +use crate::storage::mock::InMemoryStorageClient; + +use super::test_helpers::{ + expected_v1, insert_test_user, make_state_with_storage, post_json, publish_filter_helper, +}; + +const VALID_FILTER_TOML: &[u8] = b"command = \"my-tool\"\n"; + +const URI: &str = "/api/filters/backfill-v1-hashes"; + +/// Publish a filter, then NULL out its `v1_hash` to simulate a row created +/// before this PR shipped. +async fn publish_then_null_v1( + pool: &PgPool, + storage: Arc, + user_token: &str, + toml: &[u8], +) -> String { + let state = make_state_with_storage(pool.clone(), storage); + let app = crate::routes::create_router(state); + let hash = publish_filter_helper(app, user_token, toml, &[]).await; + sqlx::query("UPDATE filters SET v1_hash = NULL WHERE content_hash = $1") + .bind(&hash) + .execute(pool) + .await + .unwrap(); + hash +} + +#[crdb_test_macro::crdb_test(migrations = "./migrations")] +async fn backfill_v1_populates_null_rows(pool: PgPool) { + let storage = Arc::new(InMemoryStorageClient::new()); + let (_, alice) = insert_test_user(&pool, "alice_bf").await; + let (_, bob) = insert_test_user(&pool, "bob_bf").await; + let service_token = insert_service_token(&pool, "bf-test").await; + + let alice_toml: &[u8] = b"command = \"alice-tool\"\n"; + let bob_toml: &[u8] = b"command = \"bob-tool\"\n"; + + let alice_hash = publish_then_null_v1(&pool, Arc::clone(&storage), &alice, alice_toml).await; + let bob_hash = publish_then_null_v1(&pool, Arc::clone(&storage), &bob, bob_toml).await; + + let app = + crate::routes::create_router(make_state_with_storage(pool.clone(), Arc::clone(&storage))); + let resp = post_json( + app, + &service_token, + URI, + &serde_json::json!({ "limit": 100 }), + ) + .await; + assert_eq!(resp.status(), StatusCode::OK); + + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["processed"], 2); + assert_eq!(json["updated"], 2); + assert!(json["failed"].as_array().unwrap().is_empty()); + + let alice_v1: Option = + sqlx::query_scalar("SELECT v1_hash FROM filters WHERE content_hash = $1") + .bind(&alice_hash) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(alice_v1.unwrap(), expected_v1(alice_toml)); + + let bob_v1: Option = + sqlx::query_scalar("SELECT v1_hash FROM filters WHERE content_hash = $1") + .bind(&bob_hash) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(bob_v1.unwrap(), expected_v1(bob_toml)); +} + +#[crdb_test_macro::crdb_test(migrations = "./migrations")] +async fn backfill_v1_skips_already_populated(pool: PgPool) { + let storage = Arc::new(InMemoryStorageClient::new()); + let (_, user) = insert_test_user(&pool, "alice_skip").await; + let service_token = insert_service_token(&pool, "bf-skip").await; + + publish_then_null_v1(&pool, Arc::clone(&storage), &user, VALID_FILTER_TOML).await; + + let app = + crate::routes::create_router(make_state_with_storage(pool.clone(), Arc::clone(&storage))); + let resp = post_json( + app, + &service_token, + URI, + &serde_json::json!({ "limit": 10 }), + ) + .await; + assert_eq!(resp.status(), StatusCode::OK); + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let first: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(first["processed"], 1); + assert_eq!(first["updated"], 1); + + let app = + crate::routes::create_router(make_state_with_storage(pool.clone(), Arc::clone(&storage))); + let resp = post_json( + app, + &service_token, + URI, + &serde_json::json!({ "limit": 10 }), + ) + .await; + assert_eq!(resp.status(), StatusCode::OK); + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let second: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!( + second["processed"], 0, + "second invocation should find no candidates" + ); +} + +/// Drive a backfill against a single corrupted-R2 row and assert that: +/// the call succeeds with `processed=1, updated=0`, the row appears in +/// `failed[]`, and the row's `v1_hash` remains NULL. Used by every +/// per-failure-mode test below. +async fn run_backfill_expecting_one_failure( + pool: &PgPool, + storage: Arc, + service_token: &str, + hash: &str, +) { + let app = + crate::routes::create_router(make_state_with_storage(pool.clone(), Arc::clone(&storage))); + let resp = post_json(app, service_token, URI, &serde_json::json!({ "limit": 10 })).await; + assert_eq!(resp.status(), StatusCode::OK); + + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["processed"], 1); + assert_eq!(json["updated"], 0); + let failed = json["failed"].as_array().unwrap(); + assert_eq!(failed.len(), 1); + assert_eq!(failed[0]["content_hash"], hash); + + let v1: Option = + sqlx::query_scalar("SELECT v1_hash FROM filters WHERE content_hash = $1") + .bind(hash) + .fetch_one(pool) + .await + .unwrap(); + assert!( + v1.is_none(), + "v1_hash must remain NULL when backfill failed" + ); +} + +async fn r2_key_for(pool: &PgPool, hash: &str) -> String { + sqlx::query_scalar("SELECT r2_key FROM filters WHERE content_hash = $1") + .bind(hash) + .fetch_one(pool) + .await + .unwrap() +} + +/// Each kind of corrupted R2 state that `compute_and_store_v1` should +/// surface as a per-row failure (without aborting the batch or writing +/// `v1_hash`). +enum CorruptR2 { + /// Object deleted — `storage.get` returns None. + Missing, + /// Bytes that fail both UTF-8 decode and TOML parse, exercising both + /// post-fetch failure branches in `compute_and_store_v1`. + Unparseable, +} + +#[crdb_test_macro::crdb_test(migrations = "./migrations")] +async fn backfill_v1_reports_per_row_failures(pool: PgPool) { + let service_token = insert_service_token(&pool, "bf-failures").await; + + for (idx, kind) in [CorruptR2::Missing, CorruptR2::Unparseable] + .into_iter() + .enumerate() + { + let storage = Arc::new(InMemoryStorageClient::new()); + let (_, user) = insert_test_user(&pool, &format!("alice_fail_{idx}")).await; + let hash = + publish_then_null_v1(&pool, Arc::clone(&storage), &user, VALID_FILTER_TOML).await; + let r2_key = r2_key_for(&pool, &hash).await; + match kind { + CorruptR2::Missing => storage.delete(&r2_key).await.unwrap(), + CorruptR2::Unparseable => { + storage + .put(&r2_key, b"\xff\xfe not valid toml [[[".to_vec()) + .await + .unwrap(); + } + } + run_backfill_expecting_one_failure(&pool, storage, &service_token, &hash).await; + // Reset for next iteration so the second case starts from a clean + // "v1_hash IS NULL" set (the failure left the row NULL — we just + // want to make sure the backfill's `LIMIT` finds the right row). + sqlx::query("DELETE FROM filter_tests") + .execute(&pool) + .await + .unwrap(); + sqlx::query("DELETE FROM filters") + .execute(&pool) + .await + .unwrap(); + } +} + +#[crdb_test_macro::crdb_test(migrations = "./migrations")] +async fn backfill_v1_requires_service_token(pool: PgPool) { + let storage = Arc::new(InMemoryStorageClient::new()); + let app = crate::routes::create_router(make_state_with_storage(pool, storage)); + + let resp = app + .oneshot( + Request::builder() + .method("POST") + .uri("/api/filters/backfill-v1-hashes") + .header("content-type", "application/json") + .body(Body::from(b"{}".to_vec())) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(resp.status(), StatusCode::UNAUTHORIZED); +} + +#[crdb_test_macro::crdb_test(migrations = "./migrations")] +async fn backfill_v1_rejects_invalid_bearer(pool: PgPool) { + let storage = Arc::new(InMemoryStorageClient::new()); + let app = crate::routes::create_router(make_state_with_storage(pool, storage)); + + let resp = post_json( + app, + "not-a-real-token", + URI, + &serde_json::json!({ "limit": 10 }), + ) + .await; + assert_eq!(resp.status(), StatusCode::UNAUTHORIZED); +} + +#[crdb_test_macro::crdb_test(migrations = "./migrations")] +async fn backfill_v1_respects_limit(pool: PgPool) { + let storage = Arc::new(InMemoryStorageClient::new()); + let (_, user) = insert_test_user(&pool, "alice_limit").await; + let service_token = insert_service_token(&pool, "bf-limit").await; + + // Publish 5 distinct filters; null their v1_hash. + for i in 0..5 { + let toml = format!("command = \"tool-{i}\"\n"); + publish_then_null_v1(&pool, Arc::clone(&storage), &user, toml.as_bytes()).await; + } + + let app = + crate::routes::create_router(make_state_with_storage(pool.clone(), Arc::clone(&storage))); + let resp = post_json(app, &service_token, URI, &serde_json::json!({ "limit": 2 })).await; + assert_eq!(resp.status(), StatusCode::OK); + + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["processed"], 2, "should respect requested limit"); + assert_eq!(json["updated"], 2); + + let still_null: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM filters WHERE v1_hash IS NULL") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(still_null, 3, "3 rows should still be unprocessed"); +} + +#[crdb_test_macro::crdb_test(migrations = "./migrations")] +async fn backfill_v1_caps_limit_at_max(pool: PgPool) { + let storage = Arc::new(InMemoryStorageClient::new()); + let service_token = insert_service_token(&pool, "bf-cap").await; + let app = + crate::routes::create_router(make_state_with_storage(pool.clone(), Arc::clone(&storage))); + + // No filters seeded; just confirm the call succeeds with an out-of-range + // limit (clamped, not rejected). + let resp = post_json( + app, + &service_token, + URI, + &serde_json::json!({ "limit": 1_000_000 }), + ) + .await; + assert_eq!(resp.status(), StatusCode::OK); + + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["processed"], 0); +} diff --git a/crates/tokf-server/src/routes/filters/mod.rs b/crates/tokf-server/src/routes/filters/mod.rs index 68befa3b..583310af 100644 --- a/crates/tokf-server/src/routes/filters/mod.rs +++ b/crates/tokf-server/src/routes/filters/mod.rs @@ -1,4 +1,7 @@ mod backfill; +#[cfg(test)] +#[allow(clippy::unwrap_used, clippy::expect_used)] +mod backfill_tests; mod publish; mod regenerate; mod search; @@ -9,7 +12,7 @@ mod search_tests; pub mod test_helpers; mod update_tests; -pub use backfill::backfill_versions; +pub use backfill::{backfill_v1_hashes, backfill_versions}; pub use publish::publish_filter; pub use publish::stdlib::publish_stdlib; pub use regenerate::regenerate_examples; diff --git a/crates/tokf-server/src/routes/filters/publish/mod.rs b/crates/tokf-server/src/routes/filters/publish/mod.rs index d6d4a786..1184298d 100644 --- a/crates/tokf-server/src/routes/filters/publish/mod.rs +++ b/crates/tokf-server/src/routes/filters/publish/mod.rs @@ -88,6 +88,7 @@ async fn parse_multipart( /// Grouped fields for inserting a filter record. struct FilterInsert<'a> { content_hash: &'a str, + v1_hash: &'a str, command_pattern: &'a str, canonical_command: &'a str, author_id: i64, @@ -95,23 +96,62 @@ struct FilterInsert<'a> { safety_passed: bool, } -/// Returns `(author_username, was_new)` for the filter with `content_hash`. +/// Resolved view of a publish attempt: the author and `content_hash` of +/// the row that ends up in the registry. For v1-equivalent duplicates +/// these come from the *existing* row, not the rejected submission. +struct UpsertResult { + author: String, + content_hash: String, + is_new: bool, +} + +/// Insert a new filter row, or resolve a duplicate. +/// +/// Two duplicate paths: byte-identical (caught by `ON CONFLICT (content_hash)`) +/// and v1-equivalent (caught by the pre-check below — same canonical TOML +/// shape, different `content_hash`). Both return the existing row's author +/// and canonical hash. Legacy rows with NULL `v1_hash` are excluded from the +/// v1-equivalence check via SQL three-valued logic until they are backfilled. /// -/// Attempts an INSERT and uses the result to distinguish new vs duplicate. -/// Uploading to R2 before this call means orphaned objects are possible on DB -/// failure, but they are harmless (no user-visible state and R2 uploads are -/// idempotent on retry). +/// Uploading to R2 before this call means orphaned objects are possible on +/// DB failure, but they are harmless (no user-visible state and R2 uploads +/// are idempotent on retry). async fn upsert_filter_record( state: &AppState, insert: &FilterInsert<'_>, author_username: &str, -) -> Result<(String, bool), AppError> { +) -> Result { + if let Some((existing_hash, existing_author)) = sqlx::query_as::<_, (String, String)>( + "SELECT f.content_hash, u.username FROM filters f + JOIN users u ON u.id = f.author_id + WHERE f.v1_hash = $1 AND f.content_hash <> $2 + LIMIT 1", + ) + .bind(insert.v1_hash) + .bind(insert.content_hash) + .fetch_optional(&state.db) + .await? + { + tracing::info!( + new_hash = %insert.content_hash, + existing_hash = %existing_hash, + v1 = %insert.v1_hash, + "publish rejected as v1-equivalent of existing filter", + ); + return Ok(UpsertResult { + author: existing_author, + content_hash: existing_hash, + is_new: false, + }); + } + let result = sqlx::query( - "INSERT INTO filters (content_hash, command_pattern, canonical_command, author_id, r2_key, safety_passed) - VALUES ($1, $2, $3, $4, $5, $6) + "INSERT INTO filters (content_hash, v1_hash, command_pattern, canonical_command, author_id, r2_key, safety_passed) + VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (content_hash) DO NOTHING", ) .bind(insert.content_hash) + .bind(insert.v1_hash) .bind(insert.command_pattern) .bind(insert.canonical_command) .bind(insert.author_id) @@ -121,7 +161,7 @@ async fn upsert_filter_record( .await?; if result.rows_affected() == 0 { - // Duplicate — fetch the original author's username + // Byte-identical duplicate — fetch the original author's username. let existing_author: String = sqlx::query_scalar( "SELECT u.username FROM filters f JOIN users u ON u.id = f.author_id @@ -130,10 +170,18 @@ async fn upsert_filter_record( .bind(insert.content_hash) .fetch_one(&state.db) .await?; - return Ok((existing_author, false)); + return Ok(UpsertResult { + author: existing_author, + content_hash: insert.content_hash.to_string(), + is_new: false, + }); } - Ok((author_username.to_string(), true)) + Ok(UpsertResult { + author: author_username.to_string(), + content_hash: insert.content_hash.to_string(), + is_new: true, + }) } pub async fn upload_tests( @@ -195,6 +243,7 @@ pub(super) async fn set_examples_generated_at(pool: &sqlx::PgPool, content_hash: /// Validated, hashed filter ready for storage. pub(super) struct PreparedFilter { pub(super) content_hash: String, + pub(super) v1_hash: String, pub(super) command_pattern: String, pub(super) canonical_command: String, pub(super) config: FilterConfig, @@ -252,8 +301,11 @@ pub(super) fn validate_and_prepare( } let content_hash = canonical_hash(&config).map_err(|e| format!("hash error: {e}"))?; + let v1_hash = + tokf_common::canonical_v1::hash(toml_str).map_err(|e| format!("v1 hash error: {e}"))?; Ok(PreparedFilter { content_hash, + v1_hash, command_pattern, canonical_command, config, @@ -411,15 +463,19 @@ pub async fn publish_filter( let insert = FilterInsert { content_hash: &prepared.content_hash, + v1_hash: &prepared.v1_hash, command_pattern: &prepared.command_pattern, canonical_command: &prepared.canonical_command, author_id: auth.user_id, r2_key: &r2_key, safety_passed, }; - let (author, is_new) = upsert_filter_record(&state, &insert, &auth.username).await?; + let upserted = upsert_filter_record(&state, &insert, &auth.username).await?; - // Upload examples to R2 AFTER insert so set_examples_generated_at finds the row. + // Below, examples + test rows are keyed by `prepared.content_hash` (the + // submitted hash) while the response uses `upserted.content_hash` (the + // existing-row hash on a v1-collision). Safe because examples uploads + // tolerate orphans and `insert_filter_tests` is gated on `is_new`. if let Ok((examples_json, _)) = examples_result { if let Err(e) = storage::upload_examples(&*state.storage, &prepared.content_hash, examples_json).await @@ -430,7 +486,7 @@ pub async fn publish_filter( } } - if is_new { + if upserted.is_new { insert_filter_tests(&state.db, &prepared.content_hash, &test_r2_keys).await?; // Fire-and-forget: materialize per-filter metadata + catalog index to R2 @@ -441,8 +497,8 @@ pub async fn publish_filter( ); } - let registry_url = format!("{}/filters/{}", state.public_url, prepared.content_hash); - let status = if is_new { + let registry_url = format!("{}/filters/{}", state.public_url, upserted.content_hash); + let status = if upserted.is_new { StatusCode::CREATED } else { StatusCode::OK @@ -451,9 +507,9 @@ pub async fn publish_filter( status, crate::routes::ip::rate_limit_headers(&rl), Json(PublishFilterResponse { - content_hash: prepared.content_hash, + content_hash: upserted.content_hash, command_pattern: prepared.command_pattern, - author, + author: upserted.author, registry_url, }), )) diff --git a/crates/tokf-server/src/routes/filters/publish/stdlib/mod.rs b/crates/tokf-server/src/routes/filters/publish/stdlib/mod.rs index 3f782d78..b3ff2c86 100644 --- a/crates/tokf-server/src/routes/filters/publish/stdlib/mod.rs +++ b/crates/tokf-server/src/routes/filters/publish/stdlib/mod.rs @@ -369,11 +369,12 @@ async fn persist_filter( upload_tests(state, &prepared.content_hash, prepared.test_files.clone()).await?; sqlx::query( - "INSERT INTO filters (content_hash, command_pattern, canonical_command, author_id, r2_key, is_stdlib) - VALUES ($1, $2, $3, $4, $5, TRUE) - ON CONFLICT (content_hash) DO UPDATE SET is_stdlib = TRUE", + "INSERT INTO filters (content_hash, v1_hash, command_pattern, canonical_command, author_id, r2_key, is_stdlib) + VALUES ($1, $2, $3, $4, $5, $6, TRUE) + ON CONFLICT (content_hash) DO UPDATE SET is_stdlib = TRUE, v1_hash = COALESCE(filters.v1_hash, EXCLUDED.v1_hash)", ) .bind(&prepared.content_hash) + .bind(&prepared.v1_hash) .bind(&prepared.command_pattern) .bind(&prepared.canonical_command) .bind(author_id) diff --git a/crates/tokf-server/src/routes/filters/publish/stdlib/tests.rs b/crates/tokf-server/src/routes/filters/publish/stdlib/tests.rs index 375853bb..6e387436 100644 --- a/crates/tokf-server/src/routes/filters/publish/stdlib/tests.rs +++ b/crates/tokf-server/src/routes/filters/publish/stdlib/tests.rs @@ -7,7 +7,7 @@ use axum::{ use http_body_util::BodyExt; use tower::ServiceExt; -use crate::routes::filters::test_helpers::make_state; +use crate::routes::filters::test_helpers::{expected_v1, make_state}; use crate::routes::test_helpers::insert_service_token; use crate::storage::mock::InMemoryStorageClient; @@ -303,3 +303,22 @@ async fn publish_stdlib_rejects_invalid_username(pool: PgPool) { "error should mention username: {error}" ); } + +#[crdb_test_macro::crdb_test(migrations = "./migrations")] +async fn publish_stdlib_stores_v1_hash(pool: PgPool) { + let token = insert_service_token(&pool, "ci-test").await; + let state = make_state(pool.clone()); + let app = crate::routes::create_router(state); + + let req = make_valid_request(); + let resp = post_stdlib(app, &token, &req).await; + assert_eq!(resp.status(), StatusCode::CREATED); + + // Stdlib publish goes through its own INSERT; assert v1_hash is populated. + let v1_hash: Option = sqlx::query_scalar("SELECT v1_hash FROM filters LIMIT 1") + .fetch_one(&pool) + .await + .unwrap(); + let v1_hash = v1_hash.expect("stdlib publish must populate v1_hash"); + assert_eq!(v1_hash, expected_v1(&req.filters[0].filter_toml)); +} diff --git a/crates/tokf-server/src/routes/filters/publish/tests.rs b/crates/tokf-server/src/routes/filters/publish/tests.rs index aae86b80..618764b8 100644 --- a/crates/tokf-server/src/routes/filters/publish/tests.rs +++ b/crates/tokf-server/src/routes/filters/publish/tests.rs @@ -15,8 +15,8 @@ use crate::storage::mock::InMemoryStorageClient; use crate::storage::StorageClient as _; use super::super::test_helpers::{ - DEFAULT_PASSING_TEST, MIT_ACCEPT, insert_test_user, make_multipart, make_state, - make_state_with_storage, post_filter, + DEFAULT_PASSING_TEST, MIT_ACCEPT, expected_v1, insert_test_user, make_multipart, make_state, + make_state_with_storage, post_filter, publish_filter_helper, }; const VALID_FILTER_TOML: &[u8] = b"command = \"my-tool\"\n"; @@ -536,3 +536,106 @@ async fn publish_filter_stores_examples_in_r2(pool: PgPool) { examples_json["safety"] ); } + +// ── v1_hash persistence + collision tests ───────────────────────────────── + +#[crdb_test_macro::crdb_test(migrations = "./migrations")] +async fn publish_stores_v1_hash_on_new_row(pool: PgPool) { + let (_, token) = insert_test_user(&pool, "alice_v1_store").await; + let app = crate::routes::create_router(make_state(pool.clone())); + let hash = publish_filter_helper(app, &token, VALID_FILTER_TOML, &[]).await; + + let v1: Option = + sqlx::query_scalar("SELECT v1_hash FROM filters WHERE content_hash = $1") + .bind(&hash) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(v1.unwrap(), expected_v1(VALID_FILTER_TOML)); +} + +/// Two byte-different but canonically-equivalent filters published by +/// different authors: the second is rejected as a v1-equivalent of the +/// first; `is_new` is false; only one row exists in the DB; the response +/// reports the original author. +#[crdb_test_macro::crdb_test(migrations = "./migrations")] +async fn publish_v1_collision_returns_existing_author(pool: PgPool) { + let (_, alice_token) = insert_test_user(&pool, "alice_v1").await; + let (_, bob_token) = insert_test_user(&pool, "bob_v1").await; + + // `command = "x"` vs `command = ["x"]` — these parse to different + // `CommandPattern` enum variants (`Single` vs `Multiple`), so + // `canonical_hash` (which serialises the parsed FilterConfig) yields + // different content_hashes. canonical_v1 collapses both into the + // single-string form, so they share a v1_hash. This is the smallest + // fixture that exercises the v1-collision branch in + // `upsert_filter_record` rather than the byte-equal duplicate path. + let alice_toml: &[u8] = b"command = \"my-tool\"\n"; + let bob_toml: &[u8] = b"command = [\"my-tool\"]\n"; + + // Sanity check: same v1, different content_hash. + let v1_alice = expected_v1(alice_toml); + let v1_bob = expected_v1(bob_toml); + assert_eq!( + v1_alice, v1_bob, + "test setup: filters must canonicalise to the same v1 hash" + ); + let alice_cfg: tokf_common::config::types::FilterConfig = + toml::from_str(std::str::from_utf8(alice_toml).unwrap()).unwrap(); + let bob_cfg: tokf_common::config::types::FilterConfig = + toml::from_str(std::str::from_utf8(bob_toml).unwrap()).unwrap(); + assert_ne!( + tokf_common::hash::canonical_hash(&alice_cfg).unwrap(), + tokf_common::hash::canonical_hash(&bob_cfg).unwrap(), + "test setup: content_hashes must differ — otherwise the test exercises \ + the byte-duplicate path, not the v1-collision branch", + ); + + let app = crate::routes::create_router(make_state(pool.clone())); + let alice_resp = post_filter( + app, + &alice_token, + &[("filter", alice_toml), MIT_ACCEPT, DEFAULT_PASSING_TEST], + ) + .await; + assert_eq!(alice_resp.status(), StatusCode::CREATED); + let alice_body = alice_resp.into_body().collect().await.unwrap().to_bytes(); + let alice_json: serde_json::Value = serde_json::from_slice(&alice_body).unwrap(); + let alice_content_hash = alice_json["content_hash"].as_str().unwrap().to_string(); + + let app = crate::routes::create_router(make_state(pool.clone())); + let bob_resp = post_filter( + app, + &bob_token, + &[("filter", bob_toml), MIT_ACCEPT, DEFAULT_PASSING_TEST], + ) + .await; + assert_eq!( + bob_resp.status(), + StatusCode::OK, + "v1-equivalent republish should be 200 OK (treated as duplicate)" + ); + + let bob_body = bob_resp.into_body().collect().await.unwrap().to_bytes(); + let bob_json: serde_json::Value = serde_json::from_slice(&bob_body).unwrap(); + assert_eq!( + bob_json["author"], "alice_v1", + "response should report the original author, not bob" + ); + assert_eq!( + bob_json["content_hash"].as_str().unwrap(), + alice_content_hash, + "response should report the existing content_hash, not bob's" + ); + + // Only one row exists in DB. + let row_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM filters WHERE v1_hash = $1") + .bind(&v1_alice) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!( + row_count, 1, + "only one row should exist for canonically-equivalent filters" + ); +} diff --git a/crates/tokf-server/src/routes/filters/regenerate_tests.rs b/crates/tokf-server/src/routes/filters/regenerate_tests.rs index de5da728..26907d37 100644 --- a/crates/tokf-server/src/routes/filters/regenerate_tests.rs +++ b/crates/tokf-server/src/routes/filters/regenerate_tests.rs @@ -1,23 +1,21 @@ use std::sync::Arc; -use axum::{ - body::Body, - http::{Request, StatusCode}, -}; +use axum::http::StatusCode; use http_body_util::BodyExt; use sqlx::PgPool; -use tower::ServiceExt; use crate::routes::test_helpers::insert_service_token; use crate::storage::StorageClient as _; use crate::storage::mock::InMemoryStorageClient; use super::super::test_helpers::{ - insert_test_user, make_state_with_storage, publish_filter_helper, + insert_test_user, make_state_with_storage, post_json, publish_filter_helper, }; const VALID_FILTER_TOML: &[u8] = b"command = \"my-tool\"\n"; +const URI: &str = "/api/filters/regenerate-examples"; + fn make_state_with_mem_storage( pool: PgPool, storage: Arc, @@ -30,17 +28,7 @@ async fn post_regenerate( token: &str, body: &serde_json::Value, ) -> axum::response::Response { - app.oneshot( - Request::builder() - .method("POST") - .uri("/api/filters/regenerate-examples") - .header("authorization", format!("Bearer {token}")) - .header("content-type", "application/json") - .body(Body::from(serde_json::to_vec(body).unwrap())) - .unwrap(), - ) - .await - .unwrap() + post_json(app, token, URI, body).await } // ── Tests ──────────────────────────────────────────────────────────────────── diff --git a/crates/tokf-server/src/routes/filters/test_helpers.rs b/crates/tokf-server/src/routes/filters/test_helpers.rs index 09efd16c..eb031b9c 100644 --- a/crates/tokf-server/src/routes/filters/test_helpers.rs +++ b/crates/tokf-server/src/routes/filters/test_helpers.rs @@ -148,6 +148,35 @@ pub async fn get_request(app: axum::Router, token: &str, uri: &str) -> axum::res .unwrap() } +/// Compute the canonical-v1 hash of TOML for assertion in tests. Accepts +/// `&[u8]` or `&str`. Panics if the bytes aren't UTF-8 or +/// `canonical_v1::hash` errors — both impossible for the small inline +/// fixtures used here. +pub fn expected_v1(toml: impl AsRef<[u8]>) -> String { + tokf_common::canonical_v1::hash(std::str::from_utf8(toml.as_ref()).unwrap()).unwrap() +} + +/// POST a JSON body to a URI with a bearer token. Used by every endpoint +/// test whose handler accepts `Json<...>` over `application/json`. +pub async fn post_json( + app: axum::Router, + token: &str, + uri: &str, + body: &serde_json::Value, +) -> axum::response::Response { + app.oneshot( + Request::builder() + .method("POST") + .uri(uri) + .header("authorization", format!("Bearer {token}")) + .header("content-type", "application/json") + .body(Body::from(serde_json::to_vec(body).unwrap())) + .unwrap(), + ) + .await + .unwrap() +} + /// POST `/api/filters` helper that returns the full response (for publish-specific tests). pub async fn post_filter( app: axum::Router, diff --git a/crates/tokf-server/src/routes/mod.rs b/crates/tokf-server/src/routes/mod.rs index 002635df..8c862c15 100644 --- a/crates/tokf-server/src/routes/mod.rs +++ b/crates/tokf-server/src/routes/mod.rs @@ -55,6 +55,10 @@ pub fn create_router(state: AppState) -> Router { "/api/filters/backfill-versions", post(filters::backfill_versions), ) + .route( + "/api/filters/backfill-v1-hashes", + post(filters::backfill_v1_hashes), + ) .route("/api/sync", post(sync::sync_usage)) .route("/api/catalog/refresh", post(catalog::refresh_catalog)) .route("/api/catalog/grouped", get(catalog::get_grouped_catalog)) diff --git a/crates/tokf-server/src/storage/mod.rs b/crates/tokf-server/src/storage/mod.rs index 0ca4f742..690d1084 100644 --- a/crates/tokf-server/src/storage/mod.rs +++ b/crates/tokf-server/src/storage/mod.rs @@ -18,6 +18,21 @@ pub trait StorageClient: Send + Sync { async fn delete(&self, key: &str) -> anyhow::Result<()>; } +/// Fetch an object as a UTF-8 string. Errors when the key is missing or +/// when the bytes are not valid UTF-8. +/// +/// # Errors +/// +/// Returns an error if the storage call fails, the key does not exist, or +/// the content is not valid UTF-8. +pub async fn get_utf8(storage: &dyn StorageClient, key: &str) -> anyhow::Result { + let bytes = storage + .get(key) + .await? + .ok_or_else(|| anyhow::anyhow!("storage object missing: {key}"))?; + String::from_utf8(bytes).map_err(|e| anyhow::anyhow!("storage object not UTF-8: {e}")) +} + /// Upload a filter TOML if not already stored. Returns the R2 key. /// /// Key format: `filters/{content_hash}/filter.toml`