Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions migrations/20260623000001_ingestion_retry_budget.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- Retry budget for ingestion: bound retries, back them off, and quarantine
-- files that keep failing so the poll loop stops re-processing them forever.
ALTER TABLE ingestion_files ADD COLUMN attempts INTEGER NOT NULL DEFAULT 0;
ALTER TABLE ingestion_files ADD COLUMN next_attempt_at TIMESTAMP;
-- status now also takes the terminal value 'quarantined' (no CHECK constraint
-- exists on this column, so no schema change is needed beyond documenting it).
186 changes: 163 additions & 23 deletions src/agent/ingestion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,39 @@ use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;

/// Max ingestion attempts before a file is quarantined.
const MAX_INGEST_ATTEMPTS: i64 = 3;
/// Base backoff between failed ingestion attempts (seconds).
const INGEST_BACKOFF_BASE_SECS: i64 = 60;
/// Cap on the exponential backoff (seconds) — 1 hour.
const MAX_INGEST_BACKOFF_SECS: i64 = 3600;

/// Exponential backoff for the Nth failed attempt (1-based), capped.
fn backoff_secs(attempts: i64) -> i64 {
let exp = attempts.saturating_sub(1).min(20) as u32;
INGEST_BACKOFF_BASE_SECS
.saturating_mul(2_i64.saturating_pow(exp))
.min(MAX_INGEST_BACKOFF_SECS)
}

#[derive(Debug, PartialEq, Eq)]
enum RetryDecision {
Retry { backoff_secs: i64 },
Quarantine,
}

/// Decide what to do after a failed attempt. `attempts` is the count AFTER
/// incrementing for this failure.
fn retry_decision(attempts: i64) -> RetryDecision {
if attempts >= MAX_INGEST_ATTEMPTS {
RetryDecision::Quarantine
} else {
RetryDecision::Retry {
backoff_secs: backoff_secs(attempts),
}
}
}

/// Spawn the ingestion polling loop for an agent.
///
/// Runs until the returned JoinHandle is dropped or aborted. Scans the ingest
Expand Down Expand Up @@ -185,6 +218,18 @@ async fn process_file(
}

let hash = content_hash(&content);

if let Some(state) = load_retry_state(&deps.sqlite_pool, &hash).await? {
if state.status == "quarantined" {
tracing::debug!(file = %filename, "skipping quarantined ingest file");
return Ok(());
}
if state.not_due {
tracing::debug!(file = %filename, "ingest file still backing off, skipping this cycle");
return Ok(());
}
}

let file_size = content.len() as i64;
let chunks = chunk_text(&content, config.chunk_size);
let total_chunks = chunks.len();
Expand Down Expand Up @@ -263,10 +308,6 @@ async fn process_file(
}
}

// Mark file as completed (or failed if any chunk errored)
let final_status = if had_failure { "failed" } else { "completed" };
complete_ingestion_file(&deps.sqlite_pool, &hash, final_status).await?;

#[cfg(feature = "metrics")]
{
let result = if had_failure { "failure" } else { "success" };
Expand All @@ -277,26 +318,52 @@ async fn process_file(
}

if had_failure {
// Keep the source file and progress rows so the next poll cycle can
// resume from where it left off. Deleting on failure would cause data
// loss when a provider error interrupts mid-ingestion (fixes #48).
tracing::warn!(
file = %filename,
chunks = total_chunks,
"file ingestion had failures — keeping file and progress for retry"
);
let attempts: i64 = sqlx::query_scalar(
"UPDATE ingestion_files SET attempts = attempts + 1 WHERE content_hash = ? RETURNING attempts",
)
.bind(&hash)
.fetch_one(&deps.sqlite_pool)
.await
.context("failed to bump ingestion attempts")?;

match retry_decision(attempts) {
RetryDecision::Quarantine => {
complete_ingestion_file(&deps.sqlite_pool, &hash, "quarantined").await?;
tracing::error!(
file = %filename, attempts,
"ingest file quarantined after exhausting retry budget — use the UI retry action to re-arm"
);
}
RetryDecision::Retry { backoff_secs } => {
sqlx::query(
"UPDATE ingestion_files SET status = 'failed', \
next_attempt_at = datetime('now', '+' || ? || ' seconds'), \
completed_at = CURRENT_TIMESTAMP WHERE content_hash = ?",
)
.bind(backoff_secs)
.bind(&hash)
.execute(&deps.sqlite_pool)
.await
.context("failed to schedule ingestion retry")?;
tracing::warn!(
file = %filename, attempts, backoff_secs,
"ingest file had failures — scheduled for retry with backoff"
);
}
}
return Ok(());
}

// Full success: clean up progress rows and remove the source file.
delete_progress(&deps.sqlite_pool, &hash).await?;

// Full success: mark completed, remove the source file, then clear progress.
// Order matters: remove the file BEFORE clearing chunk progress. If remove_file
// fails, the progress rows must survive so the next poll skips already-ingested
// chunks instead of re-processing them (which would re-create their memories).
complete_ingestion_file(&deps.sqlite_pool, &hash, "completed").await?;
tokio::fs::remove_file(path)
.await
.with_context(|| format!("failed to delete ingested file: {}", path.display()))?;

tracing::info!(file = %filename, chunks = total_chunks, status = final_status, "file ingestion complete, file deleted");

delete_progress(&deps.sqlite_pool, &hash).await?;
tracing::info!(file = %filename, chunks = total_chunks, status = "completed", "file ingestion complete, file deleted");
Ok(())
}

