Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,6 @@ Sequential async pipelines with **first-class retry, error boundaries, and smart

Just plain JavaScript. Eager execution. Perfect stack traces.

```js
const result = await series([
() => fetchUser(id),
user => validateAndEnrich(user),
final => saveToDatabase(final),
saved => sendWebhook(saved),
], { strategy: collect }); // or failFast, retry(3), custom...
```

## Why Pipelean?

To stop writing the same try/catch and manual accumulation boilerplate.
Expand Down Expand Up @@ -113,7 +104,7 @@ const downloadSomething = async () => {...}
const transformSomething = () => {...}
const writeToDatabase = async () => {...}

const pipeline = await pipe(
const pipeline = pipe(
downloadSomething,
transformSomething,
writeToDatabase,
Expand Down
32 changes: 19 additions & 13 deletions docs/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@ Pipelean provides four main tools, grouped by **data flow direction** (horizonta
All iteration functions (`series`, `filter`, `scan`) support four error strategies:

**failFast** (aliases: `fail`, `stopOnError`)
- Sets `failure: {item, error}` on first error
- Calls `onFailure({item, error})` immediately
- Sets `failure: {item, error, index}` on first error
- Calls `onError({item, error, index, total})`, then `onFailure({item, error, index})` immediately
- Stops iteration; results array is empty on failure

**throw**
- Throws the error on first failure
- Does NOT return a structured result on failure
- Does NOT call `onError` or `onFailure`
- Useful for "let it crash" / fail-early patterns where the caller handles errors externally

**failLate**
- Collects all errors in `errors` array
- Sets `failure: true` after loop completes (only if `errors.length > 0`)
- Calls `onFailure(true)` if `failure` is truthy
- Sets `failure: {errors}` after loop completes (only if `errors.length > 0`)
- Calls `onFailure({errors})` if `failure` is truthy

**collect** (default for `series` and `filter`)
- Collects all errors in `errors` array
Expand Down Expand Up @@ -62,6 +63,11 @@ All iteration functions (`series`, `filter`, `scan`) support four error strategi
- Always returns a predictable object: `{ results, errors, failure }`.
- Errors are treated as data, removing the need for consumer-side `try/catch` blocks.

* **Contextual Callbacks**
- `onProgress({item, result, index, total})` runs after each successful item.
- `onError({item, error, index, total})` runs for handled item errors.
- `total` is included only when Pipelean can know it cheaply, or when the caller passes `total`.

* **Order Guarantee**
- Because execution is sequential, output order strictly matches input order (no race conditions).

Expand All @@ -71,21 +77,21 @@ All iteration functions (`series`, `filter`, `scan`) support four error strategi

#### Composition: pipe

Combine multiple filter predicates into a single reusable filter:
Compose multiple operations into a single reusable function for `series()`:

```js
import { filter, pipe } from 'pipelean'
import { series, pipe } from 'pipelean'

const isValid = pipe(
(user) => user.age >= 18,
(user) => user.email.includes('@'),
(user) => !user.blocked
const normalizeActiveUser = pipe(
user => user.active ? user : undefined,
user => user.email,
email => email.toLowerCase()
)

const adults = await filter(isValid, users)
const result = await series(users, normalizeActiveUser)
```

**Undefined Short-Circuit**: When any step returns `undefined`, remaining steps are skipped and `undefined` propagates out. Combined with `series` (which drops items when the operation returns `undefined`), this merges transformation and selection in a single pass:
`pipe()` is Pipelean's operation composer. It chains functions left-to-right and preserves Pipelean's drop signal: when any step returns `undefined`, remaining steps are skipped and `undefined` propagates out. Combined with `series` (which drops items when the operation returns `undefined`), this merges transformation and selection in a single pass:

```js
import { series, pipe } from 'pipelean'
Expand Down Expand Up @@ -120,7 +126,7 @@ Pipelean also provides lightweight wrappers that add behavior to **individual fu
## Key Principles

1. **`onError` ≠ error strategy**: `onError` is a callback, not a strategy
2. **`failure` is truthy for**: `failFast` ({item, error}) and `failLate` (true)
2. **`failure` is truthy for**: `failFast` (`{item, error, index}`) and `failLate` (`{errors}`)
3. **`failure` is falsy for**: `collect`, `skip`, and `throw` on success
4. **`throw` does not return on error**: It propagates the error to the caller
5. **Strategy selection**: Choose based on whether failures are acceptable
Expand Down
167 changes: 167 additions & 0 deletions docs/migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,173 @@

How to replace common imperative and error-prone patterns with pipelean equivalents.

## 0.7: `series()` callbacks receive context objects

Pipelean 0.7 changes `series()` lifecycle callbacks from raw values to named context objects. This is a breaking change, but it makes app tasks easier to write because UI progress and error reporting get the item, index, result/error, and known total in one place.

### `onProgress(result)` → `onProgress({result})`

**Before:**
```js
await series(items, fn, {
onProgress: result => updateUi(result),
})
```

**After:**
```js
await series(items, fn, {
onProgress: ({result}) => updateUi(result),
})
```

The full progress payload is:

```js
{item, result, index, total}
```

`total` is omitted when Pipelean cannot know it cheaply. Pass `total` explicitly when the planned count comes from your app:

```js
await series(items, fn, {
total: items.length,
onProgress: ({index, total}) => updateProgress(index + 1, total),
})
```

If `take` is set, callback `total` means planned execution total:

```js
await series(items, fn, {
take: 10,
total: 100,
onProgress: ({total}) => {
// total is 10
},
})
```

### `onError(error)` → `onError({error})`

**Before:**
```js
await series(items, fn, {
strategy: collect,
onError: error => report(error),
})
```

**After:**
```js
await series(items, fn, {
strategy: collect,
onError: ({item, error, index}) => report({item, error, index}),
})
```

Collected errors now also include `index`:

```js
const {errors} = await series(items, fn, {strategy: collect})
// errors = [{item, error, index}]
```

### `failFast` failure includes `index`

**Before:**
```js
const result = await series(items, fn, {strategy: failFast})
// result.failure = {item, error}
```

**After:**
```js
const result = await series(items, fn, {strategy: failFast})
// result.failure = {item, error, index}
```

`onFailure` receives the same shape:

```js
await series(items, fn, {
strategy: failFast,
onFailure: ({item, error, index}) => showItemError(item, error, index),
})
```

### `failLate` failure changes from `true` to `{errors}`

**Before:**
```js
const result = await series(items, fn, {strategy: failLate})

if (result.failure === true) {
showToast('Some items failed')
}
```

**After:**
```js
const result = await series(items, fn, {strategy: failLate})

if (result.failure) {
showToast(`${result.failure.errors.length} items failed`)
}
```

`onFailure` receives `{errors}`:

```js
await series(items, fn, {
strategy: failLate,
onFailure: ({errors}) => showToast(`${errors.length} items failed`),
})
```

### `throw` does not call lifecycle callbacks

In `series()`, `throw` now throws the original error immediately and does not call `onError` or `onFailure`.

```js
await series(items, fn, {
strategy: throw_,
onError: () => {
// not called
},
onFailure: () => {
// not called
},
})
```

### App task progress example

```js
const albumsToSync = await getAlbumsByStatus({statusFilter})

const result = await series(albumsToSync, album => importAlbum({
sourceId: album.sourceId,
libraryId: album.libraryId,
fast,
}), {
take: limit,
strategy: collect,
onProgress: ({index, total}) => {
operation.sync.total = total
operation.sync.current = index + 1
},
onError: ({item, error}) => {
reportAlbumImportError({
sourceId: item.sourceId,
title: item.title,
name: error.name,
content: error.message,
})
},
})
```

## for-loop with try/catch → series

**Before:**
Expand Down
37 changes: 34 additions & 3 deletions docs/patterns.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
```javascript
await series(items, fn, {
strategy: collect,
onError: (error) => logger.error(error)
onError: ({item, error}) => logger.error({item, error})
})
// Check failure manually: if (result.errors.length > 0) { ... }
```
Expand All @@ -15,7 +15,7 @@ await series(items, fn, {
```javascript
await series(items, fn, {
strategy: skip,
onError: (error) => metrics.increment('errors')
onError: ({item, error}) => metrics.increment('errors', {item, error})
})
// Result has no errors array, failure is false
```
Expand All @@ -38,7 +38,7 @@ await series(items, fn, {
const withErrorHandling = (opts) => ({
...opts,
onFailure: (failure) => {
if (failure === true) {
if (failure.errors) {
showToast('Some items failed')
} else {
showToast(`Error: ${failure.error.message}`)
Expand All @@ -49,3 +49,34 @@ const withErrorHandling = (opts) => ({

await series(items, fn, withErrorHandling({strategy: failFast}))
```

### Pattern 5: App Task + UI Progress

Use `series()` when an app task needs one loop, predictable errors, and progress updates. The query and UI state stay in the app; Pipelean owns the iteration, callback timing, and final outcome.

```javascript
const albumsToSync = await getAlbumsByStatus({statusFilter})

const result = await series(albumsToSync, album => importAlbum({
sourceId: album.sourceId,
libraryId: album.libraryId,
fast,
}), {
take: limit,
strategy: collect,
onProgress: ({index, total}) => {
operation.sync.total = total
operation.sync.current = index + 1
},
onError: ({item, error}) => {
reportAlbumImportError({
sourceId: item.sourceId,
title: item.title,
name: error.name,
content: error.message,
})
},
})
```

If `total` is unknown, Pipelean omits the key. Pass `total` explicitly when the planned count comes from a database query or another app-level source.
Loading