From 008a6e8cf13e2566cac0bd581d2a1c8398ab70c9 Mon Sep 17 00:00:00 2001 From: Daniele Dellafiore <66707+ildella@users.noreply.github.com> Date: Tue, 12 May 2026 15:29:09 +0200 Subject: [PATCH 1/4] breaking changes to support better common scenario in eager-iteration on async iterators, better onProgressm onError and onFailure signatures, all named paremeters. --- docs/guide.md | 16 +++- docs/migration.md | 167 +++++++++++++++++++++++++++++++++ docs/patterns.md | 37 +++++++- docs/reference.md | 58 +++++++----- package.json | 4 +- src/functional.js | 57 +++++++---- tests/error-strategies.test.js | 134 ++++++++++++++++++++++++-- tests/filter.test.js | 6 +- tests/onFailure.test.js | 36 +++++-- tests/series.test.js | 94 ++++++++++++++++++- 10 files changed, 526 insertions(+), 83 deletions(-) diff --git a/docs/guide.md b/docs/guide.md index 795bcc3..c2ced65 100644 --- a/docs/guide.md +++ b/docs/guide.md @@ -18,19 +18,20 @@ Pipelean provides four main tools, grouped by **data flow direction** (horizonta All iteration functions (`series`, `filter`, `scan`) support four error strategies: **failFast** (aliases: `fail`, `stopOnError`) -- Sets `failure: {item, error}` on first error -- Calls `onFailure({item, error})` immediately +- Sets `failure: {item, error, index}` on first error +- Calls `onError({item, error, index, total})`, then `onFailure({item, error, index})` immediately - Stops iteration; results array is empty on failure **throw** - Throws the error on first failure - Does NOT return a structured result on failure +- Does NOT call `onError` or `onFailure` - Useful for "let it crash" / fail-early patterns where the caller handles errors externally **failLate** - Collects all errors in `errors` array -- Sets `failure: true` after loop completes (only if `errors.length > 0`) -- Calls `onFailure(true)` if `failure` is truthy +- Sets `failure: {errors}` after loop completes (only if `errors.length > 0`) +- Calls `onFailure({errors})` if `failure` is truthy **collect** (default for `series` and `filter`) - Collects all errors in `errors` array @@ -62,6 +63,11 @@ All iteration functions (`series`, `filter`, `scan`) support four error strategi - Always returns a predictable object: `{ results, errors, failure }`. - Errors are treated as data, removing the need for consumer-side `try/catch` blocks. +* **Contextual Callbacks** + - `onProgress({item, result, index, total})` runs after each successful item. + - `onError({item, error, index, total})` runs for handled item errors. + - `total` is included only when Pipelean can know it cheaply, or when the caller passes `total`. + * **Order Guarantee** - Because execution is sequential, output order strictly matches input order (no race conditions). @@ -120,7 +126,7 @@ Pipelean also provides lightweight wrappers that add behavior to **individual fu ## Key Principles 1. **`onError` ≠ error strategy**: `onError` is a callback, not a strategy -2. **`failure` is truthy for**: `failFast` ({item, error}) and `failLate` (true) +2. **`failure` is truthy for**: `failFast` (`{item, error, index}`) and `failLate` (`{errors}`) 3. **`failure` is falsy for**: `collect`, `skip`, and `throw` on success 4. **`throw` does not return on error**: It propagates the error to the caller 5. **Strategy selection**: Choose based on whether failures are acceptable diff --git a/docs/migration.md b/docs/migration.md index b7647b2..4769dae 100644 --- a/docs/migration.md +++ b/docs/migration.md @@ -2,6 +2,173 @@ How to replace common imperative and error-prone patterns with pipelean equivalents. +## 0.7: `series()` callbacks receive context objects + +Pipelean 0.7 changes `series()` lifecycle callbacks from raw values to named context objects. This is a breaking change, but it makes app tasks easier to write because UI progress and error reporting get the item, index, result/error, and known total in one place. + +### `onProgress(result)` → `onProgress({result})` + +**Before:** +```js +await series(items, fn, { + onProgress: result => updateUi(result), +}) +``` + +**After:** +```js +await series(items, fn, { + onProgress: ({result}) => updateUi(result), +}) +``` + +The full progress payload is: + +```js +{item, result, index, total} +``` + +`total` is omitted when Pipelean cannot know it cheaply. Pass `total` explicitly when the planned count comes from your app: + +```js +await series(items, fn, { + total: items.length, + onProgress: ({index, total}) => updateProgress(index + 1, total), +}) +``` + +If `take` is set, callback `total` means planned execution total: + +```js +await series(items, fn, { + take: 10, + total: 100, + onProgress: ({total}) => { + // total is 10 + }, +}) +``` + +### `onError(error)` → `onError({error})` + +**Before:** +```js +await series(items, fn, { + strategy: collect, + onError: error => report(error), +}) +``` + +**After:** +```js +await series(items, fn, { + strategy: collect, + onError: ({item, error, index}) => report({item, error, index}), +}) +``` + +Collected errors now also include `index`: + +```js +const {errors} = await series(items, fn, {strategy: collect}) +// errors = [{item, error, index}] +``` + +### `failFast` failure includes `index` + +**Before:** +```js +const result = await series(items, fn, {strategy: failFast}) +// result.failure = {item, error} +``` + +**After:** +```js +const result = await series(items, fn, {strategy: failFast}) +// result.failure = {item, error, index} +``` + +`onFailure` receives the same shape: + +```js +await series(items, fn, { + strategy: failFast, + onFailure: ({item, error, index}) => showItemError(item, error, index), +}) +``` + +### `failLate` failure changes from `true` to `{errors}` + +**Before:** +```js +const result = await series(items, fn, {strategy: failLate}) + +if (result.failure === true) { + showToast('Some items failed') +} +``` + +**After:** +```js +const result = await series(items, fn, {strategy: failLate}) + +if (result.failure) { + showToast(`${result.failure.errors.length} items failed`) +} +``` + +`onFailure` receives `{errors}`: + +```js +await series(items, fn, { + strategy: failLate, + onFailure: ({errors}) => showToast(`${errors.length} items failed`), +}) +``` + +### `throw` does not call lifecycle callbacks + +In `series()`, `throw` now throws the original error immediately and does not call `onError` or `onFailure`. + +```js +await series(items, fn, { + strategy: throw_, + onError: () => { + // not called + }, + onFailure: () => { + // not called + }, +}) +``` + +### App task progress example + +```js +const albumsToSync = await getAlbumsByStatus({statusFilter}) + +const result = await series(albumsToSync, album => importAlbum({ + sourceId: album.sourceId, + libraryId: album.libraryId, + fast, +}), { + take: limit, + strategy: collect, + onProgress: ({index, total}) => { + operation.sync.total = total + operation.sync.current = index + 1 + }, + onError: ({item, error}) => { + reportAlbumImportError({ + sourceId: item.sourceId, + title: item.title, + name: error.name, + content: error.message, + }) + }, +}) +``` + ## for-loop with try/catch → series **Before:** diff --git a/docs/patterns.md b/docs/patterns.md index 0353e38..2201665 100644 --- a/docs/patterns.md +++ b/docs/patterns.md @@ -5,7 +5,7 @@ ```javascript await series(items, fn, { strategy: collect, - onError: (error) => logger.error(error) + onError: ({item, error}) => logger.error({item, error}) }) // Check failure manually: if (result.errors.length > 0) { ... } ``` @@ -15,7 +15,7 @@ await series(items, fn, { ```javascript await series(items, fn, { strategy: skip, - onError: (error) => metrics.increment('errors') + onError: ({item, error}) => metrics.increment('errors', {item, error}) }) // Result has no errors array, failure is false ``` @@ -38,7 +38,7 @@ await series(items, fn, { const withErrorHandling = (opts) => ({ ...opts, onFailure: (failure) => { - if (failure === true) { + if (failure.errors) { showToast('Some items failed') } else { showToast(`Error: ${failure.error.message}`) @@ -49,3 +49,34 @@ const withErrorHandling = (opts) => ({ await series(items, fn, withErrorHandling({strategy: failFast})) ``` + +### Pattern 5: App Task + UI Progress + +Use `series()` when an app task needs one loop, predictable errors, and progress updates. The query and UI state stay in the app; Pipelean owns the iteration, callback timing, and final outcome. + +```javascript +const albumsToSync = await getAlbumsByStatus({statusFilter}) + +const result = await series(albumsToSync, album => importAlbum({ + sourceId: album.sourceId, + libraryId: album.libraryId, + fast, +}), { + take: limit, + strategy: collect, + onProgress: ({index, total}) => { + operation.sync.total = total + operation.sync.current = index + 1 + }, + onError: ({item, error}) => { + reportAlbumImportError({ + sourceId: item.sourceId, + title: item.title, + name: error.name, + content: error.message, + }) + }, +}) +``` + +If `total` is unknown, Pipelean omits the key. Pass `total` explicitly when the planned count comes from a database query or another app-level source. diff --git a/docs/reference.md b/docs/reference.md index f760d7c..4bf3b11 100644 --- a/docs/reference.md +++ b/docs/reference.md @@ -52,11 +52,11 @@ **Use when**: Critical operations where failure means the entire pipeline is invalid. **Behavior**: -- Sets `failure: {item, error}` on first error -- Calls `onFailure({item, error})` immediately +- Sets `failure: {item, error, index}` on first error +- Calls `onError({item, error, index, total})`, then `onFailure({item, error, index})` immediately - Stops iteration -**Return Format**: `{results: [], errors: [], failure: {item, error}}` +**Return Format**: `{results: [], errors: [], failure: {item, error, index}}` **Example**: ```javascript @@ -67,7 +67,7 @@ const result = await series([1, 2, 3], async item => { return item * 2 }, {strategy: failFast}) -// result = {results: [], errors: [], failure: {item: 2, error: Error(...)}} +// result = {results: [], errors: [], failure: {item: 2, error: Error(...), index: 1}} ``` --- @@ -81,7 +81,7 @@ const result = await series([1, 2, 3], async item => { **Behavior**: - Throws the error on first failure - Does NOT call `onFailure` -- `onError` is still called if present (in `series`; `scan` calls it before throw) +- Does NOT call `onError` in `series` **Return Format**: On success: `{results, errors: [], failure: false}` @@ -112,7 +112,7 @@ try { - Sets `failure: false` - Does NOT call `onFailure` -**Return Format**: `{results, errors: [...], failure: false}` +**Return Format**: `{results, errors: [{item, error, index}], failure: false}` **Example**: ```javascript @@ -123,22 +123,22 @@ const result = await series([1, 2, 3], async item => { return item * 2 }, {strategy: collect}) -// result = {results: [2, 6], errors: [{item: 2, error: ...}, {item: 4, error: ...}], failure: false} +// result = {results: [2, 6], errors: [{item: 2, error: ..., index: 1}], failure: false} ``` --- ### failLate -**Purpose**: Error strategy identifier that collects all errors and returns `failure: true` at the end. +**Purpose**: Error strategy identifier that collects all errors and returns `failure: {errors}` at the end. **Use when**: Application-layer needs to detect if *any* error occurred. **Behavior**: - Collects all errors in `errors` array -- Sets `failure: true` after loop completes (only if `errors.length > 0`) -- Calls `onFailure(true)` if `failure` is truthy +- Sets `failure: {errors}` after loop completes (only if `errors.length > 0`) +- Calls `onFailure({errors})` if `failure` is truthy -**Return Format**: `{results, errors: [...], failure: true}` (if any errors occurred) +**Return Format**: `{results, errors: [{item, error, index}], failure: {errors}}` (if any errors occurred) **Example**: ```javascript @@ -149,7 +149,7 @@ const result = await series([1, 2, 3], async item => { return item * 2 }, {strategy: failLate}) -// result = {results: [2, 6], errors: [{item: 2, error: ...}, {item: 4, error: ...}], failure: true} +// result = {results: [2, 6], errors: [{item: 2, error: ..., index: 1}], failure: {errors}} ``` --- @@ -186,14 +186,15 @@ const result = await series([1, 2, 3], async item => { Optional callback for verification/telemetry (logging, metrics). -- Called for **every** error +- Called for every handled item error in `series` - Does NOT affect control flow +- Receives `{item, error, index, total}`; `total` is omitted when unknown - Use for: logging, metrics, external error reporting ```javascript await series(items, fn, { strategy: skip, - onError: (error) => console.error('Error:', error.message) // Called for each error + onError: ({item, error}) => console.error('Error:', item.id, error.message) }) ``` @@ -205,19 +206,17 @@ Optional callback for application-layer error handling (UI updates, notification - Called when `failure` is truthy - Depends on strategy: - - `failFast`: called with `{item, error}` - - `failLate`: called with `true` + - `failFast`: called with `{item, error, index}` + - `failLate`: called with `{errors}` - `collect` / `skip`: NOT called (failure is false) ```javascript await series(items, fn, { - strategy: failFast, + strategy: failLate, onFailure: (failure) => { - if (failure === true) { - // failLate: show general error notification + if (failure.errors) { showToast('Some items failed') } else { - // failFast: show specific error with item showToast(`Item ${failure.item} failed: ${failure.error.message}`) } } @@ -240,14 +239,15 @@ Curried: `(fn, opts?) => (items) => Promise` - `fn(item, index)`: The function to apply. Returns the mapped value, or throws, or returns `undefined` to drop the item. - `opts` (optional): - `strategy`: Error strategy (default: `collect`). `failFast`, `collect`, `failLate`, `skip`, `throw_`. - - `onProgress(result)`: Called after each successful item with the mapped result. NOT called for errors or `undefined` drops. - - `onError(error)`: Called for each error. Does not affect control flow. - - `onFailure(failure)`: Called when failure is truthy. Receives `{item, error}` for `failFast`, `true` for `failLate`. + - `onProgress({item, result, index, total})`: Called after each successful item. NOT called for errors or `undefined` drops. + - `onError({item, error, index, total})`: Called for each handled item error. Does not affect control flow. + - `onFailure(failure)`: Called when failure is truthy. Receives `{item, error, index}` for `failFast`, `{errors}` for `failLate`. - `take`: Limit items processed. + - `total`: Explicit planned input count for progress/error callbacks. If omitted, `series` uses a cheap known input size when available. If `take` is set, callback `total` is limited to `Math.min(take, knownTotal)`. If no total is known, the `total` key is omitted. - `pause`: Milliseconds between successful items. - `pauseOnErrors`: Whether to also pause after errors (default: `false`). -**Return**: `{results, errors, failure}` where `failure` is `false` on success. +**Return**: `{results, errors, failure}` where `failure` is `false` on success. Collected `errors` are `{item, error, index}`. **Usage Example**: ```javascript @@ -263,6 +263,12 @@ const result = await series(items, fn, { strategy: failFast }) // With pause for rate limiting const result = await series(endpoints, fn, { pause: 500 }) +// With UI progress +await series(items, saveItem, { + onProgress: ({index, total}) => updateProgress(index + 1, total), + onError: ({item, error}) => reportItemError(item.id, error), +}) + // Curried: create a reusable transform const double = series(x => x * 2) const { results } = await double([1, 2, 3]) @@ -327,11 +333,11 @@ const { results, errors } = await scan( - `items`: The iterable to filter (immediate mode) - `opts` (optional): Options passed through to `series` -**Options**: Same as `series` — `strategy`, `onError`, `onFailure`, `take`, `onProgress`, `pause`, `pauseOnErrors`. +**Options**: Same as `series` — `strategy`, `onError`, `onFailure`, `take`, `total`, `onProgress`, `pause`, `pauseOnErrors`. **Return Type**: `{ results, errors, failure }` — same shape as `series`: - `results`: Original items where the predicate returned truthy -- `failure`: `false` on success (no errors), `{item, error}` for `failFast`, `true` for `failLate` +- `failure`: `false` on success (no errors), `{item, error, index}` for `failFast`, `{errors}` for `failLate` **Key Characteristics**: - The predicate's return value is never placed into `results` — only truthiness is checked, and the original `item` is what gets kept or dropped. diff --git a/package.json b/package.json index 161ba3c..d14d409 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "pipelean", - "version": "0.6.0", + "version": "0.7.0-0", "description": "A pragmatic library for sequential async operations with first-class error handling.", "type": "module", "license": "MIT", @@ -52,5 +52,5 @@ "globals": "15.15.0", "vitest": "4.1.5" }, - "stableVersion": "0.5.0" + "stableVersion": "0.6.0" } diff --git a/src/functional.js b/src/functional.js index b7b5a1a..59c38fe 100644 --- a/src/functional.js +++ b/src/functional.js @@ -55,6 +55,24 @@ export const retry = (fn, {attempts = 3, delay: delayMs = 0} = {}) => export const where = pattern => item => Object.entries(pattern).every(([key, value]) => item[key] === value) +const getKnownTotal = (items, total) => + total !== undefined ? total : items.length + +const getPlannedTotal = ({items, take, total}) => { + const knownTotal = getKnownTotal(items, total) + + if (knownTotal === undefined) + return undefined + + if (take === undefined) + return knownTotal + + return Math.min(take, knownTotal) +} + +const withTotal = (payload, total) => + total === undefined ? payload : {...payload, total} + // eslint-disable-next-line max-lines-per-function export const series = (...args) => { const immediate = typeof args[0] !== 'function' @@ -63,25 +81,19 @@ export const series = (...args) => { // eslint-disable-next-line complexity, max-statements const run = async inputItems => { const { - strategy = collect, + strategy = collect, total, take, onProgress, onError, onFailure, pause, pauseOnErrors = false, } = opts const results = [] const errors = [] + const strategyName = strategy.name ?? strategy + const plannedTotal = getPlannedTotal({items: inputItems, take, total}) - // Wrap the function ONCE and handle onProgress/onError; rethrows so - // 'series' can handle the chosen strategy. const runFn = async (item, index) => { - try { - const result = await fn(item, index) - if (onProgress && result !== undefined) - await onProgress(result) - return result - } catch (error) { - if (onError) - await onError(error) - throw error - } + const result = await fn(item, index) + if (onProgress && result !== undefined) + await onProgress(withTotal({item, result, index}, plannedTotal)) + return result } let index = 0 @@ -104,17 +116,20 @@ export const series = (...args) => { await delay(pause) } } catch (error) { - const strategyName = strategy.name ?? strategy - if (strategyName === 'throw') { throw error } + const errorContext = {item, error, index} + + if (onError) + await onError(withTotal(errorContext, plannedTotal)) + if (strategyName === 'failFast') { if (onFailure) { - onFailure({item, error}) + onFailure(errorContext) } - return {results: [], errors, failure: {item, error}} + return {results: [], errors, failure: errorContext} } if (strategyName === 'skip') { @@ -125,7 +140,7 @@ export const series = (...args) => { continue } - errors.push({item, error}) + errors.push(errorContext) if (pause && pauseOnErrors) { await delay(pause) } @@ -133,10 +148,12 @@ export const series = (...args) => { index++ } - failure = !!(strategy.name === 'failLate' && errors.length > 0) + failure = strategyName === 'failLate' && errors.length > 0 + ? {errors} + : false if (failure && onFailure) { - onFailure(true) + onFailure(failure) } return {results, errors, failure} diff --git a/tests/error-strategies.test.js b/tests/error-strategies.test.js index f08e113..6dd7a60 100644 --- a/tests/error-strategies.test.js +++ b/tests/error-strategies.test.js @@ -1,9 +1,11 @@ -import {test, expect} from 'vitest' +/* eslint-disable max-lines */ +import {test, expect, vi} from 'vitest' import { failFast, collect, failLate, skip, + throw_, fail, stopOnError, series, @@ -51,7 +53,7 @@ test('strategies are distinct references', () => { expect(failLate).not.toBe(skip) }) -test('failLate: collects all errors and returns failure: true', async () => { +test('failLate: collects all errors and returns failure context', async () => { const items = [1, 2, 3, 4] const fn = item => { if (item === 2 || item === 4) @@ -63,9 +65,17 @@ test('failLate: collects all errors and returns failure: true', async () => { expect(result.results).toEqual([2, 6]) expect(result.errors).toHaveLength(2) - expect(result.errors[0]).toEqual({item: 2, error: new Error('Error at 2')}) - expect(result.errors[1]).toEqual({item: 4, error: new Error('Error at 4')}) - expect(result.failure).toBe(true) + expect(result.errors[0]).toEqual({ + item: 2, + error: new Error('Error at 2'), + index: 1, + }) + expect(result.errors[1]).toEqual({ + item: 4, + error: new Error('Error at 4'), + index: 3, + }) + expect(result.failure).toEqual({errors: result.errors}) }) test('failLate: no errors returns failure: false', async () => { @@ -110,7 +120,12 @@ test('skip: calls onError if present', async () => { }) expect(onErrorCalls).toHaveLength(1) - expect(onErrorCalls[0]).toEqual(new Error('Error at 2')) + expect(onErrorCalls[0]).toEqual({ + item: 2, + error: new Error('Error at 2'), + index: 1, + total: 3, + }) expect(result.results).toEqual([2, 6]) expect(result.errors).toHaveLength(0) expect(result.failure).toBe(false) @@ -131,6 +146,7 @@ test('fail alias works as failFast in series', async () => { expect(result.failure).toEqual({ item: 2, error: new Error('Error at 2'), + index: 1, }) }) @@ -149,6 +165,7 @@ test('stopOnError alias works as failFast in series', async () => { expect(result.failure).toEqual({ item: 2, error: new Error('Error at 2'), + index: 1, }) }) @@ -164,9 +181,17 @@ test('failLate works with filter', async () => { expect(result.results).toEqual([1, 3, 5]) expect(result.errors).toHaveLength(2) - expect(result.errors[0]).toEqual({item: 2, error: new Error('Error at 2')}) - expect(result.errors[1]).toEqual({item: 4, error: new Error('Error at 4')}) - expect(result.failure).toBe(true) + expect(result.errors[0]).toEqual({ + item: 2, + error: new Error('Error at 2'), + index: 1, + }) + expect(result.errors[1]).toEqual({ + item: 4, + error: new Error('Error at 4'), + index: 3, + }) + expect(result.failure).toEqual({errors: result.errors}) }) test('skip works with filter', async () => { @@ -197,7 +222,96 @@ test('failLate with series returns success before error', async () => { // All items processed: 1, 2, 4, 5 succeed, 3 fails expect(result.results).toEqual([2, 4, 8, 10]) expect(result.errors).toHaveLength(1) - expect(result.failure).toBe(true) + expect(result.failure).toEqual({errors: result.errors}) +}) + +test('collect onError strategy', async () => { + const onError = vi.fn() + const bang = new Error('bang') + const result = await series([1, 2, 3], item => { + if (item === 2) + throw bang + return item * 10 + }, {strategy: collect, onError}) + + expect(onError).toHaveBeenCalledWith({ + item: 2, + error: bang, + index: 1, + total: 3, + }) + expect(result.errors).toEqual([{item: 2, error: bang, index: 1}]) + expect(result.failure).toBe(false) +}) + +test('skip calls onError with context without storing errors', async () => { + const onError = vi.fn() + const bang = new Error('bang') + const result = await series([1, 2, 3], item => { + if (item === 2) + throw bang + return item * 10 + }, {strategy: skip, onError}) + + expect(onError).toHaveBeenCalledWith({ + item: 2, + error: bang, + index: 1, + total: 3, + }) + expect(result.errors).toEqual([]) + expect(result.failure).toBe(false) +}) + +test('failFast calls onError and onFailure with item context', async () => { + const onError = vi.fn() + const onFailure = vi.fn() + const bang = new Error('bang') + const result = await series([1, 2, 3], item => { + if (item === 2) + throw bang + return item * 10 + }, {strategy: failFast, onError, onFailure}) + + expect(onError).toHaveBeenCalledWith({ + item: 2, + error: bang, + index: 1, + total: 3, + }) + expect(onFailure).toHaveBeenCalledWith({item: 2, error: bang, index: 1}) + expect(result.failure).toEqual({item: 2, error: bang, index: 1}) +}) + +test('failLate calls onFailure with collected contextual errors', async () => { + const onFailure = vi.fn() + const result = await series([1, 2, 3], item => { + if (item > 1) + throw new Error(`Error at ${item}`) + return item * 10 + }, {strategy: failLate, onFailure}) + + expect(result.errors).toEqual([ + {item: 2, error: new Error('Error at 2'), index: 1}, + {item: 3, error: new Error('Error at 3'), index: 2}, + ]) + expect(onFailure).toHaveBeenCalledWith({errors: result.errors}) + expect(result.failure).toEqual({errors: result.errors}) +}) + +test('throw strategy throws without calling onError or onFailure', async () => { + const onError = vi.fn() + const onFailure = vi.fn() + const bang = new Error('bang') + + await expect(series([1, 2, 3], item => { + if (item === 2) + throw bang + return item * 10 + }, {strategy: throw_, onError, onFailure})).rejects.toThrow(bang) + + expect(onError).not.toHaveBeenCalled() + expect(onFailure).not.toHaveBeenCalled() }) test('collect strategy still works (failure: false)', async () => { diff --git a/tests/filter.test.js b/tests/filter.test.js index 7ed8a92..78028bd 100644 --- a/tests/filter.test.js +++ b/tests/filter.test.js @@ -19,7 +19,7 @@ test('predicate throws with failFast stops and populates failure', async () => { return true }, {strategy: 'failFast'}) expect(result.results).toEqual([]) - expect(result.failure).toEqual({item: 2, error: bang}) + expect(result.failure).toEqual({item: 2, error: bang, index: 1}) expect(result.errors).toEqual([]) }) @@ -33,8 +33,8 @@ test('predicate throws with default collect collects errors', async () => { expect(result.results).toEqual([1, 3]) expect(result.failure).toBe(false) expect(result.errors).toEqual([ - {item: 2, error: bang}, - {item: 4, error: bang}, + {item: 2, error: bang, index: 1}, + {item: 4, error: bang, index: 3}, ]) }) diff --git a/tests/onFailure.test.js b/tests/onFailure.test.js index 7dd7536..32d9896 100644 --- a/tests/onFailure.test.js +++ b/tests/onFailure.test.js @@ -33,7 +33,7 @@ const scannerThatFailsAt = failItem => (acc, item) => { return acc + item } -test('onFailure called for failFast with {item, error}', async () => { +test('onFailure called for failFast with item context', async () => { const onFailure = vi.fn() const items = [1, 2, 3] @@ -46,15 +46,17 @@ test('onFailure called for failFast with {item, error}', async () => { expect(onFailure).toHaveBeenCalledWith({ item: 2, error: new Error('Error at 2'), + index: 1, }) expect(result.failure).toEqual({ item: 2, error: new Error('Error at 2'), + index: 1, }) expect(result.results).toEqual([]) }) -test('onFailure called for failLate with true', async () => { +test('onFailure called for failLate with errors context', async () => { const onFailure = vi.fn() const items = [1, 2, 3, 4] @@ -64,10 +66,13 @@ test('onFailure called for failLate with true', async () => { }) expect(onFailure).toHaveBeenCalledTimes(1) - expect(onFailure).toHaveBeenCalledWith(true) - expect(result.failure).toBe(true) + expect(onFailure).toHaveBeenCalledWith({errors: result.errors}) + expect(result.failure).toEqual({errors: result.errors}) expect(result.results).toEqual([2, 6]) - expect(result.errors).toHaveLength(2) + expect(result.errors).toEqual([ + {item: 2, error: new Error('Error at 2'), index: 1}, + {item: 4, error: new Error('Error at 4'), index: 3}, + ]) }) test('onFailure NOT called for collect (failure: false)', async () => { @@ -109,6 +114,7 @@ test('onFailure is optional', async () => { expect(result.failure).toEqual({ item: 2, error: new Error('Error at 2'), + index: 1, }) }) @@ -125,10 +131,12 @@ test('onFailure works with filter', async () => { expect(onFailure).toHaveBeenCalledWith({ item: 3, error: new Error('Error at 3'), + index: 2, }) expect(result.failure).toEqual({ item: 3, error: new Error('Error at 3'), + index: 2, }) expect(result.results).toEqual([]) }) @@ -143,10 +151,13 @@ test('onFailure works with filter and failLate', async () => { }) expect(onFailure).toHaveBeenCalledTimes(1) - expect(onFailure).toHaveBeenCalledWith(true) - expect(result.failure).toBe(true) + expect(onFailure).toHaveBeenCalledWith({errors: result.errors}) + expect(result.failure).toEqual({errors: result.errors}) expect(result.results).toEqual([1, 3, 5]) - expect(result.errors).toHaveLength(2) + expect(result.errors).toEqual([ + {item: 2, error: new Error('Error at 2'), index: 1}, + {item: 4, error: new Error('Error at 4'), index: 3}, + ]) }) test('onFailure works with scan', async () => { @@ -192,10 +203,12 @@ test('Application-layer wrapper with default onFailure', async () => { expect(lastFailure).toEqual({ item: 2, error: new Error('Error at 2'), + index: 1, }) expect(result.failure).toEqual({ item: 2, error: new Error('Error at 2'), + index: 1, }) }) @@ -212,7 +225,12 @@ test('onFailure with skip strategy still allows onError', async () => { // onError should be called even with skip expect(onError).toHaveBeenCalledTimes(1) - expect(onError).toHaveBeenCalledWith(new Error('Error at 2')) + expect(onError).toHaveBeenCalledWith({ + item: 2, + error: new Error('Error at 2'), + index: 1, + total: 3, + }) // onFailure should NOT be called (failure is false) expect(onFailure).not.toHaveBeenCalled() diff --git a/tests/series.test.js b/tests/series.test.js index d176faa..6704da2 100644 --- a/tests/series.test.js +++ b/tests/series.test.js @@ -1,3 +1,4 @@ +/* eslint-disable max-lines */ import {test, expect} from 'vitest' import {series, collect} from '$src/functional' @@ -14,7 +15,7 @@ test('failFast stops on first error with no partial results', async () => { return x * 10 }, {strategy: 'failFast'}) expect(result.results).toEqual([]) - expect(result.failure).toEqual({item: 2, error: bang}) + expect(result.failure).toEqual({item: 2, error: bang, index: 1}) expect(result.errors).toEqual([]) }) @@ -26,7 +27,7 @@ test('collect continues past errors same as skip', async () => { return x * 10 }, {strategy: collect}) expect(result.results).toEqual([10, 30]) - expect(result.errors).toEqual([{item: 2, error: bang}]) + expect(result.errors).toEqual([{item: 2, error: bang, index: 1}]) expect(result.failure).toBe(false) }) @@ -165,7 +166,76 @@ test('calls onProgress after each successful item', async () => { onProgress: value => progress.push(value), }) expect(result.results).toEqual([10, 20, 30]) - expect(progress).toEqual([10, 20, 30]) + expect(progress).toEqual([ + { + item: 1, result: 10, index: 0, total: 3, + }, + { + item: 2, result: 20, index: 1, total: 3, + }, + { + item: 3, result: 30, index: 2, total: 3, + }, + ]) +}) + +test('onProgress uses take as planned total', async () => { + const progress = [] + const result = await series([1, 2, 3, 4, 5], x => x * 10, { + take: 2, + onProgress: value => progress.push(value), + }) + + expect(result.results).toEqual([10, 20]) + expect(progress).toEqual([ + { + item: 1, result: 10, index: 0, total: 2, + }, + { + item: 2, result: 20, index: 1, total: 2, + }, + ]) +}) + +test('onProgress uses explicit total override limited by take', async () => { + const progress = [] + const result = await series([1, 2, 3, 4, 5], x => x * 10, { + total: 10, + take: 3, + onProgress: value => progress.push(value), + }) + + expect(result.results).toEqual([10, 20, 30]) + expect(progress).toEqual([ + { + item: 1, result: 10, index: 0, total: 3, + }, + { + item: 2, result: 20, index: 1, total: 3, + }, + { + item: 3, result: 30, index: 2, total: 3, + }, + ]) +}) + +test('onProgress omits total for inputs without cheap size', async () => { + async function * numbers () { + yield 1 + yield 2 + } + + const progress = [] + const result = await series(numbers(), x => x * 10, { + onProgress: value => progress.push(value), + }) + + expect(result.results).toEqual([10, 20]) + expect(progress).toEqual([ + {item: 1, result: 10, index: 0}, + {item: 2, result: 20, index: 1}, + ]) + expect(Object.hasOwn(progress[0], 'total')).toBe(false) }) test('does not call onProgress for errored items', async () => { @@ -179,7 +249,14 @@ test('does not call onProgress for errored items', async () => { onProgress: value => progress.push(value), }) expect(result.results).toEqual([10, 30]) - expect(progress).toEqual([10, 30]) + expect(progress).toEqual([ + { + item: 1, result: 10, index: 0, total: 3, + }, + { + item: 3, result: 30, index: 2, total: 3, + }, + ]) }) test('does not call onProgress for undefined results', async () => { @@ -193,7 +270,14 @@ test('does not call onProgress for undefined results', async () => { }) expect(result.results).toEqual([10, 30]) // undefined items are dropped — onProgress should not be called for them - expect(progress).toEqual([10, 30]) + expect(progress).toEqual([ + { + item: 1, result: 10, index: 0, total: 3, + }, + { + item: 3, result: 30, index: 2, total: 3, + }, + ]) }) test('undefined result drops the item from results', async () => { From 90a12393e737782e77e8c207b9a9f8cd5c475b40 Mon Sep 17 00:00:00 2001 From: Daniele Dellafiore <66707+ildella@users.noreply.github.com> Date: Tue, 12 May 2026 17:45:25 +0200 Subject: [PATCH 2/4] scan updated to new signature for callback --- docs/reference.md | 2 +- src/functional.js | 33 +++++++++++++--------- tests/onFailure.test.js | 2 ++ tests/scan-reduce.test.js | 2 ++ tests/scan.test.js | 58 +++++++++++++++++++++++++++++++++++++-- 5 files changed, 81 insertions(+), 16 deletions(-) diff --git a/docs/reference.md b/docs/reference.md index 4bf3b11..b723e74 100644 --- a/docs/reference.md +++ b/docs/reference.md @@ -297,7 +297,7 @@ const { results } = await series(items, pipe( **Return Type**: A Promise that resolves to an object containing: - `results`: Array of intermediate results (or `[]` on failFast failure) - `errors`: Array of errors encountered (empty for failFast, skip, throw) -- `failure`: `false` on success; `{item, error}` for failFast; `true` for failLate +- `failure`: `false` on success; `{item, error, index}` for failFast; `{errors}` for failLate - `value`: Final accumulated value when `storePartialResults: false` (only on success) **Key Characteristics**: diff --git a/src/functional.js b/src/functional.js index 59c38fe..41360ac 100644 --- a/src/functional.js +++ b/src/functional.js @@ -201,7 +201,7 @@ export const filter = (...args) => { * strategy?: StrategyFn, onError?, onFailure?, storePartialResults?: boolean * }} opts */ -// eslint-disable-next-line complexity +// eslint-disable-next-line complexity, max-statements export const scan = async (iterable, scanner, initialValue, opts = {}) => { const { strategy = failFast, onError, onFailure, storePartialResults = true, @@ -209,45 +209,52 @@ export const scan = async (iterable, scanner, initialValue, opts = {}) => { const results = [] let acc = initialValue const errors = [] + const strategyName = strategy.name ?? strategy + const plannedTotal = getPlannedTotal({items: iterable}) + let index = 0 for await (const item of iterable) { try { - acc = await scanner(acc, item) + acc = await scanner(acc, item, index) if (storePartialResults) results.push(acc) } catch (error) { - const strategyName = strategy.name ?? strategy - - if (onError) { - await onError(error) - } + const errorContext = {item, error, index} if (strategyName === 'throw') { throw error } + if (onError) { + await onError(withTotal(errorContext, plannedTotal)) + } + if (strategyName === 'failFast') { if (onFailure) { - onFailure({item, error}) + onFailure(errorContext) } return storePartialResults - ? {results: [], errors, failure: {item, error}} - : {errors, failure: {item, error}} + ? {results: [], errors, failure: errorContext} + : {errors, failure: errorContext} } if (strategyName === 'skip') { + index++ continue } - errors.push({item, error}) + errors.push(errorContext) } + index++ } const failure = - strategy.name === 'failLate' && errors.length > 0 + strategyName === 'failLate' && errors.length > 0 + ? {errors} + : false if (failure && onFailure) { - onFailure(true) + onFailure(failure) } return storePartialResults diff --git a/tests/onFailure.test.js b/tests/onFailure.test.js index 32d9896..0685754 100644 --- a/tests/onFailure.test.js +++ b/tests/onFailure.test.js @@ -173,10 +173,12 @@ test('onFailure works with scan', async () => { expect(onFailure).toHaveBeenCalledWith({ item: 2, error: new Error('Error at 2'), + index: 1, }) expect(result.failure).toEqual({ item: 2, error: new Error('Error at 2'), + index: 1, }) expect(result.results).toEqual([]) }) diff --git a/tests/scan-reduce.test.js b/tests/scan-reduce.test.js index e72a8bb..bcace1f 100644 --- a/tests/scan-reduce.test.js +++ b/tests/scan-reduce.test.js @@ -40,6 +40,7 @@ test('failFast stops on error with storePartialResults false', async () => { expect(failure).toEqual({ item: {duration: 10}, error: new Error('Duration too long'), + index: 2, }) expect(errors).toHaveLength(0) }) @@ -65,6 +66,7 @@ test('failFast returns no value when accessing missing property', async () => { expect(value).toBeUndefined() expect(failure.item).toEqual({}) expect(failure.error.message).toBe('Missing duration') + expect(failure.index).toBe(1) expect(errors).toEqual([]) }) diff --git a/tests/scan.test.js b/tests/scan.test.js index 740ebd6..fe58acc 100644 --- a/tests/scan.test.js +++ b/tests/scan.test.js @@ -1,5 +1,5 @@ -import {test, expect} from 'vitest' -import {scan} from '$src/functional' +import {test, expect, vi} from 'vitest' +import {failLate, scan, throw_} from '$src/functional' test('threads accumulator through items', async () => { const {results} = await scan( @@ -30,3 +30,57 @@ test('works with async scanner', async () => { ) expect(results).toEqual([10, 30]) }) + +test('failFast calls onError and onFailure with item context', async () => { + const onError = vi.fn() + const onFailure = vi.fn() + const bang = new Error('bang') + + const result = await scan([1, 2, 3], (acc, item) => { + if (item === 2) + throw bang + return acc + item + }, 0, {onError, onFailure}) + + expect(onError).toHaveBeenCalledWith({ + item: 2, + error: bang, + index: 1, + total: 3, + }) + expect(onFailure).toHaveBeenCalledWith({item: 2, error: bang, index: 1}) + expect(result.failure).toEqual({item: 2, error: bang, index: 1}) + expect(result.results).toEqual([]) +}) + +test('failLate calls onFailure with collected contextual errors', async () => { + const onFailure = vi.fn() + + const result = await scan([1, 2, 3], (acc, item) => { + if (item > 1) + throw new Error(`Error at ${item}`) + return acc + item + }, 0, {strategy: failLate, onFailure}) + + expect(result.errors).toEqual([ + {item: 2, error: new Error('Error at 2'), index: 1}, + {item: 3, error: new Error('Error at 3'), index: 2}, + ]) + expect(onFailure).toHaveBeenCalledWith({errors: result.errors}) + expect(result.failure).toEqual({errors: result.errors}) +}) + +test('throw strategy throws without calling onError or onFailure', async () => { + const onError = vi.fn() + const onFailure = vi.fn() + const bang = new Error('bang') + + await expect(scan([1, 2, 3], (acc, item) => { + if (item === 2) + throw bang + return acc + item + }, 0, {strategy: throw_, onError, onFailure})).rejects.toThrow(bang) + + expect(onError).not.toHaveBeenCalled() + expect(onFailure).not.toHaveBeenCalled() +}) From d26d9bd6a62e473cb265219342a6d755d5733366 Mon Sep 17 00:00:00 2001 From: Daniele Dellafiore <66707+ildella@users.noreply.github.com> Date: Tue, 12 May 2026 17:55:53 +0200 Subject: [PATCH 3/4] updated docs for pipe() more consistent and clear --- docs/guide.md | 16 ++++++++-------- docs/reference.md | 29 +++++++++++++++-------------- 2 files changed, 23 insertions(+), 22 deletions(-) diff --git a/docs/guide.md b/docs/guide.md index c2ced65..d2e6596 100644 --- a/docs/guide.md +++ b/docs/guide.md @@ -77,21 +77,21 @@ All iteration functions (`series`, `filter`, `scan`) support four error strategi #### Composition: pipe -Combine multiple filter predicates into a single reusable filter: +Compose multiple operations into a single reusable function for `series()`: ```js -import { filter, pipe } from 'pipelean' +import { series, pipe } from 'pipelean' -const isValid = pipe( - (user) => user.age >= 18, - (user) => user.email.includes('@'), - (user) => !user.blocked +const normalizeActiveUser = pipe( + user => user.active ? user : undefined, + user => user.email, + email => email.toLowerCase() ) -const adults = await filter(isValid, users) +const result = await series(users, normalizeActiveUser) ``` -**Undefined Short-Circuit**: When any step returns `undefined`, remaining steps are skipped and `undefined` propagates out. Combined with `series` (which drops items when the operation returns `undefined`), this merges transformation and selection in a single pass: +`pipe()` is Pipelean's operation composer. It chains functions left-to-right and preserves Pipelean's drop signal: when any step returns `undefined`, remaining steps are skipped and `undefined` propagates out. Combined with `series` (which drops items when the operation returns `undefined`), this merges transformation and selection in a single pass: ```js import { series, pipe } from 'pipelean' diff --git a/docs/reference.md b/docs/reference.md index b723e74..4a9c79d 100644 --- a/docs/reference.md +++ b/docs/reference.md @@ -360,12 +360,12 @@ const adults = await filter( ### pipe -**Purpose**: Vertical composition tool - chains functions left-to-right (Unix pipe pattern). +**Purpose**: Pipelean operation composer - chains functions left-to-right and preserves Pipelean's `undefined` drop signal. **Type**: `(...fns) => (input) => Promise>` **Parameters**: -- Variadic arguments: Any number of async functions to execute sequentially +- Variadic arguments: Any number of sync or async functions to execute sequentially - `input`: The initial value passed to the first function **Return Type**: A Promise that resolves to the final result. @@ -375,27 +375,28 @@ const adults = await filter( - Output of one function becomes input to the next - Supports both synchronous and asynchronous functions - Natural data flow from input through transformations -- **Undefined Short-Circuit**: If any step returns `undefined`, remaining steps are skipped and `undefined` is returned. This enables selection (filtering) within a composed pipe — see [series](#series) drop behavior. +- Designed for composing reusable operations passed to `series()` or used directly +- **Undefined Short-Circuit**: If any step returns `undefined`, remaining steps are skipped and `undefined` is returned. This enables selection within a composed operation — see [series](#series) drop behavior. **Usage Example**: ```javascript import { pipe } from './functional.js' -// Process user through validation, transformation, and storage -const userId = await pipe( - async (id) => validateUserId(id), // Step 1 - async (id) => fetchUser(id), // Step 2 - async (user, data) => saveUser(user, data), // Step 3 - userId // Starting value +const normalizeUser = pipe( + async id => validateUserId(id), + async id => fetchUser(id), + user => user.active ? user : undefined, + user => ({...user, email: user.email.toLowerCase()}), ) +const user = await normalizeUser(userId) + // Compose operations in a readable pipeline const result = await pipe( - async (data) => validate(data), - async (data) => transform(data), - async (data) => persist(data), - null // No initial data needed -) + async data => validate(data), + async data => transform(data), + async data => persist(data), +)(input) ``` **Best Practice**: Use `pipe()` when you need to chain operations that form a coherent data processing pipeline. From 05042870badb75764f9ff4f245c0345c73e52948 Mon Sep 17 00:00:00 2001 From: Daniele Dellafiore <66707+ildella@users.noreply.github.com> Date: Tue, 12 May 2026 17:59:35 +0200 Subject: [PATCH 4/4] removed first example was wrong and alos not necessary, we have one at the end --- README.md | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/README.md b/README.md index 49e83e8..553f0f2 100644 --- a/README.md +++ b/README.md @@ -8,15 +8,6 @@ Sequential async pipelines with **first-class retry, error boundaries, and smart Just plain JavaScript. Eager execution. Perfect stack traces. -```js -const result = await series([ - () => fetchUser(id), - user => validateAndEnrich(user), - final => saveToDatabase(final), - saved => sendWebhook(saved), -], { strategy: collect }); // or failFast, retry(3), custom... -``` - ## Why Pipelean? To stop writing the same try/catch and manual accumulation boilerplate. @@ -113,7 +104,7 @@ const downloadSomething = async () => {...} const transformSomething = () => {...} const writeToDatabase = async () => {...} -const pipeline = await pipe( +const pipeline = pipe( downloadSomething, transformSomething, writeToDatabase,