Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/server/medplum.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
"tokenUrl": "${FHIR_SERVER_BASE_URL}/oauth2/token",
"userInfoUrl": "${FHIR_SERVER_BASE_URL}/oauth2/userinfo",
"appBaseUrl": "https://localhost:41130/",
"binaryStorage": "${FHIR_BINARY_STORAGE}",
"storageBaseUrl": "${FHIR_SERVER_BASE_URL}/fhir-server/storage/",
"maxJsonSize": "${BINARY_UPLOAD_LIMIT_SIZE}",
"maxBatchSize": "${BINARY_UPLOAD_LIMIT_SIZE}",
"botLambdaRoleArn": "",
Expand Down
9 changes: 9 additions & 0 deletions packages/server/src/fhir/bulkdata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { AsyncJob, BulkDataExport } from '@medplum/fhirtypes';
import { Request, Response, Router } from 'express';
import { asyncWrap } from '../async';
import { getAuthenticatedContext } from '../context';
import { getLogger } from '../logger';
import { rewriteAttachments, RewriteMode } from './rewrite';

// Bulk Data API
Expand Down Expand Up @@ -35,10 +36,17 @@ bulkDataRouter.get(
const ctx = getAuthenticatedContext();
const { id } = req.params;
const bulkDataExport = await getExportResource(id);
const log = getLogger();

log.info('Bulk export poll', { jobId: id, status: bulkDataExport.status });

if (bulkDataExport.status === 'cancelled') {
res.status(404).json(notFound);
return;
} else if (bulkDataExport.status === 'error') {
log.error('Bulk export job in error state', { jobId: id });
res.status(500).json({ error: 'Export failed', jobId: id });
return;
} else if (bulkDataExport.status !== 'completed') {
res.status(202).end();
return;
Expand All @@ -52,6 +60,7 @@ bulkDataRouter.get(
error: extractOutputParameters(bulkDataExport, 'error'),
deleted: extractOutputParameters(bulkDataExport, 'deleted'),
});
log.info('Bulk export job completed, returning output', { jobId: id });
res.status(200).type(ContentType.JSON).json(json);
})
);
Expand Down
49 changes: 44 additions & 5 deletions packages/server/src/fhir/operations/export.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import {
singularize,
} from '@medplum/core';
import { FhirRequest, FhirResponse } from '@medplum/fhir-router';
import { Project, Resource, ResourceType } from '@medplum/fhirtypes';
import { AsyncJob, Project, Resource, ResourceType } from '@medplum/fhirtypes';
import { getConfig } from '../../config/loader';
import { getAuthenticatedContext } from '../../context';
import { globalLogger } from '../../logger';
import { getSystemRepo } from '../repo';
import { getPatientResourceTypes } from '../patient';
import { BulkExporter } from './utils/bulkexporter';

Expand Down Expand Up @@ -54,11 +56,41 @@ async function startExport(req: FhirRequest, exportType: string): Promise<FhirRe
const exporter = new BulkExporter(ctx.repo);
const bulkDataExport = await exporter.start(concatUrls(baseUrl, 'fhir/R4' + req.pathname));

globalLogger.info('Bulk export started', {
exportType,
jobId: bulkDataExport.id,
projectId: ctx.project.id,
types: types ?? 'all',
since: since ?? 'none',
});

exportResources(exporter, ctx.project, types, exportType, since)
.then(() => ctx.logger.info('Export completed', { exportType, id: ctx.project.id }))
.catch((err) => ctx.logger.error('Export failure', { exportType, id: ctx.project.id, error: err }));
.then(() => globalLogger.info('Bulk export completed', { exportType, jobId: bulkDataExport.id, projectId: ctx.project.id }))
.catch(async (err) => {
globalLogger.error('Bulk export failed', {
exportType,
jobId: bulkDataExport.id,
projectId: ctx.project.id,
error: err instanceof Error ? err.message : String(err),
stack: err instanceof Error ? err.stack : undefined,
});
try {
const systemRepo = getSystemRepo();
await systemRepo.updateResource<AsyncJob>({
...bulkDataExport,
status: 'error',
transactionTime: new Date().toISOString(),
});
globalLogger.info('Bulk export job marked as error', { jobId: bulkDataExport.id });
} catch (updateErr) {
globalLogger.error('Failed to mark bulk export job as error', {
jobId: bulkDataExport.id,
error: updateErr instanceof Error ? updateErr.message : String(updateErr),
});
}
});