Expand Down Expand Up @@ -379,6 +446,32 @@ async fn delete_progress(pool: &SqlitePool, hash: &str) -> anyhow::Result<()> {

// -- File-level tracking queries ------------------------------------------------

struct IngestRetryState {
status: String,
not_due: bool,
}

/// Load the retry gate for a file: its status and whether it is still backing
/// off (`next_attempt_at` in the future). Returns None if no row exists yet.
async fn load_retry_state(
pool: &SqlitePool,
hash: &str,
) -> anyhow::Result<Option<IngestRetryState>> {
let row = sqlx::query_as::<_, (String, Option<i64>)>(
"SELECT status, CAST((julianday(next_attempt_at) - julianday('now')) * 86400 AS INTEGER) \
FROM ingestion_files WHERE content_hash = ?",
)
.bind(hash)
.fetch_optional(pool)
.await
.context("failed to load ingestion retry state")?;

Ok(row.map(|(status, secs_until_due)| IngestRetryState {
status,
not_due: secs_until_due.map(|s| s > 0).unwrap_or(false),
}))
}

/// Record that a file is now being processed. If a `queued` record already
/// exists (from the upload handler), update it with chunk info and flip to
/// `processing`. Otherwise insert a fresh `processing` record.
Expand Down Expand Up @@ -532,11 +625,20 @@ async fn process_chunk(
let result = hook.prompt_once(&agent, &mut history, &user_prompt).await;
classify_chunk_prompt_result(result, filename, chunk_number, total_chunks)?;

if !contract_state.has_terminal_outcome() {
return Err(anyhow::anyhow!(
"ingestion chunk {chunk_number}/{total_chunks} for {filename} completed without memory_persistence_complete signal"
));
}
// A chunk is complete when the LLM run returns Ok — that is a deterministic
// fact the harness owns. We do NOT gate completion on the LLM calling
// memory_persistence_complete: a weak model may not emit that signal, yet
// any memory_save calls it made are already committed. Gating on the signal
// turned "model forgot to call the tool" into a permanent chunk failure,
// which the poll loop then retried forever (re-saving duplicates). The
// contract state is kept only for observability.
let saved = contract_state.saved_memory_ids().len();
tracing::info!(
file = %filename,
chunk = %format!("{chunk_number}/{total_chunks}"),
saved_memories = saved,
"chunk processed"
);

Ok(())
}
Expand Down Expand Up @@ -692,4 +794,42 @@ mod tests {
"max turns must be treated as chunk failure for retry"
);
}

#[test]
fn test_chunk_success_does_not_require_terminal_contract_outcome() {
// After the fix, chunk success is decided by classify_chunk_prompt_result
// alone (the LLM run returning Ok), NOT by the memory_persistence contract.
// A run that returns Ok with no memory_persistence_complete signal is a
// legitimate success (the chunk may have had nothing worth saving, or the
// small model simply didn't emit the signal — memories it DID save are
// already committed).
let result =
classify_chunk_prompt_result(Ok("processed chunk".to_string()), "notes.txt", 1, 3);
assert!(
result.is_ok(),
"Ok prompt result must classify as chunk success"
);
}

#[test]
fn test_backoff_secs_is_exponential_and_capped() {
assert_eq!(backoff_secs(1), 60); // base 60s
assert_eq!(backoff_secs(2), 120);
assert_eq!(backoff_secs(3), 240);
assert_eq!(backoff_secs(20), MAX_INGEST_BACKOFF_SECS); // capped
}

#[test]
fn test_retry_decision_quarantines_at_cap() {
// attempts is the count AFTER this failure.
assert!(matches!(retry_decision(1), RetryDecision::Retry { .. }));
assert!(matches!(
retry_decision(MAX_INGEST_ATTEMPTS - 1),
RetryDecision::Retry { .. }
));
assert!(matches!(
retry_decision(MAX_INGEST_ATTEMPTS),
RetryDecision::Quarantine
));
}
}
102 changes: 98 additions & 4 deletions src/api/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,52 @@ pub(super) async fn upload_ingest_file(
Ok(Json(IngestUploadResponse { uploaded }))
}

