Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions docs/job-queue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# Background Job Queue

FinMind uses a resilient background job queue system for asynchronous task execution with automatic retry and monitoring.

## Architecture

```
┌──────────────┐ ┌───────────┐ ┌──────────────┐
│ API / CLI │────>│ Redis │────>│ APScheduler │
│ (enqueue) │ │ (queue) │ │ (worker) │
└──────────────┘ └───────────┘ └──────┬───────┘
┌──────────────────────────┤
│ │
┌────▼─────┐ ┌───────▼──────┐
│ PostgreSQL│ │ Dead-Letter │
│ (status) │ │ Queue │
└──────────┘ └──────────────┘
```

## Features

- **Redis-backed queue** with priority support (lower number = higher priority)
- **Exponential backoff retry** with configurable max retries (default: 5)
- **Dead-letter queue** for permanently failed jobs
- **Job status tracking** in PostgreSQL
- **Distributed locking** to prevent duplicate execution
- **APScheduler integration** for automatic job processing
- **Prometheus metrics** for monitoring

## API Endpoints

All endpoints require JWT authentication.

### List Jobs
```
GET /api/jobs?status=pending&job_type=send_reminder&limit=50&offset=0
```

### Get Job Status
```
GET /api/jobs/<job_id>
```

### Cancel Job
```
POST /api/jobs/<job_id>/cancel
```

### Retry Dead Job
```
POST /api/jobs/<job_id>/retry
```

### Queue Statistics
```
GET /api/jobs/stats
```

### Enqueue Job
```
POST /api/jobs/enqueue
Content-Type: application/json

{
"job_type": "send_reminder",
"payload": {"reminder_id": 123},
"queue": "default",
"priority": 5,
"max_retries": 5
}
```

## Job Types

### `send_reminder`
Send a bill reminder notification (email or WhatsApp).

```json
{"reminder_id": 123}
```

### `import_expenses`
Import expenses from a CSV file.

```json
{"file_path": "/tmp/import.csv", "user_id": 42}
```

## Adding Custom Handlers

Register new job types using the `register_handler` decorator:

```python
from app.services.job_worker import register_handler

@register_handler("my_custom_job")
def handle_my_job(payload: dict) -> dict:
# Your logic here
return {"status": "done"}
```

## Retry Behavior

| Attempt | Delay |
|---------|-------|
| 1 | 5s |
| 2 | 10s |
| 3 | 20s |
| 4 | 40s |
| 5 | 80s |
| ... | ... |
| max | 1h |

After `max_retries` attempts, jobs are moved to the dead-letter queue and can be manually retried via the API.

## Prometheus Metrics

- `finmind_jobs_enqueued_total` — Total jobs enqueued
- `finmind_jobs_completed_total` — Total jobs completed
- `finmind_jobs_failed_total` — Total job failures (including retries)
- `finmind_jobs_dead_total` — Total jobs moved to dead-letter queue
- `finmind_job_duration_seconds` — Job execution duration histogram
- `finmind_queue_depth` — Current queue depth

## Configuration

Environment variables:

| Variable | Default | Description |
|----------|---------|-------------|
| `REDIS_URL` | `redis://localhost:6379` | Redis connection URL |
| `DATABASE_URL` | — | PostgreSQL connection URL |

## Testing

```bash
cd packages/backend
pytest tests/test_jobs.py -v
```
24 changes: 24 additions & 0 deletions packages/backend/app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ def create_app(settings: Settings | None = None) -> Flask:
# Blueprint routes
register_routes(app)

# Background job worker (APScheduler)
_start_job_worker(app)

# Backward-compatible schema patch for existing databases.
with app.app_context():
_ensure_schema_compatibility(app)
Expand Down Expand Up @@ -96,6 +99,27 @@ def init_db():
return app


def _start_job_worker(app: Flask) -> None:
"""Start APScheduler to poll the job queue periodically."""
try:
from apscheduler.schedulers.background import BackgroundScheduler
from .services.job_worker import process_batch, retry_scheduled_jobs

scheduler = BackgroundScheduler(daemon=True)

def _tick():
"""Process jobs in an application context."""
with app.app_context():
retry_scheduled_jobs()
process_batch(batch_size=5)

scheduler.add_job(_tick, "interval", seconds=5, id="job_worker_tick")
scheduler.start()
app.logger.info("Job worker scheduler started (interval=5s)")
except Exception:
app.logger.exception("Failed to start job worker scheduler")


