From 32d6b72ff30ea8da34c71ba9d3cf7927d18b75d0 Mon Sep 17 00:00:00 2001 From: Afreen Sikandara Date: Tue, 3 Mar 2026 15:02:34 +0800 Subject: [PATCH 1/2] Fix bulk export --- packages/server/medplum.config.json | 2 + packages/server/src/fhir/bulkdata.ts | 9 ++++ packages/server/src/fhir/operations/export.ts | 49 +++++++++++++++++-- .../fhir/operations/utils/asyncjobexecutor.ts | 19 ------- .../src/fhir/operations/utils/bulkexporter.ts | 17 ++++++- 5 files changed, 70 insertions(+), 26 deletions(-) diff --git a/packages/server/medplum.config.json b/packages/server/medplum.config.json index 09c2ed0ce26..c4a09f1e128 100644 --- a/packages/server/medplum.config.json +++ b/packages/server/medplum.config.json @@ -8,6 +8,8 @@ "tokenUrl": "${FHIR_SERVER_BASE_URL}/oauth2/token", "userInfoUrl": "${FHIR_SERVER_BASE_URL}/oauth2/userinfo", "appBaseUrl": "https://localhost:41130/", + "binaryStorage": "${FHIR_BINARY_STORAGE}", + "storageBaseUrl": "${FHIR_SERVER_BASE_URL}/fhir-server/storage/", "maxJsonSize": "${BINARY_UPLOAD_LIMIT_SIZE}", "maxBatchSize": "${BINARY_UPLOAD_LIMIT_SIZE}", "botLambdaRoleArn": "", diff --git a/packages/server/src/fhir/bulkdata.ts b/packages/server/src/fhir/bulkdata.ts index cf9b7951760..7b7557e6d5a 100644 --- a/packages/server/src/fhir/bulkdata.ts +++ b/packages/server/src/fhir/bulkdata.ts @@ -3,6 +3,7 @@ import { AsyncJob, BulkDataExport } from '@medplum/fhirtypes'; import { Request, Response, Router } from 'express'; import { asyncWrap } from '../async'; import { getAuthenticatedContext } from '../context'; +import { getLogger } from '../logger'; import { rewriteAttachments, RewriteMode } from './rewrite'; // Bulk Data API @@ -35,10 +36,17 @@ bulkDataRouter.get( const ctx = getAuthenticatedContext(); const { id } = req.params; const bulkDataExport = await getExportResource(id); + const log = getLogger(); + + log.info('Bulk export poll', { jobId: id, status: bulkDataExport.status }); if (bulkDataExport.status === 'cancelled') { res.status(404).json(notFound); return; + } else if (bulkDataExport.status === 'error') { + log.error('Bulk export job in error state', { jobId: id }); + res.status(500).json({ error: 'Export failed', jobId: id }); + return; } else if (bulkDataExport.status !== 'completed') { res.status(202).end(); return; @@ -52,6 +60,7 @@ bulkDataRouter.get( error: extractOutputParameters(bulkDataExport, 'error'), deleted: extractOutputParameters(bulkDataExport, 'deleted'), }); + log.info('Bulk export job completed, returning output', { jobId: id }); res.status(200).type(ContentType.JSON).json(json); }) ); diff --git a/packages/server/src/fhir/operations/export.ts b/packages/server/src/fhir/operations/export.ts index 5ef868a1074..27c80a9e357 100644 --- a/packages/server/src/fhir/operations/export.ts +++ b/packages/server/src/fhir/operations/export.ts @@ -9,9 +9,11 @@ import { singularize, } from '@medplum/core'; import { FhirRequest, FhirResponse } from '@medplum/fhir-router'; -import { Project, Resource, ResourceType } from '@medplum/fhirtypes'; +import { AsyncJob, Project, Resource, ResourceType } from '@medplum/fhirtypes'; import { getConfig } from '../../config/loader'; import { getAuthenticatedContext } from '../../context'; +import { globalLogger } from '../../logger'; +import { getSystemRepo } from '../repo'; import { getPatientResourceTypes } from '../patient'; import { BulkExporter } from './utils/bulkexporter'; @@ -54,11 +56,41 @@ async function startExport(req: FhirRequest, exportType: string): Promise ctx.logger.info('Export completed', { exportType, id: ctx.project.id })) - .catch((err) => ctx.logger.error('Export failure', { exportType, id: ctx.project.id, error: err })); + .then(() => globalLogger.info('Bulk export completed', { exportType, jobId: bulkDataExport.id, projectId: ctx.project.id })) + .catch(async (err) => { + globalLogger.error('Bulk export failed', { + exportType, + jobId: bulkDataExport.id, + projectId: ctx.project.id, + error: err instanceof Error ? err.message : String(err), + stack: err instanceof Error ? err.stack : undefined, + }); + try { + const systemRepo = getSystemRepo(); + await systemRepo.updateResource({ + ...bulkDataExport, + status: 'error', + transactionTime: new Date().toISOString(), + }); + globalLogger.info('Bulk export job marked as error', { jobId: bulkDataExport.id }); + } catch (updateErr) { + globalLogger.error('Failed to mark bulk export job as error', { + jobId: bulkDataExport.id, + error: updateErr instanceof Error ? updateErr.message : String(updateErr), + }); + } + }); - return [accepted(`${baseUrl}fhir/R4/bulkdata/export/${bulkDataExport.id}`)]; + return [accepted(concatUrls(baseUrl, `fhir/R4/bulkdata/export/${bulkDataExport.id}`))]; } export async function exportResources( @@ -79,11 +111,18 @@ export async function exportResources( ) { continue; } + globalLogger.info('Bulk export: exporting resource type', { resourceType, projectId: project.id }); await exportResourceType(exporter, resourceType, pageSize, since); + globalLogger.info('Bulk export: finished resource type', { + resourceType, + projectId: project.id, + count: exporter.resourceSet.size, + }); } - // Close the exporter + globalLogger.info('Bulk export: closing exporter', { projectId: project.id, totalResources: exporter.resourceSet.size }); await exporter.close(project); + globalLogger.info('Bulk export: exporter closed successfully', { projectId: project.id }); } export async function exportResourceType( diff --git a/packages/server/src/fhir/operations/utils/asyncjobexecutor.ts b/packages/server/src/fhir/operations/utils/asyncjobexecutor.ts index 91baef971c8..daaa883ff99 100644 --- a/packages/server/src/fhir/operations/utils/asyncjobexecutor.ts +++ b/packages/server/src/fhir/operations/utils/asyncjobexecutor.ts @@ -126,30 +126,11 @@ export class AsyncJobExecutor { throw new Error('Cannot failJob since AsyncJob is not specified'); } - // A job throwing `DelayedError` means the job has been delayed/re-queued, - // so the job should not fail. Instead re-throw the error for BullMQ - // to handle. - if (err) { - throw ''; // DelayedError is removed - } - const failedJob: AsyncJob = { ...this.resource, status: 'error', transactionTime: new Date().toISOString(), }; - // if (err) { - // failedJob.output = { - // resourceType: 'Parameters', - // parameter: - // err instanceof OperationOutcomeError - // ? [{ name: 'outcome', resource: err.outcome }] - // : [ - // { name: 'error', valueString: err.message }, - // { name: 'stack', valueString: err.stack }, - // ], - // }; - // } return repo.updateResource(failedJob); } diff --git a/packages/server/src/fhir/operations/utils/bulkexporter.ts b/packages/server/src/fhir/operations/utils/bulkexporter.ts index 805b8e75a8f..cb7adca20e2 100644 --- a/packages/server/src/fhir/operations/utils/bulkexporter.ts +++ b/packages/server/src/fhir/operations/utils/bulkexporter.ts @@ -1,6 +1,7 @@ import { getReferenceString, WithId } from '@medplum/core'; import { AsyncJob, Binary, Bundle, Parameters, Project, Resource } from '@medplum/fhirtypes'; import { PassThrough } from 'node:stream'; +import { globalLogger } from '../../../logger'; import { getBinaryStorage } from '../../../storage/loader'; import { getSystemRepo, Repository } from '../../repo'; @@ -86,15 +87,24 @@ export class BulkExporter { throw new Error('Export muse be started before calling close()'); } - for (const writer of Object.values(this.writers)) { + const writerCount = Object.keys(this.writers).length; + globalLogger.info('BulkExporter: closing writers', { jobId: this.resource.id, writerCount }); + + for (const [resourceType, writer] of Object.entries(this.writers)) { + globalLogger.info('BulkExporter: closing writer', { jobId: this.resource.id, resourceType }); await writer.close(); + globalLogger.info('BulkExporter: writer closed', { jobId: this.resource.id, resourceType }); } + globalLogger.info('BulkExporter: all writers closed, updating AsyncJob to completed', { jobId: this.resource.id }); + // Update the AsyncJob const systemRepo = getSystemRepo(); const asyncJob = await systemRepo.readResource('AsyncJob', this.resource.id); + globalLogger.info('BulkExporter: current AsyncJob status before update', { jobId: this.resource.id, status: asyncJob.status }); + if (asyncJob.status !== 'cancelled') { - return systemRepo.updateResource({ + const updated = await systemRepo.updateResource({ ...this.resource, meta: { project: project.id, @@ -103,7 +113,10 @@ export class BulkExporter { transactionTime: new Date().toISOString(), output: this.formatOutput(), }); + globalLogger.info('BulkExporter: AsyncJob updated to completed', { jobId: this.resource.id }); + return updated; } + globalLogger.warn('BulkExporter: AsyncJob was cancelled, skipping completed update', { jobId: this.resource.id }); return this.resource; } From 2fce0aad9ca87643ae9e90a1e9b559940b3cd4c9 Mon Sep 17 00:00:00 2001 From: Afreen Sikandara Date: Thu, 12 Mar 2026 16:59:18 +0800 Subject: [PATCH 2/2] fix java version error --- .github/workflows/publish-server.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/publish-server.yml b/.github/workflows/publish-server.yml index 2c6c5a986c9..f8d274ae69c 100644 --- a/.github/workflows/publish-server.yml +++ b/.github/workflows/publish-server.yml @@ -116,6 +116,12 @@ jobs: working-directory: ${{ matrix.PKGPATH }} run: jq '.private=false' package.json > tmppkg; mv tmppkg package.json + - name: Setup Java 21 + uses: actions/setup-java@v4 + with: + java-version: '21' + distribution: 'temurin' + - name: DEST working-directory: ${{ matrix.PKGPATH }} run: |