Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/publish-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ jobs:
working-directory: ${{ matrix.PKGPATH }}
run: jq '.private=false' package.json > tmppkg; mv tmppkg package.json

- name: Setup Java 21
uses: actions/setup-java@v4
with:
java-version: '21'
distribution: 'temurin'

- name: DEST
working-directory: ${{ matrix.PKGPATH }}
run: |
Expand Down
2 changes: 2 additions & 0 deletions packages/server/medplum.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
"tokenUrl": "${FHIR_SERVER_BASE_URL}/oauth2/token",
"userInfoUrl": "${FHIR_SERVER_BASE_URL}/oauth2/userinfo",
"appBaseUrl": "https://localhost:41130/",
"binaryStorage": "${FHIR_BINARY_STORAGE}",
"storageBaseUrl": "${FHIR_SERVER_BASE_URL}/fhir-server/storage/",
"maxJsonSize": "${BINARY_UPLOAD_LIMIT_SIZE}",
"maxBatchSize": "${BINARY_UPLOAD_LIMIT_SIZE}",
"botLambdaRoleArn": "",
Expand Down
9 changes: 9 additions & 0 deletions packages/server/src/fhir/bulkdata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { AsyncJob, BulkDataExport } from '@medplum/fhirtypes';
import { Request, Response, Router } from 'express';
import { asyncWrap } from '../async';
import { getAuthenticatedContext } from '../context';
import { getLogger } from '../logger';
import { rewriteAttachments, RewriteMode } from './rewrite';

// Bulk Data API
Expand Down Expand Up @@ -35,10 +36,17 @@ bulkDataRouter.get(
const ctx = getAuthenticatedContext();
const { id } = req.params;
const bulkDataExport = await getExportResource(id);
const log = getLogger();

log.info('Bulk export poll', { jobId: id, status: bulkDataExport.status });

if (bulkDataExport.status === 'cancelled') {
res.status(404).json(notFound);
return;
} else if (bulkDataExport.status === 'error') {
log.error('Bulk export job in error state', { jobId: id });
res.status(500).json({ error: 'Export failed', jobId: id });
return;
} else if (bulkDataExport.status !== 'completed') {
res.status(202).end();
return;
Expand All @@ -52,6 +60,7 @@ bulkDataRouter.get(
error: extractOutputParameters(bulkDataExport, 'error'),
deleted: extractOutputParameters(bulkDataExport, 'deleted'),
});
log.info('Bulk export job completed, returning output', { jobId: id });
res.status(200).type(ContentType.JSON).json(json);
})
);
Expand Down
49 changes: 44 additions & 5 deletions packages/server/src/fhir/operations/export.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import {
singularize,
} from '@medplum/core';
import { FhirRequest, FhirResponse } from '@medplum/fhir-router';
import { Project, Resource, ResourceType } from '@medplum/fhirtypes';
import { AsyncJob, Project, Resource, ResourceType } from '@medplum/fhirtypes';
import { getConfig } from '../../config/loader';
import { getAuthenticatedContext } from '../../context';
import { globalLogger } from '../../logger';
import { getSystemRepo } from '../repo';
import { getPatientResourceTypes } from '../patient';
import { BulkExporter } from './utils/bulkexporter';

Expand Down Expand Up @@ -54,11 +56,41 @@ async function startExport(req: FhirRequest, exportType: string): Promise<FhirRe
const exporter = new BulkExporter(ctx.repo);
const bulkDataExport = await exporter.start(concatUrls(baseUrl, 'fhir/R4' + req.pathname));

globalLogger.info('Bulk export started', {
exportType,
jobId: bulkDataExport.id,
projectId: ctx.project.id,
types: types ?? 'all',
since: since ?? 'none',
});

exportResources(exporter, ctx.project, types, exportType, since)
.then(() => ctx.logger.info('Export completed', { exportType, id: ctx.project.id }))
.catch((err) => ctx.logger.error('Export failure', { exportType, id: ctx.project.id, error: err }));
.then(() => globalLogger.info('Bulk export completed', { exportType, jobId: bulkDataExport.id, projectId: ctx.project.id }))
.catch(async (err) => {
globalLogger.error('Bulk export failed', {
exportType,
jobId: bulkDataExport.id,
projectId: ctx.project.id,
error: err instanceof Error ? err.message : String(err),
stack: err instanceof Error ? err.stack : undefined,
});
try {
const systemRepo = getSystemRepo();
await systemRepo.updateResource<AsyncJob>({
...bulkDataExport,
status: 'error',
transactionTime: new Date().toISOString(),
});
globalLogger.info('Bulk export job marked as error', { jobId: bulkDataExport.id });
} catch (updateErr) {
globalLogger.error('Failed to mark bulk export job as error', {
jobId: bulkDataExport.id,
error: updateErr instanceof Error ? updateErr.message : String(updateErr),
});
}
});

