diff --git a/packages/core/CLAUDE.md b/packages/core/CLAUDE.md index 3c28da2a2..86914ff9d 100644 --- a/packages/core/CLAUDE.md +++ b/packages/core/CLAUDE.md @@ -212,6 +212,14 @@ packages/core/ - `integration-repository-mongo.js` - MongoDB implementation - `integration-repository-postgres.js` - PostgreSQL implementation - `integration-mapping-repository-*.js` - Mapping data persistence +- `process-repository-*.js` - Process (long-running job) persistence. + Implements `applyProcessUpdate(processId, ops)` — a race-safe alternative + to `update(id, patch)` that routes increments, sets, and bounded-array + pushes through each backend's native atomic primitive (PostgreSQL + `jsonb_set` / MongoDB `$inc`/`$set`/`$push`). Use this method any time + multiple queue workers may concurrently mutate the same Process row + (counters, flags, error history). The legacy `update(id, patch)` + remains available but is clobber-prone under concurrency. **Integration developers extend IntegrationBase**: diff --git a/packages/core/integrations/integration-router.js b/packages/core/integrations/integration-router.js index 78d219a66..c69b0dc7a 100644 --- a/packages/core/integrations/integration-router.js +++ b/packages/core/integrations/integration-router.js @@ -507,7 +507,8 @@ function setEntityRoutes(router, authenticateUser, useCases) { const params = checkRequiredParams(req.query, ['entityType']); const module = await getModuleInstanceFromType.execute( userId, - params.entityType + params.entityType, + { state: req.query.state } ); const areRequirementsValid = module.validateAuthorizationRequirements(); diff --git a/packages/core/integrations/repositories/process-repository-documentdb.js b/packages/core/integrations/repositories/process-repository-documentdb.js index e50384238..5175e7014 100644 --- a/packages/core/integrations/repositories/process-repository-documentdb.js +++ b/packages/core/integrations/repositories/process-repository-documentdb.js @@ -14,6 +14,7 @@ const { const { DocumentDBEncryptionService, } = require('../../database/documentdb-encryption-service'); +const { validateOps } = require('./process-update-ops-shared'); class ProcessRepositoryDocumentDB extends ProcessRepositoryInterface { constructor() { @@ -155,6 +156,73 @@ class ProcessRepositoryDocumentDB extends ProcessRepositoryInterface { return this._mapProcess(decryptedProcess); } + /** + * Atomic process update — race-safe counterpart to `update()`. + * + * Uses DocumentDB's native $inc / $set / $push operators (Mongo-wire + * compatible) via findAndModify so increments, sets, and pushes land + * in one server-side write. Contention on the same document + * serializes at the DB level. + * + * DocumentDB compatibility notes: + * - $inc: supported since v3.6. + * - $set with dot-path: supported. + * - $push with $each + negative $slice: supported since v4.0. + * Clusters still on v3.6 must upgrade before using pushSlice. + * + * Process documents have no encrypted fields today; if that changes, + * the set-by-path payload here MUST route through + * `encryptionService.encryptFields` for any affected paths. + * + * @param {string} processId + * @param {import('./process-repository-interface').ProcessUpdateOps} ops + * @returns {Promise} + */ + async applyProcessUpdate(processId, ops) { + const normalized = validateOps(ops); + const objectId = toObjectId(processId); + + const update = {}; + const $set = {}; + + if (Object.keys(normalized.increment).length > 0) { + update.$inc = { ...normalized.increment }; + } + for (const [path, value] of Object.entries(normalized.set)) { + $set[path] = value; + } + if (normalized.newState !== null) { + $set.state = normalized.newState; + } + $set.updatedAt = new Date(); + update.$set = $set; + + if (Object.keys(normalized.pushSlice).length > 0) { + update.$push = {}; + for (const [path, spec] of Object.entries(normalized.pushSlice)) { + update.$push[path] = { + $each: spec.values, + $slice: -spec.keepLast, + }; + } + } + + const result = await this.prisma.$runCommandRaw({ + findAndModify: 'Process', + query: { _id: objectId }, + update, + new: true, + }); + + const doc = result && result.value; + if (!doc) return null; + const decrypted = await this.encryptionService.decryptFields( + 'Process', + doc + ); + return this._mapProcess(decrypted); + } + async findByIntegrationAndType(integrationId, type) { const integrationObjectId = toObjectId(integrationId); const filter = { diff --git a/packages/core/integrations/repositories/process-repository-interface.js b/packages/core/integrations/repositories/process-repository-interface.js index c74a79807..d7dbf0d9e 100644 --- a/packages/core/integrations/repositories/process-repository-interface.js +++ b/packages/core/integrations/repositories/process-repository-interface.js @@ -47,6 +47,52 @@ class ProcessRepositoryInterface { throw new Error('Method update() must be implemented'); } + /** + * Apply atomic mutations to a process record. + * + * Race-safe counterpart to `update()`. Where `update()` takes full + * JSON blobs and does read-modify-write at the ORM layer (clobber- + * prone under concurrent writers), `applyProcessUpdate()` describes + * the intent declaratively and each backend uses its native atomic + * primitive: + * - PostgreSQL: `jsonb_set` chain inside a single UPDATE ... RETURNING + * - MongoDB: `$inc` / `$set` / `$push` via findAndModify + * - DocumentDB: same operator set as MongoDB (with version caveats) + * + * All paths are dot-delimited and rooted in `context` or `results` + * (e.g. `context.processedRecords`, + * `results.aggregateData.totalSynced`). Paths MUST match + * `^(context|results)(\\.[a-zA-Z_][a-zA-Z0-9_]*)+$` — validated by + * each adapter before any SQL/command generation. + * + * Intended primary callers: UpdateProcessMetrics and + * UpdateProcessState. Other callers can use this directly when they + * need race-free cumulative updates. + * + * @typedef {Object} ProcessUpdateOps + * @property {Object.} [increment] - Atomic numeric + * increments keyed by dot-path. e.g. + * `{ 'context.processedRecords': 1, 'results.aggregateData.totalSynced': 1 }` + * @property {Object.} [set] - Atomic whole-subtree set + * keyed by dot-path. Replaces the value at the path (NOT deep + * merge). e.g. `{ 'context.fetchDone': true }` + * @property {Object.} [pushSlice] + * Atomic array push with bounded retention (sliding window of the + * last `keepLast` items). Keys are dot-paths pointing to arrays. + * e.g. `{ 'results.aggregateData.errors': { values: [err], keepLast: 100 } }` + * @property {string} [newState] - Top-level `state` column update. + * Written alongside the JSON mutations in the same UPDATE so state + * + counters move together. + * + * @param {string} processId - Process ID to update + * @param {ProcessUpdateOps} ops - Atomic operations to apply + * @returns {Promise} Updated process record (post- + * mutation) or null if the process does not exist. + */ + async applyProcessUpdate(processId, ops) { + throw new Error('Method applyProcessUpdate() must be implemented'); + } + /** * Find processes by integration and type * @param {string} integrationId - Integration ID diff --git a/packages/core/integrations/repositories/process-repository-mongo.js b/packages/core/integrations/repositories/process-repository-mongo.js index 4e2925298..e387e2d1c 100644 --- a/packages/core/integrations/repositories/process-repository-mongo.js +++ b/packages/core/integrations/repositories/process-repository-mongo.js @@ -1,5 +1,6 @@ const { prisma } = require('../../database/prisma'); const { ProcessRepositoryInterface } = require('./process-repository-interface'); +const { validateOps } = require('./process-update-ops-shared'); /** * MongoDB Process Repository Adapter @@ -92,6 +93,77 @@ class ProcessRepositoryMongo extends ProcessRepositoryInterface { return this._toPlainObject(process); } + /** + * Atomic process update — race-safe counterpart to `update()`. + * + * Uses `findAndModify` via `$runCommandRaw` so increments, sets, and + * pushes land in one server-side write. Contention on the same + * document serializes at the MongoDB level; no Node-side read- + * modify-write. Returns the post-update document. + * + * @param {string} processId + * @param {import('./process-repository-interface').ProcessUpdateOps} ops + * @returns {Promise} + */ + async applyProcessUpdate(processId, ops) { + const normalized = validateOps(ops); + + const update = {}; + const $set = {}; + + if (Object.keys(normalized.increment).length > 0) { + update.$inc = { ...normalized.increment }; + } + for (const [path, value] of Object.entries(normalized.set)) { + $set[path] = value; + } + if (normalized.newState !== null) { + $set.state = normalized.newState; + } + $set.updatedAt = new Date(); + update.$set = $set; + + if (Object.keys(normalized.pushSlice).length > 0) { + update.$push = {}; + for (const [path, spec] of Object.entries(normalized.pushSlice)) { + update.$push[path] = { + $each: spec.values, + $slice: -spec.keepLast, + }; + } + } + + const result = await this.prisma.$runCommandRaw({ + findAndModify: 'Process', + query: { _id: { $oid: processId } }, + update, + new: true, + }); + + const doc = result && result.value; + if (!doc) return null; + return this._toPlainObject(this._hydrateRawMongoDoc(doc)); + } + + /** + * Shape a raw Mongo document (as returned by $runCommandRaw) to match + * Prisma's `findUnique` output so the existing `_toPlainObject` works + * without modification. EJSON round-trips give us `{$oid, $date}` wrappers + * that need unwrapping. + * @private + */ + _hydrateRawMongoDoc(doc) { + const hydrated = { ...doc }; + if (doc._id) hydrated.id = doc._id.$oid ?? doc._id; + for (const field of ['createdAt', 'updatedAt']) { + const raw = doc[field]; + if (raw && typeof raw === 'object' && raw.$date) { + hydrated[field] = new Date(raw.$date); + } + } + return hydrated; + } + /** * Find processes by integration and type * @param {string} integrationId - Integration ID diff --git a/packages/core/integrations/repositories/process-repository-postgres.js b/packages/core/integrations/repositories/process-repository-postgres.js index d41d030ee..c43a8a91c 100644 --- a/packages/core/integrations/repositories/process-repository-postgres.js +++ b/packages/core/integrations/repositories/process-repository-postgres.js @@ -2,6 +2,7 @@ const { prisma } = require('../../database/prisma'); const { ProcessRepositoryInterface, } = require('./process-repository-interface'); +const { validateOps, splitPath } = require('./process-update-ops-shared'); /** * PostgreSQL Process Repository Adapter @@ -108,6 +109,168 @@ class ProcessRepositoryPostgres extends ProcessRepositoryInterface { return this._toPlainObject(process); } + /** + * Atomic process update — race-safe counterpart to `update()`. + * + * Compiles the `ProcessUpdateOps` into ONE `UPDATE "Process" ... + * RETURNING *` statement with nested `jsonb_set` calls for every + * context/results mutation. Postgres applies row-level locking + * during UPDATE, so concurrent callers on the same row serialize at + * the DB without any read-modify-write in Node. + * + * Path segments have been regex-validated upstream (see + * process-update-ops-shared.js); they are embedded directly into + * the SQL string. All values go through positional parameters. + * + * @param {string} processId + * @param {ProcessUpdateOps} ops + * @returns {Promise} + */ + async applyProcessUpdate(processId, ops) { + const normalized = validateOps(ops); + const id = this._convertId(processId); + + // Build the SQL expression for each JSON column. We start each + // column's expression from the column itself and wrap it in + // jsonb_set(...) calls — one wrap per operation targeting that + // column. If no op targets a column, we omit that SET clause so + // we don't issue a pointless self-assignment. + const params = []; + /** @type {(v:unknown)=>string} positional placeholder, 1-indexed */ + const bind = (v) => { + params.push(v); + return `$${params.length}`; + }; + + const columnExpressions = this._buildColumnExpressions( + normalized, + bind + ); + const setClauses = []; + for (const [column, expr] of Object.entries(columnExpressions)) { + setClauses.push(`"${column}" = ${expr}`); + } + if (normalized.newState !== null) { + setClauses.push(`"state" = ${bind(normalized.newState)}`); + } + setClauses.push(`"updatedAt" = NOW()`); + + const idPlaceholder = bind(id); + const sql = ` + UPDATE "Process" + SET ${setClauses.join(', ')} + WHERE "id" = ${idPlaceholder} + RETURNING * + `; + + const rows = await this.prisma.$queryRawUnsafe(sql, ...params); + if (!rows || rows.length === 0) return null; + return this._toPlainObject(rows[0]); + } + + /** + * Returns a map of column → SQL expression with all jsonb_set wraps + * applied. Used only by applyProcessUpdate. + * @private + */ + _buildColumnExpressions(ops, bind) { + const byColumn = { context: null, results: null }; + + // Seed with the column itself (wrapped with COALESCE so that + // a NULL column doesn't break jsonb_set). + const seed = (col) => + byColumn[col] ?? + (byColumn[col] = `COALESCE("${col}", '{}'::jsonb)`); + + /** + * Postgres `jsonb_set(target, path, value, create_missing=true)` + * only creates the LEAF segment if missing — intermediate segments + * that don't exist as objects cause the call to return `target` + * unchanged (silent no-op). For a path like `context.a.b.c` on a + * doc where `context.a` is missing, we'd bail on the write. + * + * This helper wraps `prev` in a chain of `jsonb_set` calls that + * ensure each intermediate prefix path is an object, preserving + * its contents if it's already present: + * + * ensureParents(prev, ['a','b','c']) + * ⇒ jsonb_set( + * jsonb_set(prev, '{a}', COALESCE(prev#>'{a}', '{}'::jsonb), true), + * '{a,b}', COALESCE(${that}#>'{a,b}', '{}'::jsonb), true) + * + * The caller then wraps this result with its own `jsonb_set` for + * the leaf segment. Depth-1 paths skip this entirely (no parents + * to synthesize). + */ + const ensureParents = (prevExpr, segments) => { + let cur = prevExpr; + for (let i = 1; i < segments.length; i++) { + const parentPath = `'{${segments.slice(0, i).join(',')}}'`; + cur = `jsonb_set(${cur}, ${parentPath}, COALESCE(${cur} #> ${parentPath}, '{}'::jsonb), true)`; + } + return cur; + }; + + const wrapIncrement = (col, segments, delta) => { + const textPath = `'{${segments.join(',')}}'`; + const jsonbPath = `'{${segments.join(',')}}'`; + const prev = seed(col); + const guarded = ensureParents(prev, segments); + const nextValue = `to_jsonb(COALESCE((${guarded} #>> ${textPath})::numeric, 0) + ${bind(delta)})`; + byColumn[col] = `jsonb_set(${guarded}, ${jsonbPath}, ${nextValue}, true)`; + }; + + const wrapSet = (col, segments, value) => { + const jsonbPath = `'{${segments.join(',')}}'`; + const prev = seed(col); + const guarded = ensureParents(prev, segments); + // $n::jsonb — values are serialized to JSON by Prisma when + // passed as a parameter, then cast back into jsonb. + byColumn[col] = `jsonb_set(${guarded}, ${jsonbPath}, ${bind(JSON.stringify(value))}::jsonb, true)`; + }; + + const wrapPushSlice = (col, segments, spec) => { + const jsonbPath = `'{${segments.join(',')}}'`; + const prev = seed(col); + const guarded = ensureParents(prev, segments); + // Construct the sliced array in a CTE to evaluate `${newArr}` + // exactly ONCE (vs. the inline form that Postgres would still + // execute correctly but expand three times). Order is + // explicitly preserved by `jsonb_agg(... ORDER BY idx)`; + // without the ORDER BY, aggregate order is implementation- + // defined even with WITH ORDINALITY. + const sliced = `( + WITH combined AS ( + SELECT COALESCE((${guarded} #> ${jsonbPath}), '[]'::jsonb) || ${bind(JSON.stringify(spec.values))}::jsonb AS arr + ) + SELECT COALESCE(jsonb_agg(elem ORDER BY idx), '[]'::jsonb) + FROM combined, + jsonb_array_elements((SELECT arr FROM combined)) WITH ORDINALITY AS t(elem, idx) + WHERE idx > GREATEST(0, jsonb_array_length((SELECT arr FROM combined)) - ${bind(spec.keepLast)}) + )`; + byColumn[col] = `jsonb_set(${guarded}, ${jsonbPath}, ${sliced}, true)`; + }; + + for (const [path, delta] of Object.entries(ops.increment)) { + const { column, segments } = splitPath(path); + wrapIncrement(column, segments, delta); + } + for (const [path, value] of Object.entries(ops.set)) { + const { column, segments } = splitPath(path); + wrapSet(column, segments, value); + } + for (const [path, spec] of Object.entries(ops.pushSlice)) { + const { column, segments } = splitPath(path); + wrapPushSlice(column, segments, spec); + } + + const result = {}; + for (const [col, expr] of Object.entries(byColumn)) { + if (expr !== null) result[col] = expr; + } + return result; + } + /** * Find processes by integration and type * @param {string} integrationId - Integration ID diff --git a/packages/core/integrations/repositories/process-repository-postgres.test.js b/packages/core/integrations/repositories/process-repository-postgres.test.js new file mode 100644 index 000000000..5382dcf51 --- /dev/null +++ b/packages/core/integrations/repositories/process-repository-postgres.test.js @@ -0,0 +1,210 @@ +/** + * SQL-generation tests for ProcessRepositoryPostgres.applyProcessUpdate. + * + * Postgres's atomicity for `UPDATE "Process" SET ... WHERE id = ... + * RETURNING *` is a documented server-level guarantee — not something + * we need to re-prove per test run. What we DO need to guarantee is + * that the SQL we emit is well-formed under every combination of op + * kinds, that `jsonb_set` intermediate-path synthesis is in place for + * deep paths, and that `jsonb_agg` uses an explicit `ORDER BY idx` so + * `pushSlice` preserves insertion order reliably across Postgres + * versions. + * + * These tests stub `prisma.$queryRawUnsafe` to capture the SQL string + * and bound parameters, then assert on the captured output. + */ + +const { + ProcessRepositoryPostgres, +} = require('./process-repository-postgres'); + +function makeRepo() { + const repo = new ProcessRepositoryPostgres(); + const captured = []; + repo.prisma = { + $queryRawUnsafe: jest.fn(async (sql, ...params) => { + captured.push({ sql, params }); + return [ + { + id: 1, + userId: 1, + integrationId: 1, + name: 'p', + type: 'T', + state: 'X', + context: {}, + results: {}, + childProcesses: [], + parentProcessId: null, + createdAt: new Date(), + updatedAt: new Date(), + }, + ]; + }), + }; + return { repo, captured }; +} + +describe('ProcessRepositoryPostgres.applyProcessUpdate SQL generation', () => { + it('emits UPDATE ... SET state = $n when only newState is provided', async () => { + const { repo, captured } = makeRepo(); + await repo.applyProcessUpdate('7', { newState: 'COMPLETED' }); + + expect(captured).toHaveLength(1); + const { sql, params } = captured[0]; + expect(sql).toMatch(/UPDATE "Process"/); + expect(sql).toMatch(/"state" = \$1/); + expect(sql).toMatch(/"updatedAt" = NOW\(\)/); + expect(sql).toMatch(/WHERE "id" = \$2/); + expect(sql).toMatch(/RETURNING \*/); + expect(params).toEqual(['COMPLETED', 7]); + }); + + it('emits a single jsonb_set wrap for a depth-1 increment', async () => { + const { repo, captured } = makeRepo(); + await repo.applyProcessUpdate('1', { + increment: { 'context.processedRecords': 1 }, + }); + + const { sql } = captured[0]; + expect(sql).toMatch(/"context" = jsonb_set\(/); + // Depth-1 doesn't need intermediate-path synthesis — only ONE + // jsonb_set call wraps the column seed. + const setCount = (sql.match(/jsonb_set\(/g) || []).length; + expect(setCount).toBe(1); + }); + + it('synthesizes a missing intermediate for a 2-segment set', async () => { + const { repo, captured } = makeRepo(); + await repo.applyProcessUpdate('1', { + set: { 'context.pagination.cursor': 'abc' }, + }); + + const { sql } = captured[0]; + // ensureParents chains 1 wrap for {pagination} + 1 wrap for the + // {pagination,cursor} leaf = 2 jsonb_set calls total. + const setCount = (sql.match(/jsonb_set\(/g) || []).length; + expect(setCount).toBe(2); + // The intermediate carries COALESCE(... #> '{pagination}', '{}'::jsonb) + // so an existing pagination object is preserved. + expect(sql).toMatch(/COALESCE\(.*#>\s*'\{pagination\}',\s*'\{\}'::jsonb\)/); + }); + + it('synthesizes all intermediates for a 3-segment set', async () => { + const { repo, captured } = makeRepo(); + await repo.applyProcessUpdate('1', { + set: { 'context.a.b.c': 1 }, + }); + + const { sql } = captured[0]; + // ensureParents string-substitution: i=1 yields 1 jsonb_set; + // i=2 references the i=1 `cur` twice (once for target, once + // inside COALESCE for the read) + 1 new wrap = 3; wrapSet adds + // 1 more outer wrap = 4 jsonb_set calls. This is a string- + // duplication artifact, not a runtime duplication — Postgres + // still executes the whole expression as one UPDATE under a + // single row lock. If paths go deeper than ~5 segments the + // generated SQL size becomes impractical; validateOps should + // cap it in that case (currently no cap; none of our callers + // go beyond depth 3). + const setCount = (sql.match(/jsonb_set\(/g) || []).length; + expect(setCount).toBe(4); + expect(sql).toMatch(/'\{a\}'/); + expect(sql).toMatch(/'\{a,b\}'/); + expect(sql).toMatch(/'\{a,b,c\}'/); + }); + + it('pushSlice uses explicit ORDER BY idx inside jsonb_agg so insertion order is deterministic', async () => { + const { repo, captured } = makeRepo(); + await repo.applyProcessUpdate('1', { + pushSlice: { + 'results.aggregateData.errors': { + values: [{ e: 1 }, { e: 2 }], + keepLast: 100, + }, + }, + }); + + const { sql } = captured[0]; + expect(sql).toMatch(/jsonb_agg\(elem\s+ORDER BY idx\)/); + // The CTE binds the new-array expression once (vs. three times + // if inlined), so we should see exactly one `|| $n::jsonb` concat. + const concatCount = (sql.match(/\|\|\s*\$\d+::jsonb/g) || []).length; + expect(concatCount).toBe(1); + }); + + it('binds parameters in stable left-to-right order', async () => { + const { repo, captured } = makeRepo(); + await repo.applyProcessUpdate('42', { + increment: { + 'context.processedRecords': 3, + 'results.aggregateData.totalSynced': 5, + }, + set: { 'context.fetchDone': true }, + newState: 'PROCESSING_BATCHES', + }); + + const { params } = captured[0]; + // Params emitted in this order: increments (context then results), + // then set, then newState, then id. + expect(params).toEqual([3, 5, 'true', 'PROCESSING_BATCHES', 42]); + }); + + it('combines increment, set, and pushSlice on both columns in one UPDATE', async () => { + const { repo, captured } = makeRepo(); + await repo.applyProcessUpdate('1', { + increment: { + 'context.processedRecords': 1, + 'results.aggregateData.totalSynced': 1, + }, + set: { 'context.fetchDone': true }, + pushSlice: { + 'results.aggregateData.errors': { + values: [{ e: 'x' }], + keepLast: 10, + }, + }, + newState: 'PROCESSING_BATCHES', + }); + + const { sql } = captured[0]; + // Exactly one UPDATE statement, one SET clause per touched column, + // one for state, one for updatedAt. + expect((sql.match(/UPDATE "Process"/g) || []).length).toBe(1); + expect(sql).toMatch(/"context" =/); + expect(sql).toMatch(/"results" =/); + expect(sql).toMatch(/"state" =/); + expect(sql).toMatch(/"updatedAt" = NOW\(\)/); + }); + + it('returns null when UPDATE returns no rows (process does not exist)', async () => { + const repo = new ProcessRepositoryPostgres(); + repo.prisma = { $queryRawUnsafe: jest.fn(async () => []) }; + + const result = await repo.applyProcessUpdate('999', { + newState: 'COMPLETED', + }); + expect(result).toBeNull(); + }); + + it('rejects invalid paths before hitting the DB', async () => { + const { repo, captured } = makeRepo(); + await expect( + repo.applyProcessUpdate('1', { + set: { 'bad.root.path': 'x' }, + }) + ).rejects.toThrow('invalid path'); + expect(captured).toHaveLength(0); + }); + + it('does NOT synthesize intermediates for a depth-1 path (no wasted jsonb_set)', async () => { + const { repo, captured } = makeRepo(); + await repo.applyProcessUpdate('1', { + set: { 'context.fetchDone': true }, + }); + + const { sql } = captured[0]; + const setCount = (sql.match(/jsonb_set\(/g) || []).length; + expect(setCount).toBe(1); + }); +}); diff --git a/packages/core/integrations/repositories/process-update-ops-shared.js b/packages/core/integrations/repositories/process-update-ops-shared.js new file mode 100644 index 000000000..2ea704bac --- /dev/null +++ b/packages/core/integrations/repositories/process-update-ops-shared.js @@ -0,0 +1,112 @@ +/** + * Shared helpers for ProcessRepository.applyProcessUpdate() validation. + * + * These utilities are backend-agnostic: they enforce invariants on the + * `ProcessUpdateOps` shape BEFORE each adapter emits any SQL or database + * command. Keeping validation here means any bug we fix (e.g. tighter + * path regex, size cap) fixes all three adapters in one place. + * + * Imported by the Postgres, MongoDB, and DocumentDB adapters. + */ + +/** + * Allowed dot-path shape. Root must be `context` or `results`, and each + * segment after the first must be a JS-identifier-style token. Numeric + * segments (array indices) and bracket syntax are intentionally + * disallowed — array element mutation is exclusively handled via + * `pushSlice`, which targets a whole array at a path. + */ +const PATH_REGEX = /^(context|results)(\.[a-zA-Z_][a-zA-Z0-9_]*)+$/; + +/** + * Normalizes and validates a `ProcessUpdateOps` object. Returns a frozen + * copy with defaults applied and every key pre-validated. Throws synchronously + * on any shape error so adapters can fail fast before touching the DB. + * + * @param {Object} ops + * @returns {{ + * increment: Record, + * set: Record, + * pushSlice: Record, + * newState: string|null, + * }} + */ +function validateOps(ops) { + if (!ops || typeof ops !== 'object' || Array.isArray(ops)) { + throw new Error('applyProcessUpdate: ops must be an object'); + } + + const increment = ops.increment || {}; + const set = ops.set || {}; + const pushSlice = ops.pushSlice || {}; + const newState = ops.newState ?? null; + + for (const [path, delta] of Object.entries(increment)) { + assertPath(path, 'increment'); + if (typeof delta !== 'number' || !Number.isFinite(delta)) { + throw new Error( + `applyProcessUpdate: increment['${path}'] must be a finite number, got ${typeof delta}` + ); + } + } + + for (const path of Object.keys(set)) { + assertPath(path, 'set'); + } + + for (const [path, spec] of Object.entries(pushSlice)) { + assertPath(path, 'pushSlice'); + if ( + !spec || + typeof spec !== 'object' || + !Array.isArray(spec.values) || + typeof spec.keepLast !== 'number' || + !Number.isInteger(spec.keepLast) || + spec.keepLast <= 0 + ) { + throw new Error( + `applyProcessUpdate: pushSlice['${path}'] must be { values: [], keepLast: positive integer }` + ); + } + } + + if (newState !== null && typeof newState !== 'string') { + throw new Error('applyProcessUpdate: newState must be a string'); + } + + const hasAnyOp = + Object.keys(increment).length > 0 || + Object.keys(set).length > 0 || + Object.keys(pushSlice).length > 0 || + newState !== null; + if (!hasAnyOp) { + throw new Error( + 'applyProcessUpdate: at least one of increment/set/pushSlice/newState must be provided' + ); + } + + return Object.freeze({ increment, set, pushSlice, newState }); +} + +function assertPath(path, opName) { + if (!PATH_REGEX.test(path)) { + throw new Error( + `applyProcessUpdate: invalid path '${path}' in ${opName} (must match ${PATH_REGEX})` + ); + } +} + +/** + * Splits a validated path into `{ column, segments }`. + * `'context.pagination.pageCount'` → `{ column: 'context', segments: ['pagination', 'pageCount'] }`. + */ +function splitPath(path) { + const [column, ...segments] = path.split('.'); + return { column, segments }; +} + +module.exports = { + PATH_REGEX, + validateOps, + splitPath, +}; diff --git a/packages/core/integrations/repositories/process-update-ops-shared.test.js b/packages/core/integrations/repositories/process-update-ops-shared.test.js new file mode 100644 index 000000000..59e54b3d9 --- /dev/null +++ b/packages/core/integrations/repositories/process-update-ops-shared.test.js @@ -0,0 +1,125 @@ +const { validateOps, splitPath, PATH_REGEX } = require('./process-update-ops-shared'); + +describe('process-update-ops-shared', () => { + describe('PATH_REGEX', () => { + it.each([ + 'context.processedRecords', + 'context.pagination.cursor', + 'results.aggregateData.totalSynced', + 'results.aggregateData.errors', + ])('accepts %s', (path) => { + expect(PATH_REGEX.test(path)).toBe(true); + }); + + it.each([ + 'context', + 'results', + 'status', + 'context.', + '.context.x', + 'context..x', + 'context.1stItem', + 'context.a-b', + 'context[0]', + "context.'x'", + 'other.foo', + ])('rejects %s', (path) => { + expect(PATH_REGEX.test(path)).toBe(false); + }); + }); + + describe('validateOps', () => { + it('throws when ops is null/undefined/array', () => { + expect(() => validateOps(null)).toThrow('must be an object'); + expect(() => validateOps(undefined)).toThrow('must be an object'); + expect(() => validateOps([])).toThrow('must be an object'); + }); + + it('throws when no op kinds are provided', () => { + expect(() => validateOps({})).toThrow('at least one of'); + expect(() => + validateOps({ + increment: {}, + set: {}, + pushSlice: {}, + }) + ).toThrow('at least one of'); + }); + + it('accepts newState alone as a valid op', () => { + expect(() => validateOps({ newState: 'COMPLETED' })).not.toThrow(); + }); + + it('rejects non-finite increment values', () => { + expect(() => + validateOps({ increment: { 'context.x': NaN } }) + ).toThrow('finite number'); + expect(() => + validateOps({ increment: { 'context.x': Infinity } }) + ).toThrow('finite number'); + expect(() => + validateOps({ increment: { 'context.x': '1' } }) + ).toThrow('finite number'); + }); + + it('rejects invalid paths across all op kinds', () => { + expect(() => + validateOps({ increment: { 'bad.path': 1 } }) + ).toThrow('invalid path'); + expect(() => + validateOps({ set: { 'bad.path': 'x' } }) + ).toThrow('invalid path'); + expect(() => + validateOps({ + pushSlice: { + 'bad.path': { values: [], keepLast: 1 }, + }, + }) + ).toThrow('invalid path'); + }); + + it('rejects malformed pushSlice specs', () => { + expect(() => + validateOps({ pushSlice: { 'context.x': { keepLast: 10 } } }) + ).toThrow('pushSlice'); + expect(() => + validateOps({ + pushSlice: { + 'context.x': { values: [], keepLast: 0 }, + }, + }) + ).toThrow('pushSlice'); + expect(() => + validateOps({ + pushSlice: { + 'context.x': { values: [], keepLast: 1.5 }, + }, + }) + ).toThrow('pushSlice'); + }); + + it('returns a frozen normalized object with defaults', () => { + const result = validateOps({ newState: 'COMPLETED' }); + expect(result).toEqual({ + increment: {}, + set: {}, + pushSlice: {}, + newState: 'COMPLETED', + }); + expect(Object.isFrozen(result)).toBe(true); + }); + }); + + describe('splitPath', () => { + it('separates column from segments', () => { + expect(splitPath('context.pagination.cursor')).toEqual({ + column: 'context', + segments: ['pagination', 'cursor'], + }); + expect(splitPath('results.aggregateData.totalSynced')).toEqual({ + column: 'results', + segments: ['aggregateData', 'totalSynced'], + }); + }); + }); +}); diff --git a/packages/core/integrations/use-cases/update-process-metrics.js b/packages/core/integrations/use-cases/update-process-metrics.js index baa1291b8..97c303a4e 100644 --- a/packages/core/integrations/use-cases/update-process-metrics.js +++ b/packages/core/integrations/use-cases/update-process-metrics.js @@ -1,37 +1,28 @@ -/** - TODO: - This implementation contains a race condition in the `execute` method. When multiple concurrent processes call this method on the same process record, they'll each read the current state, modify it independently, and then save - potentially overwriting each other's changes. - -For example: -``` -Thread 1: reads process with totalSynced=100 -Thread 2: reads process with totalSynced=100 -Thread 1: adds 50 → writes totalSynced=150 -Thread 2: adds 30 → writes totalSynced=130 (overwrites Thread 1's update!) -``` - -Consider implementing one of these patterns: -1. Database transactions with row locking -2. Optimistic concurrency control with version numbers -3. Atomic update operations (e.g., `$inc` in MongoDB) -4. A FIFO queue for process updates (as described in the PROCESS_MANAGEMENT_QUEUE_SPEC.md) - -The current approach will lead to lost updates and inconsistent metrics during concurrent processing. - - */ - /** * UpdateProcessMetrics Use Case * - * Updates process metrics, calculates aggregates, and computes estimated completion time. - * Optionally broadcasts progress via WebSocket service if provided. + * Updates process metrics atomically via + * `processRepository.applyProcessUpdate`. This is the race-safe + * replacement for the original read-modify-write implementation — the + * long-standing TODO about lost updates under concurrent writers is + * now resolved. + * + * Split into two phases: * - * Design Philosophy: - * - Metrics are cumulative (add to existing counts) - * - Performance metrics calculated automatically (duration, records/sec) - * - ETA computed based on current progress - * - Error history limited to last 100 entries - * - WebSocket broadcasting is optional (DI pattern) + * 1. Atomic phase — counters and bounded error history. Uses + * $inc / $push+$slice (Mongo/DocumentDB) or jsonb_set with + * arithmetic expressions (Postgres) in a single UPDATE ... RETURNING + * so concurrent callers serialize at the DB layer. + * + * 2. Derived-fields phase — duration, recordsPerSecond, + * estimatedCompletion. Computed from the post-atomic snapshot and + * written via the legacy (non-atomic) `update()` method. + * Intentionally best-effort: under concurrent writers they reflect + * "whichever handler wrote last" — the same semantics they had + * before and all they've ever guaranteed. Preserved for backward + * compatibility with consumers (UI, WebSocket listeners). + * + * Optionally broadcasts progress via WebSocket service if provided. * * @example * const updateMetrics = new UpdateProcessMetrics({ processRepository, websocketService }); @@ -60,15 +51,14 @@ class UpdateProcessMetrics { * Execute the use case to update process metrics * @param {string} processId - Process ID to update * @param {Object} metricsUpdate - Metrics to add/update - * @param {number} [metricsUpdate.processed=0] - Number of records processed in this batch - * @param {number} [metricsUpdate.success=0] - Number of successful records - * @param {number} [metricsUpdate.errors=0] - Number of failed records + * @param {number} [metricsUpdate.processed=0] - Records processed in this batch + * @param {number} [metricsUpdate.success=0] - Successful records + * @param {number} [metricsUpdate.errors=0] - Failed records * @param {Array} [metricsUpdate.errorDetails=[]] - Error details array * @returns {Promise} Updated process record * @throws {Error} If process not found or update fails */ async execute(processId, metricsUpdate) { - // Validate inputs if (!processId || typeof processId !== 'string') { throw new Error('processId must be a non-empty string'); } @@ -76,87 +66,103 @@ class UpdateProcessMetrics { throw new Error('metricsUpdate must be an object'); } - // Retrieve current process - const process = await this.processRepository.findById(processId); - if (!process) { - throw new Error(`Process not found: ${processId}`); - } - - // Get current context and results - const context = process.context || {}; - const results = process.results || { aggregateData: {} }; - - // Initialize nested objects if not present - if (!results.aggregateData) { - results.aggregateData = {}; - } - - // Update context counters (cumulative) - context.processedRecords = - (context.processedRecords || 0) + (metricsUpdate.processed || 0); + // Phase 1: atomic increments + bounded error history. + const increment = {}; + const processed = metricsUpdate.processed || 0; + const success = metricsUpdate.success || 0; + const errors = metricsUpdate.errors || 0; + if (processed) increment['context.processedRecords'] = processed; + if (success) increment['results.aggregateData.totalSynced'] = success; + if (errors) increment['results.aggregateData.totalFailed'] = errors; - // Update results aggregates (cumulative) - results.aggregateData.totalSynced = - (results.aggregateData.totalSynced || 0) + - (metricsUpdate.success || 0); - results.aggregateData.totalFailed = - (results.aggregateData.totalFailed || 0) + - (metricsUpdate.errors || 0); - - // Append error details (limited to last 100) + const pushSlice = {}; if ( - metricsUpdate.errorDetails && + Array.isArray(metricsUpdate.errorDetails) && metricsUpdate.errorDetails.length > 0 ) { - results.aggregateData.errors = [ - ...(results.aggregateData.errors || []), - ...metricsUpdate.errorDetails, - ].slice(-100); // Keep only last 100 errors + pushSlice['results.aggregateData.errors'] = { + values: metricsUpdate.errorDetails, + keepLast: 100, + }; } - // Calculate performance metrics - const startTime = new Date(context.startTime || process.createdAt); - const elapsed = Date.now() - startTime.getTime(); - results.aggregateData.duration = elapsed; - - if (elapsed > 0 && context.processedRecords > 0) { - results.aggregateData.recordsPerSecond = - context.processedRecords / (elapsed / 1000); - } else { - results.aggregateData.recordsPerSecond = 0; - } + const hasAtomicWork = + Object.keys(increment).length > 0 || + Object.keys(pushSlice).length > 0; - // Calculate ETA if we know total - if (context.totalRecords > 0 && context.processedRecords > 0) { - const remaining = context.totalRecords - context.processedRecords; - if (results.aggregateData.recordsPerSecond > 0) { - const etaMs = - (remaining / results.aggregateData.recordsPerSecond) * 1000; - const eta = new Date(Date.now() + etaMs); - context.estimatedCompletion = eta.toISOString(); - } - } - - // Prepare updates - const updates = { - context, - results, - }; - - // Persist updates let updatedProcess; try { - updatedProcess = await this.processRepository.update( - processId, - updates - ); + if (hasAtomicWork) { + updatedProcess = await this.processRepository.applyProcessUpdate( + processId, + { increment, pushSlice } + ); + } else { + // All-zero update (e.g., empty batch) — nothing to persist; + // just read current state for the derived-fields pass. + updatedProcess = await this.processRepository.findById( + processId + ); + } } catch (error) { throw new Error( `Failed to update process metrics: ${error.message}` ); } - // Broadcast progress via WebSocket (if service provided) + if (!updatedProcess) { + throw new Error(`Process not found: ${processId}`); + } + + // Phase 2: derived metrics (non-atomic, best-effort). Preserved + // for backward compatibility — these were always stale under + // concurrent writers even before this refactor. + const context = updatedProcess.context || {}; + const results = updatedProcess.results || { aggregateData: {} }; + if (!results.aggregateData) results.aggregateData = {}; + + if (context.processedRecords > 0 || context.totalRecords > 0) { + const startTime = new Date( + context.startTime || updatedProcess.createdAt + ); + const elapsed = Date.now() - startTime.getTime(); + results.aggregateData.duration = elapsed; + + if (elapsed > 0 && context.processedRecords > 0) { + results.aggregateData.recordsPerSecond = + context.processedRecords / (elapsed / 1000); + } else { + results.aggregateData.recordsPerSecond = 0; + } + + if (context.totalRecords > 0 && context.processedRecords > 0) { + const remaining = + context.totalRecords - context.processedRecords; + if (results.aggregateData.recordsPerSecond > 0) { + const etaMs = + (remaining / results.aggregateData.recordsPerSecond) * + 1000; + const eta = new Date(Date.now() + etaMs); + context.estimatedCompletion = eta.toISOString(); + } + } + + try { + updatedProcess = await this.processRepository.update( + processId, + { context, results } + ); + } catch (error) { + // Derived-field write failures are NON-FATAL — atomic + // counters from phase 1 already landed. Log and return the + // post-atomic snapshot. + console.error( + '[UpdateProcessMetrics] derived-fields write failed (non-fatal):', + error.message + ); + } + } + if (this.websocketService) { await this._broadcastProgress(updatedProcess); } @@ -167,7 +173,6 @@ class UpdateProcessMetrics { /** * Broadcast progress update via WebSocket * @private - * @param {Object} process - Updated process record */ async _broadcastProgress(process) { try { @@ -192,7 +197,6 @@ class UpdateProcessMetrics { }, }); } catch (error) { - // Log but don't fail the update if WebSocket broadcast fails console.error('Failed to broadcast process progress:', error); } } diff --git a/packages/core/integrations/use-cases/update-process-metrics.test.js b/packages/core/integrations/use-cases/update-process-metrics.test.js index d1d3858fc..de3d90e5a 100644 --- a/packages/core/integrations/use-cases/update-process-metrics.test.js +++ b/packages/core/integrations/use-cases/update-process-metrics.test.js @@ -1,13 +1,15 @@ /** * UpdateProcessMetrics Use Case Tests - * - * Tests metrics updates, aggregate calculations, and ETA computation. + * + * Covers the atomic-path refactor: increments and errorDetails push go + * through applyProcessUpdate; derived fields are computed and written + * via the legacy update() as a non-fatal follow-up. */ const { UpdateProcessMetrics } = require('./update-process-metrics'); describe('UpdateProcessMetrics', () => { - let updateProcessMetricsUseCase; + let useCase; let mockProcessRepository; let mockWebsocketService; @@ -15,294 +17,350 @@ describe('UpdateProcessMetrics', () => { mockProcessRepository = { findById: jest.fn(), update: jest.fn(), + applyProcessUpdate: jest.fn(), }; - mockWebsocketService = { - broadcast: jest.fn(), - }; - updateProcessMetricsUseCase = new UpdateProcessMetrics({ + mockWebsocketService = { broadcast: jest.fn() }; + useCase = new UpdateProcessMetrics({ processRepository: mockProcessRepository, websocketService: mockWebsocketService, }); }); describe('constructor', () => { - it('should require processRepository', () => { - expect(() => new UpdateProcessMetrics({})).toThrow('processRepository is required'); + it('requires processRepository', () => { + expect(() => new UpdateProcessMetrics({})).toThrow( + 'processRepository is required' + ); }); - - it('should initialize with processRepository and optional websocketService', () => { - expect(updateProcessMetricsUseCase.processRepository).toBe(mockProcessRepository); - expect(updateProcessMetricsUseCase.websocketService).toBe(mockWebsocketService); + it('initializes with repository and optional websocket', () => { + expect(useCase.processRepository).toBe(mockProcessRepository); + expect(useCase.websocketService).toBe(mockWebsocketService); }); - - it('should work without websocketService', () => { - const useCase = new UpdateProcessMetrics({ + it('works without websocket', () => { + const uc = new UpdateProcessMetrics({ processRepository: mockProcessRepository, }); - expect(useCase.websocketService).toBeUndefined(); + expect(uc.websocketService).toBeUndefined(); }); }); - describe('execute', () => { + describe('execute — atomic phase', () => { const processId = 'process-123'; const baseTime = new Date('2024-01-01T10:00:00Z'); - - const mockProcess = { + + // Post-atomic snapshot returned by applyProcessUpdate. Counters + // have already been incremented server-side. + const atomicSnapshot = { id: processId, - userId: 'user-456', - integrationId: 'integration-789', - name: 'test-sync', - type: 'CRM_SYNC', state: 'PROCESSING_BATCHES', context: { syncType: 'INITIAL', totalRecords: 1000, - processedRecords: 100, + processedRecords: 150, // prior 100 + this batch's 50 startTime: baseTime.toISOString(), }, results: { aggregateData: { - totalSynced: 95, - totalFailed: 5, - duration: 30000, // 30 seconds - recordsPerSecond: 3.33, + totalSynced: 143, + totalFailed: 7, errors: [ - { contactId: 'contact-1', error: 'Missing email', timestamp: '2024-01-01T10:00:30Z' } + { + contactId: 'contact-2', + error: 'Invalid phone', + timestamp: '2024-01-01T10:00:45Z', + }, ], }, }, createdAt: baseTime, - updatedAt: baseTime, }; beforeEach(() => { - // Mock current time to be 45 seconds after start jest.useFakeTimers(); - jest.setSystemTime(new Date(baseTime.getTime() + 45000)); + jest.setSystemTime(new Date(baseTime.getTime() + 45000)); // +45s }); - afterEach(() => { jest.useRealTimers(); }); - it('should update metrics with new batch data', async () => { - const metricsUpdate = { + it('routes processed/success/errors to applyProcessUpdate.increment', async () => { + mockProcessRepository.applyProcessUpdate.mockResolvedValue( + atomicSnapshot + ); + mockProcessRepository.update.mockResolvedValue(atomicSnapshot); + + await useCase.execute(processId, { processed: 50, success: 48, errors: 2, - errorDetails: [ - { contactId: 'contact-2', error: 'Invalid phone', timestamp: '2024-01-01T10:00:45Z' } - ], - }; + }); - const expectedContext = { - ...mockProcess.context, - processedRecords: 150, // 100 + 50 - }; + expect(mockProcessRepository.applyProcessUpdate).toHaveBeenCalledWith( + processId, + { + increment: { + 'context.processedRecords': 50, + 'results.aggregateData.totalSynced': 48, + 'results.aggregateData.totalFailed': 2, + }, + pushSlice: {}, + } + ); + }); - const expectedResults = { - aggregateData: { - totalSynced: 143, // 95 + 48 - totalFailed: 7, // 5 + 2 - duration: 45000, // Current elapsed time - recordsPerSecond: 3.33, // 150 / 45 - errors: [ - { contactId: 'contact-1', error: 'Missing email', timestamp: '2024-01-01T10:00:30Z' }, - { contactId: 'contact-2', error: 'Invalid phone', timestamp: '2024-01-01T10:00:45Z' } - ], + it('routes errorDetails to pushSlice with keepLast 100', async () => { + mockProcessRepository.applyProcessUpdate.mockResolvedValue( + atomicSnapshot + ); + mockProcessRepository.update.mockResolvedValue(atomicSnapshot); + + const errorDetails = [ + { + contactId: 'contact-2', + error: 'Invalid phone', + timestamp: '2024-01-01T10:00:45Z', }, - }; - - const updatedProcess = { - ...mockProcess, - context: expectedContext, - results: expectedResults, - }; + ]; + await useCase.execute(processId, { + processed: 5, + errors: 5, + errorDetails, + }); - mockProcessRepository.findById.mockResolvedValue(mockProcess); - mockProcessRepository.update.mockResolvedValue(updatedProcess); + const [, ops] = + mockProcessRepository.applyProcessUpdate.mock.calls[0]; + expect(ops.pushSlice).toEqual({ + 'results.aggregateData.errors': { + values: errorDetails, + keepLast: 100, + }, + }); + }); - const result = await updateProcessMetricsUseCase.execute(processId, metricsUpdate); + it('omits zero-value counters from the increment map', async () => { + mockProcessRepository.applyProcessUpdate.mockResolvedValue( + atomicSnapshot + ); + mockProcessRepository.update.mockResolvedValue(atomicSnapshot); - expect(mockProcessRepository.findById).toHaveBeenCalledWith(processId); - expect(mockProcessRepository.update).toHaveBeenCalledWith(processId, { - context: expectedContext, - results: expectedResults, + await useCase.execute(processId, { + processed: 50, + success: 50, + errors: 0, }); - expect(result).toEqual(updatedProcess); - }); - it('should calculate ETA when total records known', async () => { - const metricsUpdate = { processed: 100, success: 100, errors: 0 }; - - // With 850 remaining records and 3.33 records/sec, ETA should be ~255 seconds - const expectedETA = new Date(Date.now() + (850 / 3.33 * 1000)); - - const updatedProcess = { - ...mockProcess, - context: { - ...mockProcess.context, - processedRecords: 200, - estimatedCompletion: expectedETA.toISOString(), - }, - results: { - aggregateData: { - totalSynced: 195, - totalFailed: 5, - duration: 45000, - recordsPerSecond: 4.44, // 200 / 45 - }, - }, - }; + const [, ops] = + mockProcessRepository.applyProcessUpdate.mock.calls[0]; + // toHaveProperty interprets dots as nested paths; use `in` + // against the object keyed by our literal dot-paths. + expect('results.aggregateData.totalFailed' in ops.increment).toBe( + false + ); + expect('context.processedRecords' in ops.increment).toBe(true); + expect('results.aggregateData.totalSynced' in ops.increment).toBe( + true + ); + }); - mockProcessRepository.findById.mockResolvedValue(mockProcess); - mockProcessRepository.update.mockResolvedValue(updatedProcess); + it('short-circuits on an all-zero update without hitting applyProcessUpdate', async () => { + mockProcessRepository.findById.mockResolvedValue(atomicSnapshot); + mockProcessRepository.update.mockResolvedValue(atomicSnapshot); - const result = await updateProcessMetricsUseCase.execute(processId, metricsUpdate); + await useCase.execute(processId, { + processed: 0, + success: 0, + errors: 0, + }); - const updateCall = mockProcessRepository.update.mock.calls[0][1]; - expect(updateCall.context.estimatedCompletion).toBeDefined(); - expect(new Date(updateCall.context.estimatedCompletion)).toBeInstanceOf(Date); + expect( + mockProcessRepository.applyProcessUpdate + ).not.toHaveBeenCalled(); + expect(mockProcessRepository.findById).toHaveBeenCalledWith( + processId + ); }); + }); - it('should limit error details to last 100', async () => { - // Create a process with 98 existing errors - const existingErrors = Array.from({ length: 98 }, (_, i) => ({ - contactId: `contact-${i}`, - error: `Error ${i}`, - timestamp: new Date().toISOString(), - })); - - const processWithManyErrors = { - ...mockProcess, - results: { - aggregateData: { - totalSynced: 95, - totalFailed: 5, - duration: 30000, - recordsPerSecond: 3.33, - errors: existingErrors, - }, - }, - }; + describe('execute — derived-fields phase', () => { + const processId = 'process-123'; + const baseTime = new Date('2024-01-01T10:00:00Z'); + const atomicSnapshot = { + id: processId, + context: { + totalRecords: 1000, + processedRecords: 150, + startTime: baseTime.toISOString(), + }, + results: { aggregateData: { totalSynced: 143, totalFailed: 7 } }, + createdAt: baseTime, + }; - const newErrors = Array.from({ length: 5 }, (_, i) => ({ - contactId: `new-contact-${i}`, - error: `New error ${i}`, - timestamp: new Date().toISOString(), - })); + beforeEach(() => { + jest.useFakeTimers(); + jest.setSystemTime(new Date(baseTime.getTime() + 45000)); + }); + afterEach(() => { + jest.useRealTimers(); + }); - const metricsUpdate = { - processed: 5, - success: 0, - errors: 5, - errorDetails: newErrors, - }; + it('writes duration, recordsPerSecond, and estimatedCompletion via update()', async () => { + mockProcessRepository.applyProcessUpdate.mockResolvedValue( + atomicSnapshot + ); + mockProcessRepository.update.mockResolvedValue(atomicSnapshot); + + await useCase.execute(processId, { processed: 50, success: 50 }); + + const [id, derived] = + mockProcessRepository.update.mock.calls[0]; + expect(id).toBe(processId); + expect(derived.results.aggregateData.duration).toBe(45000); + expect(derived.results.aggregateData.recordsPerSecond).toBeCloseTo( + 150 / 45, + 2 + ); + expect(derived.context.estimatedCompletion).toEqual( + expect.any(String) + ); + }); - mockProcessRepository.findById.mockResolvedValue(processWithManyErrors); - mockProcessRepository.update.mockResolvedValue({}); + it('continues returning the atomic snapshot when the derived-fields write fails (non-fatal)', async () => { + const consoleErr = jest.spyOn(console, 'error').mockImplementation(); + mockProcessRepository.applyProcessUpdate.mockResolvedValue( + atomicSnapshot + ); + mockProcessRepository.update.mockRejectedValue(new Error('disk full')); - await updateProcessMetricsUseCase.execute(processId, metricsUpdate); + const result = await useCase.execute(processId, { + processed: 50, + success: 50, + }); - const updateCall = mockProcessRepository.update.mock.calls[0][1]; - const errorCount = updateCall.results.aggregateData.errors.length; - expect(errorCount).toBe(100); // Should be limited to 100 - expect(updateCall.results.aggregateData.errors[0]).toEqual(existingErrors[3]); // First 3 old errors dropped + expect(result).toBe(atomicSnapshot); + expect(consoleErr).toHaveBeenCalledWith( + expect.stringContaining('derived-fields write failed'), + expect.stringContaining('disk full') + ); + consoleErr.mockRestore(); }); - it('should handle process with no existing context', async () => { - const processWithNoContext = { - ...mockProcess, - context: null, - results: null, + it('skips derived fields entirely for a never-processed process (both counters zero)', async () => { + const fresh = { + ...atomicSnapshot, + context: { totalRecords: 0, processedRecords: 0 }, }; + mockProcessRepository.findById.mockResolvedValue(fresh); - const metricsUpdate = { processed: 10, success: 8, errors: 2 }; - const updatedProcess = { ...processWithNoContext }; - - mockProcessRepository.findById.mockResolvedValue(processWithNoContext); - mockProcessRepository.update.mockResolvedValue(updatedProcess); + await useCase.execute(processId, { processed: 0 }); - const result = await updateProcessMetricsUseCase.execute(processId, metricsUpdate); + expect(mockProcessRepository.update).not.toHaveBeenCalled(); + }); + }); - const updateCall = mockProcessRepository.update.mock.calls[0][1]; - expect(updateCall.context.processedRecords).toBe(10); - expect(updateCall.results.aggregateData.totalSynced).toBe(8); - expect(updateCall.results.aggregateData.totalFailed).toBe(2); + describe('execute — validation', () => { + it('throws when processId is empty', async () => { + await expect(useCase.execute('', {})).rejects.toThrow( + 'processId must be a non-empty string' + ); + }); + it('throws when metricsUpdate is null', async () => { + await expect( + useCase.execute('p1', null) + ).rejects.toThrow('metricsUpdate must be an object'); }); + it('throws when the atomic update yields no process', async () => { + mockProcessRepository.applyProcessUpdate.mockResolvedValue(null); + await expect( + useCase.execute('p1', { processed: 1 }) + ).rejects.toThrow('Process not found: p1'); + }); + it('wraps applyProcessUpdate errors in "Failed to update process metrics"', async () => { + mockProcessRepository.applyProcessUpdate.mockRejectedValue( + new Error('db connection lost') + ); + await expect( + useCase.execute('p1', { processed: 1 }) + ).rejects.toThrow( + 'Failed to update process metrics: db connection lost' + ); + }); + }); - it('should broadcast progress via WebSocket', async () => { - const metricsUpdate = { processed: 50, success: 48, errors: 2 }; - const updatedProcess = { ...mockProcess }; + describe('execute — websocket broadcast', () => { + const atomicSnapshot = { + id: 'p1', + name: 'sync', + type: 'CRM_SYNC', + state: 'PROCESSING_BATCHES', + context: { totalRecords: 10, processedRecords: 3 }, + results: { aggregateData: { totalSynced: 3, totalFailed: 0 } }, + createdAt: new Date(), + }; - mockProcessRepository.findById.mockResolvedValue(mockProcess); - mockProcessRepository.update.mockResolvedValue(updatedProcess); + it('broadcasts progress after a successful update', async () => { + mockProcessRepository.applyProcessUpdate.mockResolvedValue( + atomicSnapshot + ); + mockProcessRepository.update.mockResolvedValue(atomicSnapshot); - await updateProcessMetricsUseCase.execute(processId, metricsUpdate); + await useCase.execute('p1', { processed: 3, success: 3 }); expect(mockWebsocketService.broadcast).toHaveBeenCalledWith({ type: 'PROCESS_PROGRESS', - data: { - processId, - processName: mockProcess.name, - processType: mockProcess.type, - state: mockProcess.state, - processed: 150, // 100 + 50 - total: 1000, - successCount: 143, // 95 + 48 - errorCount: 7, // 5 + 2 - recordsPerSecond: expect.any(Number), - estimatedCompletion: expect.any(String), - timestamp: expect.any(String), - }, + data: expect.objectContaining({ + processId: 'p1', + processed: 3, + total: 10, + successCount: 3, + errorCount: 0, + }), }); }); - it('should handle WebSocket broadcast errors gracefully', async () => { - const websocketError = new Error('WebSocket connection failed'); - mockWebsocketService.broadcast.mockRejectedValue(websocketError); - - const metricsUpdate = { processed: 10, success: 10, errors: 0 }; - const updatedProcess = { ...mockProcess }; - - mockProcessRepository.findById.mockResolvedValue(mockProcess); - mockProcessRepository.update.mockResolvedValue(updatedProcess); + it('swallows websocket errors without failing the operation', async () => { + const consoleErr = jest.spyOn(console, 'error').mockImplementation(); + mockProcessRepository.applyProcessUpdate.mockResolvedValue( + atomicSnapshot + ); + mockProcessRepository.update.mockResolvedValue(atomicSnapshot); + mockWebsocketService.broadcast.mockRejectedValue( + new Error('ws closed') + ); - // Should not throw error even if WebSocket fails - const result = await updateProcessMetricsUseCase.execute(processId, metricsUpdate); - - expect(result).toEqual(updatedProcess); - expect(mockWebsocketService.broadcast).toHaveBeenCalled(); - }); + const result = await useCase.execute('p1', { processed: 3 }); - it('should throw error if processId is missing', async () => { - await expect(updateProcessMetricsUseCase.execute('', {})) - .rejects.toThrow('processId must be a non-empty string'); + expect(result).toBe(atomicSnapshot); + consoleErr.mockRestore(); }); + }); - it('should throw error if processId is not a string', async () => { - await expect(updateProcessMetricsUseCase.execute(123, {})) - .rejects.toThrow('processId must be a non-empty string'); - }); - - it('should throw error if metricsUpdate is missing', async () => { - await expect(updateProcessMetricsUseCase.execute(processId, null)) - .rejects.toThrow('metricsUpdate must be an object'); - }); - - it('should throw error if process not found', async () => { - mockProcessRepository.findById.mockResolvedValue(null); - - await expect(updateProcessMetricsUseCase.execute(processId, {})) - .rejects.toThrow('Process not found: process-123'); - }); - - it('should handle repository errors', async () => { - const repositoryError = new Error('Database connection failed'); - mockProcessRepository.findById.mockRejectedValue(repositoryError); - - await expect(updateProcessMetricsUseCase.execute(processId, {})) - .rejects.toThrow('Failed to update process metrics: Database connection failed'); + describe('race simulation', () => { + it('N concurrent invocations produce N calls to applyProcessUpdate (DB is responsible for serializing)', async () => { + const snap = { + id: 'p1', + context: { processedRecords: 0, startTime: new Date().toISOString() }, + results: { aggregateData: {} }, + createdAt: new Date(), + }; + mockProcessRepository.applyProcessUpdate.mockResolvedValue(snap); + mockProcessRepository.update.mockResolvedValue(snap); + + const calls = Array.from({ length: 25 }, () => + useCase.execute('p1', { processed: 1, success: 1 }) + ); + await Promise.all(calls); + + // Every invocation lands an atomic increment at the repo. Any + // clobbering is the DB's problem (and it handles it) — this + // layer just forwards. + expect( + mockProcessRepository.applyProcessUpdate + ).toHaveBeenCalledTimes(25); + expect( + mockProcessRepository.applyProcessUpdate.mock.calls.every( + ([, ops]) => ops.increment['context.processedRecords'] === 1 + ) + ).toBe(true); }); }); }); diff --git a/packages/core/integrations/use-cases/update-process-state.js b/packages/core/integrations/use-cases/update-process-state.js index 0fece258b..9e2d7d2b3 100644 --- a/packages/core/integrations/use-cases/update-process-state.js +++ b/packages/core/integrations/use-cases/update-process-state.js @@ -54,30 +54,69 @@ class UpdateProcessState { throw new Error('contextUpdates must be an object'); } - // Retrieve current process - const process = await this.processRepository.findById(processId); - if (!process) { - throw new Error(`Process not found: ${processId}`); - } + // Route through the atomic path when the repo supports it AND we + // have context keys to set. The atomic path writes the state + // column + the context field-sets in one DB round trip without + // read-modify-write, so a concurrent counter bump from + // UpdateProcessMetrics can't clobber our flags (e.g. `fetchDone`) + // or vice versa. + // + // Each context update key becomes a `set` at path + // `context.` — matching the prior semantics of a shallow + // top-level merge (sub-objects were and still are replaced + // whole, not deep-merged). + const hasContextKeys = + contextUpdates && Object.keys(contextUpdates).length > 0; - // Prepare updates - const updates = { - state: newState, - }; - - // Merge context updates if provided - if (contextUpdates && Object.keys(contextUpdates).length > 0) { - updates.context = { - ...process.context, - ...contextUpdates, - }; + if ( + hasContextKeys && + typeof this.processRepository.applyProcessUpdate === 'function' + ) { + const set = {}; + for (const [key, value] of Object.entries(contextUpdates)) { + set[`context.${key}`] = value; + } + try { + const updated = await this.processRepository.applyProcessUpdate( + processId, + { set, newState } + ); + if (!updated) { + throw new Error(`Process not found: ${processId}`); + } + return updated; + } catch (error) { + throw new Error( + `Failed to update process state: ${error.message}` + ); + } } - // Persist updates + // Legacy path (no contextUpdates or repo lacks applyProcessUpdate): + // preserve the original read-merge-write semantics for backward + // compatibility with any custom repos. Wrap the full read+write + // in try/catch so a findById error surfaces under the same + // "Failed to update process state" message as a write failure. try { - const updatedProcess = await this.processRepository.update(processId, updates); - return updatedProcess; + const process = await this.processRepository.findById(processId); + if (!process) { + throw new Error(`Process not found: ${processId}`); + } + + const updates = { state: newState }; + if (hasContextKeys) { + updates.context = { + ...process.context, + ...contextUpdates, + }; + } + + return await this.processRepository.update(processId, updates); } catch (error) { + // Re-throw "Process not found" as-is; wrap other errors. + if (error.message && error.message.startsWith('Process not found')) { + throw error; + } throw new Error(`Failed to update process state: ${error.message}`); } } diff --git a/packages/core/modules/module.js b/packages/core/modules/module.js index 5677b48b2..d476102a6 100644 --- a/packages/core/modules/module.js +++ b/packages/core/modules/module.js @@ -20,8 +20,9 @@ class Module extends Delegate { * @param {Object} params.definition The definition of the Api Module * @param {string} params.userId The user id * @param {Object} params.entity The entity record from the database + * @param {string} [params.state] Optional OAuth state value forwarded to the API client (round-trips through the OAuth provider). */ - constructor({ definition, userId = null, entity: entityObj = null }) { + constructor({ definition, userId = null, entity: entityObj = null, state = null }) { super({ definition, userId, entity: entityObj }); this.validateDefinition(definition); @@ -46,6 +47,7 @@ class Module extends Delegate { const apiParams = { ...this.definition.env, delegate: this, + ...(state ? { state } : {}), ...(this.credential?.data ? this.apiParamsFromCredential(this.credential.data) : {}), // Handle case when credential is undefined diff --git a/packages/core/modules/requester/requester.js b/packages/core/modules/requester/requester.js index 214eb10b0..8ab86ebf0 100644 --- a/packages/core/modules/requester/requester.js +++ b/packages/core/modules/requester/requester.js @@ -3,6 +3,8 @@ const { Delegate } = require('../../core'); const { FetchError } = require('../../errors'); const { get } = require('../../assertions'); +const DEFAULT_REQUEST_TIMEOUT_MS = 60_000; + class Requester extends Delegate { constructor(params) { super(params); @@ -13,6 +15,30 @@ class Requester extends Delegate { this.delegateTypes.push(this.DLGT_INVALID_AUTH); this.agent = get(params, 'agent', null); + // Per-attempt HTTP timeout. Without this the framework called fetch() + // with no AbortController and no timeout — a silently-hung TCP + // connection (server accepts but never responds) blocked the calling + // promise forever, cascading into stalled batches, stalled syncs, + // and worker-lambda timeouts. + // + // Configuration precedence: + // 1. Instance param: new Requester({ requestTimeoutMs: 30_000 }) + // 2. Class static: static requestTimeoutMs = 30_000 + // 3. Default: DEFAULT_REQUEST_TIMEOUT_MS (60s) + // + // Pass 0 (or null) to disable the timeout entirely — reserved for + // test doubles and documented long-running endpoints. + // Intentionally NOT using `get(params, ...)` here — the Frigg + // `get` helper throws RequiredPropertyError if the key is missing + // and no default is provided, which would collide with the fall- + // through to the class-level static override. + const instanceTimeout = params?.requestTimeoutMs; + this.requestTimeoutMs = + instanceTimeout !== undefined && instanceTimeout !== null + ? instanceTimeout + : this.constructor.requestTimeoutMs ?? + DEFAULT_REQUEST_TIMEOUT_MS; + // Allow passing in the fetch function // Instance methods can use this.fetch without differentiating this.fetch = get(params, 'fetch', fetch); @@ -48,52 +74,134 @@ class Requester extends Delegate { if (this.agent) options.agent = this.agent; - let response; + // Per-attempt timeout — fresh AbortController per call so the retry + // recursion (with its own backoff sleeps) always gets a clean + // signal. Timer is cleared in the finally block regardless of + // outcome. + const timeoutMs = this.requestTimeoutMs; + const controller = timeoutMs > 0 ? new AbortController() : null; + const timeoutHandle = controller + ? setTimeout(() => controller.abort(), timeoutMs) + : null; + const fetchOptions = controller + ? { ...options, signal: controller.signal } + : options; + + // Timer must stay active through body consumption. node-fetch v2 + // resolves the fetch() promise when headers arrive, not when the + // body is fully read — so a server that sends headers and then + // stalls the body would still hang parsedBody() or + // FetchError.create()'s response.text() call. We clear the timer + // only after the body is fully consumed (success path) or + // deliberately before each recursive retry so the new attempt + // starts with its own fresh timer. + let timerCleared = false; + const clearRequestTimer = () => { + if (!timerCleared && timeoutHandle) { + clearTimeout(timeoutHandle); + timerCleared = true; + } + }; + try { - response = await this.fetch(encodedUrl, options); - } catch (e) { - if (e.code === 'ECONNRESET' && i < this.backOff.length) { + let response; + try { + response = await this.fetch(encodedUrl, fetchOptions); + } catch (e) { + // AbortController fires AbortError (name) / ETIMEDOUT-shaped + // errors (type on node-fetch) when we hit the timeout. No + // retry on timeout: a slow endpoint is a downstream problem, + // and each retry would wait another `timeoutMs` before giving + // up — amplifying the hang into a per-record multi-minute + // stall at batch scale. + const isTimeout = + e?.name === 'AbortError' || e?.type === 'aborted'; + if (e?.code === 'ECONNRESET' && i < this.backOff.length) { + clearRequestTimer(); + const delay = this.backOff[i] * 1000; + await new Promise((resolve) => + setTimeout(resolve, delay) + ); + return this._request(url, options, i + 1); + } + const fetchError = await FetchError.create({ + resource: encodedUrl, + init: options, + responseBody: isTimeout + ? `Request timed out after ${timeoutMs}ms` + : e, + }); + if (isTimeout) { + // Flag + machine-readable fields so callers can + // distinguish a timeout from a generic network error + // without parsing the message (which FetchError + // sanitizes outside of STAGE=dev). + fetchError.isTimeout = true; + fetchError.timeoutMs = timeoutMs; + } + throw fetchError; + } + + const { status } = response; + + // If the status is retriable and there are back off requests left, retry the request + if ((status === 429 || status >= 500) && i < this.backOff.length) { + clearRequestTimer(); const delay = this.backOff[i] * 1000; await new Promise((resolve) => setTimeout(resolve, delay)); return this._request(url, options, i + 1); - } - throw await FetchError.create({ - resource: encodedUrl, - init: options, - responseBody: e, - }); - } - const { status } = response; - - // If the status is retriable and there are back off requests left, retry the request - if ((status === 429 || status >= 500) && i < this.backOff.length) { - const delay = this.backOff[i] * 1000; - await new Promise((resolve) => setTimeout(resolve, delay)); - return this._request(url, options, i + 1); - } else if (status === 401) { - if (!this.isRefreshable || this.refreshCount > 0) { - await this.notify(this.DLGT_INVALID_AUTH); - } else { - this.refreshCount++; - const refreshSucceeded = await this.refreshAuth(); - if (refreshSucceeded) { - return this._request(url, options, i + 1); + } else if (status === 401) { + if (!this.isRefreshable || this.refreshCount > 0) { + await this.notify(this.DLGT_INVALID_AUTH); + } else { + this.refreshCount++; + const refreshSucceeded = await this.refreshAuth(); + if (refreshSucceeded) { + clearRequestTimer(); + return this._request(url, options, i + 1); + } } } - } - // If the error wasn't retried, throw. - if (status >= 400) { - throw await FetchError.create({ - resource: encodedUrl, - init: options, - response, - }); + // If the error wasn't retried, throw. FetchError.create reads + // the response body (response.text()) — timer must still be + // alive to catch a stalled body stream. + if (status >= 400) { + const fetchError = await FetchError.create({ + resource: encodedUrl, + init: options, + response, + }); + throw this._maybeFlagTimeoutDuringBodyRead( + fetchError, + timeoutMs + ); + } + + // parsedBody consumes the response body stream. If the server + // stalls mid-stream the timer (still armed) aborts it. + return options.returnFullRes + ? response + : await this.parsedBody(response); + } catch (e) { + // If the abort fired during body consumption, node-fetch emits + // the error as an AbortError on the body stream. Surface the + // same isTimeout flag callers use for header-phase timeouts. + throw this._maybeFlagTimeoutDuringBodyRead(e, timeoutMs); + } finally { + clearRequestTimer(); } + } - return options.returnFullRes - ? response - : await this.parsedBody(response); + _maybeFlagTimeoutDuringBodyRead(err, timeoutMs) { + if (!err || typeof err !== 'object') return err; + if (err.isTimeout) return err; + const isAbort = + err.name === 'AbortError' || err.type === 'aborted'; + if (!isAbort) return err; + err.isTimeout = true; + err.timeoutMs = timeoutMs; + return err; } async _get(options) { diff --git a/packages/core/modules/requester/requester.test.js b/packages/core/modules/requester/requester.test.js index 7d5bdd08a..5705fe375 100644 --- a/packages/core/modules/requester/requester.test.js +++ b/packages/core/modules/requester/requester.test.js @@ -1,28 +1,322 @@ const { Requester } = require('./requester'); -describe('429 and 5xx testing', () => { - let backOffArray = [1, 1, 1]; - let requester = new Requester({ backOff: backOffArray }); - let sum = backOffArray.reduce((a, b) => { - return a + b; - }, 0); - it.skip("should retry with 'exponential' back off due to 429", async () => { - let startTime = await Date.now(); - let res = await requester._get({ - url: 'https://70e18ff0-1967-4fb5-8f96-10477ab6bb9e.mock.pstmn.io//429', - }); - let endTime = await Date.now(); - let difference = endTime - startTime; - expect(difference).toBeGreaterThan(sum * 1000); +/** + * Requester is abstract: subclasses provide `addAuthHeaders`. For the + * timeout / retry tests we don't care about auth headers, so this + * subclass just passes them through. + */ +class TestRequester extends Requester { + async addAuthHeaders(headers) { + return headers; + } +} + +/** + * Fetch double that honors the AbortSignal: rejects with an AbortError when + * the signal fires, otherwise never resolves (simulating a hung upstream). + * Lets jest's fake-timer machinery drive the abort. + */ +function hangingFetch() { + return jest.fn((_url, options) => { + return new Promise((_resolve, reject) => { + if (!options?.signal) return; // disabled-timeout path + options.signal.addEventListener('abort', () => { + const err = new Error('The user aborted a request.'); + err.name = 'AbortError'; + reject(err); + }); + }); + }); +} + +function okFetch(body) { + const resolvedBody = body === undefined ? { ok: true } : body; + return jest.fn().mockResolvedValue({ + status: 200, + headers: { get: () => 'application/json' }, + json: async () => resolvedBody, + }); +} + +describe('Requester', () => { + describe('429 and 5xx testing', () => { + let backOffArray = [1, 1, 1]; + let requester = new TestRequester({ backOff: backOffArray }); + let sum = backOffArray.reduce((a, b) => { + return a + b; + }, 0); + it.skip("should retry with 'exponential' back off due to 429", async () => { + let startTime = await Date.now(); + let res = await requester._get({ + url: 'https://70e18ff0-1967-4fb5-8f96-10477ab6bb9e.mock.pstmn.io//429', + }); + let endTime = await Date.now(); + let difference = endTime - startTime; + expect(difference).toBeGreaterThan(sum * 1000); + }); + + it.skip("should retry with 'exponential' back off due to 500", async () => { + let startTime = await Date.now(); + let res = await requester._get({ + url: 'https://70e18ff0-1967-4fb5-8f96-10477ab6bb9e.mock.pstmn.io//5xx', + }); + let endTime = await Date.now(); + let difference = endTime - startTime; + expect(difference).toBeGreaterThan(sum * 1000); + }); + }); + + describe('requestTimeoutMs configuration', () => { + it('defaults to 60000ms', () => { + const requester = new TestRequester({}); + expect(requester.requestTimeoutMs).toBe(60_000); + }); + + it('honors an instance-level override', () => { + const requester = new TestRequester({ requestTimeoutMs: 30_000 }); + expect(requester.requestTimeoutMs).toBe(30_000); + }); + + it('honors a class-level static override', () => { + class TighterRequester extends Requester { + static requestTimeoutMs = 15_000; + } + const requester = new TighterRequester({}); + expect(requester.requestTimeoutMs).toBe(15_000); + }); + + it('prefers instance param over class static', () => { + class TighterRequester extends Requester { + static requestTimeoutMs = 15_000; + } + const requester = new TighterRequester({ requestTimeoutMs: 5_000 }); + expect(requester.requestTimeoutMs).toBe(5_000); + }); + + it('accepts 0 to disable the timeout', () => { + const requester = new TestRequester({ requestTimeoutMs: 0 }); + expect(requester.requestTimeoutMs).toBe(0); + }); + + it('accepts null to fall back to the class/default', () => { + class TighterRequester extends Requester { + static requestTimeoutMs = 15_000; + } + const requester = new TighterRequester({ requestTimeoutMs: null }); + expect(requester.requestTimeoutMs).toBe(15_000); + }); + }); + + describe('AbortController timeout behavior', () => { + it('aborts and throws a FetchError when fetch never resolves', async () => { + jest.useFakeTimers(); + try { + const fetchMock = hangingFetch(); + const requester = new TestRequester({ + requestTimeoutMs: 100, + fetch: fetchMock, + }); + const p = requester._get({ url: 'https://example.com/slow' }); + p.catch(() => {}); // prevent unhandled-rejection noise + await jest.advanceTimersByTimeAsync(101); + await expect(p).rejects.toMatchObject({ + isTimeout: true, + timeoutMs: 100, + }); + expect(fetchMock).toHaveBeenCalledTimes(1); + } finally { + jest.useRealTimers(); + } + }); + + it('does not retry on timeout (single attempt only)', async () => { + jest.useFakeTimers(); + try { + const fetchMock = hangingFetch(); + const requester = new TestRequester({ + requestTimeoutMs: 50, + backOff: [1, 1, 1], + fetch: fetchMock, + }); + const p = requester._get({ url: 'https://example.com/hang' }); + p.catch(() => {}); + await jest.advanceTimersByTimeAsync(60); + await expect(p).rejects.toMatchObject({ isTimeout: true }); + expect(fetchMock).toHaveBeenCalledTimes(1); + } finally { + jest.useRealTimers(); + } + }); + + it('fires the timeout when fetch hangs past requestTimeoutMs', async () => { + jest.useFakeTimers(); + try { + const requester = new TestRequester({ + requestTimeoutMs: 200, + fetch: hangingFetch(), + }); + const p = requester._get({ + url: 'https://example.com/slow', + }); + p.catch(() => {}); + // Advance just short of the timeout — still pending. + await jest.advanceTimersByTimeAsync(150); + // ...then past it — should now reject. + await jest.advanceTimersByTimeAsync(100); + await expect(p).rejects.toMatchObject({ + isTimeout: true, + timeoutMs: 200, + }); + } finally { + jest.useRealTimers(); + } + }); + + it('does not wrap fetch in a timeout when requestTimeoutMs is 0', async () => { + // When disabled, no AbortController is wired in — the fetch + // options should not carry a `signal`. + const fetchMock = jest.fn(async (_url, options) => { + expect(options.signal).toBeUndefined(); + return { + status: 200, + headers: { get: () => 'application/json' }, + json: async () => ({ ok: true }), + }; + }); + const requester = new TestRequester({ + requestTimeoutMs: 0, + fetch: fetchMock, + }); + const result = await requester._get({ + url: 'https://example.com/ok', + }); + expect(result).toEqual({ ok: true }); + expect(fetchMock).toHaveBeenCalledTimes(1); + }); + + it('passes a signal to fetch when timeout is enabled', async () => { + const fetchMock = jest.fn(async (_url, options) => { + expect(options.signal).toBeDefined(); + expect(typeof options.signal.addEventListener).toBe('function'); + return { + status: 200, + headers: { get: () => 'application/json' }, + json: async () => ({ ok: true }), + }; + }); + const requester = new TestRequester({ + requestTimeoutMs: 30_000, + fetch: fetchMock, + }); + await requester._get({ url: 'https://example.com/ok' }); + expect(fetchMock).toHaveBeenCalledTimes(1); + }); + + it('does not time out a fast, successful response', async () => { + const fetchMock = okFetch({ data: 'fresh' }); + const requester = new TestRequester({ + requestTimeoutMs: 5_000, + fetch: fetchMock, + }); + const result = await requester._get({ + url: 'https://example.com/fast', + }); + expect(result).toEqual({ data: 'fresh' }); + }); + + it('aborts when the server sends headers but stalls the body', async () => { + // node-fetch v2 resolves fetch() on headers received; body + // reads happen later via parsedBody / response.text(). The + // timer must stay active until the body is fully consumed. + jest.useFakeTimers(); + try { + let capturedSignal; + const fetchMock = jest.fn(async (_url, options) => { + capturedSignal = options.signal; + return { + status: 200, + headers: { get: () => 'application/json' }, + json: () => + new Promise((_resolve, reject) => { + capturedSignal.addEventListener( + 'abort', + () => { + const err = new Error( + 'aborted mid-body' + ); + err.name = 'AbortError'; + reject(err); + } + ); + }), + }; + }); + const requester = new TestRequester({ + requestTimeoutMs: 100, + fetch: fetchMock, + }); + const p = requester._get({ + url: 'https://example.com/stalled-body', + }); + p.catch(() => {}); + await jest.advanceTimersByTimeAsync(150); + await expect(p).rejects.toMatchObject({ + isTimeout: true, + timeoutMs: 100, + }); + } finally { + jest.useRealTimers(); + } + }); + + it('clears the timer once the fetch resolves so long-running processes do not leak timers', async () => { + const fetchMock = okFetch(); + const requester = new TestRequester({ + requestTimeoutMs: 5_000, + fetch: fetchMock, + }); + // If we did not clear the timer, the Node event loop would keep + // a reference and this assertion is a smoke test for that: the + // call should resolve in real time, not after 5s. + const start = Date.now(); + await requester._get({ url: 'https://example.com/ok' }); + expect(Date.now() - start).toBeLessThan(500); + }); }); - it.skip("should retry with 'exponential' back off due to 500", async () => { - let startTime = await Date.now(); - let res = await requester._get({ - url: 'https://70e18ff0-1967-4fb5-8f96-10477ab6bb9e.mock.pstmn.io//5xx', + describe('ECONNRESET retry (regression guard)', () => { + it('still retries on ECONNRESET following the backOff schedule', async () => { + jest.useFakeTimers(); + try { + let attempts = 0; + const fetchMock = jest.fn(async () => { + attempts++; + if (attempts <= 2) { + const err = new Error('socket hang up'); + err.code = 'ECONNRESET'; + throw err; + } + return { + status: 200, + headers: { get: () => 'application/json' }, + json: async () => ({ ok: true }), + }; + }); + const requester = new TestRequester({ + fetch: fetchMock, + backOff: [0, 0, 0], + requestTimeoutMs: 10_000, + }); + + const p = requester._get({ url: 'https://example.com/flaky' }); + // Backoffs are 0s, so just flush queued timeouts/microtasks. + await jest.runAllTimersAsync(); + const result = await p; + expect(result).toEqual({ ok: true }); + expect(fetchMock).toHaveBeenCalledTimes(3); + } finally { + jest.useRealTimers(); + } }); - let endTime = await Date.now(); - let difference = endTime - startTime; - expect(difference).toBeGreaterThan(sum * 1000); }); }); diff --git a/packages/core/modules/use-cases/get-module-instance-from-type.js b/packages/core/modules/use-cases/get-module-instance-from-type.js index 8f53deb42..d34e4a76d 100644 --- a/packages/core/modules/use-cases/get-module-instance-from-type.js +++ b/packages/core/modules/use-cases/get-module-instance-from-type.js @@ -13,8 +13,10 @@ class GetModuleInstanceFromType { * Retrieve a Module instance for a given user and entity/module type. * @param {string} userId * @param {string} type – human-readable module/entity type (e.g. "Hubspot") + * @param {Object} [options] + * @param {string} [options.state] – optional OAuth state value to be forwarded to the API client (round-trips through the OAuth provider). */ - async execute(userId, type) { + async execute(userId, type, options = {}) { const moduleDefinition = this.moduleDefinitions.find( (def) => def.getName() === type ); @@ -24,6 +26,7 @@ class GetModuleInstanceFromType { return new Module({ userId, definition: moduleDefinition, + state: options.state, }); } } diff --git a/packages/core/prisma-mongodb/schema.prisma b/packages/core/prisma-mongodb/schema.prisma index 02dd9bd98..373ff5aa7 100644 --- a/packages/core/prisma-mongodb/schema.prisma +++ b/packages/core/prisma-mongodb/schema.prisma @@ -6,7 +6,13 @@ generator client { provider = "prisma-client-js" output = "../generated/prisma-mongodb" binaryTargets = ["native", "rhel-openssl-3.0.x"] // native for local dev, rhel for Lambda deployment - engineType = "binary" // Use binary engines (smaller size) + // Library engine (default since Prisma 3.x): Rust query engine loads as a + // Node-API addon inside the same process. The binary engine forks a child + // query-engine subprocess and communicates over a local HTTP/IPC pipe with + // NO client-side timeout — a zombied child wedges the Node process until + // Lambda's 900s cap. Switching to library eliminates that entire class of + // silent hangs. See friggframework/frigg#580 for the investigation. + engineType = "library" } datasource db { diff --git a/packages/core/prisma-postgresql/migrations/20260422120000_add_entity_data_column/migration.sql b/packages/core/prisma-postgresql/migrations/20260422120000_add_entity_data_column/migration.sql new file mode 100644 index 000000000..ea2ce7ab7 --- /dev/null +++ b/packages/core/prisma-postgresql/migrations/20260422120000_add_entity_data_column/migration.sql @@ -0,0 +1,10 @@ +-- AlterTable +-- Backfill the Entity.data JSONB field declared in schema.prisma but never +-- migrated. ModuleRepositoryPostgres.createEntity / findEntity use this +-- column to persist & rehydrate identifiers/details fields that fall +-- outside the six named columns (id, userId, credentialId, name, +-- moduleName, externalId). Without the column, any integration whose +-- getEntityDetails returns an extra field (e.g. `firm_subdomain`) causes +-- prisma.entity.create to throw P2022 at runtime. +ALTER TABLE "Entity" + ADD COLUMN IF NOT EXISTS "data" JSONB NOT NULL DEFAULT '{}'; diff --git a/packages/core/prisma-postgresql/migrations/20260422120001_create_process_table/migration.sql b/packages/core/prisma-postgresql/migrations/20260422120001_create_process_table/migration.sql new file mode 100644 index 000000000..bd0011bf0 --- /dev/null +++ b/packages/core/prisma-postgresql/migrations/20260422120001_create_process_table/migration.sql @@ -0,0 +1,48 @@ +-- CreateTable +-- The Process model is declared in schema.prisma but was never created by +-- any prior migration. ProcessRepositoryPostgres (read/write) and +-- FriggProcessManager rely on this table for long-running job tracking — +-- e.g. fan-out sync progress, batch state machines. Without the table, +-- any `prisma.process.create` at runtime throws P2021. +CREATE TABLE "Process" ( + "id" SERIAL NOT NULL, + "userId" INTEGER NOT NULL, + "integrationId" INTEGER NOT NULL, + "name" TEXT NOT NULL, + "type" TEXT NOT NULL, + "state" TEXT NOT NULL, + "context" JSONB NOT NULL DEFAULT '{}', + "results" JSONB NOT NULL DEFAULT '{}', + "parentProcessId" INTEGER, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "Process_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "Process_userId_idx" ON "Process"("userId"); + +-- CreateIndex +CREATE INDEX "Process_integrationId_idx" ON "Process"("integrationId"); + +-- CreateIndex +CREATE INDEX "Process_type_idx" ON "Process"("type"); + +-- CreateIndex +CREATE INDEX "Process_state_idx" ON "Process"("state"); + +-- CreateIndex +CREATE INDEX "Process_name_idx" ON "Process"("name"); + +-- CreateIndex +CREATE INDEX "Process_parentProcessId_idx" ON "Process"("parentProcessId"); + +-- AddForeignKey +ALTER TABLE "Process" ADD CONSTRAINT "Process_userId_fkey" FOREIGN KEY ("userId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "Process" ADD CONSTRAINT "Process_integrationId_fkey" FOREIGN KEY ("integrationId") REFERENCES "Integration"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "Process" ADD CONSTRAINT "Process_parentProcessId_fkey" FOREIGN KEY ("parentProcessId") REFERENCES "Process"("id") ON DELETE SET NULL ON UPDATE CASCADE; diff --git a/packages/core/prisma-postgresql/schema.prisma b/packages/core/prisma-postgresql/schema.prisma index c8d781e98..3cd764eeb 100644 --- a/packages/core/prisma-postgresql/schema.prisma +++ b/packages/core/prisma-postgresql/schema.prisma @@ -6,7 +6,13 @@ generator client { provider = "prisma-client-js" output = "../generated/prisma-postgresql" binaryTargets = ["native", "rhel-openssl-3.0.x"] // native for local dev, rhel for Lambda deployment - engineType = "binary" // Use binary engines (smaller size) + // Library engine (default since Prisma 3.x): Rust query engine loads as a + // Node-API addon inside the same process. The binary engine forks a child + // query-engine subprocess and communicates over a local HTTP/IPC pipe with + // NO client-side timeout — a zombied child wedges the Node process until + // Lambda's 900s cap. Switching to library eliminates that entire class of + // silent hangs. See friggframework/frigg#580 for the investigation. + engineType = "library" } datasource db { diff --git a/packages/devtools/infrastructure/domains/database/aurora-builder.js b/packages/devtools/infrastructure/domains/database/aurora-builder.js index 2dd19cf89..ed6057f40 100644 --- a/packages/devtools/infrastructure/domains/database/aurora-builder.js +++ b/packages/devtools/infrastructure/domains/database/aurora-builder.js @@ -21,6 +21,36 @@ const AuroraResourceResolver = require('./aurora-resolver'); const { createEmptyDiscoveryResult } = require('../shared/types/discovery-result'); const { ResourceOwnership } = require('../shared/types/resource-ownership'); +// Pool + timeout query params appended to DATABASE_URL for Lambda-to-Aurora +// connections. Chosen to make worker Lambdas fail loud and fast on any DB +// contention rather than silently hanging for Lambda's 900s timeout. +// +// connection_limit=2 — two pg connections per Lambda container. One is too +// tight: several core use cases (get-process.executeMany, +// field-encryption-service batches) issue in-handler +// Promise.all against Prisma, and would serialize +// behind a single slot. Two removes that cliff while +// still being safe against max_connections (at 4 ACU +// Aurora pg 15 allows ~400 connections; 200 concurrent +// Lambdas × 2 = 400, leaves cluster room for maint). +// pool_timeout=20 — wait up to 20s for a pool slot, then throw P2024. +// Still fail-fast relative to 900s Lambda cap; gives +// in-handler fan-outs headroom. +// connect_timeout=10 — bound TCP/TLS handshake. +// socket_timeout=60 — kill dead client sockets (server never responds). +// options=-c statement_timeout=30000 -c lock_timeout=10000 +// — Postgres-side hard caps. A query stuck >30s aborts +// with SQLSTATE 57014; a lock wait >10s aborts with +// SQLSTATE 55P03. URL encoding per libpq URI rules +// (space→%20, `=`→%3D inside the options value). +const LAMBDA_DATABASE_URL_QUERY_PARAMS = [ + 'connection_limit=2', + 'pool_timeout=20', + 'connect_timeout=10', + 'socket_timeout=60', + 'options=-c%20statement_timeout%3D30000%20-c%20lock_timeout%3D10000', +].join('&'); + class AuroraBuilder extends InfrastructureBuilder { constructor() { super(); @@ -415,9 +445,16 @@ class AuroraBuilder extends InfrastructureBuilder { ], // Note: PubliclyAccessible is NOT supported on Aurora clusters // It should only be set on DB instances (see FriggAuroraInstance below) + // MaxCapacity default bumped 1 → 4 ACU: at 0.5–1 ACU Aurora is + // CPU-starved under 20-way concurrent writes from a Lambda + // fan-out sync, which starves worker queries and compounds + // the tail-latency problem. 4 ACU is still cheap (scales to + // min when idle) and gives the DB enough headroom to + // absorb bursty sync traffic. Apps can still override both + // via app definition dbConfig. ServerlessV2ScalingConfiguration: { MinCapacity: dbConfig.minCapacity || 0.5, - MaxCapacity: dbConfig.maxCapacity || 1, + MaxCapacity: dbConfig.maxCapacity || 4, }, EnableHttpEndpoint: false, BackupRetentionPeriod: 7, @@ -494,6 +531,10 @@ class AuroraBuilder extends InfrastructureBuilder { result.environment.DATABASE_PORT = String(dbConfig.port || 5432); result.environment.DATABASE_NAME = dbConfig.database || 'frigg'; result.environment.DATABASE_USER = dbConfig.username || 'postgres'; + // Consumers that build DATABASE_URL from components at runtime MUST + // append `?${DATABASE_URL_PARAMS}` to get the same hang-prevention + // timeouts as the managed path. + result.environment.DATABASE_URL_PARAMS = LAMBDA_DATABASE_URL_QUERY_PARAMS; console.log(` ✅ Using existing cluster: ${dbConfig.endpoint}`); } @@ -724,13 +765,18 @@ exports.handler = async (event, context) => { result.environment.DATABASE_HOST = discoveredResources.auroraClusterEndpoint; result.environment.DATABASE_PORT = String(discoveredResources.auroraPort || 5432); result.environment.DATABASE_NAME = dbName; + // Consumers that build DATABASE_URL from components at runtime MUST + // append `?${DATABASE_URL_PARAMS}` to get the same hang-prevention + // timeouts as the managed path. + result.environment.DATABASE_URL_PARAMS = LAMBDA_DATABASE_URL_QUERY_PARAMS; // Note: DATABASE_URL is NOT set here to avoid Serverless variable resolution errors // The application (Frigg Core) should construct it at runtime from: - // DATABASE_HOST, DATABASE_PORT, DATABASE_NAME, DATABASE_USER, DATABASE_PASSWORD + // DATABASE_HOST, DATABASE_PORT, DATABASE_NAME, DATABASE_USER, DATABASE_PASSWORD, DATABASE_URL_PARAMS console.log(' ℹ️ No Secrets Manager secret found - set DATABASE_USER and DATABASE_PASSWORD in Lambda environment'); console.log(' ℹ️ Application will construct DATABASE_URL at runtime from DATABASE_HOST, DATABASE_PORT, DATABASE_NAME, DATABASE_USER, DATABASE_PASSWORD'); + console.log(' ℹ️ Append `?${DATABASE_URL_PARAMS}` to the constructed URL for pool/timeout safety.'); console.log(' ℹ️ Or enable autoCreateCredentials=true to automatically create and rotate credentials'); } @@ -790,9 +836,11 @@ exports.handler = async (event, context) => { return `{{resolve:secretsmanager:${secretRefValue}:SecretString:password}}`; }; + // Query params are defined at module scope (LAMBDA_DATABASE_URL_QUERY_PARAMS) + // so runtime-URL-construction paths can emit the same timeouts as an env var. return { 'Fn::Sub': [ - `postgresql://\${Username}:\${Password}@\${Host}:\${Port}/\${Database}`, + `postgresql://\${Username}:\${Password}@\${Host}:\${Port}/\${Database}?${LAMBDA_DATABASE_URL_QUERY_PARAMS}`, { Username: resolveSecretRef(secretRef), Password: resolveSecretPassword(secretRef), diff --git a/packages/devtools/infrastructure/domains/database/aurora-builder.test.js b/packages/devtools/infrastructure/domains/database/aurora-builder.test.js index 813d5c6f5..e07724c8e 100644 --- a/packages/devtools/infrastructure/domains/database/aurora-builder.test.js +++ b/packages/devtools/infrastructure/domains/database/aurora-builder.test.js @@ -556,7 +556,17 @@ describe('AuroraBuilder', () => { // Should use Fn::Sub with nested Fn::Sub to resolve the Ref expect(dbUrl['Fn::Sub']).toBeDefined(); - expect(dbUrl['Fn::Sub'][0]).toBe('postgresql://${Username}:${Password}@${Host}:${Port}/${Database}'); + // Template includes pool + timeout query params to prevent + // silent 15-min Lambda hangs on DB contention. + expect(dbUrl['Fn::Sub'][0]).toMatch( + /^postgresql:\/\/\$\{Username\}:\$\{Password\}@\$\{Host\}:\$\{Port\}\/\$\{Database\}\?/ + ); + expect(dbUrl['Fn::Sub'][0]).toContain('connection_limit=2'); + expect(dbUrl['Fn::Sub'][0]).toContain('pool_timeout=20'); + expect(dbUrl['Fn::Sub'][0]).toContain('connect_timeout=10'); + expect(dbUrl['Fn::Sub'][0]).toContain('socket_timeout=60'); + expect(dbUrl['Fn::Sub'][0]).toContain('statement_timeout%3D30000'); + expect(dbUrl['Fn::Sub'][0]).toContain('lock_timeout%3D10000'); // The Username and Password should use Fn::Sub to resolve the secret Ref, not literal "[object Object]" expect(dbUrl['Fn::Sub'][1].Username['Fn::Sub']).toBeDefined(); diff --git a/packages/serverless-plugin/index.js b/packages/serverless-plugin/index.js index d4f4f48a9..b2be0a74b 100644 --- a/packages/serverless-plugin/index.js +++ b/packages/serverless-plugin/index.js @@ -66,12 +66,37 @@ class FriggServerlessPlugin { } extractQueueDefinitions() { + // Each custom.*Queue entry is the resolved QueueName. The matching + // CloudFormation resource (under resources.Resources) has the same + // logical ID and carries the Properties block we want to mirror onto + // LocalStack (VisibilityTimeout, MessageRetentionPeriod, + // RedrivePolicy, …). Deployed AWS applies those via CloudFormation; + // locally they'd be silently dropped and LocalStack would fall back + // to AWS defaults — notably a 30s VisibilityTimeout which + // re-delivers in-flight messages while a long-running queue worker + // is still processing them. + const resources = + this.serverless.service.resources && + this.serverless.service.resources.Resources + ? this.serverless.service.resources.Resources + : {}; + return Object.keys(this.serverless.service.custom) .filter((key) => key.endsWith('Queue')) - .map((key) => ({ - key, - name: this.serverless.service.custom[key], - })); + .map((key) => { + const resource = resources[key]; + const properties = + resource && + resource.Type === 'AWS::SQS::Queue' && + resource.Properties + ? resource.Properties + : undefined; + return { + key, + name: this.serverless.service.custom[key], + ...(properties ? { properties } : {}), + }; + }); } createLocalStackSQSClient() { diff --git a/packages/serverless-plugin/index.test.js b/packages/serverless-plugin/index.test.js index 729ae90cd..1491555fa 100644 --- a/packages/serverless-plugin/index.test.js +++ b/packages/serverless-plugin/index.test.js @@ -183,6 +183,75 @@ describe('FriggServerlessPlugin', () => { expect(queues).toEqual([]); }); + + it('should attach Properties from matching CloudFormation resources', () => { + plugin = new FriggServerlessPlugin(mockServerless, mockOptions); + mockServerless.service.custom = { + HubspotQueue: 'svc--dev-HubspotQueue', + }; + mockServerless.service.resources = { + Resources: { + HubspotQueue: { + Type: 'AWS::SQS::Queue', + Properties: { + QueueName: 'svc--dev-HubspotQueue', + VisibilityTimeout: 1800, + MessageRetentionPeriod: 345600, + RedrivePolicy: { + maxReceiveCount: 3, + deadLetterTargetArn: 'arn:aws:sqs:us-east-1:x:dlq', + }, + }, + }, + }, + }; + + const queues = plugin.extractQueueDefinitions(); + + expect(queues).toHaveLength(1); + expect(queues[0]).toEqual({ + key: 'HubspotQueue', + name: 'svc--dev-HubspotQueue', + properties: { + QueueName: 'svc--dev-HubspotQueue', + VisibilityTimeout: 1800, + MessageRetentionPeriod: 345600, + RedrivePolicy: { + maxReceiveCount: 3, + deadLetterTargetArn: 'arn:aws:sqs:us-east-1:x:dlq', + }, + }, + }); + }); + + it('should omit properties when resources.Resources is absent', () => { + plugin = new FriggServerlessPlugin(mockServerless, mockOptions); + mockServerless.service.custom = { LegacyQueue: 'legacy-queue' }; + mockServerless.service.resources = undefined; + + const queues = plugin.extractQueueDefinitions(); + + expect(queues).toEqual([{ key: 'LegacyQueue', name: 'legacy-queue' }]); + }); + + it('should ignore non-SQS CloudFormation resources with matching logical IDs', () => { + plugin = new FriggServerlessPlugin(mockServerless, mockOptions); + mockServerless.service.custom = { MisnamedQueue: 'misnamed-queue' }; + mockServerless.service.resources = { + Resources: { + MisnamedQueue: { + Type: 'AWS::SNS::Topic', + Properties: { TopicName: 'not-an-sqs-queue' }, + }, + }, + }; + + const queues = plugin.extractQueueDefinitions(); + + expect(queues).toEqual([ + { key: 'MisnamedQueue', name: 'misnamed-queue' }, + ]); + }); }); describe('Hooks', () => { diff --git a/packages/serverless-plugin/lib/localstack-queue-service.js b/packages/serverless-plugin/lib/localstack-queue-service.js index 1ffc9da7b..a6fb9fdf3 100644 --- a/packages/serverless-plugin/lib/localstack-queue-service.js +++ b/packages/serverless-plugin/lib/localstack-queue-service.js @@ -2,19 +2,120 @@ * Infrastructure Service - LocalStack Queue Management * * Handles SQS queue creation in LocalStack for offline development. + * + * On deployed AWS, CloudFormation applies queue properties + * (VisibilityTimeout, MessageRetentionPeriod, RedrivePolicy) from the + * `Resources` block in serverless.yml. LocalStack gets those same + * properties here so local emulation matches production behavior — + * otherwise the queue defaults to a 30s VisibilityTimeout which re- + * delivers in-flight messages while a long-running queue worker is + * still processing them. */ class LocalStackQueueService { constructor(sqsClient) { this.sqs = sqsClient; } + /** + * Whitelist of CloudFormation `AWS::SQS::Queue` Properties that map + * onto SQS `CreateQueue` Attributes. Everything else (tags, inline + * refs, etc.) is dropped on the way to LocalStack. + * @private + */ + static PROPERTY_ATTRIBUTE_KEYS = [ + 'DelaySeconds', + 'MaximumMessageSize', + 'MessageRetentionPeriod', + 'ReceiveMessageWaitTimeSeconds', + 'VisibilityTimeout', + 'RedrivePolicy', + 'RedriveAllowPolicy', + 'KmsMasterKeyId', + 'KmsDataKeyReusePeriodSeconds', + 'SqsManagedSseEnabled', + 'FifoQueue', + 'ContentBasedDeduplication', + 'DeduplicationScope', + 'FifoThroughputLimit', + ]; + + /** + * Serialize a CloudFormation `Properties` object into the + * `Attributes` shape the SQS `CreateQueue` API accepts (string + * values only; object values like `RedrivePolicy` get JSON-encoded). + * + * Attributes whose value still contains an unresolved CloudFormation + * intrinsic (`Fn::GetAtt`, `Ref`, `Fn::Sub`, …) are DROPPED rather + * than stringified. Example: integration-builder.js emits + * `RedrivePolicy.deadLetterTargetArn: {'Fn::GetAtt': [...]}`, which + * CloudFormation resolves to a real ARN in AWS but is still a raw + * intrinsic object at local plugin-time. Forwarding that JSON blob + * to SQS `CreateQueue` would fail (`deadLetterTargetArn` must be a + * valid ARN string) or produce malformed config. Dropping the + * attribute gives local parity on every other queue property + * (notably `VisibilityTimeout`, which is the main reason this code + * exists) while leaving the DLQ association intentionally un-wired + * locally — matching the pre-PR behavior for that one attribute. + * @private + */ + _propertiesToAttributes(properties = {}) { + const attributes = {}; + for (const key of LocalStackQueueService.PROPERTY_ATTRIBUTE_KEYS) { + const value = properties[key]; + if (value === undefined || value === null) continue; + if (LocalStackQueueService._containsUnresolvedIntrinsic(value)) { + console.warn( + `[frigg-plugin] Skipping queue attribute "${key}" because it contains an unresolved CloudFormation intrinsic. ` + + `Deployed AWS will apply it via CloudFormation; local emulation will fall back to the AWS default for this attribute.` + ); + continue; + } + attributes[key] = + typeof value === 'object' ? JSON.stringify(value) : String(value); + } + return attributes; + } + + /** + * Recursively checks whether a value still contains a CloudFormation + * intrinsic function key (`Fn::*` or `Ref`). Such values are unsafe + * to pass through to SQS `CreateQueue` — AWS's runtime API doesn't + * understand CloudFormation intrinsics; they're only valid inside + * serverless.yml / CloudFormation templates. + * @private + */ + static _containsUnresolvedIntrinsic(value) { + if (value === null || value === undefined) return false; + if (typeof value !== 'object') return false; + if (Array.isArray(value)) { + return value.some((v) => + LocalStackQueueService._containsUnresolvedIntrinsic(v) + ); + } + for (const key of Object.keys(value)) { + if (key === 'Ref' || key.startsWith('Fn::')) return true; + if ( + LocalStackQueueService._containsUnresolvedIntrinsic(value[key]) + ) { + return true; + } + } + return false; + } + /** * @param {string} queueName - Name of queue to create + * @param {Object} [attributes] - SQS CreateQueue Attributes (already + * stringified); missing/empty attributes fall back to AWS defaults. * @returns {Promise} Queue URL */ - async createQueue(queueName) { + async createQueue(queueName, attributes) { return new Promise((resolve, reject) => { - this.sqs.createQueue({ QueueName: queueName }, (err, data) => { + const params = { QueueName: queueName }; + if (attributes && Object.keys(attributes).length > 0) { + params.Attributes = attributes; + } + this.sqs.createQueue(params, (err, data) => { if (err) { reject(new Error(`Failed to create queue ${queueName}: ${err.message}`)); } else { @@ -25,13 +126,17 @@ class LocalStackQueueService { } /** - * @param {Array<{key: string, name: string}>} queues - Queue definitions + * @param {Array<{key: string, name: string, properties?: Object}>} queues + * Queue definitions. `properties` mirrors the CloudFormation + * `AWS::SQS::Queue` Properties block; extracted by the plugin from + * `serverless.service.resources.Resources`. * @returns {Promise>} Created queues with URLs */ async createQueues(queues) { const results = await Promise.all( queues.map(async (queue) => { - const url = await this.createQueue(queue.name); + const attributes = this._propertiesToAttributes(queue.properties); + const url = await this.createQueue(queue.name, attributes); console.log(`Queue ${queue.name} created successfully. URL: ${url}`); return { key: queue.key, url }; }) diff --git a/packages/serverless-plugin/lib/localstack-queue-service.test.js b/packages/serverless-plugin/lib/localstack-queue-service.test.js index 13544aded..641523cf2 100644 --- a/packages/serverless-plugin/lib/localstack-queue-service.test.js +++ b/packages/serverless-plugin/lib/localstack-queue-service.test.js @@ -27,6 +27,41 @@ describe('LocalStackQueueService', () => { ); }); + it('should pass Attributes when provided', async () => { + mockSQS.createQueue.mockImplementation((params, callback) => { + callback(null, { QueueUrl: 'url' }); + }); + + await service.createQueue('test-queue', { + VisibilityTimeout: '1800', + MessageRetentionPeriod: '345600', + }); + + expect(mockSQS.createQueue).toHaveBeenCalledWith( + { + QueueName: 'test-queue', + Attributes: { + VisibilityTimeout: '1800', + MessageRetentionPeriod: '345600', + }, + }, + expect.any(Function) + ); + }); + + it('should omit Attributes when empty', async () => { + mockSQS.createQueue.mockImplementation((params, callback) => { + callback(null, { QueueUrl: 'url' }); + }); + + await service.createQueue('test-queue', {}); + + expect(mockSQS.createQueue).toHaveBeenCalledWith( + { QueueName: 'test-queue' }, + expect.any(Function) + ); + }); + it('should reject with error on failure', async () => { const error = new Error('SQS Error'); mockSQS.createQueue.mockImplementation((params, callback) => { @@ -39,6 +74,105 @@ describe('LocalStackQueueService', () => { }); }); + describe('_propertiesToAttributes', () => { + it('maps CloudFormation Properties onto SQS Attributes and stringifies', () => { + const attrs = service._propertiesToAttributes({ + VisibilityTimeout: 1800, + MessageRetentionPeriod: 345600, + RedrivePolicy: { + maxReceiveCount: 3, + deadLetterTargetArn: 'arn:aws:sqs:us-east-1:x:dlq', + }, + QueueName: 'should-be-dropped', + Tags: [{ Key: 'ignored', Value: 'yes' }], + }); + + expect(attrs).toEqual({ + VisibilityTimeout: '1800', + MessageRetentionPeriod: '345600', + RedrivePolicy: JSON.stringify({ + maxReceiveCount: 3, + deadLetterTargetArn: 'arn:aws:sqs:us-east-1:x:dlq', + }), + }); + }); + + it('skips undefined and null values', () => { + const attrs = service._propertiesToAttributes({ + VisibilityTimeout: 60, + MessageRetentionPeriod: undefined, + KmsMasterKeyId: null, + }); + expect(attrs).toEqual({ VisibilityTimeout: '60' }); + }); + + it('returns an empty object when properties are missing', () => { + expect(service._propertiesToAttributes()).toEqual({}); + }); + + it('drops attributes containing unresolved CloudFormation intrinsics', () => { + const warnSpy = jest.spyOn(console, 'warn').mockImplementation(); + const attrs = service._propertiesToAttributes({ + VisibilityTimeout: 1800, + RedrivePolicy: { + maxReceiveCount: 3, + deadLetterTargetArn: { + 'Fn::GetAtt': ['InternalErrorQueue', 'Arn'], + }, + }, + }); + + // VisibilityTimeout survives; RedrivePolicy is dropped because + // deadLetterTargetArn is still an unresolved Fn::GetAtt intrinsic + // (real AWS resolves it via CloudFormation; LocalStack cannot). + expect(attrs).toEqual({ VisibilityTimeout: '1800' }); + expect(attrs).not.toHaveProperty('RedrivePolicy'); + expect(warnSpy).toHaveBeenCalledWith( + expect.stringContaining('Skipping queue attribute "RedrivePolicy"') + ); + warnSpy.mockRestore(); + }); + + it('drops attributes with Ref intrinsics', () => { + jest.spyOn(console, 'warn').mockImplementation(); + const attrs = service._propertiesToAttributes({ + VisibilityTimeout: 60, + KmsMasterKeyId: { Ref: 'MyKmsKey' }, + }); + expect(attrs).toEqual({ VisibilityTimeout: '60' }); + }); + + it('detects intrinsics nested deep inside objects and arrays', () => { + expect( + LocalStackQueueService._containsUnresolvedIntrinsic({ + a: { b: [{ c: { 'Fn::Sub': '${AWS::Region}' } }] }, + }) + ).toBe(true); + expect( + LocalStackQueueService._containsUnresolvedIntrinsic({ + a: { b: [{ c: 'hello' }] }, + }) + ).toBe(false); + expect(LocalStackQueueService._containsUnresolvedIntrinsic(null)).toBe(false); + expect(LocalStackQueueService._containsUnresolvedIntrinsic('str')).toBe(false); + }); + + it('retains resolved RedrivePolicy (ARN already a string)', () => { + const attrs = service._propertiesToAttributes({ + RedrivePolicy: { + maxReceiveCount: 3, + deadLetterTargetArn: 'arn:aws:sqs:us-east-1:x:dlq', + }, + }); + expect(attrs.RedrivePolicy).toBe( + JSON.stringify({ + maxReceiveCount: 3, + deadLetterTargetArn: 'arn:aws:sqs:us-east-1:x:dlq', + }) + ); + }); + }); + describe('createQueues', () => { it('should create multiple queues and return results', async () => { mockSQS.createQueue.mockImplementation((params, callback) => { @@ -69,6 +203,58 @@ describe('LocalStackQueueService', () => { consoleLogSpy.mockRestore(); }); + it('forwards CloudFormation Properties as SQS Attributes', async () => { + const captured = []; + mockSQS.createQueue.mockImplementation((params, callback) => { + captured.push(params); + callback(null, { QueueUrl: 'url' }); + }); + jest.spyOn(console, 'log').mockImplementation(); + + await service.createQueues([ + { + key: 'HubspotQueue', + name: 'my-service--dev-HubspotQueue', + properties: { + QueueName: 'my-service--dev-HubspotQueue', + VisibilityTimeout: 1800, + MessageRetentionPeriod: 345600, + RedrivePolicy: { + maxReceiveCount: 3, + deadLetterTargetArn: 'arn:aws:sqs:us-east-1:x:dlq', + }, + }, + }, + ]); + + expect(captured[0]).toEqual({ + QueueName: 'my-service--dev-HubspotQueue', + Attributes: { + VisibilityTimeout: '1800', + MessageRetentionPeriod: '345600', + RedrivePolicy: JSON.stringify({ + maxReceiveCount: 3, + deadLetterTargetArn: 'arn:aws:sqs:us-east-1:x:dlq', + }), + }, + }); + }); + + it('creates queues with defaults when properties are missing (back-compat)', async () => { + const captured = []; + mockSQS.createQueue.mockImplementation((params, callback) => { + captured.push(params); + callback(null, { QueueUrl: 'url' }); + }); + jest.spyOn(console, 'log').mockImplementation(); + + await service.createQueues([ + { key: 'LegacyQueue', name: 'legacy-queue' }, + ]); + + expect(captured[0]).toEqual({ QueueName: 'legacy-queue' }); + }); + it('should handle empty queue array', async () => { const results = await service.createQueues([]); expect(results).toEqual([]);