Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions packages/core/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,14 @@ packages/core/
- `integration-repository-mongo.js` - MongoDB implementation
- `integration-repository-postgres.js` - PostgreSQL implementation
- `integration-mapping-repository-*.js` - Mapping data persistence
- `process-repository-*.js` - Process (long-running job) persistence.
Implements `applyProcessUpdate(processId, ops)` — a race-safe alternative
to `update(id, patch)` that routes increments, sets, and bounded-array
pushes through each backend's native atomic primitive (PostgreSQL
`jsonb_set` / MongoDB `$inc`/`$set`/`$push`). Use this method any time
multiple queue workers may concurrently mutate the same Process row
(counters, flags, error history). The legacy `update(id, patch)`
remains available but is clobber-prone under concurrency.

**Integration developers extend IntegrationBase**:

Expand Down
3 changes: 2 additions & 1 deletion packages/core/integrations/integration-router.js
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,8 @@ function setEntityRoutes(router, authenticateUser, useCases) {
const params = checkRequiredParams(req.query, ['entityType']);
const module = await getModuleInstanceFromType.execute(
userId,
params.entityType
params.entityType,
{ state: req.query.state }
);
const areRequirementsValid =
module.validateAuthorizationRequirements();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
const {
DocumentDBEncryptionService,
} = require('../../database/documentdb-encryption-service');
const { validateOps } = require('./process-update-ops-shared');

class ProcessRepositoryDocumentDB extends ProcessRepositoryInterface {
constructor() {
Expand Down Expand Up @@ -155,6 +156,73 @@
return this._mapProcess(decryptedProcess);
}

/**
* Atomic process update — race-safe counterpart to `update()`.
*
* Uses DocumentDB's native $inc / $set / $push operators (Mongo-wire
* compatible) via findAndModify so increments, sets, and pushes land
* in one server-side write. Contention on the same document
* serializes at the DB level.
*
* DocumentDB compatibility notes:
* - $inc: supported since v3.6.
* - $set with dot-path: supported.
* - $push with $each + negative $slice: supported since v4.0.
* Clusters still on v3.6 must upgrade before using pushSlice.
*
* Process documents have no encrypted fields today; if that changes,
* the set-by-path payload here MUST route through
* `encryptionService.encryptFields` for any affected paths.
*
* @param {string} processId
* @param {import('./process-repository-interface').ProcessUpdateOps} ops
* @returns {Promise<Object|null>}
*/
async applyProcessUpdate(processId, ops) {
const normalized = validateOps(ops);
const objectId = toObjectId(processId);

const update = {};
const $set = {};

if (Object.keys(normalized.increment).length > 0) {
update.$inc = { ...normalized.increment };
}
for (const [path, value] of Object.entries(normalized.set)) {
$set[path] = value;
}
if (normalized.newState !== null) {
$set.state = normalized.newState;
}
$set.updatedAt = new Date();
update.$set = $set;

if (Object.keys(normalized.pushSlice).length > 0) {
update.$push = {};
for (const [path, spec] of Object.entries(normalized.pushSlice)) {
update.$push[path] = {
$each: spec.values,
$slice: -spec.keepLast,
};
}
}

const result = await this.prisma.$runCommandRaw({
findAndModify: 'Process',
query: { _id: objectId },
update,
new: true,
});

const doc = result && result.value;

Check warning on line 217 in packages/core/integrations/repositories/process-repository-documentdb.js

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Prefer using an optional chain expression instead, as it's more concise and easier to read.

See more on https://sonarcloud.io/project/issues?id=friggframework_frigg&issues=AZ3Q2LkUqgvhdYcr2IMl&open=AZ3Q2LkUqgvhdYcr2IMl&pullRequest=579
if (!doc) return null;
const decrypted = await this.encryptionService.decryptFields(
'Process',
doc
);
return this._mapProcess(decrypted);
}

async findByIntegrationAndType(integrationId, type) {
const integrationObjectId = toObjectId(integrationId);
const filter = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,52 @@ class ProcessRepositoryInterface {
throw new Error('Method update() must be implemented');
}

/**
* Apply atomic mutations to a process record.
*
* Race-safe counterpart to `update()`. Where `update()` takes full
* JSON blobs and does read-modify-write at the ORM layer (clobber-
* prone under concurrent writers), `applyProcessUpdate()` describes
* the intent declaratively and each backend uses its native atomic
* primitive:
* - PostgreSQL: `jsonb_set` chain inside a single UPDATE ... RETURNING
* - MongoDB: `$inc` / `$set` / `$push` via findAndModify
* - DocumentDB: same operator set as MongoDB (with version caveats)
*
* All paths are dot-delimited and rooted in `context` or `results`
* (e.g. `context.processedRecords`,
* `results.aggregateData.totalSynced`). Paths MUST match
* `^(context|results)(\\.[a-zA-Z_][a-zA-Z0-9_]*)+$` — validated by
* each adapter before any SQL/command generation.
*
* Intended primary callers: UpdateProcessMetrics and
* UpdateProcessState. Other callers can use this directly when they
* need race-free cumulative updates.
*
* @typedef {Object} ProcessUpdateOps
* @property {Object.<string, number>} [increment] - Atomic numeric
* increments keyed by dot-path. e.g.
* `{ 'context.processedRecords': 1, 'results.aggregateData.totalSynced': 1 }`
* @property {Object.<string, *>} [set] - Atomic whole-subtree set
* keyed by dot-path. Replaces the value at the path (NOT deep
* merge). e.g. `{ 'context.fetchDone': true }`
* @property {Object.<string, {values: Array, keepLast: number}>} [pushSlice]
* Atomic array push with bounded retention (sliding window of the
* last `keepLast` items). Keys are dot-paths pointing to arrays.
* e.g. `{ 'results.aggregateData.errors': { values: [err], keepLast: 100 } }`
* @property {string} [newState] - Top-level `state` column update.
* Written alongside the JSON mutations in the same UPDATE so state
* + counters move together.
*
* @param {string} processId - Process ID to update
* @param {ProcessUpdateOps} ops - Atomic operations to apply
* @returns {Promise<Object|null>} Updated process record (post-
* mutation) or null if the process does not exist.
*/
async applyProcessUpdate(processId, ops) {
throw new Error('Method applyProcessUpdate() must be implemented');
}

/**
* Find processes by integration and type
* @param {string} integrationId - Integration ID
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const { prisma } = require('../../database/prisma');
const { ProcessRepositoryInterface } = require('./process-repository-interface');
const { validateOps } = require('./process-update-ops-shared');

/**
* MongoDB Process Repository Adapter
Expand Down Expand Up @@ -92,6 +93,77 @@
return this._toPlainObject(process);
}

/**
* Atomic process update — race-safe counterpart to `update()`.
*
* Uses `findAndModify` via `$runCommandRaw` so increments, sets, and
* pushes land in one server-side write. Contention on the same
* document serializes at the MongoDB level; no Node-side read-
* modify-write. Returns the post-update document.
*
* @param {string} processId
* @param {import('./process-repository-interface').ProcessUpdateOps} ops
* @returns {Promise<Object|null>}
*/
async applyProcessUpdate(processId, ops) {
const normalized = validateOps(ops);

const update = {};
const $set = {};

if (Object.keys(normalized.increment).length > 0) {
update.$inc = { ...normalized.increment };
}
for (const [path, value] of Object.entries(normalized.set)) {
$set[path] = value;
}
if (normalized.newState !== null) {
$set.state = normalized.newState;
}
$set.updatedAt = new Date();
update.$set = $set;

if (Object.keys(normalized.pushSlice).length > 0) {
update.$push = {};
for (const [path, spec] of Object.entries(normalized.pushSlice)) {
update.$push[path] = {
$each: spec.values,
$slice: -spec.keepLast,
};
}
}

const result = await this.prisma.$runCommandRaw({
findAndModify: 'Process',
query: { _id: { $oid: processId } },
update,
new: true,
});

const doc = result && result.value;

Check warning on line 143 in packages/core/integrations/repositories/process-repository-mongo.js

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Prefer using an optional chain expression instead, as it's more concise and easier to read.

See more on https://sonarcloud.io/project/issues?id=friggframework_frigg&issues=AZ3Q2LkAqgvhdYcr2IMk&open=AZ3Q2LkAqgvhdYcr2IMk&pullRequest=579
if (!doc) return null;
return this._toPlainObject(this._hydrateRawMongoDoc(doc));
}

/**
* Shape a raw Mongo document (as returned by $runCommandRaw) to match
* Prisma's `findUnique` output so the existing `_toPlainObject` works
* without modification. EJSON round-trips give us `{$oid, $date}` wrappers
* that need unwrapping.
* @private
*/
_hydrateRawMongoDoc(doc) {
const hydrated = { ...doc };
if (doc._id) hydrated.id = doc._id.$oid ?? doc._id;
for (const field of ['createdAt', 'updatedAt']) {
const raw = doc[field];
if (raw && typeof raw === 'object' && raw.$date) {
hydrated[field] = new Date(raw.$date);
}
}
return hydrated;
}

/**
* Find processes by integration and type
* @param {string} integrationId - Integration ID
Expand Down
Loading
Loading