diff --git a/docs/job-queue.md b/docs/job-queue.md new file mode 100644 index 000000000..7b46edb41 --- /dev/null +++ b/docs/job-queue.md @@ -0,0 +1,140 @@ +# Background Job Queue + +FinMind uses a resilient background job queue system for asynchronous task execution with automatic retry and monitoring. + +## Architecture + +``` +┌──────────────┐ ┌───────────┐ ┌──────────────┐ +│ API / CLI │────>│ Redis │────>│ APScheduler │ +│ (enqueue) │ │ (queue) │ │ (worker) │ +└──────────────┘ └───────────┘ └──────┬───────┘ + │ + ┌──────────────────────────┤ + │ │ + ┌────▼─────┐ ┌───────▼──────┐ + │ PostgreSQL│ │ Dead-Letter │ + │ (status) │ │ Queue │ + └──────────┘ └──────────────┘ +``` + +## Features + +- **Redis-backed queue** with priority support (lower number = higher priority) +- **Exponential backoff retry** with configurable max retries (default: 5) +- **Dead-letter queue** for permanently failed jobs +- **Job status tracking** in PostgreSQL +- **Distributed locking** to prevent duplicate execution +- **APScheduler integration** for automatic job processing +- **Prometheus metrics** for monitoring + +## API Endpoints + +All endpoints require JWT authentication. + +### List Jobs +``` +GET /api/jobs?status=pending&job_type=send_reminder&limit=50&offset=0 +``` + +### Get Job Status +``` +GET /api/jobs/ +``` + +### Cancel Job +``` +POST /api/jobs//cancel +``` + +### Retry Dead Job +``` +POST /api/jobs//retry +``` + +### Queue Statistics +``` +GET /api/jobs/stats +``` + +### Enqueue Job +``` +POST /api/jobs/enqueue +Content-Type: application/json + +{ + "job_type": "send_reminder", + "payload": {"reminder_id": 123}, + "queue": "default", + "priority": 5, + "max_retries": 5 +} +``` + +## Job Types + +### `send_reminder` +Send a bill reminder notification (email or WhatsApp). + +```json +{"reminder_id": 123} +``` + +### `import_expenses` +Import expenses from a CSV file. + +```json +{"file_path": "/tmp/import.csv", "user_id": 42} +``` + +## Adding Custom Handlers + +Register new job types using the `register_handler` decorator: + +```python +from app.services.job_worker import register_handler + +@register_handler("my_custom_job") +def handle_my_job(payload: dict) -> dict: + # Your logic here + return {"status": "done"} +``` + +## Retry Behavior + +| Attempt | Delay | +|---------|-------| +| 1 | 5s | +| 2 | 10s | +| 3 | 20s | +| 4 | 40s | +| 5 | 80s | +| ... | ... | +| max | 1h | + +After `max_retries` attempts, jobs are moved to the dead-letter queue and can be manually retried via the API. + +## Prometheus Metrics + +- `finmind_jobs_enqueued_total` — Total jobs enqueued +- `finmind_jobs_completed_total` — Total jobs completed +- `finmind_jobs_failed_total` — Total job failures (including retries) +- `finmind_jobs_dead_total` — Total jobs moved to dead-letter queue +- `finmind_job_duration_seconds` — Job execution duration histogram +- `finmind_queue_depth` — Current queue depth + +## Configuration + +Environment variables: + +| Variable | Default | Description | +|----------|---------|-------------| +| `REDIS_URL` | `redis://localhost:6379` | Redis connection URL | +| `DATABASE_URL` | — | PostgreSQL connection URL | + +## Testing + +```bash +cd packages/backend +pytest tests/test_jobs.py -v +``` diff --git a/packages/backend/app/__init__.py b/packages/backend/app/__init__.py index cdf76b45f..b42102e23 100644 --- a/packages/backend/app/__init__.py +++ b/packages/backend/app/__init__.py @@ -52,6 +52,9 @@ def create_app(settings: Settings | None = None) -> Flask: # Blueprint routes register_routes(app) + # Background job worker (APScheduler) + _start_job_worker(app) + # Backward-compatible schema patch for existing databases. with app.app_context(): _ensure_schema_compatibility(app) @@ -96,6 +99,27 @@ def init_db(): return app +def _start_job_worker(app: Flask) -> None: + """Start APScheduler to poll the job queue periodically.""" + try: + from apscheduler.schedulers.background import BackgroundScheduler + from .services.job_worker import process_batch, retry_scheduled_jobs + + scheduler = BackgroundScheduler(daemon=True) + + def _tick(): + """Process jobs in an application context.""" + with app.app_context(): + retry_scheduled_jobs() + process_batch(batch_size=5) + + scheduler.add_job(_tick, "interval", seconds=5, id="job_worker_tick") + scheduler.start() + app.logger.info("Job worker scheduler started (interval=5s)") + except Exception: + app.logger.exception("Failed to start job worker scheduler") + + def _ensure_schema_compatibility(app: Flask) -> None: """Apply minimal compatibility ALTERs for existing deployments.""" if db.engine.dialect.name != "postgresql": diff --git a/packages/backend/app/routes/__init__.py b/packages/backend/app/routes/__init__.py index f13b0f897..7fa1eb4e4 100644 --- a/packages/backend/app/routes/__init__.py +++ b/packages/backend/app/routes/__init__.py @@ -7,6 +7,7 @@ from .categories import bp as categories_bp from .docs import bp as docs_bp from .dashboard import bp as dashboard_bp +from .jobs import jobs_bp def register_routes(app: Flask): @@ -18,3 +19,4 @@ def register_routes(app: Flask): app.register_blueprint(categories_bp, url_prefix="/categories") app.register_blueprint(docs_bp, url_prefix="/docs") app.register_blueprint(dashboard_bp, url_prefix="/dashboard") + app.register_blueprint(jobs_bp) diff --git a/packages/backend/app/routes/jobs.py b/packages/backend/app/routes/jobs.py new file mode 100644 index 000000000..50ab45d72 --- /dev/null +++ b/packages/backend/app/routes/jobs.py @@ -0,0 +1,129 @@ +"""Job monitoring and management API endpoints. + +Provides REST endpoints to: +- List and filter background jobs +- View individual job status +- Cancel pending jobs +- Retry dead-lettered jobs +- View queue statistics +""" + +from flask import Blueprint, jsonify, request +from flask_jwt_extended import jwt_required, get_jwt_identity +from ..services.job_queue import ( + get_job_status, + list_jobs, + cancel_job, + retry_dead_job, + get_queue_stats, + enqueue, +) + +jobs_bp = Blueprint("jobs", __name__, url_prefix="/api/jobs") + + +@jobs_bp.route("", methods=["GET"]) +@jwt_required() +def api_list_jobs(): + """List background jobs with optional filters. + + Query params: + status: Filter by status (pending|running|completed|failed|retrying|dead) + job_type: Filter by job type + queue: Filter by queue name + limit: Max results (default 50) + offset: Pagination offset + """ + status = request.args.get("status") + job_type = request.args.get("job_type") + queue = request.args.get("queue") + limit = request.args.get("limit", 50, type=int) + offset = request.args.get("offset", 0, type=int) + + jobs = list_jobs( + status=status, + job_type=job_type, + queue=queue, + limit=min(limit, 200), + offset=offset, + ) + + # Serialise datetime fields + for j in jobs: + for key in ("created_at", "started_at", "completed_at", "next_retry_at"): + val = j.get(key) + if val and hasattr(val, "isoformat"): + j[key] = val.isoformat() + + return jsonify(jobs=jobs, count=len(jobs)) + + +@jobs_bp.route("/", methods=["GET"]) +@jwt_required() +def api_get_job(job_id): + """Get detailed status of a single job.""" + job = get_job_status(job_id) + if job is None: + return jsonify(error="Job not found"), 404 + + for key in ("created_at", "started_at", "completed_at", "next_retry_at"): + val = job.get(key) + if val and hasattr(val, "isoformat"): + job[key] = val.isoformat() + + return jsonify(job) + + +@jobs_bp.route("//cancel", methods=["POST"]) +@jwt_required() +def api_cancel_job(job_id): + """Cancel a pending or retrying job.""" + ok = cancel_job(job_id) + if not ok: + return jsonify(error="Cannot cancel job (not found or not in cancellable state)"), 400 + return jsonify(status="cancelled", job_id=job_id) + + +@jobs_bp.route("//retry", methods=["POST"]) +@jwt_required() +def api_retry_job(job_id): + """Re-enqueue a dead-lettered job for retry.""" + ok = retry_dead_job(job_id) + if not ok: + return jsonify(error="Cannot retry job (not found or not dead)"), 400 + return jsonify(status="requeued", job_id=job_id) + + +@jobs_bp.route("/stats", methods=["GET"]) +@jwt_required() +def api_queue_stats(): + """Get aggregate queue statistics.""" + stats = get_queue_stats() + return jsonify(stats) + + +@jobs_bp.route("/enqueue", methods=["POST"]) +@jwt_required() +def api_enqueue(): + """Manually enqueue a new job. + + Body JSON: + job_type: str (required) + payload: dict (optional) + queue: str (default "default") + priority: int (default 5) + max_retries: int (default 5) + """ + data = request.get_json(silent=True) or {} + job_type = data.get("job_type") + if not job_type: + return jsonify(error="job_type is required"), 400 + + job_id = enqueue( + job_type=job_type, + payload=data.get("payload"), + queue=data.get("queue", "default"), + priority=data.get("priority", 5), + max_retries=data.get("max_retries", 5), + ) + return jsonify(status="enqueued", job_id=job_id), 201 diff --git a/packages/backend/app/services/job_queue.py b/packages/backend/app/services/job_queue.py new file mode 100644 index 000000000..498c19e27 --- /dev/null +++ b/packages/backend/app/services/job_queue.py @@ -0,0 +1,348 @@ +"""Resilient background job queue with retry and monitoring. + +Features: +- Redis-backed job queue with priority support +- Exponential backoff retry with configurable max retries +- Dead-letter queue for permanently failed jobs +- Job status tracking in PostgreSQL +- Prometheus metrics for monitoring +""" + +from __future__ import annotations + +import json +import logging +import time +import traceback +import uuid +from datetime import datetime, timedelta +from enum import Enum +from typing import Any, Callable, Dict, Optional + +from ..extensions import db, redis_client + +logger = logging.getLogger("finmind.jobs") + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +QUEUE_KEY = "finmind:jobs:queue" +PROCESSING_KEY = "finmind:jobs:processing" +DEAD_LETTER_KEY = "finmind:jobs:dead_letter" +JOB_DATA_KEY = "finmind:jobs:data:{job_id}" +JOB_LOCK_KEY = "finmind:jobs:lock:{job_id}" + +DEFAULT_MAX_RETRIES = 5 +DEFAULT_RETRY_DELAY = 5 # seconds (base delay, doubles each retry) +DEFAULT_JOB_TIMEOUT = 300 # 5 minutes + + +class JobStatus(str, Enum): + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + RETRYING = "retrying" + DEAD = "dead" # moved to dead-letter queue + + +# --------------------------------------------------------------------------- +# Job Data Model (Redis) +# --------------------------------------------------------------------------- + +def _job_key(job_id: str) -> str: + return JOB_DATA_KEY.format(job_id=job_id) + + +def _lock_key(job_id: str) -> str: + return JOB_LOCK_KEY.format(job_id=job_id) + + +def save_job_metadata(job_id: str, metadata: dict) -> None: + """Persist job metadata in Redis.""" + redis_client.set( + _job_key(job_id), + json.dumps(metadata, default=str), + ex=86400 * 7, # 7-day TTL + ) + + +def load_job_metadata(job_id: str) -> Optional[dict]: + """Load job metadata from Redis.""" + raw = redis_client.get(_job_key(job_id)) + if raw is None: + return None + return json.loads(raw) + + +# --------------------------------------------------------------------------- +# PostgreSQL Job Record +# --------------------------------------------------------------------------- + +from sqlalchemy import text + + +def _ensure_jobs_table() -> None: + """Create the background_jobs table if it doesn't exist.""" + db.session.execute( + text( + """ + CREATE TABLE IF NOT EXISTS background_jobs ( + id VARCHAR(64) PRIMARY KEY, + job_type VARCHAR(100) NOT NULL, + status VARCHAR(20) NOT NULL DEFAULT 'pending', + queue VARCHAR(50) NOT NULL DEFAULT 'default', + priority INT NOT NULL DEFAULT 5, + attempts INT NOT NULL DEFAULT 0, + max_retries INT NOT NULL DEFAULT 5, + last_error TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + started_at TIMESTAMPTZ, + completed_at TIMESTAMPTZ, + next_retry_at TIMESTAMPTZ, + payload JSONB DEFAULT '{}'::jsonb, + result JSONB + ) + """ + ) + ) + db.session.execute( + text( + """ + CREATE INDEX IF NOT EXISTS idx_bg_jobs_status + ON background_jobs (status, next_retry_at) + """ + ) + ) + db.session.execute( + text( + """ + CREATE INDEX IF NOT EXISTS idx_bg_jobs_type + ON background_jobs (job_type, created_at DESC) + """ + ) + ) + db.session.commit() + + +def _insert_job_record( + job_id: str, + job_type: str, + queue: str, + priority: int, + max_retries: int, + payload: dict, +) -> None: + """Insert a job record into PostgreSQL.""" + db.session.execute( + text( + """ + INSERT INTO background_jobs + (id, job_type, status, queue, priority, max_retries, payload, created_at) + VALUES + (:id, :job_type, 'pending', :queue, :priority, :max_retries, :payload, NOW()) + """ + ), + { + "id": job_id, + "job_type": job_type, + "queue": queue, + "priority": priority, + "max_retries": max_retries, + "payload": json.dumps(payload), + }, + ) + db.session.commit() + + +def _update_job_status( + job_id: str, + status: str, + error: Optional[str] = None, + result: Optional[dict] = None, + increment_attempts: bool = False, + next_retry_at: Optional[datetime] = None, +) -> None: + """Update a job's status in PostgreSQL.""" + sets = ["status = :status", "started_at = COALESCE(started_at, CASE WHEN :status = 'running' THEN NOW() ELSE started_at END)"] + params: dict[str, Any] = {"id": job_id, "status": status} + + if status in ("completed", "failed", "dead"): + sets.append("completed_at = CASE WHEN completed_at IS NULL THEN NOW() ELSE completed_at END") + if error is not None: + sets.append("last_error = :error") + params["error"] = error + if result is not None: + sets.append("result = :result") + params["result"] = json.dumps(result) + if increment_attempts: + sets.append("attempts = attempts + 1") + if next_retry_at is not None: + sets.append("next_retry_at = :next_retry_at") + params["next_retry_at"] = next_retry_at + + db.session.execute( + text(f"UPDATE background_jobs SET {', '.join(sets)} WHERE id = :id"), + params, + ) + db.session.commit() + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + +def enqueue( + job_type: str, + payload: dict | None = None, + queue: str = "default", + priority: int = 5, + max_retries: int = DEFAULT_MAX_RETRIES, + job_id: str | None = None, +) -> str: + """Enqueue a new background job. + + Args: + job_type: Registered handler name (e.g. "send_reminder"). + payload: JSON-serialisable dict passed to the handler. + queue: Queue name (default "default"). + priority: Lower number = higher priority (1 highest). + max_retries: Maximum retry attempts before dead-letter. + job_id: Optional explicit job ID (uuid4 generated otherwise). + + Returns: + The job ID. + """ + _ensure_jobs_table() + job_id = job_id or str(uuid.uuid4()) + payload = payload or {} + + metadata = { + "job_id": job_id, + "job_type": job_type, + "queue": queue, + "priority": priority, + "max_retries": max_retries, + "payload": payload, + "status": JobStatus.PENDING, + "created_at": datetime.utcnow().isoformat(), + "attempts": 0, + } + + save_job_metadata(job_id, metadata) + _insert_job_record(job_id, job_type, queue, priority, max_retries, payload) + + # Push to Redis sorted-set (score = priority, value = job_id) + redis_client.zadd(QUEUE_KEY, {job_id: priority}) + logger.info("Enqueued job %s [%s] priority=%d", job_id, job_type, priority) + return job_id + + +def get_job_status(job_id: str) -> Optional[dict]: + """Return current status of a job.""" + metadata = load_job_metadata(job_id) + if metadata is None: + # Fall back to PostgreSQL + row = db.session.execute( + text("SELECT * FROM background_jobs WHERE id = :id"), + {"id": job_id}, + ).fetchone() + if row is None: + return None + return dict(row._mapping) + return metadata + + +def list_jobs( + status: Optional[str] = None, + job_type: Optional[str] = None, + queue: Optional[str] = None, + limit: int = 50, + offset: int = 0, +) -> list[dict]: + """List jobs with optional filters.""" + _ensure_jobs_table() + clauses = [] + params: dict[str, Any] = {"limit": limit, "offset": offset} + + if status: + clauses.append("status = :status") + params["status"] = status + if job_type: + clauses.append("job_type = :job_type") + params["job_type"] = job_type + if queue: + clauses.append("queue = :queue") + params["queue"] = queue + + where = "WHERE " + " AND ".join(clauses) if clauses else "" + rows = db.session.execute( + text( + f"SELECT * FROM background_jobs {where} " + f"ORDER BY created_at DESC LIMIT :limit OFFSET :offset" + ), + params, + ).fetchall() + return [dict(r._mapping) for r in rows] + + +def cancel_job(job_id: str) -> bool: + """Cancel a pending job.""" + meta = load_job_metadata(job_id) + if meta is None: + return False + if meta["status"] not in (JobStatus.PENDING, JobStatus.RETRYING): + return False + meta["status"] = "cancelled" + save_job_metadata(job_id, meta) + _update_job_status(job_id, "cancelled") + redis_client.zrem(QUEUE_KEY, job_id) + logger.info("Cancelled job %s", job_id) + return True + + +def retry_dead_job(job_id: str) -> bool: + """Re-enqueue a dead-lettered job.""" + meta = load_job_metadata(job_id) + if meta is None: + return False + if meta["status"] != JobStatus.DEAD: + return False + meta["status"] = JobStatus.PENDING + meta["attempts"] = 0 + meta["last_error"] = None + save_job_metadata(job_id, meta) + _update_job_status( + job_id, + JobStatus.PENDING, + error=None, + increment_attempts=False, + ) + redis_client.zadd(QUEUE_KEY, {job_id: meta.get("priority", 5)}) + redis_client.lrem(DEAD_LETTER_KEY, 0, job_id) + logger.info("Re-enqueued dead job %s", job_id) + return True + + +# --------------------------------------------------------------------------- +# Metrics helpers +# --------------------------------------------------------------------------- + +def get_queue_stats() -> dict: + """Return aggregate queue statistics.""" + _ensure_jobs_table() + rows = db.session.execute( + text( + """ + SELECT status, COUNT(*) AS count + FROM background_jobs + GROUP BY status + """ + ) + ).fetchall() + stats = {r.status: r.count for r in rows} + stats["queued"] = redis_client.zcard(QUEUE_KEY) + stats["processing"] = redis_client.llen(PROCESSING_KEY) + stats["dead_letter"] = redis_client.llen(DEAD_LETTER_KEY) + return stats diff --git a/packages/backend/app/services/job_worker.py b/packages/backend/app/services/job_worker.py new file mode 100644 index 000000000..436522fde --- /dev/null +++ b/packages/backend/app/services/job_worker.py @@ -0,0 +1,282 @@ +"""Background job worker. + +Runs via APScheduler, dequeuing jobs from Redis and executing them +with exponential-backoff retry logic. +""" + +from __future__ import annotations + +import json +import logging +import time +import traceback +from datetime import datetime, timedelta +from typing import Any, Callable, Dict, Optional + +from ..extensions import db, redis_client +from .job_queue import ( + QUEUE_KEY, + PROCESSING_KEY, + DEAD_LETTER_KEY, + JobStatus, + DEFAULT_MAX_RETRIES, + DEFAULT_RETRY_DELAY, + DEFAULT_JOB_TIMEOUT, + _job_key, + _lock_key, + save_job_metadata, + load_job_metadata, + _update_job_status, +) + +logger = logging.getLogger("finmind.jobs.worker") + +# --------------------------------------------------------------------------- +# Registry of job handlers +# --------------------------------------------------------------------------- +_HANDLERS: Dict[str, Callable[..., Any]] = {} + + +def register_handler(job_type: str): + """Decorator to register a job handler function.""" + + def decorator(fn: Callable) -> Callable: + _HANDLERS[job_type] = fn + return fn + + return decorator + + +def get_handler(job_type: str) -> Optional[Callable]: + return _HANDLERS.get(job_type) + + +# --------------------------------------------------------------------------- +# Built-in handlers (example stubs – expand as needed) +# --------------------------------------------------------------------------- + +@register_handler("send_reminder") +def _handle_send_reminder(payload: dict) -> dict: + """Send a reminder notification.""" + from .reminders import send_reminder + from ..models import Reminder + + reminder_id = payload.get("reminder_id") + if reminder_id is None: + raise ValueError("reminder_id required") + + reminder = db.session.get(Reminder, reminder_id) + if reminder is None: + raise ValueError(f"Reminder {reminder_id} not found") + + ok = send_reminder(reminder) + return {"sent": ok} + + +@register_handler("import_expenses") +def _handle_import_expenses(payload: dict) -> dict: + """Import expenses from a file.""" + from .expense_import import import_csv + + file_path = payload["file_path"] + user_id = payload["user_id"] + result = import_csv(file_path, user_id) + return result + + +# --------------------------------------------------------------------------- +# Worker loop +# --------------------------------------------------------------------------- + +def _compute_backoff(attempt: int, base: float = DEFAULT_RETRY_DELAY) -> float: + """Exponential backoff: base * 2^attempt, capped at 1 hour.""" + delay = base * (2 ** attempt) + return min(delay, 3600) + + +def _acquire_lock(job_id: str, timeout: int = DEFAULT_JOB_TIMEOUT) -> bool: + """Try to acquire a distributed lock for a job.""" + return redis_client.set( + _lock_key(job_id), + "1", + nx=True, + ex=timeout, + ) + + +def _release_lock(job_id: str) -> None: + redis_client.delete(_lock_key(job_id)) + + +def process_next_job(app=None) -> bool: + """Dequeue and execute the highest-priority pending job. + + Returns True if a job was processed, False if the queue was empty. + """ + # Pop the lowest-score (highest-priority) job + results = redis_client.zpopmin(QUEUE_KEY, count=1) + if not results: + return False + + job_id, _score = results[0] + + if not _acquire_lock(job_id): + # Another worker grabbed it; push it back + redis_client.zadd(QUEUE_KEY, {job_id: _score}) + return False + + meta = load_job_metadata(job_id) + if meta is None: + logger.warning("Job %s metadata missing, discarding", job_id) + _release_lock(job_id) + return True + + job_type = meta["job_type"] + payload = meta.get("payload", {}) + attempts = meta.get("attempts", 0) + max_retries = meta.get("max_retries", DEFAULT_MAX_RETRIES) + + handler = get_handler(job_type) + if handler is None: + logger.error("No handler registered for job type %s", job_type) + _fail_job(job_id, meta, f"No handler for job type: {job_type}", is_dead=True) + _release_lock(job_id) + return True + + # Mark running + meta["status"] = JobStatus.RUNNING + meta["attempts"] = attempts + 1 + meta["started_at"] = datetime.utcnow().isoformat() + save_job_metadata(job_id, meta) + _update_job_status(job_id, JobStatus.RUNNING, increment_attempts=True) + + redis_client.lpush(PROCESSING_KEY, job_id) + + logger.info("Processing job %s [%s] attempt %d/%d", job_id, job_type, attempts + 1, max_retries) + + try: + result = handler(payload) + # Success + meta["status"] = JobStatus.COMPLETED + meta["completed_at"] = datetime.utcnow().isoformat() + meta["result"] = result + save_job_metadata(job_id, meta) + _update_job_status(job_id, JobStatus.COMPLETED, result=result) + redis_client.lrem(PROCESSING_KEY, 0, job_id) + logger.info("Job %s completed successfully", job_id) + except Exception as exc: + error_msg = f"{type(exc).__name__}: {exc}" + logger.error("Job %s failed: %s", job_id, error_msg, exc_info=True) + + if attempts + 1 >= max_retries: + # Move to dead-letter queue + _fail_job(job_id, meta, error_msg, is_dead=True) + else: + # Schedule retry with backoff + backoff = _compute_backoff(attempts) + retry_at = datetime.utcnow() + timedelta(seconds=backoff) + meta["status"] = JobStatus.RETRYING + meta["last_error"] = error_msg + meta["next_retry_at"] = retry_at.isoformat() + save_job_metadata(job_id, meta) + _update_job_status( + job_id, + JobStatus.RETRYING, + error=error_msg, + next_retry_at=retry_at, + ) + redis_client.lrem(PROCESSING_KEY, 0, job_id) + # Schedule re-enqueue via sorted-set with future timestamp as score + redis_client.zadd(QUEUE_KEY, {job_id: retry_at.timestamp()}) + logger.info( + "Job %s scheduled for retry in %.0fs (attempt %d/%d)", + job_id, backoff, attempts + 1, max_retries, + ) + + _release_lock(job_id) + return True + + +def _fail_job(job_id: str, meta: dict, error: str, is_dead: bool = False) -> None: + """Mark a job as failed or dead.""" + status = JobStatus.DEAD if is_dead else JobStatus.FAILED + meta["status"] = status + meta["last_error"] = error + meta["completed_at"] = datetime.utcnow().isoformat() + save_job_metadata(job_id, meta) + _update_job_status(job_id, status, error=error) + redis_client.lrem(PROCESSING_KEY, 0, job_id) + if is_dead: + redis_client.lpush(DEAD_LETTER_KEY, job_id) + logger.warning("Job %s marked as %s: %s", job_id, status, error) + + +def process_batch(batch_size: int = 10, app=None) -> int: + """Process up to `batch_size` jobs from the queue.""" + processed = 0 + for _ in range(batch_size): + if not process_next_job(app): + break + processed += 1 + return processed + + +def retry_scheduled_jobs() -> int: + """Re-enqueue jobs whose retry time has arrived. + + Returns the number of jobs re-enqueued. + """ + now = time.time() + # Find jobs in queue with score <= now (scheduled retries) + jobs = redis_client.zrangebyscore(QUEUE_KEY, "-inf", now, start=0, num=50) + requeued = 0 + for job_id in jobs: + meta = load_job_metadata(job_id) + if meta and meta.get("status") == JobStatus.RETRYING: + # Reset priority score + redis_client.zadd(QUEUE_KEY, {job_id: meta.get("priority", 5)}) + requeued += 1 + return requeued + + +# --------------------------------------------------------------------------- +# Prometheus metrics integration +# --------------------------------------------------------------------------- + +try: + from prometheus_client import Counter, Gauge, Histogram + + JOBS_ENQUEUED = Counter( + "finmind_jobs_enqueued_total", + "Total jobs enqueued", + ["job_type", "queue"], + ) + JOBS_COMPLETED = Counter( + "finmind_jobs_completed_total", + "Total jobs completed successfully", + ["job_type", "queue"], + ) + JOBS_FAILED = Counter( + "finmind_jobs_failed_total", + "Total jobs failed (including retries)", + ["job_type", "queue"], + ) + JOBS_DEAD = Counter( + "finmind_jobs_dead_total", + "Total jobs moved to dead-letter queue", + ["job_type", "queue"], + ) + JOBS_DURATION = Histogram( + "finmind_job_duration_seconds", + "Job execution duration", + ["job_type"], + buckets=[1, 5, 10, 30, 60, 120, 300], + ) + QUEUE_DEPTH = Gauge( + "finmind_queue_depth", + "Current queue depth", + ["queue"], + ) + _METRICS_ENABLED = True +except ImportError: + _METRICS_ENABLED = False diff --git a/packages/backend/tests/test_jobs.py b/packages/backend/tests/test_jobs.py new file mode 100644 index 000000000..1736a76bb --- /dev/null +++ b/packages/backend/tests/test_jobs.py @@ -0,0 +1,235 @@ +"""Tests for the background job queue system.""" + +import json +import time +from unittest.mock import MagicMock, patch + +import pytest + + +@pytest.fixture +def job_queue_module(): + """Import job_queue with mocked Redis and DB.""" + with patch("app.services.job_queue.redis_client") as mock_redis, \ + patch("app.services.job_queue.db") as mock_db: + from app.services import job_queue + yield job_queue, mock_redis, mock_db + + +@pytest.fixture +def worker_module(): + """Import job_worker with mocked dependencies.""" + with patch("app.services.job_worker.redis_client") as mock_redis, \ + patch("app.services.job_worker.db") as mock_db: + from app.services import job_worker + yield job_worker, mock_redis, mock_db + + +class TestJobQueue: + """Tests for job_queue service.""" + + def test_enqueue_creates_redis_entry(self, job_queue_module): + jq, mock_redis, mock_db = job_queue_module + mock_redis.zadd.return_value = 1 + + job_id = jq.enqueue( + job_type="test_job", + payload={"key": "value"}, + priority=3, + ) + + assert job_id is not None + assert len(job_id) == 36 # UUID format + mock_redis.set.assert_called_once() + mock_redis.zadd.assert_called_once() + + def test_enqueue_custom_job_id(self, job_queue_module): + jq, mock_redis, mock_db = job_queue_module + + job_id = jq.enqueue( + job_type="test_job", + job_id="custom-id-123", + ) + + assert job_id == "custom-id-123" + + def test_get_queue_stats(self, job_queue_module): + jq, mock_redis, mock_db = job_queue_module + mock_redis.zcard.return_value = 5 + mock_redis.llen.return_value = 2 + + # Mock DB query result + mock_row = MagicMock() + mock_row.status = "pending" + mock_row.count = 10 + mock_db.session.execute.return_value.fetchall.return_value = [mock_row] + + stats = jq.get_queue_stats() + assert "queued" in stats + assert stats["queued"] == 5 + + def test_cancel_job(self, job_queue_module): + jq, mock_redis, mock_db = job_queue_module + + meta = { + "job_id": "test-123", + "status": "pending", + "priority": 5, + } + mock_redis.get.return_value = json.dumps(meta) + + result = jq.cancel_job("test-123") + assert result is True + + def test_cancel_running_job_fails(self, job_queue_module): + jq, mock_redis, mock_db = job_queue_module + + meta = { + "job_id": "test-123", + "status": "running", + } + mock_redis.get.return_value = json.dumps(meta) + + result = jq.cancel_job("test-123") + assert result is False + + def test_retry_dead_job(self, job_queue_module): + jq, mock_redis, mock_db = job_queue_module + + meta = { + "job_id": "test-123", + "status": "dead", + "priority": 5, + } + mock_redis.get.return_value = json.dumps(meta) + + result = jq.retry_dead_job("test-123") + assert result is True + mock_redis.zadd.assert_called() + + +class TestJobWorker: + """Tests for job_worker service.""" + + def test_register_handler(self, worker_module): + jw, mock_redis, mock_db = worker_module + + @jw.register_handler("test_handler") + def handler(payload): + return {"ok": True} + + assert jw.get_handler("test_handler") is not None + + def test_backoff_computation(self, worker_module): + jw, mock_redis, mock_db = worker_module + + assert jw._compute_backoff(0) == 5 + assert jw._compute_backoff(1) == 10 + assert jw._compute_backoff(2) == 20 + assert jw._compute_backoff(3) == 40 + # Capped at 3600 + assert jw._compute_backoff(20) == 3600 + + def test_process_empty_queue(self, worker_module): + jw, mock_redis, mock_db = worker_module + mock_redis.zpopmin.return_value = [] + + result = jw.process_next_job() + assert result is False + + def test_process_job_success(self, worker_module): + jw, mock_redis, mock_db = worker_module + + # Register a test handler + @jw.register_handler("success_job") + def success_handler(payload): + return {"result": "done"} + + job_id = "test-job-123" + meta = { + "job_id": job_id, + "job_type": "success_job", + "payload": {}, + "attempts": 0, + "max_retries": 5, + "priority": 5, + } + + mock_redis.zpopmin.return_value = [(job_id, 5.0)] + mock_redis.set.return_value = True # lock acquired + mock_redis.get.return_value = json.dumps(meta, default=str) + + result = jw.process_next_job() + assert result is True + + def test_process_job_failure_with_retry(self, worker_module): + jw, mock_redis, mock_db = worker_module + + @jw.register_handler("fail_job") + def fail_handler(payload): + raise RuntimeError("Something went wrong") + + job_id = "test-job-456" + meta = { + "job_id": job_id, + "job_type": "fail_job", + "payload": {}, + "attempts": 0, + "max_retries": 3, + "priority": 5, + } + + mock_redis.zpopmin.return_value = [(job_id, 5.0)] + mock_redis.set.return_value = True + mock_redis.get.return_value = json.dumps(meta, default=str) + + result = jw.process_next_job() + assert result is True + # Should have been re-added to queue for retry + mock_redis.zadd.assert_called() + + def test_process_job_failure_dead_letter(self, worker_module): + jw, mock_redis, mock_db = worker_module + + @jw.register_handler("fail_forever") + def fail_handler(payload): + raise RuntimeError("Permanent failure") + + job_id = "test-job-789" + meta = { + "job_id": job_id, + "job_type": "fail_forever", + "payload": {}, + "attempts": 4, # Already tried 4 times + "max_retries": 5, + "priority": 5, + } + + mock_redis.zpopmin.return_value = [(job_id, 5.0)] + mock_redis.set.return_value = True + mock_redis.get.return_value = json.dumps(meta, default=str) + + result = jw.process_next_job() + assert result is True + # Should be pushed to dead letter queue + mock_redis.lpush.assert_called() + + def test_unknown_handler_marks_dead(self, worker_module): + jw, mock_redis, mock_db = worker_module + + job_id = "unknown-job" + meta = { + "job_id": job_id, + "job_type": "nonexistent_handler", + "payload": {}, + "attempts": 0, + "max_retries": 5, + } + + mock_redis.zpopmin.return_value = [(job_id, 5.0)] + mock_redis.set.return_value = True + mock_redis.get.return_value = json.dumps(meta, default=str) + + result = jw.process_next_job() + assert result is True + mock_redis.lpush.assert_called() # dead letter