Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions crates/tokf-server/migrations/20260428000000_add_v1_hash.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- Schema-independent canonical TOML hash (ADR-0002).
--
-- Nullable on purpose: existing rows are NULL until the backfill endpoint
-- (`POST /api/filters/backfill-v1-hashes`) has populated them by re-reading
-- each row's TOML from R2. NOT NULL or UNIQUE here would either fail the
-- migration or break the backfill mid-run on the first historical duplicate;
-- both belong to the dedup migration (separate PR) which also collapses
-- duplicate v1 rows.
--
-- Non-partial index because CockroachDB rejects a partial index on a column
-- added in the same migration ("column ... is not public", error 0A000).
-- B-tree NULL handling makes the space cost negligible.
ALTER TABLE filters ADD COLUMN v1_hash TEXT;
CREATE INDEX filters_v1_hash_idx ON filters(v1_hash);
120 changes: 116 additions & 4 deletions crates/tokf-server/src/routes/filters/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ pub struct BackfillVersionsResponse {
pub skipped: usize,
}

/// Maximum number of entries in a single backfill request.
const MAX_BACKFILL_ENTRIES: usize = 500;
/// Maximum batch size for either backfill endpoint. Bounds operator load
/// against the DB / R2.
const MAX_BACKFILL_BATCH: usize = 500;

/// `POST /api/filters/backfill-versions` — Backfill version data for filters.
///
Expand All @@ -39,9 +40,9 @@ pub async fn backfill_versions(
State(state): State<AppState>,
Json(req): Json<BackfillVersionsRequest>,
) -> Result<(StatusCode, Json<BackfillVersionsResponse>), AppError> {
if req.entries.len() > MAX_BACKFILL_ENTRIES {
if req.entries.len() > MAX_BACKFILL_BATCH {
return Err(AppError::BadRequest(format!(
"batch size {} exceeds maximum of {MAX_BACKFILL_ENTRIES}",
"batch size {} exceeds maximum of {MAX_BACKFILL_BATCH}",
req.entries.len()
)));
}
Expand Down Expand Up @@ -99,3 +100,114 @@ pub async fn backfill_versions(
Json(BackfillVersionsResponse { updated, skipped }),
))
}

// ── v1_hash backfill ──────────────────────────────────────────────────────

#[derive(Debug, Deserialize)]
pub struct BackfillV1Request {
/// Maximum rows to process in this call. Operator iterates until
/// `processed == 0`. Defaults to 100; capped at `MAX_BACKFILL_BATCH`.
#[serde(default = "default_v1_batch_size")]
pub limit: usize,
}

const fn default_v1_batch_size() -> usize {
100
}

#[derive(Debug, Serialize)]
pub struct BackfillV1Response {
pub processed: usize,
pub updated: usize,
pub failed: Vec<BackfillV1Failure>,
}

#[derive(Debug, Serialize)]
pub struct BackfillV1Failure {
pub content_hash: String,
pub error: String,
}

/// `POST /api/filters/backfill-v1-hashes` — populate `v1_hash` for legacy rows.
///
/// Service-token auth. Idempotent. Picks up to `limit` rows where
/// `v1_hash IS NULL`, fetches the TOML from R2, computes the v1 hash
/// (ADR-0002), writes it back. Rows whose R2 object is missing or whose
/// TOML fails to canonicalise are reported in `failed` and skipped — the
/// operator inspects the response and triages those manually.
///
/// Operator runbook: invoke repeatedly until `processed == 0`. Failures
/// don't stop the batch.
pub async fn backfill_v1_hashes(
_auth: ServiceAuth,
State(state): State<AppState>,
Json(req): Json<BackfillV1Request>,
) -> Result<(StatusCode, Json<BackfillV1Response>), AppError> {
let limit = req.limit.clamp(1, MAX_BACKFILL_BATCH);
// Safe: `limit` is clamped to [1, MAX_BACKFILL_BATCH] (= 500) which fits in i64.
#[allow(clippy::cast_possible_wrap)]
let limit_i64 = limit as i64;

let rows: Vec<(String, String)> = sqlx::query_as(
"SELECT content_hash, r2_key FROM filters
WHERE v1_hash IS NULL
ORDER BY created_at ASC
LIMIT $1",
)
.bind(limit_i64)
.fetch_all(&state.db)
.await?;

tracing::info!(
candidates = rows.len(),
limit,
"backfill-v1-hashes request received"
);

let mut updated = 0usize;
let mut failed = Vec::new();

for (content_hash, r2_key) in &rows {
match compute_and_store_v1(&state, content_hash, r2_key).await {
Ok(()) => updated += 1,
Err(e) => {
tracing::warn!(hash = %content_hash, "backfill-v1 failed: {e}");
failed.push(BackfillV1Failure {
content_hash: content_hash.clone(),
error: e.to_string(),
});
}
}
}

tracing::info!(
processed = rows.len(),
updated,
failed = failed.len(),
"backfill-v1-hashes complete"
);

Ok((
StatusCode::OK,
Json(BackfillV1Response {
processed: rows.len(),
updated,
failed,
}),
))
}

async fn compute_and_store_v1(
state: &AppState,
content_hash: &str,
r2_key: &str,
) -> anyhow::Result<()> {
let toml_str = crate::storage::get_utf8(&*state.storage, r2_key).await?;
let v1 = tokf_common::canonical_v1::hash(&toml_str)?;
sqlx::query("UPDATE filters SET v1_hash = $1 WHERE content_hash = $2")
.bind(&v1)
.bind(content_hash)
.execute(&state.db)
.await?;
Ok(())
}
Loading