Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ coverage
npm-debug.log
yarn-error.log

# OS specific
.DS_Store

# Editors specific
.fleet
.idea
Expand Down
103 changes: 89 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ npm install @boringnode/queue
- **Priority Queues**: Process high-priority jobs first
- **Bulk Dispatch**: Efficiently dispatch thousands of jobs at once
- **Job Grouping**: Organize related jobs for monitoring
- **Job Deduplication**: Prevent duplicate jobs with custom IDs
- **Retry with Backoff**: Exponential, linear, or fixed backoff strategies
- **Job Timeout**: Fail or retry jobs that exceed a time limit
- **Job History**: Retain completed/failed jobs for debugging
Expand Down Expand Up @@ -131,6 +132,80 @@ await SendEmailJob.dispatchMany(recipients).group('newsletter-jan-2025')

The `groupId` is stored with job data and accessible via `job.data.groupId`.

## Job Deduplication

Prevent the same job from being pushed multiple times. Four modes, all via `.dedup()`:

### Simple (skip while job exists)

```typescript
// First dispatch - job is created
await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run()

// Second dispatch with same dedup ID - silently skipped
await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run()
```

### Throttle (skip within TTL window)

```typescript
// Within 5s, duplicates are skipped. After 5s, a new job is created.
await SendEmailJob.dispatch({ to: 'user@example.com' })
.dedup({ id: 'welcome-123', ttl: '5s' })
.run()
```

### Extend (reset TTL on duplicate)

```typescript
// Each duplicate push resets the TTL timer.
await RateLimitJob.dispatch({ userId: 42 }).dedup({ id: 'rate-42', ttl: '1m', extend: true }).run()
```

### Debounce (replace payload + reset TTL)

```typescript
// Within the 2s window, the latest payload overwrites the previous pending job.
await SaveDraftJob.dispatch({ content: 'latest draft' })
.dedup({ id: 'draft-42', ttl: '2s', replace: true, extend: true })
.run()
```

### Inspecting the outcome

`DispatchResult` tells you what happened:

```typescript
const { jobId, deduped } = await SaveDraftJob.dispatch({ content: '...' })
.dedup({ id: 'draft-42', ttl: '2s', replace: true })
.run()

// deduped: 'added' | 'skipped' | 'replaced' | 'extended'
// jobId: the UUID of the job (the existing one when deduped)
```

### How it works

- The dedup ID is automatically prefixed with the job name (`SendInvoiceJob::order-123`), so different job types can reuse the same key.
- `ttl` accepts a Duration (`'5s'`, `'1m'`) or milliseconds.
- `extend` and `replace` **require** `ttl` — calling them without `ttl` throws.
- `replace` only applies to jobs in `pending` or `delayed` state. Active (executing) jobs are left alone; the dispatch returns `{ deduped: 'skipped' }`.
- `replace` swaps the **payload only** — priority, queue, delay, and groupId of the existing job are retained. To change those, use a different dedup id or wait for the TTL to expire.
- `retryJob` does not touch the dedup entry — a retried job continues to occupy the dedup slot. TTL runs on wall-clock time, so long-running retries may outlive the TTL window. Use a generous TTL or no TTL if retries must stay deduped.
- Atomic and race-free:
- **Redis**: `SET + PEXPIRE` under a Lua script with `HSETNX`-style guards.
- **Knex**: transactional `SELECT ... FOR UPDATE` + insert/update inside a transaction.
- **SyncAdapter**: executes inline, no dedup support.

### Caveats

- Without `.dedup()`, jobs use auto-generated UUIDs and are never deduplicated.
- `.dedup()` is only available on single dispatch. `dispatchMany` / `pushManyOn` reject jobs with a `dedup` field.
- Scheduled jobs (`.schedule()`) do not support dedup — each cron/interval fire is an independent dispatch.
- With no `ttl`, dedup persists until the job is removed (completed/failed without retention). When retention keeps the record, re-dispatch stays blocked until the record is pruned.
- With `ttl`, dedup expires after the window — a new job (new UUID) is created. The old job still runs.
- Knex concurrent race: two `pushOn` calls with the same dedup id firing at the exact same instant on Postgres (READ COMMITTED) can both succeed (rare). Serialize at the app layer if strict guarantees are required, or use Redis.

## Job History & Retention

Keep completed and failed jobs for debugging:
Expand Down Expand Up @@ -536,7 +611,7 @@ import * as boringqueue from '@boringnode/queue'

const instrumentation = new QueueInstrumentation({
messagingSystem: 'boringqueue', // default
executionSpanLinkMode: 'link', // or 'parent'
executionSpanLinkMode: 'link', // or 'parent'
})