/// Remove an ingest file from the source of truth (disk) and purge its tracking
/// rows. The disk file is the loop's input; deleting only the DB row lets the
/// next poll cycle re-discover the file and re-create the row ("reappears").
pub(super) async fn purge_ingest_file(
pool: &sqlx::SqlitePool,
ingest_dir: &Path,
content_hash: &str,
) -> anyhow::Result<()> {
// Look up the on-disk filename for this hash, then remove the file.
if let Some(filename) = sqlx::query_scalar::<_, String>(
"SELECT filename FROM ingestion_files WHERE content_hash = ?",
)
.bind(content_hash)
.fetch_optional(pool)
.await?
{
// Defense-in-depth: the filename comes from the DB and is sanitized on
// write, but this is a destructive delete — reject any non-normal path
// component (`..`, absolute paths) before joining so we cannot escape
// ingest_dir.
let filename_path = Path::new(&filename);
if filename_path
.components()
.any(|component| !matches!(component, std::path::Component::Normal(_)))
{
anyhow::bail!("refusing to delete ingest file with unsafe path: {filename}");
}
let path = ingest_dir.join(filename_path);
match tokio::fs::remove_file(&path).await {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this deletes a disk path derived from ingestion_files.filename, consider rejecting any non-normal path components (e.g. .. / absolute paths) before join as defense-in-depth.

Suggested change
match tokio::fs::remove_file(&path).await {
let filename_path = Path::new(&filename);
if filename_path
.components()
.any(|c| !matches!(c, std::path::Component::Normal(_)))
{
anyhow::bail!("invalid ingest filename: {filename}");
}
let path = ingest_dir.join(filename_path);

Ok(()) => {}
Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
Err(error) => return Err(error.into()),
}
}

sqlx::query("DELETE FROM ingestion_progress WHERE content_hash = ?")
.bind(content_hash)
.execute(pool)
.await?;
sqlx::query("DELETE FROM ingestion_files WHERE content_hash = ?")
.bind(content_hash)
.execute(pool)
.await?;
Ok(())
}

/// Delete a completed ingestion file record from history.
#[utoipa::path(
delete,
Expand All @@ -242,15 +288,63 @@ pub(super) async fn delete_ingest_file(
) -> Result<Json<IngestDeleteResponse>, StatusCode> {
let pools = state.agent_pools.load();
let pool = pools.get(&query.agent_id).ok_or(StatusCode::NOT_FOUND)?;
let workspaces = state.agent_workspaces.load();
let workspace = workspaces
.get(&query.agent_id)
.ok_or(StatusCode::NOT_FOUND)?;
let ingest_dir = workspace.join("ingest");

sqlx::query("DELETE FROM ingestion_files WHERE content_hash = ?")
.bind(&query.content_hash)
.execute(pool)
purge_ingest_file(pool, &ingest_dir, &query.content_hash)
.await
.map_err(|error| {
tracing::warn!(%error, "failed to delete ingest file record");
tracing::warn!(%error, "failed to purge ingest file");
StatusCode::INTERNAL_SERVER_ERROR
})?;

Ok(Json(IngestDeleteResponse { success: true }))
}

#[cfg(test)]
mod tests {
use super::*;
use sqlx::sqlite::SqlitePoolOptions;

#[tokio::test]
async fn test_purge_removes_disk_file_and_rows() {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect("sqlite::memory:")
.await
.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();

let dir = tempfile::tempdir().unwrap();
let ingest_dir = dir.path().to_path_buf();
let file = ingest_dir.join("notes.txt");
tokio::fs::write(&file, b"hello").await.unwrap();
let hash = crate::agent::ingestion::content_hash("hello");

sqlx::query("INSERT INTO ingestion_files (content_hash, filename, file_size, total_chunks, status) VALUES (?, 'notes.txt', 5, 1, 'failed')")
.bind(&hash).execute(&pool).await.unwrap();
sqlx::query("INSERT INTO ingestion_progress (content_hash, chunk_index, total_chunks, filename) VALUES (?, 0, 1, 'notes.txt')")
.bind(&hash).execute(&pool).await.unwrap();

purge_ingest_file(&pool, &ingest_dir, &hash).await.unwrap();

assert!(!file.exists(), "disk file must be removed");
let files: i64 =
sqlx::query_scalar("SELECT COUNT(*) FROM ingestion_files WHERE content_hash = ?")
.bind(&hash)
.fetch_one(&pool)
.await
.unwrap();
let prog: i64 =
sqlx::query_scalar("SELECT COUNT(*) FROM ingestion_progress WHERE content_hash = ?")
.bind(&hash)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(files, 0, "ingestion_files row must be deleted");
assert_eq!(prog, 0, "ingestion_progress rows must be deleted");
}
}
Loading