return [accepted(`${baseUrl}fhir/R4/bulkdata/export/${bulkDataExport.id}`)];
return [accepted(concatUrls(baseUrl, `fhir/R4/bulkdata/export/${bulkDataExport.id}`))];
}

export async function exportResources(
Expand All @@ -79,11 +111,18 @@ export async function exportResources(
) {
continue;
}
globalLogger.info('Bulk export: exporting resource type', { resourceType, projectId: project.id });
await exportResourceType(exporter, resourceType, pageSize, since);
globalLogger.info('Bulk export: finished resource type', {
resourceType,
projectId: project.id,
count: exporter.resourceSet.size,
});
}

// Close the exporter
globalLogger.info('Bulk export: closing exporter', { projectId: project.id, totalResources: exporter.resourceSet.size });
await exporter.close(project);
globalLogger.info('Bulk export: exporter closed successfully', { projectId: project.id });
}

export async function exportResourceType<T extends Resource>(
Expand Down
19 changes: 0 additions & 19 deletions packages/server/src/fhir/operations/utils/asyncjobexecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,30 +126,11 @@ export class AsyncJobExecutor {
throw new Error('Cannot failJob since AsyncJob is not specified');
}

// A job throwing `DelayedError` means the job has been delayed/re-queued,
// so the job should not fail. Instead re-throw the error for BullMQ
// to handle.
if (err) {
throw ''; // DelayedError is removed
}

const failedJob: AsyncJob = {
...this.resource,
status: 'error',
transactionTime: new Date().toISOString(),
};
// if (err) {
// failedJob.output = {
// resourceType: 'Parameters',
// parameter:
// err instanceof OperationOutcomeError
// ? [{ name: 'outcome', resource: err.outcome }]
// : [
// { name: 'error', valueString: err.message },
// { name: 'stack', valueString: err.stack },
// ],
// };
// }
return repo.updateResource<AsyncJob>(failedJob);
}

Expand Down
17 changes: 15 additions & 2 deletions packages/server/src/fhir/operations/utils/bulkexporter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { getReferenceString, WithId } from '@medplum/core';
import { AsyncJob, Binary, Bundle, Parameters, Project, Resource } from '@medplum/fhirtypes';
import { PassThrough } from 'node:stream';
import { globalLogger } from '../../../logger';
import { getBinaryStorage } from '../../../storage/loader';
import { getSystemRepo, Repository } from '../../repo';

Expand Down Expand Up @@ -86,15 +87,24 @@ export class BulkExporter {
throw new Error('Export muse be started before calling close()');
}

for (const writer of Object.values(this.writers)) {
const writerCount = Object.keys(this.writers).length;
globalLogger.info('BulkExporter: closing writers', { jobId: this.resource.id, writerCount });

for (const [resourceType, writer] of Object.entries(this.writers)) {
globalLogger.info('BulkExporter: closing writer', { jobId: this.resource.id, resourceType });
await writer.close();
globalLogger.info('BulkExporter: writer closed', { jobId: this.resource.id, resourceType });
}

globalLogger.info('BulkExporter: all writers closed, updating AsyncJob to completed', { jobId: this.resource.id });

// Update the AsyncJob
const systemRepo = getSystemRepo();
const asyncJob = await systemRepo.readResource<AsyncJob>('AsyncJob', this.resource.id);
globalLogger.info('BulkExporter: current AsyncJob status before update', { jobId: this.resource.id, status: asyncJob.status });

if (asyncJob.status !== 'cancelled') {
return systemRepo.updateResource<AsyncJob>({
const updated = await systemRepo.updateResource<AsyncJob>({
...this.resource,
meta: {
project: project.id,
Expand All @@ -103,7 +113,10 @@ export class BulkExporter {
transactionTime: new Date().toISOString(),
output: this.formatOutput(),
});
globalLogger.info('BulkExporter: AsyncJob updated to completed', { jobId: this.resource.id });
return updated;
}
globalLogger.warn('BulkExporter: AsyncJob was cancelled, skipping completed update', { jobId: this.resource.id });
return this.resource;
}

Expand Down
Loading