Reindex robustness: selective fields, cache fail-fast, stop actually stops#27876
Reindex robustness: selective fields, cache fail-fast, stop actually stops#27876
Conversation
…stops
Three independent fixes that all surfaced from the same incident: a 580k-
container reindex that froze for hours, then refused to actually stop when
the user clicked Stop.
Selective fields in the distributed reader path. PartitionWorker was
hardcoding List.of("*"), triggering every fieldFetcher in setFieldsInBulk —
including fetchAndSetOwns on Team/User where every owned entity becomes a
getEntityReferenceById round-trip. PR #27723 fixed this for EntityReader
(single-server) but the distributed pipeline never picked it up. Lifted the
field-resolution into ReindexingUtil so both paths share one source of
truth.
Cache layer no longer flaps on a single Redis hiccup. RedisCacheProvider
used to flip the whole provider unavailable on the first 300 ms timeout and
flip back on the next PING success — which combined with a 1 s health-check
made the indexer pay one timeout per cycle indefinitely. Replaced with a
sliding-window failure detector (5 failures in 30 s to trip, 3 consecutive
successes to recover) on the BulkCircuitBreaker pattern.
CacheWarmupApp parsed user config as EventPublisherJob (the SearchIndex
schema), which broke the Configuration page once cacheWarmupAppConfig.json
gained a type discriminator. Switched to CacheWarmupAppConfig in all four
parse sites and decoupled runtime status/stats from the parsed config.
Removed the readAppConfigFlags() workaround that read warmBundles /
enableDistributedClaim out of a raw map. Bails with ACTIVE_ERROR (not
COMPLETED) when an entity type is only partially warmed; retries on
transient cache unavailability instead of giving up on the first miss.
Stop actually stops. Three pieces:
- DistributedJobStatsAggregator skips the WebSocket status broadcast while
the job is STOPPING so it doesn't overwrite the AppRunRecord.STOPPED that
AppScheduler.updateAndBroadcastStoppedStatus pushed. Self-stops after a
30 s grace if the executor never gets to call stop() on it.
- DistributedSearchIndexExecutor.stop() now calls workerExecutor.shutdownNow()
after flagging workers, so threads parked inside the bulk-sink semaphore,
initializeKeysetCursor, or waitForSinkOperations (5-min deadline) get
interrupted instead of grinding for minutes.
- OpenSearchBulkSink replaces concurrentRequestSemaphore.acquire() with a
60-second tryAcquire, recording permanent failure on timeout. A leaked
bulk future (callback never fires) can no longer permanently freeze every
subsequent flush at a fixed record count.
Tests: 8 new regression tests covering schema-parse, aggregator
STOPPING-skip and self-stop, executor shutdownNow. All 109 directly
relevant tests pass (PartitionWorker, Aggregator, Executor, BulkSink),
plus the broader 677-test reindex/cache suite stays green.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR aims to harden reindexing and cache-warmup behavior after a large reindex incident by reducing unnecessary field loading, making Redis cache availability less brittle, and improving stop responsiveness in the distributed reindex flow.
Changes:
- Centralizes selective reindex field resolution and applies it to the distributed
PartitionWorkerpath. - Adds sliding-window health detection for
RedisCacheProvider, cache-warmup config parsing fixes, and partial-warmup handling inCacheWarmupApp. - Improves distributed stop behavior via aggregator STOPPING handling, executor
shutdownNow(), and bounded semaphore acquisition inOpenSearchBulkSink.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/DistributedSearchIndexExecutorTest.java |
Adds regression tests for executor stop/shutdown behavior. |
openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/DistributedJobStatsAggregatorTest.java |
Adds tests for STOPPING broadcast suppression and aggregator self-stop behavior. |
openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/cache/CacheWarmupAppConfigParseTest.java |
Adds config parsing regression tests for cache warmup app schema. |
openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java |
Introduces shared helper for resolving reindex field lists. |
openmetadata-service/src/main/java/org/openmetadata/service/cache/RedisCacheProvider.java |
Replaces fail-fast availability toggling with sliding-window failure/recovery tracking. |
openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java |
Switches distributed reader path to selective field resolution. |
openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/DistributedSearchIndexExecutor.java |
Forces worker interruption on stop via shutdownNow(). |
openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/DistributedJobStatsAggregator.java |
Skips STOPPING websocket broadcasts and self-stops after a grace period. |
openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSink.java |
Bounds semaphore acquisition time to avoid indefinite bulk flush hangs. |
openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/EntityReader.java |
Reuses the new shared field-resolution helper. |
openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/cache/CacheWarmupApp.java |
Parses typed cache warmup config, retries on cache unavailability, and reports partial warmups as errors. |
Three follow-ups to gitar-bot review on PR #27876: - recordSuccess / recordFailure / pruneOldFailures now share a single stateLock. The methods were doing non-atomic check-then-act on the available / consecutiveSuccesses / failureTimestamps trio; under concurrent ops a success-recovery transition could overlap a failure and partially defeat the sliding window. The lock cost is negligible versus the Redis round-trip we already paid. - pruneOldFailures no longer breaks on the first non-stale entry. Concurrent recordFailure calls aren't strictly time-ordered (the currentTimeMillis sample and the addLast happen as separate steps even under the lock), so an early break could occasionally leave stale entries behind. Iterate the full deque instead — bounded at FAILURE_THRESHOLD=5 so the cost is trivial. - Removed CacheWarmupAppConfigParseTest#rejectsUnknownFields. It asserted `t == null || t.getMessage() != null`, which is trivially true and provided no regression guard. The other two tests in the file already cover the type-discriminator parse regression that matters. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…listener during STOPPING Three Copilot review findings on PR #27876: - SearchIndexExecutor.getSearchIndexFields still returned List.of("*"), so the single-server reindex path didn't get the selective-field optimization the rest of the PR claimed to restore. Delegated to ReindexingUtil.getSearchIndexFields so single-server and distributed paths share one source of truth. - CacheWarmupApp.warmupEntityType cleared the resume checkpoint unconditionally after the loop — including after bailing out with partiallyWarmed=true. Next run would restart from offset 0 and re-warm everything we already wrote. Now tracks per-entity bailedOut and skips clearCheckpoint when bailed so the next run resumes from the last successfully pipelined page. - DistributedJobStatsAggregator was skipping broadcastStats during STOPPING but still calling notifyProgressListener every poll. QuartzProgressListener.onProgressUpdate flips status back to RUNNING when pendingErrors > 0 and broadcasts a fresh AppRunRecord, so the user-set STOPPED could still be overwritten through that path. Skip both broadcastStats and notifyProgressListener during STOPPING; the listener's terminal callbacks (onJobStopped / onJobCompleted / onJobFailed) only fire on transition into a terminal state, not on STOPPING, so we don't lose any callbacks. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
ReindexingUtil.getSearchIndexFields could return fields that aren't
declared on the target entity's JSON schema (most service types lack
reviewers/votes/extension/certification, ingestionPipeline lacks
dataProducts, user/team lack most COMMON_REINDEX_FIELDS). The downstream
PaginatedEntitiesSource path runs them through Entity.getFields →
EntityUtil.Fields, which validates against the entity's allowedFields
and throws IllegalArgumentException("Invalid field name <x>") on the
first batch — terminating that entity-type's partition.
The fix intersects the probed required-fields set with the entity's
allowedFields before returning, so the helper is safe for every
registered entity type. Falls back to the unfiltered set if the
EntityRepository can't be resolved (test boot, plugin not loaded).
Tests:
- ReindexingUtilTest: filter mechanic + parametrized parity test
asserting getSearchIndexFields ⊆ allowedFields for ~30 representative
entity types using the real SearchIndexFactory probe path.
- SearchIndexingFieldsParityIT: triggers the bundled SearchIndexing-
Application, waits for completion, and asserts entityStats reports
zero failedRecords for every fields-missing entity type. Catches the
regression end-to-end against a live OM stack.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
scripts/ingest_100k_containers.py generates a tree of ~100k Container entities (depth 1-10) under a Storage service, with column-level tags on the dataModel. Used to reproduce and validate the reindex slowness fix this PR addresses. scripts/reindex-perf-bootstrap.sh tears down any local OM stack, brings up a fresh PostgreSQL-backed dev stack, waits for health, fetches the ingestion-bot JWT (with admin-JWT fallback for the dev profile), and runs the generator against it. Knobs: CONTAINERS, WORKERS, BATCH_SIZE, SKIP_BUILD, SKIP_DOCKER, SKIP_INGEST. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Real fixes: - ElasticSearchBulkSink: bounded semaphore acquire (60s) so a leaked async future can't park every flush forever. Mirrors OpenSearchBulkSink. Resolves the asymmetry called out in thread 3181975057. - DistributedJobStatsAggregator: split self-stop from inline stop() so the aggregator's polling task can return before scheduler.awaitTermination blocks on it. Adds an idempotent schedulerShutDown CAS so executor + self- stop racing each other shut the scheduler down at most once. Resolves thread 3181975148. - CacheWarmupApp: set partiallyWarmed on bulk-fetch failure, pipelined-write failure, and persistent bundle-warmup failure so the run reports ACTIVE_ERROR / preserves the checkpoint instead of clearing it. Wires the schema-exposed `force` flag into the distributed-claim skip so an operator override actually bypasses the claim. Resolves threads 3179658333, 3181975115, 3182937847, 3182938005. - PartitionWorkerTest: regression test that pins the selective-fields contract — readEntitiesKeyset must hand ReindexingUtil.getSearchIndexFields output through to PaginatedEntitiesSource, never wildcard. Resolves thread 3181975204. - SearchIndexingFieldsParityIT: capture pre-trigger startTime so we can reject the previous run record (thread 3182937874); switch the parity assertion to successRecords > 0 so a stale/early-aborted run can't satisfy it (thread 3182937901); use JUnit's assertNotEquals (thread 3182801474). - ingest_100k_containers.py: scale_depth_counts honors --containers exactly via floor + largest-remainder, so --containers=1 produces 1 container, not 10 (thread 3182937961). Document seed determinism as best-effort across workers (thread 3182937981). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…nMetadata into harshach/indexing-perf
- RedisCacheProviderStateMachineTest: 8 focused tests pinning the sliding- window detector contract — single failure stays available, threshold flips unavailable, recovery requires consecutive successes (not cumulative), out-of-order timestamp prune doesn't leave stale entries. Resolves the "no direct test" concern in thread 3181975175. - RedisCacheProvider: package-private no-arg constructor for the state- machine test path (skips Redis IO; only the in-memory deque + atomic fields are needed for the contract). Production constructor unchanged. - RedisCacheProvider: document the L1-staleness tradeoff that lives between the first transient failure and the FAILURE_THRESHOLD flip. Resolves thread 3182937799 — the design intent stays, the rationale is now visible alongside the code. - CacheWarmupApp.initJobData: clarify that the on-demand setAppConfiguration is intentionally per-execution, not persisted via AppRepository. Resolves thread 3179658321. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add the entity types Copilot called out in thread 3182937931 to the parametrized parity test: API collection/endpoint, dashboard data model, drive/file/spreadsheet/worksheet/directory, test case/suite, and the AI/LLM/MCP types. The integration test (SearchIndexingFieldsParityIT) was already exercising "all" entity types end-to-end; the unit-level test now spans the full SearchIndexFactory mapping at a smaller scope. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 29 out of 29 changed files in this pull request and generated 5 comments.
Comments suppressed due to low confidence (1)
openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/cache/CacheWarmupApp.java:180
- This path now marks the run FAILED but never populates
jobData.failure, so the AppRunRecord is persisted without afailureContext. Operators will see a terminal failure with no indication that the warmup aborted because Redis was unavailable.
if (cacheProvider == null || !cacheProvider.available()) {
// Surface this as FAILED — initJobData set status to RUNNING above, and the finally block
// will broadcast the terminal state. Leaving it RUNNING here would pin the job record in
// an active state indefinitely.
LOG.warn("Cache not available, skipping warmup");
jobData.setStatus(EventPublisherJob.Status.FAILED);
…(batch 4) Address all 5 actionable Copilot comments + the suppressed one in review 4222114380: - C1 (3183313794): partial-warm runs (ACTIVE_ERROR) now populate jobData. failure with per-entity-type bail-out reasons. UI now shows which entity types and offsets bailed via AppRunRecord.failureContext, instead of an opaque ACTIVE_ERROR. Reasons recorded for cache-unavailable retry exhaust, bulk-fetch failure, pipelined-write failure, and bundle-warmup failure. Same pattern applied to the FAILED-from-start branch (suppressed comment on CacheWarmupApp:180). - C2 (3183313828): releaseClaim no longer skips when cacheProvider.available() is false — that's exactly the case where we MOST need to drop the claim, because a partial-warm bailed because the provider went unavailable. Best- effort DEL with exception fallback to CLAIM_TTL cleanup. Stops the 10-min skip-window for follow-up runs after a flap. - C3 (3183313853): bundle-failure and pipelined-write-failure paths now set bailedOut=true alongside partiallyWarmed=true. The end-of-warmEntity block already guards clearCheckpoint behind !bailedOut, so this prevents later success pages from clobbering the saved offset and the next retry resumes at the failed page. - C4 (3183313871): added semaphoreTimeoutRecordsPermanentFailure... test in ElasticSearchBulkSinkSimpleTest, mirroring the OpenSearch counterpart. Pins the same contract for ES: timed-out tryAcquire records the bulk as permanent failure, leaves activeBulkRequests at zero, doesn't release a permit it never took, drains the buffer. - C5 (3183313896): removed unused create_one_container helper from ingest_100k_containers.py — the active path uses the inline make_batch request construction. One source of truth. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Code Review ✅ Approved 7 resolved / 7 findingsImproves reindexing robustness by implementing selective field filtering, a sliding-window failure detector for Redis, and forceful executor termination for stop operations. All identified issues, including race conditions and configuration parsing errors, have been addressed and verified with comprehensive regression tests. ✅ 7 resolved✅ Bug: pruneOldFailures early-break skips out-of-order timestamps
✅ Bug: recordSuccess/recordFailure race can cause premature recovery
✅ Quality: CacheWarmupAppConfigParseTest test asserts nothing meaningful
✅ Quality: CachedEntityDao.put* methods lack bypass guards (inconsistent)
✅ Bug: semaphoreAcquireTimeoutSeconds should be volatile
...and 2 more resolved from earlier reviews OptionsDisplay: compact → Showing less information. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
|



fixes https://github.com/open-metadata/openmetadata-collate/issues/3948
fixes https://github.com/open-metadata/openmetadata-collate/issues/3947
fixes https://github.com/open-metadata/openmetadata-collate/issues/3949
Describe your changes:
Three independent fixes that all surfaced from the same incident — a 580k-container reindex that froze for hours, then refused to actually stop when the user clicked Stop. Together they remove the cache layer as a single point of fragility for indexing, restore the selective-field optimisation in the distributed pipeline, and make Stop reflect in the UI within seconds instead of minutes.
Selective fields in the distributed reader path. `PartitionWorker` was hardcoding `List.of("*")`, triggering every fieldFetcher in `setFieldsInBulk` — including `fetchAndSetOwns` on Team/User where every owned entity becomes a `getEntityReferenceById` round-trip. PR #27723 fixed this for `EntityReader` (single-server) but the distributed path never picked it up. Lifted the resolution into `ReindexingUtil.getSearchIndexFields()` so both paths share one source of truth.
Cache layer no longer flaps on a single Redis hiccup. `RedisCacheProvider` used to flip the whole provider unavailable on the first 300 ms timeout and flip back on the next PING success. Combined with a 1 s health-check this made the indexer pay one timeout per cycle indefinitely. Replaced with a sliding-window failure detector (5 failures in 30 s to trip, 3 consecutive successes to recover), modelled on `BulkCircuitBreaker`.
`CacheWarmupApp` parses its own schema. It used to parse user config as `EventPublisherJob` (the SearchIndex schema), which broke the Configuration page once `cacheWarmupAppConfig.json` gained a `type` discriminator. Switched to `CacheWarmupAppConfig` in all four parse sites and decoupled runtime status/stats from the parsed config; removed the `readAppConfigFlags()` workaround. Now reports `ACTIVE_ERROR` (not `COMPLETED`) when an entity type bails, and retries on transient cache unavailability instead of aborting on the first miss.
Stop actually stops. Three pieces: `DistributedJobStatsAggregator` skips the WebSocket status broadcast while `STOPPING` so it doesn't overwrite the `AppRunRecord.STOPPED` that `AppScheduler.updateAndBroadcastStoppedStatus` already pushed, and self-stops after a 30 s grace if the executor never gets to call `stop()`. `DistributedSearchIndexExecutor.stop()` now calls `workerExecutor.shutdownNow()` after flagging workers, so threads parked inside the bulk-sink semaphore, `initializeKeysetCursor`, or `waitForSinkOperations` (5-min deadline) get interrupted instead of grinding for minutes. `OpenSearchBulkSink` replaces `concurrentRequestSemaphore.acquire()` with a 60 s `tryAcquire`, recording permanent failure on timeout — a leaked async future can no longer permanently freeze every subsequent flush at a fixed record count.
Tested via 8 new regression tests covering schema-parse, aggregator STOPPING-skip and self-stop, executor `shutdownNow`. All 109 directly relevant tests pass (PartitionWorker, Aggregator, Executor, BulkSink), plus the broader 677-test reindex/cache suite stays green.
Type of change:
Checklist:
🤖 Generated with Claude Code
Summary by Gitar
scripts/ingest_100k_containers.pyto generate high-volume container entities for stress testing indexing pipelines.scripts/reindex-perf-bootstrap.shto automate stack bring-up, ingestion-bot token retrieval, and data population.ReindexingUtilTestcoverage to include AI/LLM/MCP entities, Data Assets, and service types to ensure parity in field filtering.This will update automatically on new commits.