diff --git a/core/src/abstractions/persistence-provider.ts b/core/src/abstractions/persistence-provider.ts index 35b3fde..71faac4 100644 --- a/core/src/abstractions/persistence-provider.ts +++ b/core/src/abstractions/persistence-provider.ts @@ -8,7 +8,7 @@ export interface IPersistenceProvider { getRunnableInstances(): Promise>; createEventSubscription(subscription: EventSubscription): Promise; - getSubscriptions(eventName: string, eventKey: string, asOf: Date): Promise>; + getSubscriptions(eventName: string, eventKey: string, asOf?: Date): Promise>; terminateSubscription(id: string): Promise; createEvent(event: Event): Promise; diff --git a/core/src/abstractions/workflow-host.ts b/core/src/abstractions/workflow-host.ts index 84c0653..1a7df45 100644 --- a/core/src/abstractions/workflow-host.ts +++ b/core/src/abstractions/workflow-host.ts @@ -6,7 +6,7 @@ export interface IWorkflowHost { stop(); startWorkflow(id: string, version: number, data: any): Promise; registerWorkflow(workflow: new () => WorkflowBase); - publishEvent(eventName: string, eventKey: string, eventData: any, eventTime: Date): Promise; + publishEvent(eventName: string, eventKey: string, eventData: any, eventTime?: Date): Promise; suspendWorkflow(id: string): Promise; resumeWorkflow(id: string): Promise; terminateWorkflow(id: string): Promise; diff --git a/core/src/models/event.ts b/core/src/models/event.ts index 864bf04..5ba24ec 100644 --- a/core/src/models/event.ts +++ b/core/src/models/event.ts @@ -3,6 +3,6 @@ export class Event { public eventName: string; public eventKey: string; public eventData: any; - public eventTime: Date; + public eventTime?: Date; public isProcessed: boolean; } \ No newline at end of file diff --git a/core/src/services/event-queue-worker.ts b/core/src/services/event-queue-worker.ts index ea3dd6e..8daa529 100644 --- a/core/src/services/event-queue-worker.ts +++ b/core/src/services/event-queue-worker.ts @@ -57,7 +57,7 @@ export class EventQueueWorker implements IBackgroundWorker { if (gotLock) { try { let evt = await self.persistence.getEvent(eventId); - if (evt.eventTime <= new Date()) + if (evt.eventTime === undefined || evt.eventTime <= new Date()) { let subs = await self.persistence.getSubscriptions(evt.eventName, evt.eventKey, evt.eventTime); let success = true; diff --git a/core/src/services/memory-persistence-provider.ts b/core/src/services/memory-persistence-provider.ts index 988eb84..0f6cc4c 100644 --- a/core/src/services/memory-persistence-provider.ts +++ b/core/src/services/memory-persistence-provider.ts @@ -40,8 +40,8 @@ export class MemoryPersistenceProvider implements IPersistenceProvider { wfes_subscriptions.push(subscription); } - public async getSubscriptions(eventName: string, eventKey: string, asOf: Date): Promise> { - return wfes_subscriptions.filter(x => x.eventName == eventName && x.eventKey == eventKey && x.subscribeAsOf <= asOf); + public async getSubscriptions(eventName: string, eventKey: string, asOf?: Date): Promise> { + return wfes_subscriptions.filter(x => x.eventName == eventName && x.eventKey == eventKey && (asOf ? x.subscribeAsOf <= asOf : true)); } public async terminateSubscription(id: string): Promise { diff --git a/core/src/services/workflow-host.ts b/core/src/services/workflow-host.ts index dff2fea..1347433 100644 --- a/core/src/services/workflow-host.ts +++ b/core/src/services/workflow-host.ts @@ -75,7 +75,7 @@ export class WorkflowHost implements IWorkflowHost { this.registry.registerWorkflow(new workflow()); } - public async publishEvent(eventName: string, eventKey: string, eventData: any, eventTime: Date): Promise { + public async publishEvent(eventName: string, eventKey: string, eventData: any, eventTime?: Date): Promise { //todo: check host status this.logger.info("Publishing event %s %s", eventName, eventKey); diff --git a/providers/workflow-es-mongodb/src/mongodb-provider.ts b/providers/workflow-es-mongodb/src/mongodb-provider.ts index 00dce32..62f3828 100644 --- a/providers/workflow-es-mongodb/src/mongodb-provider.ts +++ b/providers/workflow-es-mongodb/src/mongodb-provider.ts @@ -98,10 +98,11 @@ export class MongoDBPersistence implements IPersistenceProvider { return deferred; } - public async getSubscriptions(eventName: string, eventKey: string, asOf: Date): Promise> { + public async getSubscriptions(eventName: string, eventKey: string, asOf?: Date): Promise> { var self = this; var deferred = new Promise>((resolve, reject) => { - self.subscriptionCollection.find({ eventName: eventName, eventKey: eventKey, subscribeAsOf: { $lt: asOf } }) + if (asOf === undefined) { + self.subscriptionCollection.find({ eventName: eventName, eventKey: eventKey }) .toArray((err, data) => { if (err) reject(err); @@ -109,6 +110,16 @@ export class MongoDBPersistence implements IPersistenceProvider { item.id = item["_id"].toString(); resolve(data); }); + } else { + self.subscriptionCollection.find({ eventName: eventName, eventKey: eventKey, subscribeAsOf: { $lt: asOf } }) + .toArray((err, data) => { + if (err) + reject(err); + for (let item of data) + item.id = item["_id"].toString(); + resolve(data); + }); + } }); return deferred; } diff --git a/providers/workflow-es-mysql/src/mysql-provider.ts b/providers/workflow-es-mysql/src/mysql-provider.ts index 6422585..c4f76ef 100644 --- a/providers/workflow-es-mysql/src/mysql-provider.ts +++ b/providers/workflow-es-mysql/src/mysql-provider.ts @@ -102,11 +102,17 @@ export class MySqlPersistence implements IPersistenceProvider { }); return deferred; } - public async getSubscriptions(eventName: string, eventKey: string, asOf: Date): Promise> { + public async getSubscriptions(eventName: string, eventKey: string, asOf?: Date): Promise> { var deferred = new Promise>( async (resolve, reject) => { try { - var instances = await subscriptionCollection.findAll({ + var instances = asOf === undefined ? await subscriptionCollection.findAll({ + where: { + eventName: eventName, + eventKey: eventKey + }, + include: [Workflow] + }) : await subscriptionCollection.findAll({ where: { eventName: eventName, eventKey: eventKey,