instrumentation.enable()
Expand All @@ -549,19 +624,19 @@ The instrumentation patches `QueueManager.init()` to automatically inject its wr

The instrumentation uses standard [OTel messaging semantic conventions](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/) where they map cleanly, plus a few queue-specific custom attributes.

| Attribute | Kind | Description |
| ------------------------------- | ------- | ------------------------------------------ |
| `messaging.system` | Semconv | `'boringqueue'` (configurable) |
| `messaging.operation.name` | Semconv | `'publish'` or `'process'` |
| `messaging.destination.name` | Semconv | Queue name |
| `messaging.message.id` | Semconv | Job ID for single-message spans |
| `messaging.batch.message_count` | Semconv | Number of jobs in a batch dispatch |
| `messaging.message.retry.count` | Custom | Retry count (0-based) for a job attempt |
| `messaging.job.name` | Custom | Job class name (e.g. `SendEmailJob`) |
| `messaging.job.status` | Custom | `'completed'`, `'failed'`, or `'retrying'` |
| `messaging.job.group_id` | Custom | Queue-specific group identifier |
| `messaging.job.priority` | Custom | Queue-specific job priority |
| `messaging.job.delay_ms` | Custom | Delay before the job becomes available |
| Attribute | Kind | Description |
| ------------------------------- | ------- | --------------------------------------------- |
| `messaging.system` | Semconv | `'boringqueue'` (configurable) |
| `messaging.operation.name` | Semconv | `'publish'` or `'process'` |
| `messaging.destination.name` | Semconv | Queue name |
| `messaging.message.id` | Semconv | Job ID for single-message spans |
| `messaging.batch.message_count` | Semconv | Number of jobs in a batch dispatch |
| `messaging.message.retry.count` | Custom | Retry count (0-based) for a job attempt |
| `messaging.job.name` | Custom | Job class name (e.g. `SendEmailJob`) |
| `messaging.job.status` | Custom | `'completed'`, `'failed'`, or `'retrying'` |
| `messaging.job.group_id` | Custom | Queue-specific group identifier |
| `messaging.job.priority` | Custom | Queue-specific job priority |
| `messaging.job.delay_ms` | Custom | Delay before the job becomes available |
| `messaging.job.queue_time_ms` | Custom | Time spent waiting in queue before processing |

### Trace Context Propagation
Expand Down
24 changes: 20 additions & 4 deletions src/contracts/adapter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type {
DedupOutcome,
JobData,
JobRecord,
JobRetention,
Expand All @@ -7,6 +8,17 @@ import type {
ScheduleListOptions,
} from '../types/main.js'

/**
* Result of a push operation when dedup was involved.
* `outcome` tells the dispatcher what happened; `jobId` is the ID of the
* existing job when deduped (skipped/replaced/extended).
*/
export interface PushResult {
outcome: DedupOutcome
/** ID of the existing job when a duplicate was detected, otherwise the newly added job's id. */
jobId: string
}

/**
* A job that has been acquired by a worker for processing.
* Extends JobData with the timestamp when the job was acquired.
Expand Down Expand Up @@ -119,33 +131,37 @@ export interface Adapter {
* Push a job to the default queue for immediate processing.
*
* @param jobData - The job data to push
* @returns PushResult if jobData.dedup is set, otherwise void
*/
push(jobData: JobData): Promise<void>
push(jobData: JobData): Promise<PushResult | void>

/**
* Push a job to a specific queue for immediate processing.
*
* @param queue - The queue name to push to
* @param jobData - The job data to push
* @returns PushResult if jobData.dedup is set, otherwise void
*/
pushOn(queue: string, jobData: JobData): Promise<void>
pushOn(queue: string, jobData: JobData): Promise<PushResult | void>

/**
* Push a job to the default queue with a delay.
*
* @param jobData - The job data to push
* @param delay - Delay in milliseconds before the job becomes available
* @returns PushResult if jobData.dedup is set, otherwise void
*/
pushLater(jobData: JobData, delay: number): Promise<void>
pushLater(jobData: JobData, delay: number): Promise<PushResult | void>

/**
* Push a job to a specific queue with a delay.
*
* @param queue - The queue name to push to
* @param jobData - The job data to push
* @param delay - Delay in milliseconds before the job becomes available
* @returns PushResult if jobData.dedup is set, otherwise void
*/
pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<void>
pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<PushResult | void>

/**
* Push multiple jobs to the default queue for immediate processing.
Expand Down
Loading