return [accepted(`${baseUrl}fhir/R4/bulkdata/export/${bulkDataExport.id}`)];
return [accepted(concatUrls(baseUrl, `fhir/R4/bulkdata/export/${bulkDataExport.id}`))];
}

export async function exportResources(
Expand All @@ -79,11 +111,18 @@ export async function exportResources(
) {
continue;
}
globalLogger.info('Bulk export: exporting resource type', { resourceType, projectId: project.id });
await exportResourceType(exporter, resourceType, pageSize, since);
globalLogger.info('Bulk export: finished resource type', {
resourceType,
projectId: project.id,
count: exporter.resourceSet.size,
});
}

// Close the exporter
globalLogger.info('Bulk export: closing exporter', { projectId: project.id, totalResources: exporter.resourceSet.size });
await exporter.close(project);
globalLogger.info('Bulk export: exporter closed successfully', { projectId: project.id });
}

export async function exportResourceType<T extends Resource>(
Expand Down
19 changes: 0 additions & 19 deletions packages/server/src/fhir/operations/utils/asyncjobexecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,30 +126,11 @@ export class AsyncJobExecutor {
throw new Error('Cannot failJob since AsyncJob is not specified');
}

// A job throwing `DelayedError` means the job has been delayed/re-queued,
// so the job should not fail. Instead re-throw the error for BullMQ
// to handle.
if (err) {
throw ''; // DelayedError is removed
}

const failedJob: AsyncJob = {
...this.resource,
status: 'error',
transactionTime: new Date().toISOString(),
};
// if (err) {
// failedJob.output = {
// resourceType: 'Parameters',
// parameter:
// err instanceof OperationOutcomeError
// ? [{ name: 'outcome', resource: err.outcome }]
// : [
// { name: 'error', valueString: err.message },
// { name: 'stack', valueString: err.stack },
// ],
// };
// }
return repo.updateResource<AsyncJob>(failedJob);
}

Expand Down
17 changes: 15 additions & 2 deletions packages/server/src/fhir/operations/utils/bulkexporter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { getReferenceString, WithId } from '@medplum/core';
import { AsyncJob, Binary, Bundle, Parameters, Project, Resource } from '@medplum/fhirtypes';
import { PassThrough } from 'node:stream';
import { globalLogger } from '../../../logger';
import { getBinaryStorage } from '../../../storage/loader';
import { getSystemRepo, Repository } from '../../repo';

Expand Down Expand Up @@ -86,15 +87,24 @@ export class BulkExporter {
throw new Error('Export muse be started before calling close()');
}

for (const writer of Object.values(this.writers)) {
const writerCount = Object.keys(this.writers).length;
globalLogger.info('BulkExporter: closing writers', { jobId: this.resource.id, writerCount });

for (const [resourceType, writer] of Object.entries(this.writers)) {
globalLogger.info('BulkExporter: closing writer', { jobId: this.resource.id, resourceType });
await writer.close();
globalLogger.info('BulkExporter: writer closed', { jobId: this.resource.id, resourceType });
}

globalLogger.info('BulkExporter: all writers closed, updating AsyncJob to completed', { jobId: this.resource.id });

// Update the AsyncJob
const systemRepo = getSystemRepo();
const asyncJob = await systemRepo.readResource<AsyncJob>('AsyncJob', this.resource.id);
globalLogger.info('BulkExporter: current AsyncJob status before update', { jobId: this.resource.id, status: asyncJob.status });

if (asyncJob.status !== 'cancelled') {
return systemRepo.updateResource<AsyncJob>({
const updated = await systemRepo.updateResource<AsyncJob>({
...this.resource,
meta: {
project: project.id,
Expand All @@ -103,7 +113,10 @@ export class BulkExporter {
transactionTime: new Date().toISOString(),
output: this.formatOutput(),
});
globalLogger.info('BulkExporter: AsyncJob updated to completed', { jobId: this.resource.id });
return updated;
}
globalLogger.warn('BulkExporter: AsyncJob was cancelled, skipping completed update', { jobId: this.resource.id });
return this.resource;
}

Expand Down
Loading