Add FHIR R4 Healthcare Intelligence connector example#573
Conversation
Adds a new AI-powered connector under all_things_ai/tutorials that syncs clinical data from a FHIR R4 server and enriches it with Databricks ai_query() using the Hybrid (Discovery + Debate) pattern for population health risk stratification and per-patient intervention recommendations. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
David.Millman seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
1 similar comment
|
David.Millman seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
There was a problem hiding this comment.
Pull request overview
Adds a new Databricks AI tutorial connector example that syncs FHIR R4 clinical resources and optionally enriches them using a Hybrid (Discovery + Debate) ai_query() workflow, plus updates the repo root README to list the new tutorial.
Changes:
- Added a new connector implementation (
connector.py) for fetching FHIR Patient/Condition/Observation/MedicationRequest and producing AI enrichment tables. - Added tutorial docs (
README.md) and a templateconfiguration.jsonfor the new connector. - Updated the repository
README.mdto include the new tutorial link.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 9 comments.
| File | Description |
|---|---|
| all_things_ai/tutorials/databricks-fm-fhir-healthcare-intelligence/connector.py | Implements FHIR fetch, Databricks ai_query() enrichment, and optional Genie Space creation. |
| all_things_ai/tutorials/databricks-fm-fhir-healthcare-intelligence/configuration.json | Adds a configuration template for FHIR + Databricks settings. |
| all_things_ai/tutorials/databricks-fm-fhir-healthcare-intelligence/README.md | Documents connector features, configuration, and produced tables. |
| README.md | Adds an entry pointing to the new tutorial connector. |
| session = requests.Session() | ||
| session.headers.update({"Accept": __FHIR_ACCEPT_HEADER}) | ||
| return session |
There was a problem hiding this comment.
create_session() sets a global Accept: application/fhir+json header on the session, but the same session is later used for Databricks calls (which may not honor/accept that media type). To avoid unexpected 406/negotiation issues, either use a separate session for Databricks or explicitly set an appropriate Accept header (e.g., application/json) on Databricks requests.
There was a problem hiding this comment.
Fixed in commit be739142 — call_ai_query() now adds "Accept": "application/json" directly to its own headers dict for both the initial POST and all poll GETs. The create_session() still sets Accept: application/fhir+json globally for FHIR calls; the Databricks calls override it locally so the two don't conflict.
| record = {"patient_id": patient_id, "assessment_type": assessment_type} | ||
| record.update(flatten_dict(assessment)) | ||
|
|
||
| # The 'upsert' operation is used to insert or update data in the destination table. | ||
| # The first argument is the name of the destination table. | ||
| # The second argument is a dictionary containing the record to be upserted. | ||
| op.upsert(table=table_name, data=record) | ||
| return True |
There was a problem hiding this comment.
upsert_assessment() builds record with patient_id/assessment_type and then does record.update(flatten_dict(assessment)), which allows LLM-returned keys (e.g., "patient_id" or "assessment_type") to overwrite the stable identifiers. To keep primary key fields safe, merge in the opposite order (flatten first, then set patient_id/assessment_type) or drop/rename conflicting keys.
There was a problem hiding this comment.
Fixed in commit be739142 — upsert_assessment() now flattens the assessment first (record = flatten_dict(assessment) if assessment else {}), then stamps patient_id and assessment_type on top. LLM-returned keys can no longer overwrite the stable identifiers. Regression test: TestUpsertAssessment.test_patient_id_not_overwritten_by_llm_key.
|
|
||
| - Fetches Patient, Condition, Observation, and MedicationRequest resources from any FHIR R4-compliant server | ||
| - Supports optional ICD-10 code prefix filtering to target a specific patient cohort (e.g., `E11` for diabetes) | ||
| - Supports incremental sync using FHIR `_lastUpdated` filtering based on the previous sync timestamp |
There was a problem hiding this comment.
The README claims the connector “supports incremental sync using FHIR _lastUpdated filtering based on the previous sync timestamp”, but the connector code only sorts by -_lastUpdated and never uses state["last_sync"] (or any _lastUpdated filter). Either implement incremental filtering in the FHIR queries using the stored state timestamp, or adjust the README to reflect the current behavior (full/limited sample sync).
| - Supports incremental sync using FHIR `_lastUpdated` filtering based on the previous sync timestamp | |
| - Retrieves the most recently updated FHIR resources ordered by `_lastUpdated`; the current sample sync does not apply previous-state incremental `_lastUpdated` filtering |
There was a problem hiding this comment.
Fixed in commit be739142 — both parts addressed: (1) run_move_phase() now actually reads state.get("last_sync") and adds _lastUpdated=gt{...} to the FHIR patient query; (2) the README Features section already stated incremental sync support, which now accurately reflects the implementation.
|
|
||
| FHIR R4 servers return resources as paginated Bundle resources. The connector follows `Bundle.link` entries with `relation=next` to retrieve subsequent pages until no next link is present or the configured `max_patients` limit is reached. The next-page URL is used directly as provided by the server; query parameters are only passed on the initial request. | ||
|
|
||
| Databricks SQL Statement API results may be paginated via `next_chunk_internal_link`. The connector follows these links to retrieve all rows from large `ai_query()` results. |
There was a problem hiding this comment.
The README states the connector follows Databricks SQL Statement API pagination via next_chunk_internal_link, but call_ai_query() only reads result.data_array[0][0] and does not handle chunk pagination. Please either add chunk-pagination support or remove/adjust this statement to avoid misleading users.
| Databricks SQL Statement API results may be paginated via `next_chunk_internal_link`. The connector follows these links to retrieve all rows from large `ai_query()` results. | |
| Databricks SQL Statement API responses can be paginated via `next_chunk_internal_link`, but this tutorial connector currently reads the immediate `ai_query()` result only and does not follow chunk-pagination links. The included AI queries are therefore expected to return a single result value rather than a large multi-row result set. |
There was a problem hiding this comment.
Fixed in commit be739142 — README Pagination section updated to: 'Databricks SQL Statement API responses can be paginated via next_chunk_internal_link, but this tutorial connector reads the immediate ai_query() result only and does not follow chunk-pagination links. The included AI queries return a single result value rather than a large multi-row result set.'
|
|
||
| ## Additional considerations | ||
|
|
||
| This connector was built by David Millman (david.millman@fivetran.com) during a working session with Kelly Kohlleffel. It follows the Hybrid (Discovery + Debate) pattern established by the NOAA Weather Risk Intelligence connector (PR #570) and the FDA FAERS Pharmacovigilance Intelligence connector (PR #571). The HAPI FHIR public test server (`https://hapi.fhir.org/baseR4`) is used as the default data source and contains synthetic clinical data suitable for demonstration purposes. |
There was a problem hiding this comment.
README includes a personal employee email address. Repos like this typically avoid publishing personal contact info; please remove the email or replace it with a generic support alias/link (e.g., Support team / docs link).
| This connector was built by David Millman (david.millman@fivetran.com) during a working session with Kelly Kohlleffel. It follows the Hybrid (Discovery + Debate) pattern established by the NOAA Weather Risk Intelligence connector (PR #570) and the FDA FAERS Pharmacovigilance Intelligence connector (PR #571). The HAPI FHIR public test server (`https://hapi.fhir.org/baseR4`) is used as the default data source and contains synthetic clinical data suitable for demonstration purposes. | |
| This connector was built by David Millman during a working session with Kelly Kohlleffel. It follows the Hybrid (Discovery + Debate) pattern established by the NOAA Weather Risk Intelligence connector (PR #570) and the FDA FAERS Pharmacovigilance Intelligence connector (PR #571). The HAPI FHIR public test server (`https://hapi.fhir.org/baseR4`) is used as the default data source and contains synthetic clinical data suitable for demonstration purposes. |
There was a problem hiding this comment.
Fixed in commit be739142 — personal email address removed from README Additional considerations. Attribution now reads: 'This connector was built by David Millman during a working session with Kelly Kohlleffel.'
| "enable_enrichment": "<TRUE_OR_FALSE>", | ||
| "enable_discovery": "<TRUE_OR_FALSE>", | ||
| "enable_genie_space": "<TRUE_OR_FALSE>", | ||
| "genie_table_identifier": "<CATALOG.SCHEMA.TABLE>", |
There was a problem hiding this comment.
configuration.json uses very generic placeholders for booleans ("<TRUE_OR_FALSE>") and other fields. The repo’s configuration templates typically use descriptive placeholders that communicate intent and defaults (e.g., "<TRUE_OR_FALSE_DEFAULT_TRUE>" / "<TRUE_OR_FALSE_DEFAULT_FALSE>") to reduce misconfiguration.
| "enable_enrichment": "<TRUE_OR_FALSE>", | |
| "enable_discovery": "<TRUE_OR_FALSE>", | |
| "enable_genie_space": "<TRUE_OR_FALSE>", | |
| "genie_table_identifier": "<CATALOG.SCHEMA.TABLE>", | |
| "enable_enrichment": "<TRUE_OR_FALSE_DEFAULT_FALSE>", | |
| "enable_discovery": "<TRUE_OR_FALSE_DEFAULT_FALSE>", | |
| "enable_genie_space": "<TRUE_OR_FALSE_DEFAULT_FALSE>", | |
| "genie_table_identifier": "<YOUR_DATABRICKS_GENIE_TABLE_IDENTIFIER_CATALOG_SCHEMA_TABLE>", |
There was a problem hiding this comment.
Fixed in commit be739142 — configuration.json boolean placeholders now communicate defaults: <TRUE_OR_FALSE_DEFAULT_TRUE> for enable_enrichment and enable_discovery, <TRUE_OR_FALSE_DEFAULT_FALSE> for enable_genie_space. Defaults are also documented in the README configuration table.
| resources = [] | ||
| next_url = url | ||
| current_params = params | ||
|
|
||
| while next_url: | ||
| try: | ||
| data = fetch_data_with_retry(session, next_url, params=current_params) | ||
| except RuntimeError as e: | ||
| log.warning(f"Failed to fetch FHIR bundle page: {e}") | ||
| break | ||
|
|
||
| # Only pass params on the first request; subsequent requests use the full next URL | ||
| current_params = None | ||
|
|
||
| entries = data.get("entry", []) | ||
| for entry in entries: | ||
| resource = entry.get("resource", {}) | ||
| if resource: | ||
| resources.append(resource) | ||
|
|
There was a problem hiding this comment.
fetch_fhir_bundle() accumulates all resources from all pages into an in-memory list. In run_move_phase(), this is used for Conditions/Observations/MedicationRequests without any max_results cap, so a single patient with large history can cause unbounded memory growth. Consider streaming page-by-page (upsert as you go) and only retaining a small bounded sample per patient for prompt construction (e.g., latest N conditions/observations/meds).
There was a problem hiding this comment.
Fixed in commit be739142 — added constant __MAX_RESOURCES_PER_PATIENT = 100 and passed max_results=__MAX_RESOURCES_PER_PATIENT to each of the three per-patient fetch_fhir_bundle() calls (Condition, Observation, MedicationRequest). fetch_fhir_bundle() already supports max_results and truncates after pagination, so this cap is enforced end-to-end.
| # Validate Databricks credentials when enrichment is enabled | ||
| is_enrichment = _parse_bool(configuration.get("enable_enrichment"), default=True) | ||
| is_discovery = _parse_bool(configuration.get("enable_discovery"), default=True) | ||
| is_genie = _parse_bool(configuration.get("enable_genie_space"), default=False) | ||
|
|
||
| if is_enrichment or is_discovery or is_genie: | ||
| for key in [ | ||
| "databricks_workspace_url", | ||
| "databricks_token", | ||
| "databricks_warehouse_id", | ||
| ]: | ||
| if _is_placeholder(configuration.get(key)): | ||
| raise ValueError(f"Missing required Databricks config: {key}") | ||
|
|
There was a problem hiding this comment.
validate_configuration() currently requires Databricks credentials when enable_discovery is true, even if enable_enrichment is false (in update(), discovery is only run when both are true). This makes configs like enable_enrichment=false, enable_discovery=true fail validation unnecessarily. Consider gating Databricks credential requirements on the actual execution conditions (e.g., require creds when enable_enrichment is true, or when enable_genie_space is true).
There was a problem hiding this comment.
Fixed in commit be739142 — validate_configuration() condition changed from if is_enrichment or is_discovery or is_genie: to if is_enrichment or is_genie:. Discovery-only mode (enable_enrichment=false, enable_discovery=true) no longer requires Databricks credentials, matching the actual runtime behavior in update() where discovery only executes when both enrichment AND discovery are enabled. Regression test: TestValidateConfiguration.test_databricks_creds_not_required_when_enrichment_disabled.
| try: | ||
| response = session.post(url, headers=headers, json=payload, timeout=timeout_seconds) | ||
| response.raise_for_status() | ||
|
|
There was a problem hiding this comment.
call_ai_query() does not implement retry/backoff for transient Databricks API failures (e.g., 429/5xx), so intermittent issues will silently skip enrichment for patients. Since this is an external API call, add bounded retries with exponential backoff (and fail fast on auth/4xx) similar to fetch_data_with_retry().
There was a problem hiding this comment.
Fixed in commit be739142 — call_ai_query() now wraps the initial POST in a retry loop (up to __MAX_RETRIES=3 attempts, exponential backoff for status codes in __RETRYABLE_STATUS_CODES). Auth errors (401/403) still fail fast. The same retry constants used by fetch_data_with_retry() are reused for consistency. README Error handling section updated to document the new behavior.
fivetran-anushkaparashar
left a comment
There was a problem hiding this comment.
I’ve left a few comments. Please also review Copilot’s suggestions and address them if they’re valid.
| op.checkpoint(state=state) | ||
|
|
||
| # Final checkpoint | ||
| state["last_sync"] = datetime.now(timezone.utc).isoformat() |
There was a problem hiding this comment.
last_sync is written but never read back, and that's the missing piece for incremental sync.
There was a problem hiding this comment.
Fixed in commit be739142 — run_move_phase() now reads state.get("last_sync") and adds _lastUpdated=gt{last_sync} to the FHIR patient query params when a prior sync timestamp is present. On the first sync (no last_sync in state) no filter is added, so all patients are fetched. A regression test in tests/test_connector.py (category 5: incremental sync) verifies both paths.
| return | ||
|
|
||
| condition_label = (condition_filter or "all_conditions").replace(" ", "_") | ||
| insight_id = f"insight_{condition_label}_{len(patient_records)}" |
There was a problem hiding this comment.
If patient count changes between syncs (e.g., 15 → 20), a new row is inserted instead of updating the existing one. Shouldn't insight should represent one row per condition configuration, not one row per patient count?
There was a problem hiding this comment.
Fixed in commit be739142 — insight_id is now f"insight_{condition_label}" (e.g., insight_E11 or insight_all_conditions). The patient count suffix has been removed, so the key is stable across syncs and upserts update the same row rather than creating new ones.
| ' "winner_rationale": "...",\n' | ||
| ' "agreement_areas": ["..."],\n' | ||
| ' "disagreement_areas": ["..."],\n' | ||
| ' "disagreement_flag": true,\n' |
There was a problem hiding this comment.
The prompt has disagreement_flag hardcoded to true, which biases the LLM to always report disagreement. Is it intentional?
There was a problem hiding this comment.
Fixed in commit be739142 — build_consensus_prompt() prompt template now reads 'disagreement_flag': true|false instead of true. The LLM will now evaluate disagreement independently rather than being nudged toward always returning true.
Fixes all 12 unanswered reviewer threads from Apr 24 (fivetran-anushkaparashar
+ Copilot) plus Level 3 retrofit patterns from NOAA/FDA/FAERS/CPSC/SEC EDGAR:
connector.py:
- Fix 1: Add _lastUpdated=gt{last_sync} incremental filter to FHIR patient query
- Fix 2: Stable insight_id (drop patient count suffix to prevent PK churn)
- Fix 3: Add Accept: application/json to Databricks call_ai_query() headers
- Fix 4: Reverse upsert_assessment() merge order — flatten first, then set
patient_id/assessment_type so LLM keys cannot overwrite stable identifiers
- Fix 5: Cap per-patient FHIR resource fetches at __MAX_RESOURCES_PER_PATIENT=100
- Fix 6: validate_configuration() only requires Databricks creds for enrichment
or Genie — discovery-only mode no longer triggers credential requirement
- Fix 7: Retry initial ai_query() POST up to 3× with exponential backoff
- Fix 8: disagreement_flag prompt bias removed (true → true|false)
- Fix 9: Genie API serialization — question/content were lists, must be strings
- Fix 12: statement_id guard before polling loop (FDA Drug Label lesson)
- Fix 14: Genie Space timeout uses databricks_timeout config, not hardcoded 60
README.md:
- Pagination section: correct ai_query() chunk-pagination claim (not followed)
- Error handling section: document new retry behavior for ai_query() POST
- Additional considerations: remove personal email address
- Configuration file section: add standard requirements.txt note
configuration.json:
- Boolean placeholders now communicate defaults
(TRUE_OR_FALSE_DEFAULT_TRUE / TRUE_OR_FALSE_DEFAULT_FALSE)
New files:
- requirements.txt: fivetran_connector_sdk>=2.0.0
- requirements-test.txt: pytest>=7.0.0
- tests/test_connector.py: 60 tests — 5 mandatory categories (config validation,
record builders, helper functions, phase logic, incremental sync regression)
All 24 submission gate checks pass. 60/60 pytest green. Black + flake8 clean.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
PR #573 — All 12 reviewer threads addressed (commit
|
| Issue | Thread | Fix |
|---|---|---|
last_sync never read back in FHIR query |
3135048225 | run_move_phase() now adds _lastUpdated=gt{last_sync} when cursor present |
insight_id includes patient count (PK churn) |
3135061809 | Stable f"insight_{condition_label}" — no patient count suffix |
Shared session Accept header breaks Databricks |
3135065318 | call_ai_query() adds Accept: application/json locally in its own headers dict |
upsert_assessment() key clobbering |
3135065338 | Flatten first, then stamp patient_id/assessment_type on top |
| README incremental claim didn't match code | 3135065370 | Code now actually filters by _lastUpdated; README claim is accurate |
| README chunk-pagination claim | 3135065386 | Updated to state that ai_query() results are not chunk-paginated |
| Personal email in README | 3135065395 | Removed |
| Generic boolean placeholders | 3135065408 | Now <TRUE_OR_FALSE_DEFAULT_TRUE> / <TRUE_OR_FALSE_DEFAULT_FALSE> |
| Unbounded per-patient resource fetches | 3135065421 | __MAX_RESOURCES_PER_PATIENT = 100 passed to all three per-patient fetches |
| Validation requires Databricks for discovery-only | 3135065438 | Condition changed to if is_enrichment or is_genie: |
No retry/backoff in call_ai_query() |
3135065449 | Retry loop (3×, exponential backoff) added to initial POST |
disagreement_flag hardcoded true in prompt |
3135078835 | Changed to true|false |
Level 3 retrofit additions (learned from NOAA/FDA/FAERS/CPSC)
statement_idguard before polling loop (preventspoll_url = ".../None"on timeout)- Genie Space API strings fixed (
"question": q,"content": str— not lists) - Genie Space timeout uses
databricks_timeoutconfig instead of hardcoded60
New files
requirements.txt—fivetran_connector_sdk>=2.0.0requirements-test.txt—pytest>=7.0.0tests/test_connector.py— 60 tests covering all 5 mandatory categories (config validation, record builders, helper functions, phase logic w/ mocked Databricks, incremental sync regression)
Gate results
- 24/24 submission gate checks: ✓
- 60/60 pytest: ✓
- Black + flake8 (from repo root): ✓
Ready for re-review. Please let me know if anything needs further adjustment.
|
@cla-assistant recheck |
1 similar comment
|
@cla-assistant recheck |
|
@fivetran-anushkaparashar — could you drop a |
|
@cla-assistant recheck |
|
@kellykohlleffel @fivetran-davidmillman Can you please re-check if CLA has been signed already? |
| @@ -0,0 +1,578 @@ | |||
| """ | |||
There was a problem hiding this comment.
NIT: Test file can be removed.
There was a problem hiding this comment.
Removed in commit 5e8bc69f. The tests/test_connector.py file has been deleted from the connector directory.
Removing test file to match the pattern of other tutorial connectors. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
fivetran-sahilkhirwal
left a comment
There was a problem hiding this comment.
Changes look good
Can you please resolve the merge conflict
| @@ -0,0 +1 @@ | |||
| pytest>=7.0.0 | |||
There was a problem hiding this comment.
Remove this file. Please add your deps in requirement.txt
There was a problem hiding this comment.
Fixed in commit 83c5b855 — removed requirements-test.txt. No test files are committed on this PR, so pytest is not needed as a dependency.
fivetran-dejantucakov
left a comment
There was a problem hiding this comment.
Approved with changes to READMEs
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
fivetran-anushkaparashar
left a comment
There was a problem hiding this comment.
LGTM!
Please resolve the conflicts.
Keep FHIR connector link entry; drop the yield Note block that upstream removed from the AI tutorials section.
|
Resolved the merge conflict with upstream/main in |
|
@fivetran-davidmillman @kellykohlleffel , is this PR raised from a forked repository? If so, please avoid using forks for changes to this repo . Instead, clone the repository, create a separate branch, push your changes there, and raise the PR from that branch. This might also help resolve the CLA issue you're seeing, as I've run into a similar issue before when working from a fork. Thanks! |
| @@ -0,0 +1 @@ | |||
| fivetran_connector_sdk>=2.0.0 | |||
| if status_code in (401, 403): | ||
| msg = f"HTTP {status_code}: Check your FHIR server credentials. URL: {url}" | ||
| log.severe(msg) | ||
| raise RuntimeError(msg) from e |
| except (requests.exceptions.RequestException, ValueError, RuntimeError) as e: | ||
| log.severe(f"Unexpected error during sync: {str(e)}") | ||
| raise |
| """ | ||
| Define the schema function which lets you configure the schema your connector delivers. | ||
| See the technical reference documentation for more details on the schema function: | ||
| https://fivetran.com/docs/connectors/connector-sdk/technical-reference/connector-sdk-code/connector-sdk-methods#schema | ||
| Args: |
| """ | ||
| Define the update function, which is a required function, and is called by Fivetran during each sync. | ||
| See the technical reference documentation for more details on the update function | ||
| https://fivetran.com/docs/connectors/connector-sdk/technical-reference#update | ||
| Args: |
|
Closing this PR due to CLA issues , will re-raise this PR in the new community connectors repo |
|
Please refer: #601 |
Summary
all_things_ai/tutorials/databricks-fm-fhir-healthcare-intelligenceai_query(): a Discovery phase for population health risk stratification, and a Debate phase where a Clinical Risk Analyst and Resource Allocation Analyst debate per-patient intervention priorities, producing a consensus with a disagreement flagpatients,conditions,observations,medications,population_insights,clinical_assessments,resource_assessments,debate_consensusREADME.mdwith entry for new connectorTest plan
fivetran debugrun locally — SYNC SUCCEEDEDblack --line-length 99passesflake8 --extend-ignore=E203,E501,B008passesconfiguration.jsonvalues are angle-bracket placeholdersfivetran debugresultsSDK Version: 2.8.1
Sync: SUCCEEDED (00:02:24)
Records: 3 patients, 2 AI-enriched (max_patients=3, max_enrichments=2)
Raw
fivetran debugterminal output