def _ensure_schema_compatibility(app: Flask) -> None:
"""Apply minimal compatibility ALTERs for existing deployments."""
if db.engine.dialect.name != "postgresql":
Expand Down
2 changes: 2 additions & 0 deletions packages/backend/app/routes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .categories import bp as categories_bp
from .docs import bp as docs_bp
from .dashboard import bp as dashboard_bp
from .jobs import jobs_bp


def register_routes(app: Flask):
Expand All @@ -18,3 +19,4 @@ def register_routes(app: Flask):
app.register_blueprint(categories_bp, url_prefix="/categories")
app.register_blueprint(docs_bp, url_prefix="/docs")
app.register_blueprint(dashboard_bp, url_prefix="/dashboard")
app.register_blueprint(jobs_bp)
129 changes: 129 additions & 0 deletions packages/backend/app/routes/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""Job monitoring and management API endpoints.

Provides REST endpoints to:
- List and filter background jobs
- View individual job status
- Cancel pending jobs
- Retry dead-lettered jobs
- View queue statistics
"""

from flask import Blueprint, jsonify, request
from flask_jwt_extended import jwt_required, get_jwt_identity
from ..services.job_queue import (
get_job_status,
list_jobs,
cancel_job,
retry_dead_job,
get_queue_stats,
enqueue,
)

jobs_bp = Blueprint("jobs", __name__, url_prefix="/api/jobs")


@jobs_bp.route("", methods=["GET"])
@jwt_required()
def api_list_jobs():
"""List background jobs with optional filters.

Query params:
status: Filter by status (pending|running|completed|failed|retrying|dead)
job_type: Filter by job type
queue: Filter by queue name
limit: Max results (default 50)
offset: Pagination offset
"""
status = request.args.get("status")
job_type = request.args.get("job_type")
queue = request.args.get("queue")
limit = request.args.get("limit", 50, type=int)
offset = request.args.get("offset", 0, type=int)

jobs = list_jobs(
status=status,
job_type=job_type,
queue=queue,
limit=min(limit, 200),
offset=offset,
)

# Serialise datetime fields
for j in jobs:
for key in ("created_at", "started_at", "completed_at", "next_retry_at"):
val = j.get(key)
if val and hasattr(val, "isoformat"):
j[key] = val.isoformat()

return jsonify(jobs=jobs, count=len(jobs))


@jobs_bp.route("/<job_id>", methods=["GET"])
@jwt_required()
def api_get_job(job_id):
"""Get detailed status of a single job."""
job = get_job_status(job_id)
if job is None:
return jsonify(error="Job not found"), 404

for key in ("created_at", "started_at", "completed_at", "next_retry_at"):
val = job.get(key)
if val and hasattr(val, "isoformat"):
job[key] = val.isoformat()

return jsonify(job)


@jobs_bp.route("/<job_id>/cancel", methods=["POST"])
@jwt_required()
def api_cancel_job(job_id):
"""Cancel a pending or retrying job."""
ok = cancel_job(job_id)
if not ok:
return jsonify(error="Cannot cancel job (not found or not in cancellable state)"), 400
return jsonify(status="cancelled", job_id=job_id)


@jobs_bp.route("/<job_id>/retry", methods=["POST"])
@jwt_required()
def api_retry_job(job_id):
"""Re-enqueue a dead-lettered job for retry."""
ok = retry_dead_job(job_id)
if not ok:
return jsonify(error="Cannot retry job (not found or not dead)"), 400
return jsonify(status="requeued", job_id=job_id)


@jobs_bp.route("/stats", methods=["GET"])
@jwt_required()
def api_queue_stats():
"""Get aggregate queue statistics."""
stats = get_queue_stats()
return jsonify(stats)


@jobs_bp.route("/enqueue", methods=["POST"])
@jwt_required()
def api_enqueue():
"""Manually enqueue a new job.

Body JSON:
job_type: str (required)
payload: dict (optional)
queue: str (default "default")
priority: int (default 5)
max_retries: int (default 5)
"""
data = request.get_json(silent=True) or {}
job_type = data.get("job_type")
if not job_type:
return jsonify(error="job_type is required"), 400

job_id = enqueue(
job_type=job_type,
payload=data.get("payload"),
queue=data.get("queue", "default"),
priority=data.get("priority", 5),
max_retries=data.get("max_retries", 5),
)
return jsonify(status="enqueued", job_id=job_id), 201
Loading