diff --git a/migrations/20260623000001_ingestion_retry_budget.sql b/migrations/20260623000001_ingestion_retry_budget.sql new file mode 100644 index 000000000..d86b807a6 --- /dev/null +++ b/migrations/20260623000001_ingestion_retry_budget.sql @@ -0,0 +1,6 @@ +-- Retry budget for ingestion: bound retries, back them off, and quarantine +-- files that keep failing so the poll loop stops re-processing them forever. +ALTER TABLE ingestion_files ADD COLUMN attempts INTEGER NOT NULL DEFAULT 0; +ALTER TABLE ingestion_files ADD COLUMN next_attempt_at TIMESTAMP; +-- status now also takes the terminal value 'quarantined' (no CHECK constraint +-- exists on this column, so no schema change is needed beyond documenting it). diff --git a/src/agent/ingestion.rs b/src/agent/ingestion.rs index 0642b630d..b4a39cfbf 100644 --- a/src/agent/ingestion.rs +++ b/src/agent/ingestion.rs @@ -29,6 +29,39 @@ use std::sync::Arc; use std::time::Duration; use uuid::Uuid; +/// Max ingestion attempts before a file is quarantined. +const MAX_INGEST_ATTEMPTS: i64 = 3; +/// Base backoff between failed ingestion attempts (seconds). +const INGEST_BACKOFF_BASE_SECS: i64 = 60; +/// Cap on the exponential backoff (seconds) — 1 hour. +const MAX_INGEST_BACKOFF_SECS: i64 = 3600; + +/// Exponential backoff for the Nth failed attempt (1-based), capped. +fn backoff_secs(attempts: i64) -> i64 { + let exp = attempts.saturating_sub(1).min(20) as u32; + INGEST_BACKOFF_BASE_SECS + .saturating_mul(2_i64.saturating_pow(exp)) + .min(MAX_INGEST_BACKOFF_SECS) +} + +#[derive(Debug, PartialEq, Eq)] +enum RetryDecision { + Retry { backoff_secs: i64 }, + Quarantine, +} + +/// Decide what to do after a failed attempt. `attempts` is the count AFTER +/// incrementing for this failure. +fn retry_decision(attempts: i64) -> RetryDecision { + if attempts >= MAX_INGEST_ATTEMPTS { + RetryDecision::Quarantine + } else { + RetryDecision::Retry { + backoff_secs: backoff_secs(attempts), + } + } +} + /// Spawn the ingestion polling loop for an agent. /// /// Runs until the returned JoinHandle is dropped or aborted. Scans the ingest @@ -185,6 +218,18 @@ async fn process_file( } let hash = content_hash(&content); + + if let Some(state) = load_retry_state(&deps.sqlite_pool, &hash).await? { + if state.status == "quarantined" { + tracing::debug!(file = %filename, "skipping quarantined ingest file"); + return Ok(()); + } + if state.not_due { + tracing::debug!(file = %filename, "ingest file still backing off, skipping this cycle"); + return Ok(()); + } + } + let file_size = content.len() as i64; let chunks = chunk_text(&content, config.chunk_size); let total_chunks = chunks.len(); @@ -263,10 +308,6 @@ async fn process_file( } } - // Mark file as completed (or failed if any chunk errored) - let final_status = if had_failure { "failed" } else { "completed" }; - complete_ingestion_file(&deps.sqlite_pool, &hash, final_status).await?; - #[cfg(feature = "metrics")] { let result = if had_failure { "failure" } else { "success" }; @@ -277,26 +318,52 @@ async fn process_file( } if had_failure { - // Keep the source file and progress rows so the next poll cycle can - // resume from where it left off. Deleting on failure would cause data - // loss when a provider error interrupts mid-ingestion (fixes #48). - tracing::warn!( - file = %filename, - chunks = total_chunks, - "file ingestion had failures — keeping file and progress for retry" - ); + let attempts: i64 = sqlx::query_scalar( + "UPDATE ingestion_files SET attempts = attempts + 1 WHERE content_hash = ? RETURNING attempts", + ) + .bind(&hash) + .fetch_one(&deps.sqlite_pool) + .await + .context("failed to bump ingestion attempts")?; + + match retry_decision(attempts) { + RetryDecision::Quarantine => { + complete_ingestion_file(&deps.sqlite_pool, &hash, "quarantined").await?; + tracing::error!( + file = %filename, attempts, + "ingest file quarantined after exhausting retry budget — use the UI retry action to re-arm" + ); + } + RetryDecision::Retry { backoff_secs } => { + sqlx::query( + "UPDATE ingestion_files SET status = 'failed', \ + next_attempt_at = datetime('now', '+' || ? || ' seconds'), \ + completed_at = CURRENT_TIMESTAMP WHERE content_hash = ?", + ) + .bind(backoff_secs) + .bind(&hash) + .execute(&deps.sqlite_pool) + .await + .context("failed to schedule ingestion retry")?; + tracing::warn!( + file = %filename, attempts, backoff_secs, + "ingest file had failures — scheduled for retry with backoff" + ); + } + } return Ok(()); } - // Full success: clean up progress rows and remove the source file. - delete_progress(&deps.sqlite_pool, &hash).await?; - + // Full success: mark completed, remove the source file, then clear progress. + // Order matters: remove the file BEFORE clearing chunk progress. If remove_file + // fails, the progress rows must survive so the next poll skips already-ingested + // chunks instead of re-processing them (which would re-create their memories). + complete_ingestion_file(&deps.sqlite_pool, &hash, "completed").await?; tokio::fs::remove_file(path) .await .with_context(|| format!("failed to delete ingested file: {}", path.display()))?; - - tracing::info!(file = %filename, chunks = total_chunks, status = final_status, "file ingestion complete, file deleted"); - + delete_progress(&deps.sqlite_pool, &hash).await?; + tracing::info!(file = %filename, chunks = total_chunks, status = "completed", "file ingestion complete, file deleted"); Ok(()) } @@ -379,6 +446,32 @@ async fn delete_progress(pool: &SqlitePool, hash: &str) -> anyhow::Result<()> { // -- File-level tracking queries ------------------------------------------------ +struct IngestRetryState { + status: String, + not_due: bool, +} + +/// Load the retry gate for a file: its status and whether it is still backing +/// off (`next_attempt_at` in the future). Returns None if no row exists yet. +async fn load_retry_state( + pool: &SqlitePool, + hash: &str, +) -> anyhow::Result> { + let row = sqlx::query_as::<_, (String, Option)>( + "SELECT status, CAST((julianday(next_attempt_at) - julianday('now')) * 86400 AS INTEGER) \ + FROM ingestion_files WHERE content_hash = ?", + ) + .bind(hash) + .fetch_optional(pool) + .await + .context("failed to load ingestion retry state")?; + + Ok(row.map(|(status, secs_until_due)| IngestRetryState { + status, + not_due: secs_until_due.map(|s| s > 0).unwrap_or(false), + })) +} + /// Record that a file is now being processed. If a `queued` record already /// exists (from the upload handler), update it with chunk info and flip to /// `processing`. Otherwise insert a fresh `processing` record. @@ -532,11 +625,20 @@ async fn process_chunk( let result = hook.prompt_once(&agent, &mut history, &user_prompt).await; classify_chunk_prompt_result(result, filename, chunk_number, total_chunks)?; - if !contract_state.has_terminal_outcome() { - return Err(anyhow::anyhow!( - "ingestion chunk {chunk_number}/{total_chunks} for {filename} completed without memory_persistence_complete signal" - )); - } + // A chunk is complete when the LLM run returns Ok — that is a deterministic + // fact the harness owns. We do NOT gate completion on the LLM calling + // memory_persistence_complete: a weak model may not emit that signal, yet + // any memory_save calls it made are already committed. Gating on the signal + // turned "model forgot to call the tool" into a permanent chunk failure, + // which the poll loop then retried forever (re-saving duplicates). The + // contract state is kept only for observability. + let saved = contract_state.saved_memory_ids().len(); + tracing::info!( + file = %filename, + chunk = %format!("{chunk_number}/{total_chunks}"), + saved_memories = saved, + "chunk processed" + ); Ok(()) } @@ -692,4 +794,42 @@ mod tests { "max turns must be treated as chunk failure for retry" ); } + + #[test] + fn test_chunk_success_does_not_require_terminal_contract_outcome() { + // After the fix, chunk success is decided by classify_chunk_prompt_result + // alone (the LLM run returning Ok), NOT by the memory_persistence contract. + // A run that returns Ok with no memory_persistence_complete signal is a + // legitimate success (the chunk may have had nothing worth saving, or the + // small model simply didn't emit the signal — memories it DID save are + // already committed). + let result = + classify_chunk_prompt_result(Ok("processed chunk".to_string()), "notes.txt", 1, 3); + assert!( + result.is_ok(), + "Ok prompt result must classify as chunk success" + ); + } + + #[test] + fn test_backoff_secs_is_exponential_and_capped() { + assert_eq!(backoff_secs(1), 60); // base 60s + assert_eq!(backoff_secs(2), 120); + assert_eq!(backoff_secs(3), 240); + assert_eq!(backoff_secs(20), MAX_INGEST_BACKOFF_SECS); // capped + } + + #[test] + fn test_retry_decision_quarantines_at_cap() { + // attempts is the count AFTER this failure. + assert!(matches!(retry_decision(1), RetryDecision::Retry { .. })); + assert!(matches!( + retry_decision(MAX_INGEST_ATTEMPTS - 1), + RetryDecision::Retry { .. } + )); + assert!(matches!( + retry_decision(MAX_INGEST_ATTEMPTS), + RetryDecision::Quarantine + )); + } } diff --git a/src/api/ingest.rs b/src/api/ingest.rs index da5ef1a17..65b594fb5 100644 --- a/src/api/ingest.rs +++ b/src/api/ingest.rs @@ -221,6 +221,52 @@ pub(super) async fn upload_ingest_file( Ok(Json(IngestUploadResponse { uploaded })) } +/// Remove an ingest file from the source of truth (disk) and purge its tracking +/// rows. The disk file is the loop's input; deleting only the DB row lets the +/// next poll cycle re-discover the file and re-create the row ("reappears"). +pub(super) async fn purge_ingest_file( + pool: &sqlx::SqlitePool, + ingest_dir: &Path, + content_hash: &str, +) -> anyhow::Result<()> { + // Look up the on-disk filename for this hash, then remove the file. + if let Some(filename) = sqlx::query_scalar::<_, String>( + "SELECT filename FROM ingestion_files WHERE content_hash = ?", + ) + .bind(content_hash) + .fetch_optional(pool) + .await? + { + // Defense-in-depth: the filename comes from the DB and is sanitized on + // write, but this is a destructive delete — reject any non-normal path + // component (`..`, absolute paths) before joining so we cannot escape + // ingest_dir. + let filename_path = Path::new(&filename); + if filename_path + .components() + .any(|component| !matches!(component, std::path::Component::Normal(_))) + { + anyhow::bail!("refusing to delete ingest file with unsafe path: {filename}"); + } + let path = ingest_dir.join(filename_path); + match tokio::fs::remove_file(&path).await { + Ok(()) => {} + Err(error) if error.kind() == std::io::ErrorKind::NotFound => {} + Err(error) => return Err(error.into()), + } + } + + sqlx::query("DELETE FROM ingestion_progress WHERE content_hash = ?") + .bind(content_hash) + .execute(pool) + .await?; + sqlx::query("DELETE FROM ingestion_files WHERE content_hash = ?") + .bind(content_hash) + .execute(pool) + .await?; + Ok(()) +} + /// Delete a completed ingestion file record from history. #[utoipa::path( delete, @@ -242,15 +288,63 @@ pub(super) async fn delete_ingest_file( ) -> Result, StatusCode> { let pools = state.agent_pools.load(); let pool = pools.get(&query.agent_id).ok_or(StatusCode::NOT_FOUND)?; + let workspaces = state.agent_workspaces.load(); + let workspace = workspaces + .get(&query.agent_id) + .ok_or(StatusCode::NOT_FOUND)?; + let ingest_dir = workspace.join("ingest"); - sqlx::query("DELETE FROM ingestion_files WHERE content_hash = ?") - .bind(&query.content_hash) - .execute(pool) + purge_ingest_file(pool, &ingest_dir, &query.content_hash) .await .map_err(|error| { - tracing::warn!(%error, "failed to delete ingest file record"); + tracing::warn!(%error, "failed to purge ingest file"); StatusCode::INTERNAL_SERVER_ERROR })?; Ok(Json(IngestDeleteResponse { success: true })) } + +#[cfg(test)] +mod tests { + use super::*; + use sqlx::sqlite::SqlitePoolOptions; + + #[tokio::test] + async fn test_purge_removes_disk_file_and_rows() { + let pool = SqlitePoolOptions::new() + .max_connections(1) + .connect("sqlite::memory:") + .await + .unwrap(); + sqlx::migrate!("./migrations").run(&pool).await.unwrap(); + + let dir = tempfile::tempdir().unwrap(); + let ingest_dir = dir.path().to_path_buf(); + let file = ingest_dir.join("notes.txt"); + tokio::fs::write(&file, b"hello").await.unwrap(); + let hash = crate::agent::ingestion::content_hash("hello"); + + sqlx::query("INSERT INTO ingestion_files (content_hash, filename, file_size, total_chunks, status) VALUES (?, 'notes.txt', 5, 1, 'failed')") + .bind(&hash).execute(&pool).await.unwrap(); + sqlx::query("INSERT INTO ingestion_progress (content_hash, chunk_index, total_chunks, filename) VALUES (?, 0, 1, 'notes.txt')") + .bind(&hash).execute(&pool).await.unwrap(); + + purge_ingest_file(&pool, &ingest_dir, &hash).await.unwrap(); + + assert!(!file.exists(), "disk file must be removed"); + let files: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM ingestion_files WHERE content_hash = ?") + .bind(&hash) + .fetch_one(&pool) + .await + .unwrap(); + let prog: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM ingestion_progress WHERE content_hash = ?") + .bind(&hash) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(files, 0, "ingestion_files row must be deleted"); + assert_eq!(prog, 0, "ingestion_progress rows must be deleted"); + } +} diff --git a/src/memory/maintenance.rs b/src/memory/maintenance.rs index 67b935497..ffb0b39bb 100644 --- a/src/memory/maintenance.rs +++ b/src/memory/maintenance.rs @@ -315,26 +315,22 @@ fn merged_memory_content(winner: String, loser: &str) -> String { let winner_trimmed = winner.trim_end(); let loser_trimmed = loser.trim_end(); - if loser_trimmed.is_empty() { - return winner_trimmed.to_string(); - } - - if winner_trimmed.contains(loser_trimmed) { - return winner_trimmed.to_string(); - } - - let merged = if winner_trimmed.is_empty() { - loser_trimmed.to_string() + // Near-duplicates (merge fires at 0.95 cosine similarity) are NOT glued + // together — concatenation produced bloated memories holding many reworded + // copies of the same fact. Keep the importance-winner's content (chosen by + // choose_merge_pair) as the single canonical version, falling back to the + // loser only if the winner is empty. Either way the size cap below applies. + let canonical = if winner_trimmed.is_empty() { + loser_trimmed } else { - format!("{winner_trimmed}\n\n{loser_trimmed}") + winner_trimmed }; - if merged.len() <= MAX_MERGED_MEMORY_CONTENT_BYTES { - return merged; + if canonical.len() <= MAX_MERGED_MEMORY_CONTENT_BYTES { + return canonical.to_string(); } - - let boundary = merged.floor_char_boundary(MAX_MERGED_MEMORY_CONTENT_BYTES); - merged[..boundary].to_string() + let boundary = canonical.floor_char_boundary(MAX_MERGED_MEMORY_CONTENT_BYTES); + canonical[..boundary].to_string() } async fn merge_pair( @@ -604,11 +600,9 @@ mod tests { .expect("failed to load survivor") .expect("survivor should exist"); assert_eq!(updated_survivor.id, survivor.id); - assert!( - updated_survivor - .content - .contains("rust memory maintenance updated") - ); + // Canonical merge keeps the winner's content (importance-winner = survivor at 0.9). + // The loser ("rust memory maintenance updated") is NOT appended. + assert_eq!(updated_survivor.content, "rust memory maintenance"); let forgotten_duplicate = store .load(&duplicate.id) @@ -771,16 +765,9 @@ mod tests { .await .expect("failed to load survivor") .expect("survivor should exist"); - assert!( - refreshed_survivor - .content - .contains("durable rust maintenance note update A") - ); - assert!( - refreshed_survivor - .content - .contains("durable rust maintenance note update B") - ); + // Canonical merge keeps the winner's content (importance-winner = survivor at 0.9). + // The losers' text ("update A", "update B") is NOT appended. + assert_eq!(refreshed_survivor.content, "durable rust maintenance note"); for duplicate_id in [&duplicate_a.id, &duplicate_b.id] { let duplicate = store @@ -815,6 +802,27 @@ mod tests { ); } + #[test] + fn test_merged_content_keeps_winner_not_concatenation() { + let winner = "User prefers concise answers.".to_string(); + let loser = "The user likes short, concise replies."; + let merged = merged_memory_content(winner.clone(), loser); + // Near-duplicates (merge fires at 0.95 similarity) must NOT be glued together, + // and the importance-winner's content is kept as canonical. + assert!( + !merged.contains("\n\n"), + "must not concatenate near-duplicates" + ); + assert_eq!(merged, winner); + } + + #[test] + fn test_merged_content_substring_short_circuit_unchanged() { + let winner = "User prefers concise answers including examples.".to_string(); + let loser = "User prefers concise answers"; + assert_eq!(merged_memory_content(winner.clone(), loser), winner); + } + #[tokio::test] async fn run_maintenance_with_cancel_stops_when_cancel_requested() { let store = MemoryStore::connect_in_memory().await;