diff --git a/.gitignore b/.gitignore index 7e45a86eeb5..1ec48e0f2de 100644 --- a/.gitignore +++ b/.gitignore @@ -113,4 +113,5 @@ dev/tool/history.json /combined_dependencies .tmp ws-tests/docker-compose.override.yml -.cursor/* \ No newline at end of file +.cursor/* +CLAUDE.md \ No newline at end of file diff --git a/README.md b/README.md index 2e9c44176d3..408cf42f04d 100644 --- a/README.md +++ b/README.md @@ -88,6 +88,12 @@ For detailed information about the platform architecture, services, and their in - [Docker](https://docs.docker.com/get-docker/) - [Docker Compose](https://docs.docker.com/compose/install/) +If you use `nvm`, run this after entering the repo to align your shell with the repository Node version: + +```bash +nvm use +``` + ## Verification To verify the installation, perform the following checks in your terminal: diff --git a/common/config/rush/pnpm-lock.yaml b/common/config/rush/pnpm-lock.yaml index 448b34b8f08..f8e9400265c 100644 --- a/common/config/rush/pnpm-lock.yaml +++ b/common/config/rush/pnpm-lock.yaml @@ -40197,6 +40197,118 @@ importers: specifier: ^5.9.3 version: 5.9.3 + ../../services/notification/pod-events-processor: + dependencies: + '@hcengineering/account-client': + specifier: workspace:^0.7.25 + version: link:../../../foundations/core/packages/account-client + '@hcengineering/analytics': + specifier: workspace:^0.7.19 + version: link:../../../foundations/core/packages/analytics + '@hcengineering/analytics-service': + specifier: workspace:^0.7.19 + version: link:../../../foundations/core/packages/analytics-service + '@hcengineering/api-client': + specifier: workspace:^0.7.25 + version: link:../../../foundations/core/packages/api-client + '@hcengineering/contact': + specifier: workspace:^0.7.0 + version: link:../../../plugins/contact + '@hcengineering/core': + specifier: workspace:^0.7.26 + version: link:../../../foundations/core/packages/core + '@hcengineering/kafka': + specifier: workspace:^0.7.18 + version: link:../../../foundations/server/packages/kafka + '@hcengineering/model-time': + specifier: workspace:^0.7.0 + version: link:../../../models/time + '@hcengineering/notification': + specifier: workspace:^0.7.0 + version: link:../../../plugins/notification + '@hcengineering/platform': + specifier: workspace:^0.7.20 + version: link:../../../foundations/core/packages/platform + '@hcengineering/server-client': + specifier: workspace:^0.7.19 + version: link:../../../foundations/server/packages/client + '@hcengineering/server-core': + specifier: workspace:^0.7.19 + version: link:../../../foundations/server/packages/core + '@hcengineering/server-token': + specifier: workspace:^0.7.18 + version: link:../../../foundations/core/packages/token + '@hcengineering/text-core': + specifier: workspace:^0.7.19 + version: link:../../../foundations/core/packages/text-core + '@hcengineering/time': + specifier: workspace:^0.7.0 + version: link:../../../plugins/time + dotenv: + specifier: ^16.4.5 + version: 16.6.1 + lru-cache: + specifier: ^11.1.0 + version: 11.2.2 + devDependencies: + '@hcengineering/platform-rig': + specifier: workspace:^0.7.21 + version: link:../../../foundations/utils/packages/platform-rig + '@tsconfig/node16': + specifier: ^1.0.4 + version: 1.0.4 + '@types/jest': + specifier: ^29.5.5 + version: 29.5.14 + '@types/node': + specifier: ^22.18.1 + version: 22.19.0 + '@typescript-eslint/eslint-plugin': + specifier: ^6.21.0 + version: 6.21.0(@typescript-eslint/parser@6.21.0(eslint@8.57.1)(typescript@5.9.3))(eslint@8.57.1)(typescript@5.9.3) + '@typescript-eslint/parser': + specifier: ^6.21.0 + version: 6.21.0(eslint@8.57.1)(typescript@5.9.3) + cross-env: + specifier: ~7.0.3 + version: 7.0.3 + esbuild: + specifier: ^0.25.10 + version: 0.25.12 + eslint: + specifier: ^8.54.0 + version: 8.57.1 + eslint-config-standard-with-typescript: + specifier: ^40.0.0 + version: 40.0.0(@typescript-eslint/eslint-plugin@6.21.0(@typescript-eslint/parser@6.21.0(eslint@8.57.1)(typescript@5.9.3))(eslint@8.57.1)(typescript@5.9.3))(eslint-plugin-import@2.32.0(eslint@8.57.1))(eslint-plugin-n@15.7.0(eslint@8.57.1))(eslint-plugin-promise@6.6.0(eslint@8.57.1))(eslint@8.57.1)(typescript@5.9.3) + eslint-plugin-import: + specifier: ^2.26.0 + version: 2.32.0(eslint@8.57.1) + eslint-plugin-n: + specifier: ^15.4.0 + version: 15.7.0(eslint@8.57.1) + eslint-plugin-node: + specifier: ^11.1.0 + version: 11.1.0(eslint@8.57.1) + eslint-plugin-promise: + specifier: ^6.1.1 + version: 6.6.0(eslint@8.57.1) + jest: + specifier: ^29.7.0 + version: 29.7.0(@types/node@22.19.0)(ts-node@10.9.2(@types/node@22.19.0)(typescript@5.9.3)) + prettier: + specifier: ^3.6.2 + version: 3.6.2 + ts-jest: + specifier: ^29.1.1 + version: 29.4.5(@babel/core@7.28.5)(@jest/transform@29.7.0)(@jest/types@30.2.0)(babel-jest@29.7.0(@babel/core@7.28.5))(esbuild@0.25.12)(jest-util@30.2.0)(jest@29.7.0(@types/node@22.19.0)(ts-node@10.9.2(@types/node@22.19.0)(typescript@5.9.3)))(typescript@5.9.3) + ts-node: + specifier: ^10.9.2 + version: 10.9.2(@types/node@22.19.0)(typescript@5.9.3) + typescript: + specifier: ^5.9.3 + version: 5.9.3 + ../../services/notification/pod-notification: dependencies: '@hcengineering/analytics': diff --git a/common/scripts/docker.sh b/common/scripts/docker.sh index 3f6de5a8283..aa3326de40b 100755 --- a/common/scripts/docker.sh +++ b/common/scripts/docker.sh @@ -58,5 +58,6 @@ else --to @hcengineering/pod-process \ --to @hcengineering/pod-rating \ --to @hcengineering/pod-payment \ - --to @hcengineering/pod-worker + --to @hcengineering/pod-worker \ + --to @hcengineering/pod-events-processor fi diff --git a/dev/docker-compose.yaml b/dev/docker-compose.yaml index 518614f92b1..890f78a0056 100644 --- a/dev/docker-compose.yaml +++ b/dev/docker-compose.yaml @@ -612,6 +612,24 @@ services: - QUEUE_CONFIG=${QUEUE_CONFIG} - QUEUE_REGION=cockroach restart: unless-stopped + events-processor: + image: hardcoreeng/events-processor + extra_hosts: + - 'huly.local:host-gateway' + depends_on: + redpanda: + condition: service_started + account: + condition: service_started + environment: + - SERVICE_ID=events-processor + - LOG_LEVEL=debug + - SECRET=secret + - ACCOUNTS_URL=http://huly.local:3000 + - QUEUE_CONFIG=${QUEUE_CONFIG} + - QUEUE_REGION=cockroach + - OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318/v1/traces + restart: unless-stopped # translate: # image: hardcoreeng/translate # extra_hosts: diff --git a/foundations/core/packages/analytics-service/src/logging.ts b/foundations/core/packages/analytics-service/src/logging.ts index 2ccf0db338f..c9f45f5be9c 100644 --- a/foundations/core/packages/analytics-service/src/logging.ts +++ b/foundations/core/packages/analytics-service/src/logging.ts @@ -16,7 +16,7 @@ export class SplitLogger implements MeasureLogger { const rootDir = this.opts.root ?? 'logs' this.logger = winston.createLogger({ - level: 'info', + level: process.env.LOG_LEVEL === 'debug' ? 'debug' : 'info', exitOnError: false }) const errorPrinter = ({ message, stack, ...rest }: Error): object => ({ @@ -99,6 +99,13 @@ export class SplitLogger implements MeasureLogger { this.logger.warn({ message, ...obj }) } + debug (message: string, obj?: Record): void { + if (this.opts.parent !== undefined) { + this.opts.parent.debug({ message, ...obj }) + } + this.logger.debug({ message, ...obj }) + } + logOperation (operation: string, time: number, params: ParamsType): void { this.logger.info(operation, { time, ...params }) } diff --git a/foundations/core/packages/measurements-otlp/src/__tests__/telemetry.test.ts b/foundations/core/packages/measurements-otlp/src/__tests__/telemetry.test.ts index 660147942fa..41264582309 100644 --- a/foundations/core/packages/measurements-otlp/src/__tests__/telemetry.test.ts +++ b/foundations/core/packages/measurements-otlp/src/__tests__/telemetry.test.ts @@ -22,6 +22,7 @@ describe('telemetry', () => { info: jest.fn(), error: jest.fn(), warn: jest.fn(), + debug: jest.fn(), close: jest.fn(async () => {}), logOperation: jest.fn() } diff --git a/foundations/core/packages/measurements-otlp/src/telemetry.ts b/foundations/core/packages/measurements-otlp/src/telemetry.ts index c1770a01a3f..a3df6a21934 100644 --- a/foundations/core/packages/measurements-otlp/src/telemetry.ts +++ b/foundations/core/packages/measurements-otlp/src/telemetry.ts @@ -11,6 +11,7 @@ import { updateMeasure, type FullParamsType, type MeasureLogger, + type MeasureLogLevel, type Metrics, type ParamsType, type WithOptions @@ -97,7 +98,8 @@ export class OpenTelemetryMetricsContext implements MeasureContext { readonly logParams?: ParamsType, readonly otlpLogger?: Logger, - readonly meter?: MetricsContext + readonly meter?: MetricsContext, + readonly logLevel: MeasureLogLevel = 'info' ) { this.name = name this.params = params @@ -133,6 +135,7 @@ export class OpenTelemetryMetricsContext implements MeasureContext { logger?: MeasureLogger span?: WithOptions['span'] // By default true meta?: Record + logLevel?: MeasureLogLevel } ): MeasureContext { let _span: Span | undefined @@ -170,7 +173,8 @@ export class OpenTelemetryMetricsContext implements MeasureContext { this, this.logParams, this.otlpLogger, - this.meter + this.meter, + opt?.logLevel ?? this.logLevel ) result.id = this.id result.contextData = this.contextData @@ -309,6 +313,23 @@ export class OpenTelemetryMetricsContext implements MeasureContext { this.logger.warn(message, { ...this.params, ...args, ...(this.logParams ?? {}) }) } + debug (message: string, args?: Record): void { + if (this.logLevel !== 'debug') return + if (this.otlpLogger !== undefined) { + this.otlpLogger.emit({ + severityNumber: SeverityNumber.DEBUG, + severityText: 'debug', + context: this.context, + body: message, + attributes: { + 'service.name': sdkServiceName, + ...(args ?? {}) + } + }) + } + this.logger.debug(message, { ...this.params, ...args, ...(this.logParams ?? {}) }) + } + end (): void { this.done() } @@ -530,7 +551,8 @@ export function createOpenTelemetryMetricsContext ( ): MeasureContext { if (!initOpenTelemetrySDK(name, version ?? '')) { console.warn('OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is not set, OpenTelemetry metrics will not be sent') - return new MeasureMetricsContext(name, params, fullParams, metrics, logger) + const rootLogLevel: MeasureLogLevel = process.env.LOG_LEVEL === 'debug' ? 'debug' : 'info' + return new MeasureMetricsContext(name, params, fullParams, metrics, logger, undefined, undefined, rootLogLevel) } // Traces @@ -542,6 +564,8 @@ export function createOpenTelemetryMetricsContext ( const meter = otelMetrics.getMeter(name, version) + const rootLogLevel: MeasureLogLevel = process.env.LOG_LEVEL === 'debug' ? 'debug' : 'info' + const ctx = new OpenTelemetryMetricsContext( name, tracer, @@ -554,7 +578,8 @@ export function createOpenTelemetryMetricsContext ( undefined, undefined, otlpLogger, - new MetricsContext(meter) + new MetricsContext(meter), + rootLogLevel ) return ctx } diff --git a/foundations/core/packages/measurements/src/__tests__/context.test.ts b/foundations/core/packages/measurements/src/__tests__/context.test.ts index d59d0a49452..818ecbd6169 100644 --- a/foundations/core/packages/measurements/src/__tests__/context.test.ts +++ b/foundations/core/packages/measurements/src/__tests__/context.test.ts @@ -21,6 +21,7 @@ describe('context', () => { expect(typeof logger.info).toBe('function') expect(typeof logger.error).toBe('function') expect(typeof logger.warn).toBe('function') + expect(typeof logger.debug).toBe('function') expect(typeof logger.close).toBe('function') }) @@ -54,6 +55,16 @@ describe('context', () => { consoleSpy.mockRestore() }) + it('should log debug messages', () => { + const consoleSpy = jest.spyOn(console, 'debug').mockImplementation() + const logger = consoleLogger({ service: 'test' }) + + logger.debug('Debug message', { detail: 'x' }) + + expect(consoleSpy).toHaveBeenCalled() + consoleSpy.mockRestore() + }) + it('should handle errors in params', () => { const consoleSpy = jest.spyOn(console, 'error').mockImplementation() const logger = consoleLogger({}) @@ -145,6 +156,7 @@ describe('context', () => { info: jest.fn(), error: jest.fn(), warn: jest.fn(), + debug: jest.fn(), close: jest.fn(async () => {}), logOperation: jest.fn() } @@ -163,6 +175,7 @@ describe('context', () => { info: jest.fn(), error: jest.fn(), warn: jest.fn(), + debug: jest.fn(), close: jest.fn(async () => {}), logOperation: jest.fn() } @@ -181,6 +194,7 @@ describe('context', () => { info: jest.fn(), error: jest.fn(), warn: jest.fn(), + debug: jest.fn(), close: jest.fn(async () => {}), logOperation: jest.fn() } @@ -194,6 +208,47 @@ describe('context', () => { ) }) + it('should not log debug when log level is info', () => { + const mockLogger: MeasureLogger = { + info: jest.fn(), + error: jest.fn(), + warn: jest.fn(), + debug: jest.fn(), + close: jest.fn(async () => {}), + logOperation: jest.fn() + } + + const ctx = new MeasureMetricsContext('test', { op: 'test' }, {}, newMetrics(), mockLogger) + ctx.debug('Skip', { k: 1 }) + + expect(mockLogger.debug).not.toHaveBeenCalled() + }) + + it('should log debug when log level is debug', () => { + const mockLogger: MeasureLogger = { + info: jest.fn(), + error: jest.fn(), + warn: jest.fn(), + debug: jest.fn(), + close: jest.fn(async () => {}), + logOperation: jest.fn() + } + + const ctx = new MeasureMetricsContext( + 'test', + { op: 'test' }, + {}, + newMetrics(), + mockLogger, + undefined, + undefined, + 'debug' + ) + ctx.debug('D', { k: 1 }) + + expect(mockLogger.debug).toHaveBeenCalledWith('D', expect.objectContaining({ k: 1, op: 'test' })) + }) + it('should get params', () => { const ctx = new MeasureMetricsContext('test', { op: 'test', method: 'GET' }, {}, newMetrics(), logger) const params = ctx.getParams() @@ -440,6 +495,7 @@ describe('context', () => { info: jest.fn(), error: jest.fn(), warn: jest.fn(), + debug: jest.fn(), close: jest.fn(async () => {}), logOperation: jest.fn() } diff --git a/foundations/core/packages/measurements/src/context.ts b/foundations/core/packages/measurements/src/context.ts index 62dbb347cc8..9a8688635c8 100644 --- a/foundations/core/packages/measurements/src/context.ts +++ b/foundations/core/packages/measurements/src/context.ts @@ -6,6 +6,7 @@ import { type FullParamsType, type MeasureContext, type MeasureLogger, + type MeasureLogLevel, type Metrics, type ParamsType, type OperationLog, @@ -42,6 +43,14 @@ export const consoleLogger = (logParams: Record): MeasureLogger => warn: (msg, args) => { console.warn(msg, ...Object.entries(args ?? {}).map((it) => `${it[0]}=${JSON.stringify(replacer(it[1]))}`)) }, + debug: (msg, args) => { + console.debug( + msg, + ...Object.entries({ ...(args ?? {}), ...(logParams ?? {}) }).map( + (it) => `${it[0]}=${JSON.stringify(replacer(it[1]))}` + ) + ) + }, close: async () => {}, logOperation: (operation, time, params) => {} }) @@ -62,6 +71,8 @@ export class MeasureMetricsContext implements MeasureContext { metrics: Metrics id?: string + private readonly logLevel: MeasureLogLevel + st = platformNow() contextData: object = {} private done (value?: number, override?: boolean): void { @@ -75,12 +86,14 @@ export class MeasureMetricsContext implements MeasureContext { metrics: Metrics = newMetrics(), logger?: MeasureLogger, readonly parent?: MeasureContext, - readonly logParams?: ParamsType + readonly logParams?: ParamsType, + logLevel: MeasureLogLevel = 'info' ) { this.name = name this.params = params this.fullParams = fullParams this.metrics = metrics + this.logLevel = logLevel this.metrics.namedParams = this.metrics.namedParams ?? {} for (const [k, v] of Object.entries(params)) { if (this.metrics.namedParams[k] !== v) { @@ -94,7 +107,16 @@ export class MeasureMetricsContext implements MeasureContext { } measure (name: string, value: number, override?: boolean): void { - const c = new MeasureMetricsContext('#' + name, {}, {}, childMetrics(this.metrics, ['#' + name]), this.logger, this) + const c = new MeasureMetricsContext( + '#' + name, + {}, + {}, + childMetrics(this.metrics, ['#' + name]), + this.logger, + this, + undefined, + this.logLevel + ) c.contextData = this.contextData c.done(value, override) } @@ -106,6 +128,8 @@ export class MeasureMetricsContext implements MeasureContext { fullParams?: FullParamsType logger?: MeasureLogger span?: WithOptions['span'] // By default true + meta?: Record + logLevel?: MeasureLogLevel } ): MeasureContext { const result = new MeasureMetricsContext( @@ -115,7 +139,8 @@ export class MeasureMetricsContext implements MeasureContext { childMetrics(this.metrics, [name]), opt?.logger ?? this.logger, this, - this.logParams + this.logParams, + opt?.logLevel ?? this.logLevel ) result.id = this.id result.contextData = this.contextData @@ -187,6 +212,11 @@ export class MeasureMetricsContext implements MeasureContext { this.logger.warn(message, { ...this.params, ...args, ...(this.logParams ?? {}) }) } + debug (message: string, args?: Record): void { + if (this.logLevel !== 'debug') return + this.logger.debug(message, { ...this.params, ...args, ...(this.logParams ?? {}) }) + } + end (): void { this.done() } @@ -202,8 +232,11 @@ export class NoMetricsContext implements MeasureContext { contextData: object = {} - constructor (logger?: MeasureLogger) { + private readonly logLevel: MeasureLogLevel + + constructor (logger?: MeasureLogger, logLevel: MeasureLogLevel = 'info') { this.logger = logger ?? consoleLogger({}) + this.logLevel = logLevel } measure (name: string, value: number, override?: boolean): void {} @@ -211,10 +244,15 @@ export class NoMetricsContext implements MeasureContext { newChild ( name: string, params: ParamsType, - fullParams?: FullParamsType | (() => FullParamsType), - logger?: MeasureLogger + opt?: { + fullParams?: FullParamsType | (() => FullParamsType) + logger?: MeasureLogger + span?: WithOptions['span'] + meta?: Record + logLevel?: MeasureLogLevel + } ): MeasureContext { - const result = new NoMetricsContext(logger ?? this.logger) + const result = new NoMetricsContext(opt?.logger ?? this.logger, opt?.logLevel ?? this.logLevel) result.id = this.id result.contextData = this.contextData return result @@ -226,7 +264,7 @@ export class NoMetricsContext implements MeasureContext { op: (ctx: MeasureContext) => T | Promise, fullParams?: ParamsType | (() => FullParamsType) ): Promise { - const r = op(this.newChild(name, params, fullParams, this.logger)) + const r = op(this.newChild(name, params, { fullParams, logger: this.logger })) return r instanceof Promise ? r : Promise.resolve(r) } @@ -240,7 +278,7 @@ export class NoMetricsContext implements MeasureContext { op: (ctx: MeasureContext) => T, fullParams?: ParamsType | (() => FullParamsType) ): T { - const c = this.newChild(name, params, fullParams, this.logger) + const c = this.newChild(name, params, { fullParams, logger: this.logger }) return op(c) } @@ -250,7 +288,7 @@ export class NoMetricsContext implements MeasureContext { op: (ctx: MeasureContext) => T | Promise, fullParams?: ParamsType ): Promise { - const r = op(this.newChild(name, params, fullParams, this.logger)) + const r = op(this.newChild(name, params, { fullParams, logger: this.logger })) return r instanceof Promise ? r : Promise.resolve(r) } @@ -266,6 +304,11 @@ export class NoMetricsContext implements MeasureContext { this.logger.warn(message, { ...args }) } + debug (message: string, args?: Record): void { + if (this.logLevel !== 'debug') return + this.logger.debug(message, { ...args }) + } + end (): void {} getParams (): ParamsType { diff --git a/foundations/core/packages/measurements/src/types.ts b/foundations/core/packages/measurements/src/types.ts index d2d2e1366bc..31b5dc80b1f 100644 --- a/foundations/core/packages/measurements/src/types.ts +++ b/foundations/core/packages/measurements/src/types.ts @@ -49,6 +49,12 @@ export interface Metrics extends MetricsData { opLog?: Record } +/** + * Root log verbosity for {@link MeasureContext.debug} (child contexts inherit unless overridden in {@link MeasureContext.newChild}). + * @public + */ +export type MeasureLogLevel = 'info' | 'debug' + /** * @public */ @@ -58,6 +64,8 @@ export interface MeasureLogger { warn: (message: string, obj?: Record) => void + debug: (message: string, obj?: Record) => void + logOperation: (operation: string, time: number, params: ParamsType) => void childLogger?: (name: string, params: Record) => MeasureLogger @@ -94,6 +102,7 @@ export interface MeasureContext { logger?: MeasureLogger span?: WithOptions['span'] // By default true meta?: Record + logLevel?: MeasureLogLevel } ) => MeasureContext @@ -128,6 +137,7 @@ export interface MeasureContext { error: (message: string, obj?: Record) => void info: (message: string, obj?: Record) => void warn: (message: string, obj?: Record) => void + debug: (message: string, obj?: Record) => void // No-op unless this context was created with log level `debug`. // Mark current context as complete // If no value is passed, time difference will be used. diff --git a/foundations/server/packages/collaboration/src/__tests__/storage.test.ts b/foundations/server/packages/collaboration/src/__tests__/storage.test.ts index 4a272989132..dbde6841295 100644 --- a/foundations/server/packages/collaboration/src/__tests__/storage.test.ts +++ b/foundations/server/packages/collaboration/src/__tests__/storage.test.ts @@ -150,6 +150,7 @@ const mockContext: MeasureContext = { info: jest.fn(), warn: jest.fn(), error: jest.fn(), + debug: jest.fn(), with: jest.fn().mockImplementation((name, params, fn) => fn()), withSync: jest.fn().mockImplementation((name, params, fn) => fn()), measure: jest.fn(), diff --git a/foundations/server/packages/kafka/src/index.ts b/foundations/server/packages/kafka/src/index.ts index 5a88767930e..74430845fea 100644 --- a/foundations/server/packages/kafka/src/index.ts +++ b/foundations/server/packages/kafka/src/index.ts @@ -259,9 +259,14 @@ class PlatformQueueConsumerImpl implements ConsumerHandle { maxRetryDelay?: number // Maximum retry delay in seconds (default 10) } ) { + // Long handlers must call ConsumerControl.heartbeat(); these timeouts still help under broker/load jitter (e.g. Redpanda in Docker). + const sessionTimeout = parseInt(process.env.KAFKA_CONSUMER_SESSION_TIMEOUT_MS ?? '90000', 10) this.cc = this.kafka.consumer({ groupId: `${getKafkaTopicId(this.topic, this.config)}-${groupId}`, - allowAutoTopicCreation: true + allowAutoTopicCreation: true, + sessionTimeout, + rebalanceTimeout: Math.min(sessionTimeout * 2, 300000), + heartbeatInterval: 3000 }) void this.start().catch((err) => { diff --git a/models/time/src/index.ts b/models/time/src/index.ts index 4e64b04b083..2aa31803271 100644 --- a/models/time/src/index.ts +++ b/models/time/src/index.ts @@ -424,6 +424,41 @@ export function createModel (builder: Builder): void { enabledTypes: [time.ids.ToDoCreated] }) + builder.createDoc( + notification.class.NotificationType, + core.space.Model, + { + hidden: false, + generated: false, + allowedForAuthor: true, + label: time.string.ToDo, + group: time.ids.TimeNotificationGroup as Ref, + // Scheduled notifications are created by a worker, but provider/type settings still expect a tx class list. + txClasses: [core.class.TxCreateDoc], + objectClass: time.class.ToDo, + onlyOwn: true, + defaultEnabled: true, + templates: { + textTemplate: '{body}', + htmlTemplate: '

{body}

{link}

', + subjectTemplate: '{title}' + } + }, + time.ids.ToDoReminder + ) + + builder.createDoc(notification.class.NotificationProviderDefaults, core.space.Model, { + provider: notification.providers.InboxNotificationProvider, + ignoredTypes: [], + enabledTypes: [time.ids.ToDoReminder] + }) + + builder.createDoc(notification.class.NotificationProviderDefaults, core.space.Model, { + provider: notification.providers.PushNotificationProvider, + ignoredTypes: [], + enabledTypes: [time.ids.ToDoReminder] + }) + builder.createDoc>(core.class.ClassCollaborators, core.space.Model, { attachedTo: time.class.ToDo, fields: ['user'] diff --git a/models/time/src/plugin.ts b/models/time/src/plugin.ts index 2aee0c9e42b..a8e1e1d3d96 100644 --- a/models/time/src/plugin.ts +++ b/models/time/src/plugin.ts @@ -58,7 +58,8 @@ export default mergeIds(timeId, time, { ids: { ToDoCreated: '' as Ref, ModulePermissionGroup: '' as Ref, - ModulePermissionGroupReadOnlyGuest: '' as Ref + ModulePermissionGroupReadOnlyGuest: '' as Ref, + ToDoReminder: '' as Ref }, function: { ToDoTitleProvider: '' as Resource<(client: Client, ref: Ref, doc?: Doc) => Promise> diff --git a/plugins/time-resources/src/components/CreateToDoPopup.svelte b/plugins/time-resources/src/components/CreateToDoPopup.svelte index 036eb52e37a..76f664893be 100644 --- a/plugins/time-resources/src/components/CreateToDoPopup.svelte +++ b/plugins/time-resources/src/components/CreateToDoPopup.svelte @@ -15,7 +15,7 @@ - +
+ + {#if slots.length > 0} +
+ +
+ {/if} +
diff --git a/pods/server/src/rpc.ts b/pods/server/src/rpc.ts index 4c147a4c096..125a22df369 100644 --- a/pods/server/src/rpc.ts +++ b/pods/server/src/rpc.ts @@ -37,7 +37,7 @@ import { promisify } from 'util' import { gzip } from 'zlib' import { retrieveJson } from './utils' -import { unknownError } from '@hcengineering/platform' +import platform, { PlatformError, unknownError } from '@hcengineering/platform' export const COMMUNICATION_DOMAIN = 'communication' as OperationDomain interface RPCClientInfo { @@ -270,15 +270,26 @@ export function registerRPC (app: Express, sessions: SessionManager, ctx: Measur void withSession(req, res, 'tx', async (ctx, session, rateLimit) => { const tx: any = (await retrieveJson(req)) ?? {} - if (tx._class === core.class.TxDomainEvent) { - const domainTx = tx as TxDomainEvent - const { result } = await session.domainRequestRaw(ctx, domainTx.domain, { - event: domainTx.event - }) - await sendJson(req, res, result.value, rateLimitToHeaders(rateLimit)) - } else { - const result = await session.txRaw(ctx, tx) - await sendJson(req, res, result.result, rateLimitToHeaders(rateLimit)) + try { + if (tx._class === core.class.TxDomainEvent) { + const domainTx = tx as TxDomainEvent + const { result } = await session.domainRequestRaw(ctx, domainTx.domain, { + event: domainTx.event + }) + await sendJson(req, res, result.value, rateLimitToHeaders(rateLimit)) + } else { + const result = await session.txRaw(ctx, tx) + await sendJson(req, res, result.result, rateLimitToHeaders(rateLimit)) + } + } catch (err: unknown) { + if (err instanceof PlatformError && err.status.code === platform.status.BadRequest) { + sendError(res, 400, { + message: 'Invalid tx', + error: err.status + }) + return + } + throw err } }) }) diff --git a/rush.json b/rush.json index 439e02a1b28..c04e21467cd 100644 --- a/rush.json +++ b/rush.json @@ -2221,6 +2221,11 @@ "projectFolder": "services/notification/pod-notification", "shouldPublish": false }, + { + "packageName": "@hcengineering/pod-events-processor", + "projectFolder": "services/notification/pod-events-processor", + "shouldPublish": false + }, { "packageName": "@hcengineering/pod-telegram", "projectFolder": "services/telegram/pod-telegram", diff --git a/server-plugins/notification-resources/src/push.ts b/server-plugins/notification-resources/src/push.ts index 7c522427330..a1d5890d1d7 100644 --- a/server-plugins/notification-resources/src/push.ts +++ b/server-plugins/notification-resources/src/push.ts @@ -15,6 +15,7 @@ import serverCore, { TriggerControl } from '@hcengineering/server-core' import serverNotification, { PUSH_NOTIFICATION_TITLE_SIZE } from '@hcengineering/server-notification' +import type { ReceiverInfo } from '@hcengineering/server-notification' import { AccountUuid, Class, @@ -49,6 +50,7 @@ import contact, { } from '@hcengineering/contact' import { AvailableProvidersCache, AvailableProvidersCacheKey, getTranslatedNotificationContent } from './index' import { getPerson } from '@hcengineering/server-contact' +import { getAllowedProviders, getNotificationProviderControl, getReceiversInfo } from './utils' async function createPushFromInbox ( control: TriggerControl, @@ -235,25 +237,51 @@ export async function PushNotificationsHandler ( ): Promise { const availableProviders: AvailableProvidersCache = control.contextCache.get(AvailableProvidersCacheKey) ?? new Map() - const all: InboxNotification[] = txes - .map((tx) => TxProcessor.createDoc2Doc(tx)) - .filter( - (it) => - availableProviders.get(it._id)?.find((p) => p === notification.providers.PushNotificationProvider) !== undefined - ) + const all: InboxNotification[] = txes.map((tx) => TxProcessor.createDoc2Doc(tx)) + + // First pass: use cache if present. + const pushEnabled: InboxNotification[] = all.filter( + (it) => + availableProviders.get(it._id)?.find((p) => p === notification.providers.PushNotificationProvider) !== undefined + ) + + // Fallback: if cache doesn't have the provider info (e.g. scheduled notifications created outside tx-trigger paths), + // compute allowed providers from notification type + user settings. + if (pushEnabled.length < all.length) { + const notificationControl = await getNotificationProviderControl(control.ctx, control) + const receivers: ReceiverInfo[] = await getReceiversInfo(control.ctx, [...new Set(all.map((n) => n.user))], control) + const receiverByAccount = new Map(receivers.map((r) => [r.account, r])) + + for (const n of all) { + if (availableProviders.get(n._id) !== undefined) continue + if (pushEnabled.includes(n)) continue + + const type = (n.types ?? [])[0] + if (type === undefined) continue + + const notificationType = control.modelDb.getObject(type) + const receiver = receiverByAccount.get(n.user) + if (receiver === undefined) continue + + const allowedProviders = getAllowedProviders(control, receiver.socialIds, notificationType, notificationControl) + if (allowedProviders.includes(notification.providers.PushNotificationProvider)) { + pushEnabled.push(n) + } + } + } - if (all.length === 0) { + if (pushEnabled.length === 0) { return [] } - const receivers = new Set(all.map((it) => it.user)) + const receivers = new Set(pushEnabled.map((it) => it.user)) const subscriptions = (await control.queryFind(control.ctx, notification.class.PushSubscription, {})).filter((it) => receivers.has(it.user) ) const res: Tx[] = [] - for (const inboxNotification of all) { + for (const inboxNotification of pushEnabled) { const { user } = inboxNotification const userSubscriptions = subscriptions.filter((it) => it.user === user) diff --git a/server-plugins/time-resources/src/index.ts b/server-plugins/time-resources/src/index.ts index 494a84a4b88..0e5fe6f0379 100644 --- a/server-plugins/time-resources/src/index.ts +++ b/server-plugins/time-resources/src/index.ts @@ -36,7 +36,7 @@ import core, { } from '@hcengineering/core' import notification, { CommonInboxNotification } from '@hcengineering/notification' import { getResource } from '@hcengineering/platform' -import type { TriggerControl } from '@hcengineering/server-core' +import { QueueTopic, type TriggerControl } from '@hcengineering/server-core' import { getSocialStrings } from '@hcengineering/server-contact' import { ReceiverInfo, SenderInfo } from '@hcengineering/server-notification' import { @@ -52,6 +52,131 @@ import { jsonToMarkup, nodeDoc, nodeParagraph, nodeText } from '@hcengineering/t import time, { ProjectToDo, ToDo, ToDoPriority, TodoAutomationHelper, WorkSlot } from '@hcengineering/time' import tracker, { Issue, IssueStatus, Project, TimeSpendReport } from '@hcengineering/tracker' +const scheduledNotificationTopic = 'scheduledNotification' + +interface ScheduledNotificationMessage { + kind: 'todoReminder' + id: string + workSlotId: Ref + todoId: Ref + shiftMs: number + targetDate: number +} + +type TimeMachineMessage = + | { + type: 'schedule' + id: string + targetDate: number + topic: string + data: ScheduledNotificationMessage + } + | { + type: 'cancel' + id: string + } + +type TimeMachineScheduleMessage = Extract + +function workSlotReminderPrefix (workSlotId: Ref): string { + return `todoReminder_${workSlotId}_` +} + +function workSlotReminderTimerId (workSlotId: Ref, shiftMs: number): string { + // Make sure this is stable and easy to cancel with `${prefix}%`. + return `${workSlotReminderPrefix(workSlotId)}${shiftMs}` +} + +async function cancelWorkSlotReminders (control: TriggerControl, workSlotId: Ref): Promise { + try { + const queue = control.queue + if (queue === undefined) return + const producer = queue.getProducer(control.ctx, QueueTopic.TimeMachine) + const cancelId = `${workSlotReminderPrefix(workSlotId)}%` + await producer.send(control.ctx, control.workspace.uuid, [{ type: 'cancel', id: cancelId }]) + control.ctx.info('Queued todo reminder cancel', { + queueTopic: QueueTopic.TimeMachine, + workSlotId, + timerIdPattern: cancelId + }) + } catch (err) { + control.ctx.error('Failed to cancel WorkSlot reminders', { err, workSlotId }) + } +} + +async function scheduleWorkSlotReminders (control: TriggerControl, workSlotId: Ref): Promise { + try { + const queue = control.queue + if (queue === undefined) return + + const workslot = (await control.findAll(control.ctx, time.class.WorkSlot, { _id: workSlotId }, { limit: 1 }))[0] + if (workslot === undefined) return + + // Reset existing timers for this WorkSlot on any relevant change. + await cancelWorkSlotReminders(control, workSlotId) + + const reminders = workslot.reminders ?? [] + if (reminders.length === 0) return + + // Skip scheduling for completed todos. + const todo = (await control.findAll(control.ctx, time.class.ToDo, { _id: workslot.attachedTo }, { limit: 1 }))[0] + if (todo === undefined) return + if (todo.doneOn != null) return + + const now = Date.now() + const msgs: TimeMachineScheduleMessage[] = [] + + for (const shiftMs of reminders) { + if (typeof shiftMs !== 'number' || Number.isNaN(shiftMs)) continue + // `shiftMs` is the positive offset before the event (in ms), matching + // the convention used by ReminderPopup and pod-calendar Google export. + const targetDate = workslot.date - shiftMs + if (targetDate <= now) continue + + const id = workSlotReminderTimerId(workSlotId, shiftMs) + const data: ScheduledNotificationMessage = { + kind: 'todoReminder', + id, + workSlotId, + todoId: todo._id, + shiftMs, + targetDate + } + msgs.push({ + type: 'schedule', + id, + targetDate, + topic: scheduledNotificationTopic, + data + }) + } + + if (msgs.length === 0) { + control.ctx.info('Skipped todo reminder scheduling', { + queueTopic: QueueTopic.TimeMachine, + workSlotId, + todoId: todo._id, + reminderCount: reminders.length, + reason: 'no-future-reminders' + }) + return + } + const producer = queue.getProducer(control.ctx, QueueTopic.TimeMachine) + await producer.send(control.ctx, control.workspace.uuid, msgs) + control.ctx.info('Queued todo reminders', { + queueTopic: QueueTopic.TimeMachine, + workSlotId, + todoId: todo._id, + reminderCount: reminders.length, + enqueuedCount: msgs.length, + timerIds: msgs.map((msg) => msg.id), + targetDates: msgs.map((msg) => msg.targetDate) + }) + } catch (err) { + control.ctx.error('Failed to schedule WorkSlot reminders', { err, workSlotId }) + } +} + /** * @public */ @@ -88,6 +213,8 @@ export async function OnWorkSlotUpdate (txes: Tx[], control: TriggerControl): Pr } const updTx = actualTx as TxUpdateDoc const visibility = updTx.operations.visibility + const date = updTx.operations.date + const reminders = updTx.operations.reminders if (visibility !== undefined) { const workslot = ( await control.findAll(control.ctx, time.class.WorkSlot, { _id: updTx.objectId }, { limit: 1 }) @@ -98,6 +225,10 @@ export async function OnWorkSlotUpdate (txes: Tx[], control: TriggerControl): Pr const todo = (await control.findAll(control.ctx, time.class.ToDo, { _id: workslot.attachedTo }))[0] result.push(control.txFactory.createTxUpdateDoc(todo._class, todo.space, todo._id, { visibility })) } + + if (date !== undefined || reminders !== undefined) { + await scheduleWorkSlotReminders(control, updTx.objectId) + } } return result } @@ -112,6 +243,10 @@ export async function OnWorkSlotCreate (txes: Tx[], control: TriggerControl): Pr continue } const workslot = TxProcessor.createDoc2Doc(actualTx as TxCreateDoc) + + // Ensure reminders are scheduled immediately on create. + await scheduleWorkSlotReminders(control, workslot._id) + const workslots = await control.findAll(control.ctx, time.class.WorkSlot, { attachedTo: workslot.attachedTo }) if (workslots.length > 1) { continue @@ -165,6 +300,20 @@ export async function OnWorkSlotCreate (txes: Tx[], control: TriggerControl): Pr return [] } +export async function OnWorkSlotRemove (txes: Tx[], control: TriggerControl): Promise { + for (const tx of txes) { + const actualTx = tx as TxCUD + if (!control.hierarchy.isDerived(actualTx.objectClass, time.class.WorkSlot)) { + continue + } + if (!control.hierarchy.isDerived(actualTx._class, core.class.TxRemoveDoc)) { + continue + } + await cancelWorkSlotReminders(control, actualTx.objectId) + } + return [] +} + export async function OnToDoRemove (txes: Tx[], control: TriggerControl): Promise { for (const tx of txes) { const actualTx = tx as TxCUD @@ -373,6 +522,9 @@ export async function OnToDoUpdate (txes: Tx[], control: TriggerControl): Promis const events = await control.findAll(control.ctx, time.class.WorkSlot, { attachedTo: updTx.objectId }) const resEvents: WorkSlot[] = [] for (const event of events) { + // Cancel any pending reminder timers as soon as the todo is completed. + await cancelWorkSlotReminders(control, event._id) + if (event.date > doneOn) { const innerTx = control.txFactory.createTxRemoveDoc(event._class, event.space, event._id) const outerTx = control.txFactory.createTxCollectionCUD( @@ -792,6 +944,7 @@ export default async () => ({ OnToDoRemove, OnToDoCreate, OnWorkSlotCreate, - OnWorkSlotUpdate + OnWorkSlotUpdate, + OnWorkSlotRemove } }) diff --git a/server-plugins/time-resources/src/reminderScheduling.test.ts b/server-plugins/time-resources/src/reminderScheduling.test.ts new file mode 100644 index 00000000000..67dbf10d285 --- /dev/null +++ b/server-plugins/time-resources/src/reminderScheduling.test.ts @@ -0,0 +1,244 @@ +// +// Copyright © 2026 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +/* eslint-disable @typescript-eslint/consistent-type-assertions */ + +import core, { generateId, type TxRemoveDoc, type TxUpdateDoc } from '@hcengineering/core' +import type { PlatformQueueProducer } from '@hcengineering/server-core' +import time, { type ToDo, type WorkSlot } from '@hcengineering/time' +import { OnToDoUpdate, OnWorkSlotRemove, OnWorkSlotUpdate } from './index' + +type AnyProducer = PlatformQueueProducer + +function makeQueueMock (): { queue: { getProducer: jest.Mock }, send: jest.Mock } { + const send = jest.fn(async () => {}) + const producer: AnyProducer = { send, close: async () => {}, getQueue: () => queue as any } as any + const queue = { + getProducer: jest.fn(() => producer) + } + return { queue, send } +} + +function makeControlBase (overrides: Partial = {}): { control: any, send: jest.Mock } { + const { queue, send } = makeQueueMock() + + const control: any = { + ctx: { + error: jest.fn(), + warn: jest.fn(), + info: jest.fn(), + contextData: { account: { uuid: generateId(), primarySocialId: core.account.System } } + }, + workspace: { uuid: generateId(), url: 'ws', dataId: 'ws' }, + hierarchy: { + isDerived: jest.fn(() => true), + classHierarchyMixin: jest.fn(() => undefined) + }, + modelDb: { findAll: jest.fn(), findAllSync: jest.fn(), getObject: jest.fn() }, + removedMap: new Map(), + userStatusMap: new Map(), + queue, + cache: new Map(), + contextCache: new Map(), + storageAdapter: {} as any, + serviceAdaptersManager: {} as any, + lowLevel: {} as any, + txFactory: { createTxUpdateDoc: jest.fn(), createTxRemoveDoc: jest.fn(), createTxCollectionCUD: jest.fn() } as any, + apply: jest.fn(async () => ({})), + domainRequest: jest.fn(async () => ({})), + queryFind: jest.fn(async () => []), + txes: [], + findAll: jest.fn(async () => []) + } + + Object.assign(control, overrides) + + return { control, send } +} + +describe('todo reminder scheduling (TimeMachine)', () => { + it('OnWorkSlotUpdate: schedules reminders when reminders change', async () => { + const workSlotId = generateId() + const todoId = generateId() + + const { control, send } = makeControlBase() + + // WorkSlot start one hour from now so a 5-minute "before" reminder is in the future. + const workSlotDate = Date.now() + 60 * 60_000 + const shiftMs = 5 * 60_000 + + // WorkSlot lookup inside scheduler. + ;(control.findAll as jest.Mock).mockImplementation(async (_ctx: any, _class: any, query: any) => { + if (_class === time.class.WorkSlot && query?._id === workSlotId) { + return [ + { + _id: workSlotId, + _class: time.class.WorkSlot, + space: core.space.Workspace, + attachedTo: todoId, + attachedToClass: time.class.ToDo, + date: workSlotDate, + reminders: [shiftMs], + dueDate: workSlotDate + 60_000 + } + ] + } + if (_class === time.class.ToDo && query?._id === todoId) { + return [ + { + _id: todoId, + _class: time.class.ToDo, + space: time.space.ToDos, + attachedTo: core.space.Workspace as any, + attachedToClass: core.class.Doc as any, + workslots: 1, + title: 't', + description: '', + priority: 0, + visibility: 'private', + doneOn: null, + user: generateId() as any, + rank: '' + } + ] + } + return [] + }) + + const tx = { + _id: generateId(), + _class: core.class.TxUpdateDoc, + objectId: workSlotId, + objectClass: time.class.WorkSlot, + objectSpace: core.space.Workspace, + space: core.space.Tx, + modifiedBy: core.account.System, + modifiedOn: Date.now(), + operations: { reminders: [shiftMs] } + } as unknown as TxUpdateDoc + + await OnWorkSlotUpdate([tx], control) + + // First call cancels `todoReminder__%`, second schedules the specific reminder. + expect(send).toHaveBeenCalled() + // Producer send signature: (ctx, workspace, msgs) + const msgs = send.mock.calls.map((c: any[]) => c[2]).flat() + expect(msgs.find((m: any) => m.type === 'cancel')).toBeDefined() + const scheduleMsg = msgs.find((m: any) => m.type === 'schedule' && m.topic === 'scheduledNotification') + expect(scheduleMsg).toBeDefined() + // Reminder must fire BEFORE the workslot starts, exactly `shiftMs` earlier. + expect(scheduleMsg.targetDate).toBe(workSlotDate - shiftMs) + expect(scheduleMsg.data.targetDate).toBe(workSlotDate - shiftMs) + expect(scheduleMsg.data.shiftMs).toBe(shiftMs) + expect(scheduleMsg.id).toBe(`todoReminder_${workSlotId}_${shiftMs}`) + }) + + it('OnWorkSlotRemove: cancels reminders', async () => { + const workSlotId = generateId() + const { control, send } = makeControlBase() + + const tx = { + _id: generateId(), + _class: core.class.TxRemoveDoc, + objectId: workSlotId, + objectClass: time.class.WorkSlot, + objectSpace: core.space.Workspace, + space: core.space.Tx, + modifiedBy: core.account.System, + modifiedOn: Date.now() + } as unknown as TxRemoveDoc + + await OnWorkSlotRemove([tx], control) + + const msgs = send.mock.calls.map((c: any[]) => c[2]).flat() + expect(msgs.find((m: any) => m.type === 'cancel')).toBeDefined() + }) + + it('OnToDoUpdate(doneOn): cancels reminders for all workslots', async () => { + const todoId = generateId() + const ws1 = generateId() + const ws2 = generateId() + + const { control, send } = makeControlBase() + + ;(control.findAll as jest.Mock).mockImplementation(async (_ctx: any, _class: any, query: any) => { + if (_class === time.class.ToDo && query?._id === todoId) { + return [ + { + _id: todoId, + _class: time.class.ToDo, + space: time.space.ToDos, + attachedTo: core.space.Workspace as any, + attachedToClass: core.class.Doc as any, + workslots: 2, + title: 't', + description: '', + priority: 0, + visibility: 'private', + doneOn: Date.now(), + user: generateId() as any, + rank: '' + } + ] + } + if (_class === core.class.TxUpdateDoc && query?.objectId === todoId) { + return [] + } + if (_class === time.class.WorkSlot && query?.attachedTo === todoId) { + return [ + { + _id: ws1, + _class: time.class.WorkSlot, + space: core.space.Workspace, + attachedTo: todoId, + attachedToClass: time.class.ToDo, + date: Date.now() + 60_000, + dueDate: Date.now() + 120_000, + reminders: [5 * 60_000] + }, + { + _id: ws2, + _class: time.class.WorkSlot, + space: core.space.Workspace, + attachedTo: todoId, + attachedToClass: time.class.ToDo, + date: Date.now() + 60_000, + dueDate: Date.now() + 120_000, + reminders: [5 * 60_000] + } + ] + } + return [] + }) + + const tx = { + _id: generateId(), + _class: core.class.TxUpdateDoc, + objectId: todoId, + objectClass: time.class.ToDo, + objectSpace: time.space.ToDos, + space: core.space.Tx, + modifiedBy: core.account.System, + modifiedOn: Date.now(), + operations: { doneOn: Date.now() } + } as unknown as TxUpdateDoc + + await OnToDoUpdate([tx], control) + + const msgs = send.mock.calls.map((c: any[]) => c[2]).flat() + const cancelMsgs = msgs.filter((m: any) => m.type === 'cancel') + expect(cancelMsgs.length).toBeGreaterThanOrEqual(2) + }) +}) diff --git a/services/gmail/pod-gmail/src/__tests__/attachments.test.ts b/services/gmail/pod-gmail/src/__tests__/attachments.test.ts index d7b7d70484e..a255b836b9d 100644 --- a/services/gmail/pod-gmail/src/__tests__/attachments.test.ts +++ b/services/gmail/pod-gmail/src/__tests__/attachments.test.ts @@ -32,6 +32,7 @@ describe('AttachmentHandler', () => { info: jest.fn(), error: jest.fn(), warn: jest.fn(), + debug: jest.fn(), logOperation: jest.fn(), childLogger: jest.fn(), close: jest.fn() @@ -40,6 +41,7 @@ describe('AttachmentHandler', () => { error: jest.fn(), info: jest.fn(), warn: jest.fn(), + debug: jest.fn(), end: jest.fn(), getParams: jest.fn() } diff --git a/services/notification/pod-events-processor/.eslintrc.js b/services/notification/pod-events-processor/.eslintrc.js new file mode 100644 index 00000000000..6ab3cb53db3 --- /dev/null +++ b/services/notification/pod-events-processor/.eslintrc.js @@ -0,0 +1,8 @@ +module.exports = { + extends: ['./node_modules/@hcengineering/platform-rig/profiles/default/eslint.config.json'], + parserOptions: { + tsconfigRootDir: __dirname, + project: './tsconfig.json' + } +} + diff --git a/services/notification/pod-events-processor/Dockerfile b/services/notification/pod-events-processor/Dockerfile new file mode 100644 index 00000000000..4f2f62f5d60 --- /dev/null +++ b/services/notification/pod-events-processor/Dockerfile @@ -0,0 +1,6 @@ +FROM hardcoreeng/base-slim:v20250916 +WORKDIR /usr/src/app + +COPY bundle/bundle.js ./ + +CMD [ "node", "bundle.js" ] diff --git a/services/notification/pod-events-processor/jest.config.js b/services/notification/pod-events-processor/jest.config.js new file mode 100644 index 00000000000..7929244e79d --- /dev/null +++ b/services/notification/pod-events-processor/jest.config.js @@ -0,0 +1,8 @@ +module.exports = { + preset: 'ts-jest', + testEnvironment: 'node', + testMatch: ['**/?(*.)+(spec|test).[jt]s?(x)'], + roots: ['./src'], + coverageReporters: ['text-summary', 'html'] +} + diff --git a/services/notification/pod-events-processor/package.json b/services/notification/pod-events-processor/package.json new file mode 100644 index 00000000000..a560f450bd3 --- /dev/null +++ b/services/notification/pod-events-processor/package.json @@ -0,0 +1,72 @@ +{ + "name": "@hcengineering/pod-events-processor", + "version": "0.7.0", + "main": "lib/index.js", + "svelte": "src/index.ts", + "types": "types/index.d.ts", + "files": [ + "lib/**/*", + "types/**/*", + "tsconfig.json" + ], + "author": "Hardcore Engineering Inc.", + "scripts": { + "build": "compile", + "build:watch": "compile", + "test": "jest --passWithNoTests --silent", + "_phase:bundle": "rushx bundle", + "_phase:docker-build": "rushx docker:build", + "_phase:docker-staging": "rushx docker:staging", + "bundle": "node ../../../common/scripts/esbuild.js --external=ws", + "docker:build": "../../../common/scripts/docker_build.sh hardcoreeng/events-processor", + "docker:staging": "../../../common/scripts/docker_tag.sh hardcoreeng/events-processor staging", + "docker:push": "../../../common/scripts/docker_tag.sh hardcoreeng/events-processor", + "run-local": "cross-env ts-node src/index.ts", + "format": "format src", + "_phase:build": "compile transpile src", + "_phase:test": "jest --passWithNoTests --silent", + "_phase:format": "format src", + "_phase:validate": "compile validate" + }, + "devDependencies": { + "@hcengineering/platform-rig": "workspace:^0.7.21", + "@tsconfig/node16": "^1.0.4", + "@types/node": "^22.18.1", + "@typescript-eslint/eslint-plugin": "^6.21.0", + "@types/jest": "^29.5.5", + "@typescript-eslint/parser": "^6.21.0", + "cross-env": "~7.0.3", + "esbuild": "^0.25.10", + "eslint": "^8.54.0", + "eslint-config-standard-with-typescript": "^40.0.0", + "eslint-plugin-import": "^2.26.0", + "eslint-plugin-n": "^15.4.0", + "eslint-plugin-node": "^11.1.0", + "eslint-plugin-promise": "^6.1.1", + "jest": "^29.7.0", + "prettier": "^3.6.2", + "ts-jest": "^29.1.1", + "ts-node": "^10.9.2", + "typescript": "^5.9.3" + }, + "dependencies": { + "@hcengineering/account-client": "workspace:^0.7.25", + "@hcengineering/analytics": "workspace:^0.7.19", + "@hcengineering/analytics-service": "workspace:^0.7.19", + "@hcengineering/api-client": "workspace:^0.7.25", + "@hcengineering/contact": "workspace:^0.7.0", + "@hcengineering/core": "workspace:^0.7.26", + "@hcengineering/kafka": "workspace:^0.7.18", + "@hcengineering/model-time": "workspace:^0.7.0", + "@hcengineering/notification": "workspace:^0.7.0", + "@hcengineering/platform": "workspace:^0.7.20", + "@hcengineering/server-client": "workspace:^0.7.19", + "@hcengineering/server-core": "workspace:^0.7.19", + "@hcengineering/server-token": "workspace:^0.7.18", + "@hcengineering/text-core": "workspace:^0.7.19", + "@hcengineering/time": "workspace:^0.7.0", + "dotenv": "^16.4.5", + "lru-cache": "^11.1.0" + } +} + diff --git a/services/notification/pod-events-processor/src/__tests__/clientCache.test.ts b/services/notification/pod-events-processor/src/__tests__/clientCache.test.ts new file mode 100644 index 00000000000..a0df257b911 --- /dev/null +++ b/services/notification/pod-events-processor/src/__tests__/clientCache.test.ts @@ -0,0 +1,77 @@ +// +// Copyright © 2026 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import type { PersonId, WorkspaceUuid } from '@hcengineering/core' +import type { ClientBundle } from '../client' +import { + clearClientCachesForTests, + clearInFlightClientCreation, + getCacheKey, + getCachedClient, + getInFlightClientCreation, + setCachedClient, + setInFlightClientCreation +} from '../clientCache' +import config from '../config' + +describe('clientCache', () => { + const workspace = 'workspace-1' as WorkspaceUuid + const serviceTag = 'events-processor' + const socialId = 'person-1' as PersonId + + const client = {} as unknown as ClientBundle['client'] + const accountClient = {} as unknown as ClientBundle['accountClient'] + const bundle: ClientBundle = { client, accountClient } + + beforeEach(() => { + clearClientCachesForTests() + }) + + afterEach(() => { + jest.restoreAllMocks() + }) + + it('uses system marker in cache key when socialId is undefined', () => { + expect(getCacheKey(workspace, undefined, serviceTag)).toBe('workspace-1:system:events-processor') + }) + + it('returns cached value before ttl expires', () => { + const now = 1_000 + jest.spyOn(Date, 'now').mockReturnValue(now) + + setCachedClient(workspace, socialId, serviceTag, bundle) + expect(getCachedClient(workspace, socialId, serviceTag)).toBe(bundle) + }) + + it('evicts cached value after ttl expires', () => { + const now = 1_000 + jest.spyOn(Date, 'now').mockReturnValue(now) + setCachedClient(workspace, socialId, serviceTag, bundle) + + jest.spyOn(Date, 'now').mockReturnValue(now + config.ClientCacheTtlMs + 1) + expect(getCachedClient(workspace, socialId, serviceTag)).toBeUndefined() + }) + + it('stores and clears in-flight client creation', () => { + const key = getCacheKey(workspace, socialId, serviceTag) + const creation = Promise.resolve(bundle) + + setInFlightClientCreation(key, creation) + expect(getInFlightClientCreation(key)).toBe(creation) + + clearInFlightClientCreation(key) + expect(getInFlightClientCreation(key)).toBeUndefined() + }) +}) diff --git a/services/notification/pod-events-processor/src/__tests__/worker.test.ts b/services/notification/pod-events-processor/src/__tests__/worker.test.ts new file mode 100644 index 00000000000..069bd0d2a0d --- /dev/null +++ b/services/notification/pod-events-processor/src/__tests__/worker.test.ts @@ -0,0 +1,167 @@ +// +// Copyright © 2026 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import contact from '@hcengineering/contact' +import type { MeasureContext, WorkspaceUuid } from '@hcengineering/core' +import modelTime from '@hcengineering/model-time' +import notification from '@hcengineering/notification' +import type { ConsumerControl } from '@hcengineering/server-core' +import time from '@hcengineering/time' +import { getClient } from '../client' +import type { ScheduledNotificationMessage } from '../types' +import { handleScheduledNotification } from '../worker' + +jest.mock('../client', () => ({ + getClient: jest.fn() +})) + +describe('handleScheduledNotification', () => { + const workspaceUuid = 'workspace-1' as WorkspaceUuid + const baseMessage: ScheduledNotificationMessage = { + kind: 'todoReminder', + id: 'timer-1', + workSlotId: 'workslot-1' as any, + todoId: 'todo-1' as any, + shiftMs: 1000, + targetDate: 1_000_000 + } + + const control = { + heartbeat: jest.fn(async () => {}) + } as unknown as ConsumerControl + + const ctx = { + info: jest.fn(), + error: jest.fn(), + warn: jest.fn() + } as unknown as MeasureContext + + const findOne = jest.fn() + const createDoc = jest.fn() + const client = { + findOne, + createDoc + } + + beforeEach(() => { + jest.clearAllMocks() + ;(getClient as jest.Mock).mockResolvedValue({ client }) + }) + + it('returns early for non-todo messages', async () => { + await handleScheduledNotification(ctx, workspaceUuid, { ...baseMessage, kind: 'other' as any }, control) + + expect(getClient).not.toHaveBeenCalled() + expect(control.heartbeat).not.toHaveBeenCalled() + }) + + it('skips creation when reminder was already created', async () => { + findOne.mockImplementation(async (klass: any, query: any) => { + if (klass === time.class.WorkSlot) return { _id: baseMessage.workSlotId } + if (klass === time.class.ToDo) { + return { + _id: baseMessage.todoId, + _class: 'time:class:ToDo', + space: 'space-1', + user: 'employee-1', + title: 'Todo title' + } + } + if (klass === contact.mixin.Employee) return { personUuid: 'person-1' } + if (klass === contact.class.PersonSpace) return { _id: 'person-space-1' } + if (klass === notification.class.DocNotifyContext) return { _id: 'doc-notify-1' } + if (klass === notification.class.CommonInboxNotification && query?.['intlParams.timerId'] === baseMessage.id) { return { _id: 'existing-notification' } } + return undefined + }) + + await handleScheduledNotification(ctx, workspaceUuid, baseMessage, control) + + expect(createDoc).not.toHaveBeenCalled() + expect(ctx.info).not.toHaveBeenCalled() + }) + + it('creates notify context and inbox notification when needed', async () => { + let docNotifyContextCreated = false + findOne.mockImplementation(async (klass: any, query: any) => { + if (klass === time.class.WorkSlot) return { _id: baseMessage.workSlotId } + if (klass === time.class.ToDo) { + return { + _id: baseMessage.todoId, + _class: 'time:class:ToDo', + space: 'space-1', + user: 'employee-1', + title: 'Todo title' + } + } + if (klass === contact.mixin.Employee) return { personUuid: 'person-1' } + if (klass === contact.class.PersonSpace) return { _id: 'person-space-1' } + if (klass === notification.class.DocNotifyContext && query?._id === 'doc-notify-created-id') { return { _id: 'doc-notify-created-id' } } + if (klass === notification.class.DocNotifyContext) { return docNotifyContextCreated ? { _id: 'doc-notify-created-id' } : undefined } + return undefined + }) + createDoc.mockImplementation(async (klass: any) => { + if (klass === notification.class.DocNotifyContext) { + docNotifyContextCreated = true + return 'doc-notify-created-id' + } + return 'notification-created-id' + }) + + await handleScheduledNotification(ctx, workspaceUuid, baseMessage, control) + + expect(createDoc).toHaveBeenNthCalledWith( + 1, + notification.class.DocNotifyContext, + 'person-space-1', + expect.objectContaining({ + objectId: baseMessage.todoId, + objectClass: 'time:class:ToDo', + objectSpace: 'space-1', + user: 'person-1', + isPinned: false, + hidden: false + }) + ) + expect(createDoc).toHaveBeenNthCalledWith( + 2, + notification.class.CommonInboxNotification, + 'person-space-1', + expect.objectContaining({ + user: 'person-1', + objectId: baseMessage.todoId, + objectClass: 'time:class:ToDo', + icon: time.icon.Planned, + header: time.string.ToDo, + message: time.string.ToDo, + intlParams: { timerId: baseMessage.id }, + types: [modelTime.ids.ToDoReminder], + isViewed: false, + archived: false, + docNotifyContext: 'doc-notify-created-id' + }) + ) + expect(ctx.info).toHaveBeenCalledWith( + 'Scheduled notification created', + expect.objectContaining({ + kind: 'todoReminder', + id: baseMessage.id, + workSlotId: baseMessage.workSlotId, + todoId: baseMessage.todoId, + user: 'person-1', + spaceId: 'person-space-1' + }) + ) + }) +}) diff --git a/services/notification/pod-events-processor/src/client.ts b/services/notification/pod-events-processor/src/client.ts new file mode 100644 index 00000000000..871a85caf46 --- /dev/null +++ b/services/notification/pod-events-processor/src/client.ts @@ -0,0 +1,78 @@ +// +// Copyright © 2026 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { getClient as getAccountClient, type AccountClient } from '@hcengineering/account-client' +import { createRestTxOperations } from '@hcengineering/api-client' +import core, { PersonId, systemAccountUuid, type TxOperations, type WorkspaceUuid } from '@hcengineering/core' +import { generateToken } from '@hcengineering/server-token' +import { + clearInFlightClientCreation, + getCacheKey, + getCachedClient, + getInFlightClientCreation, + setCachedClient, + setInFlightClientCreation +} from './clientCache' +import config from './config' + +export interface ClientBundle { + client: TxOperations + accountClient: AccountClient +} + +export async function getClient ( + workspaceUuid: WorkspaceUuid, + socialId?: PersonId, + serviceTag: string = config.ServiceId +): Promise { + const cached = getCachedClient(workspaceUuid, socialId, serviceTag) + if (cached !== undefined) return cached + + const cacheKey = getCacheKey(workspaceUuid, socialId, serviceTag) + const inFlight = getInFlightClientCreation(cacheKey) + if (inFlight !== undefined) return await inFlight + + const creation = (async () => { + const token = generateToken(systemAccountUuid, workspaceUuid, { service: serviceTag }) + let accountClient = getAccountClient(config.AccountsUrl, token) + + // If we want the notification author to be a specific user, we can obtain a workspace token for that person. + if (socialId !== undefined && socialId !== core.account.System) { + const personUuid = await accountClient.findPersonBySocialId(socialId, true) + if (personUuid === undefined) { + throw new Error(`Global person not found for social-id ${socialId}`) + } + const token = generateToken(personUuid, workspaceUuid, { service: serviceTag }) + accountClient = getAccountClient(config.AccountsUrl, token) + } + + const wsInfo = await accountClient.getLoginInfoByToken() + if (wsInfo == null || !('endpoint' in wsInfo)) { + throw new Error('Invalid login info') + } + const transactorUrl = wsInfo.endpoint.replace('ws://', 'http://').replace('wss://', 'https://') + const client = await createRestTxOperations(transactorUrl, wsInfo.workspace, wsInfo.token) + const bundle = { client, accountClient } + setCachedClient(workspaceUuid, socialId, serviceTag, bundle) + return bundle + })() + + setInFlightClientCreation(cacheKey, creation) + try { + return await creation + } finally { + clearInFlightClientCreation(cacheKey) + } +} diff --git a/services/notification/pod-events-processor/src/clientCache.ts b/services/notification/pod-events-processor/src/clientCache.ts new file mode 100644 index 00000000000..dac5cab3514 --- /dev/null +++ b/services/notification/pod-events-processor/src/clientCache.ts @@ -0,0 +1,65 @@ +// +// Copyright © 2026 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { type PersonId, type WorkspaceUuid } from '@hcengineering/core' +import { LRUCache } from 'lru-cache' +import type { ClientBundle } from './client' +import config from './config' + +const clientCache = new LRUCache({ + ttl: config.ClientCacheTtlMs, + ttlAutopurge: true +}) +const inFlightClientCreations = new Map>() + +export function getCacheKey (workspaceUuid: WorkspaceUuid, socialId: PersonId | undefined, serviceTag: string): string { + return `${workspaceUuid}:${socialId ?? 'system'}:${serviceTag}` +} + +export function getCachedClient ( + workspaceUuid: WorkspaceUuid, + socialId: PersonId | undefined, + serviceTag: string +): ClientBundle | undefined { + const key = getCacheKey(workspaceUuid, socialId, serviceTag) + return clientCache.get(key) +} + +export function setCachedClient ( + workspaceUuid: WorkspaceUuid, + socialId: PersonId | undefined, + serviceTag: string, + value: ClientBundle +): void { + const key = getCacheKey(workspaceUuid, socialId, serviceTag) + clientCache.set(key, value) +} + +export function getInFlightClientCreation (key: string): Promise | undefined { + return inFlightClientCreations.get(key) +} + +export function setInFlightClientCreation (key: string, creation: Promise): void { + inFlightClientCreations.set(key, creation) +} + +export function clearInFlightClientCreation (key: string): void { + inFlightClientCreations.delete(key) +} + +export function clearClientCachesForTests (): void { + clientCache.clear() + inFlightClientCreations.clear() +} diff --git a/services/notification/pod-events-processor/src/config.ts b/services/notification/pod-events-processor/src/config.ts new file mode 100644 index 00000000000..cea0aed7fee --- /dev/null +++ b/services/notification/pod-events-processor/src/config.ts @@ -0,0 +1,38 @@ +// +// Copyright © 2026 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { config as dotenvConfig } from 'dotenv' + +dotenvConfig() + +interface Config { + ServiceId: string + Secret: string + AccountsUrl: string + QueueRegion: string + LogLevel: 'info' | 'debug' + ClientCacheTtlMs: number +} + +const config: Config = { + ServiceId: process.env.SERVICE_ID ?? 'events-processor', + Secret: process.env.SECRET ?? 'secret', + AccountsUrl: process.env.ACCOUNTS_URL ?? 'http://localhost:3000', + QueueRegion: process.env.QUEUE_REGION ?? 'localhost', + LogLevel: process.env.LOG_LEVEL === 'debug' ? 'debug' : 'info', + ClientCacheTtlMs: Number(process.env.CLIENT_CACHE_TTL_MS ?? 60_000) +} + +export default config diff --git a/services/notification/pod-events-processor/src/index.ts b/services/notification/pod-events-processor/src/index.ts new file mode 100644 index 00000000000..90e41632d88 --- /dev/null +++ b/services/notification/pod-events-processor/src/index.ts @@ -0,0 +1,89 @@ +// +// Copyright © 2026 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { Analytics } from '@hcengineering/analytics' +import { configureAnalytics, createOpenTelemetryMetricsContext, SplitLogger } from '@hcengineering/analytics-service' +import { newMetrics } from '@hcengineering/core' +import { getPlatformQueue } from '@hcengineering/kafka' +import { setMetadata } from '@hcengineering/platform' +import serverClient from '@hcengineering/server-client' +import { initStatisticsContext, type ConsumerControl } from '@hcengineering/server-core' +import serverToken from '@hcengineering/server-token' +import { join } from 'path' +import config from './config' +import type { ScheduledNotificationMessage } from './types' +import { handleScheduledNotification } from './worker' + +const scheduledNotificationTopic = 'scheduledNotification' +const isDebugLoggingEnabled = config.LogLevel === 'debug' + +async function main (): Promise { + configureAnalytics(config.ServiceId, process.env.VERSION ?? '0.7.0') + const ctx = initStatisticsContext(config.ServiceId, { + factory: () => + createOpenTelemetryMetricsContext( + config.ServiceId, + {}, + {}, + newMetrics(), + new SplitLogger(config.ServiceId, { + root: join(process.cwd(), 'logs'), + enableConsole: (process.env.ENABLE_CONSOLE ?? 'true') === 'true' + }) + ) + }) + + Analytics.setTag('application', config.ServiceId) + setMetadata(serverToken.metadata.Secret, config.Secret) + setMetadata(serverToken.metadata.Service, config.ServiceId) + setMetadata(serverClient.metadata.Endpoint, config.AccountsUrl) + + const queue = getPlatformQueue(config.ServiceId, config.QueueRegion) + + const consumer = queue.createConsumer( + ctx, + scheduledNotificationTopic, + queue.getClientId(), + async (ctx, message, control: ConsumerControl) => { + if (isDebugLoggingEnabled) { + ctx.info('Received scheduled notification event', { + topic: scheduledNotificationTopic, + workspace: message.workspace, + kind: message.value?.kind, + id: message.value?.id + }) + } + await handleScheduledNotification(ctx, message.workspace, message.value, control) + } + ) + + const shutdown = (): void => { + void Promise.all([consumer.close()]).then(() => process.exit()) + } + + process.once('SIGINT', shutdown) + process.once('SIGTERM', shutdown) + process.on('uncaughtException', (error: any) => { + ctx.error('Uncaught exception', { error }) + }) + process.on('unhandledRejection', (error: any) => { + ctx.error('Unhandled rejection', { error }) + }) +} + +void main().catch((err) => { + // eslint-disable-next-line no-console + console.error(err) +}) diff --git a/services/notification/pod-events-processor/src/types.ts b/services/notification/pod-events-processor/src/types.ts new file mode 100644 index 00000000000..e029417a684 --- /dev/null +++ b/services/notification/pod-events-processor/src/types.ts @@ -0,0 +1,26 @@ +// +// Copyright © 2026 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import type { Ref } from '@hcengineering/core' +import type { WorkSlot, ToDo } from '@hcengineering/time' + +export interface ScheduledNotificationMessage { + kind: 'todoReminder' + id: string + workSlotId: Ref + todoId: Ref + shiftMs: number + targetDate: number +} diff --git a/services/notification/pod-events-processor/src/worker.ts b/services/notification/pod-events-processor/src/worker.ts new file mode 100644 index 00000000000..80a06ba1471 --- /dev/null +++ b/services/notification/pod-events-processor/src/worker.ts @@ -0,0 +1,111 @@ +// +// Copyright © 2026 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import contact from '@hcengineering/contact' +import { type MeasureContext, type WorkspaceUuid } from '@hcengineering/core' +import type { ConsumerControl } from '@hcengineering/server-core' +import notification from '@hcengineering/notification' +import modelTime from '@hcengineering/model-time' +import { jsonToMarkup, nodeDoc, nodeParagraph, nodeText } from '@hcengineering/text-core' +import time from '@hcengineering/time' +import { getClient } from './client' +import type { ScheduledNotificationMessage } from './types' + +export async function handleScheduledNotification ( + ctx: MeasureContext, + workspaceUuid: WorkspaceUuid, + msg: ScheduledNotificationMessage, + control: ConsumerControl +): Promise { + if (msg.kind !== 'todoReminder') return + + await control.heartbeat() + const { client } = await getClient(workspaceUuid) + await control.heartbeat() + + const workslot = await client.findOne(time.class.WorkSlot, { _id: msg.workSlotId }) + if (workslot === undefined) return + await control.heartbeat() + const todo = await client.findOne(time.class.ToDo, { _id: msg.todoId }) + if (todo === undefined) return + if (todo.doneOn != null) return + + const employee = await client.findOne(contact.mixin.Employee, { _id: todo.user, active: true }) + if (employee?.personUuid == null) return + const user = employee.personUuid + + const space = await client.findOne(contact.class.PersonSpace, { person: todo.user }, { projection: { _id: 1 } }) + if (space === undefined) return + await control.heartbeat() + + const objectId = todo._id + const objectClass = todo._class + const objectSpace = todo.space + + // Ensure doc notify context exists. + let docNotifyContext = await client.findOne(notification.class.DocNotifyContext, { objectId, user }) + if (docNotifyContext === undefined) { + const id = await client.createDoc(notification.class.DocNotifyContext, space._id, { + objectId, + objectClass, + objectSpace, + user, + isPinned: false, + hidden: false + }) + docNotifyContext = await client.findOne( + notification.class.DocNotifyContext, + { _id: id }, + { projection: { _id: 1 } } + ) + if (docNotifyContext === undefined) return + } + + // Idempotency: if this timer already created a notification, skip. + // We key by (docNotifyContext,user,msg.id) via intlParams; this avoids needing deterministic _id. + const existing = await client.findOne(notification.class.CommonInboxNotification, { + docNotifyContext: docNotifyContext._id, + user, + 'intlParams.timerId': msg.id + } as any) + if (existing !== undefined) return + + await control.heartbeat() + await client.createDoc(notification.class.CommonInboxNotification, space._id, { + user, + objectId, + objectClass, + icon: time.icon.Planned, + header: time.string.ToDo, + message: time.string.ToDo, + messageHtml: jsonToMarkup(nodeDoc(nodeParagraph(nodeText(todo.title)))), + intlParams: { + timerId: msg.id + }, + types: [modelTime.ids.ToDoReminder], + isViewed: false, + archived: false, + docNotifyContext: docNotifyContext._id + } as any) + + ctx.info('Scheduled notification created', { + kind: msg.kind, + id: msg.id, + workSlotId: msg.workSlotId, + todoId: msg.todoId, + user, + spaceId: space._id + }) +} diff --git a/services/notification/pod-events-processor/tsconfig.json b/services/notification/pod-events-processor/tsconfig.json new file mode 100644 index 00000000000..41f3dcae2c9 --- /dev/null +++ b/services/notification/pod-events-processor/tsconfig.json @@ -0,0 +1,12 @@ +{ + "extends": "./node_modules/@hcengineering/platform-rig/profiles/default/tsconfig.json", + "compilerOptions": { + "rootDir": "./src", + "outDir": "./lib", + "declarationDir": "./types", + "tsBuildInfoFile": ".build/build.tsbuildinfo" + }, + "include": ["src/**/*"], + "exclude": ["node_modules", "lib", "dist", "types", "bundle"] +} + diff --git a/services/worker/README.md b/services/worker/README.md index 45dc4df86bd..102a09f1126 100644 --- a/services/worker/README.md +++ b/services/worker/README.md @@ -33,7 +33,8 @@ When a timer expires, the service relays the exact `data` payload to the target | `DB_URL` | `postgres://localhost:5432/huly` | Connection string for the PostgreSQL database. | | `POLL_INTERVAL` | `5000` | Polling interval for expired events in milliseconds. | | `QUEUE_CONFIG` | - | Kafka bootstrap servers configuration. | -| `QUEUE_REGION` | `cockroach` | Platform region configuration. | +| `QUEUE_REGION` | (empty) | Kafka topic prefix; must match transactor `REGION` / other services `QUEUE_REGION`. | +| `LOG_LEVEL` | `info` | Set to `debug` for verbose diagnostic logs (`ctx.debug`) on TimeMachine consumes and poll batches. | ## Database Schema diff --git a/services/worker/src/config.ts b/services/worker/src/config.ts index 3c6ae330a26..3a76a81c511 100644 --- a/services/worker/src/config.ts +++ b/services/worker/src/config.ts @@ -21,13 +21,15 @@ export interface Config { PollInterval: number QueueRegion: string QueueConfig: string + LogLevel: 'info' | 'debug' } const config: Config = { DbUrl: process.env.DB_URL ?? 'postgres://localhost:5432/huly', PollInterval: process.env.POLL_INTERVAL != null ? Number(process.env.POLL_INTERVAL) : 20000, QueueRegion: process.env.QUEUE_REGION ?? '', - QueueConfig: process.env.QUEUE_CONFIG ?? '' + QueueConfig: process.env.QUEUE_CONFIG ?? '', + LogLevel: process.env.LOG_LEVEL === 'debug' ? 'debug' : 'info' } export default config diff --git a/services/worker/src/worker.ts b/services/worker/src/worker.ts index 5d5c339d0b8..50289894cb4 100644 --- a/services/worker/src/worker.ts +++ b/services/worker/src/worker.ts @@ -13,7 +13,7 @@ // limitations under the License. // -import { MeasureMetricsContext } from '@hcengineering/core' +import { MeasureMetricsContext, newMetrics } from '@hcengineering/core' import { getPlatformQueue } from '@hcengineering/kafka' import { QueueTopic } from '@hcengineering/server-core' import { TimeMachineMessage } from '@hcengineering/server-process' @@ -25,13 +25,29 @@ export async function runWorker (): Promise { const SERVICE_NAME = 'time-machine' const db = await TimeMachineDB.init(config.DbUrl) - const ctx = new MeasureMetricsContext(SERVICE_NAME, {}) + const ctx = new MeasureMetricsContext( + SERVICE_NAME, + {}, + {}, + newMetrics(), + undefined, + undefined, + undefined, + config.LogLevel + ) const queue = getPlatformQueue(SERVICE_NAME, config.QueueRegion) // 1. Kafka Consumer for commands queue.createConsumer(ctx, QueueTopic.TimeMachine, SERVICE_NAME, async (ctx, msg) => { const { type, id, targetDate, topic, data } = msg.value if (type === 'schedule' && targetDate != null && topic != null && data !== undefined) { + ctx.debug('TimeMachine consume schedule', { + workspace: msg.workspace, + id, + targetDate, + targetDateIso: new Date(targetDate).toISOString(), + outTopic: topic + }) await db.upsertEvent({ id, workspace: msg.workspace, @@ -40,6 +56,7 @@ export async function runWorker (): Promise { data }) } else if (type === 'cancel') { + ctx.debug('TimeMachine consume cancel', { workspace: msg.workspace, idPattern: id }) await db.removeEvents(msg.workspace, id) } }) @@ -49,13 +66,24 @@ export async function runWorker (): Promise { try { const expiredEvents = await db.getExpiredEvents() if (expiredEvents.length > 0) { + ctx.debug('TimeMachine poll expired', { + count: expiredEvents.length, + ids: expiredEvents.map((e) => e.id) + }) for (const event of expiredEvents) { + ctx.debug('TimeMachine relay', { + workspace: event.workspace, + id: event.id, + outTopic: event.topic, + targetDate: event.target_date, + targetDateIso: new Date(event.target_date).toISOString() + }) await SendTimeEvent(ctx, event.workspace, event.topic, event.data) } await db.deleteEvents(expiredEvents) } } catch (err) { - ctx.error('Error in Time Machine polling loop:') + ctx.error('Error in Time Machine polling loop', { err }) } finally { setTimeout(() => { void poll()