Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
f8393ba
Reindex robustness: selective fields, cache fail-fast, stop actually …
harshach May 4, 2026
e626769
Address review: synchronize Redis state transitions, drop weak test
harshach May 4, 2026
e7b36b7
Address review: complete selective fields, preserve checkpoint, skip …
harshach May 4, 2026
54b7d21
Reindex bypasses Redis entity cache
harshach May 4, 2026
529b148
Address review: on-demand config reload, semaphore timeout test, test…
harshach May 4, 2026
5d35437
Address review: bypass guards on put*, volatile timeout
harshach May 4, 2026
f4edb0c
Reindex bypass: zero Redis traffic — guard every cache touchpoint
harshach May 4, 2026
126902d
Merge branch 'main' into harshach/indexing-perf
mohityadav766 May 4, 2026
ddcdcf1
Merge remote-tracking branch 'origin/main' into harshach/indexing-perf
harshach May 4, 2026
29c5999
Merge branch 'harshach/indexing-perf' of github.com:open-metadata/Ope…
harshach May 4, 2026
ade6439
Reindex: filter required fields against entity allowedFields
harshach May 4, 2026
5a39918
Add reindex perf-test tooling: 100k container generator + bootstrap
harshach May 4, 2026
a68c801
Merge branch 'main' into harshach/indexing-perf
harshach May 4, 2026
c67b9b0
PR #27876 review: address open Copilot/gitar-bot threads (batch 1)
harshach May 4, 2026
1186162
Merge branch 'harshach/indexing-perf' of github.com:open-metadata/Ope…
harshach May 4, 2026
67fae84
PR #27876 review: state-machine test + intent comments (batch 2)
harshach May 4, 2026
e46dc0d
PR #27876 review: expand ReindexingUtil parity coverage (batch 3)
harshach May 4, 2026
f877ae6
PR #27876 review 4222114380: failure context, claim release, ES test …
harshach May 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.app.FailureContext;
import org.openmetadata.schema.entity.app.SuccessContext;
import org.openmetadata.schema.entity.applications.configuration.internal.CacheWarmupAppConfig;
import org.openmetadata.schema.system.EntityStats;
import org.openmetadata.schema.system.EventPublisherJob;
import org.openmetadata.schema.system.Stats;
Expand Down Expand Up @@ -101,7 +102,19 @@ public class CacheWarmupApp extends AbstractNativeApplication {
private static final Duration CHECKPOINT_TTL = Duration.ofDays(1);
private static final Duration CLAIM_TTL = Duration.ofMinutes(10);

// Bound how long we wait for a flapping cache before declaring the warmup partial. Each retry
// sleeps {@link #UNAVAILABLE_BACKOFF_MS}, so the total grace before bailing is roughly
// MAX_UNAVAILABLE_RETRIES * UNAVAILABLE_BACKOFF_MS / 1000 seconds. Old behaviour was a single
// {@code break} on first {@code !available}, which combined with a 300ms-timeout cache flipping
// unavailable on the very first hiccup left 84% of entities cold while the run reported SUCCESS.
private static final int MAX_UNAVAILABLE_RETRIES = 30;
private static final long UNAVAILABLE_BACKOFF_MS = 1_000L;

// Runtime state used for the AppRunRecord broadcast. We keep an EventPublisherJob here purely
// because that's what the AppRunRecord serialization expects in the success/failure contexts;
// it is NOT parsed from the user-supplied JSON. User configuration lives on {@link #appConfig}.
@Getter private EventPublisherJob jobData;
private CacheWarmupAppConfig appConfig;

private CacheProvider cacheProvider;
private CacheKeys keys;
Expand All @@ -111,16 +124,11 @@ public class CacheWarmupApp extends AbstractNativeApplication {
private String checkpointKeyPrefix;
private String claimKeyPrefix;
private final String instanceId = generateInstanceId();
// Read in initJobData from the user-supplied app config (cacheWarmupAppConfig schema). System
// properties remain a fallback for cases where the app record isn't editable (e.g. bootstrap).
private boolean warmBundles =
Boolean.parseBoolean(System.getProperty("om.cache.warmBundles", "true"));
private boolean enableDistributedClaim =
Boolean.parseBoolean(System.getProperty("om.cache.warmup.distributedClaim", "false"));

private JobExecutionContext jobExecutionContext;
private volatile boolean stopped = false;
private final Stats stats = new Stats().withEntityStats(new EntityStats());
private volatile boolean partiallyWarmed = false;
private volatile long lastWebSocketUpdate = 0;
private static final long WEBSOCKET_UPDATE_INTERVAL_MS = 2000;

Expand All @@ -131,8 +139,26 @@ public CacheWarmupApp(CollectionDAO collectionDAO, SearchRepository searchReposi
@Override
public void init(App app) {
super.init(app);
jobData = JsonUtils.convertValue(app.getAppConfiguration(), EventPublisherJob.class);
readAppConfigFlags();
appConfig = parseAppConfig(app.getAppConfiguration());
jobData = newRuntimeJobData();
}

private CacheWarmupAppConfig parseAppConfig(Object raw) {
if (raw == null) {
return new CacheWarmupAppConfig();
}
return JsonUtils.convertValue(raw, CacheWarmupAppConfig.class);
}

private EventPublisherJob newRuntimeJobData() {
EventPublisherJob runtime = new EventPublisherJob();
if (appConfig != null) {
runtime.setEntities(appConfig.getEntities());
if (appConfig.getBatchSize() != null) {
runtime.setBatchSize(appConfig.getBatchSize());
}
}
return runtime;
}

@Override
Expand Down Expand Up @@ -170,56 +196,38 @@ private void initCacheComponents() {
checkpointKeyPrefix = ks + ":warmup:checkpoint:";
claimKeyPrefix = ks + ":warmup:claim:";
}
if (warmBundles && cacheProvider != null && keys != null) {
if (warmBundlesEnabled() && cacheProvider != null && keys != null) {
bundleBatcher = new BundleWarmupBatcher(collectionDAO, cacheProvider, keys);
Comment thread
harshach marked this conversation as resolved.
}
}

/**
* Pull the user-supplied warmBundles / enableDistributedClaim flags out of the raw app config
* map. The runtime parses the same payload as {@link EventPublisherJob}, which doesn't include
* these fields, so we read them from the original map rather than trying to extend the
* EventPublisherJob schema.
*/
private void readAppConfigFlags() {
if (getApp() == null || getApp().getAppConfiguration() == null) {
return;
}
try {
Map<?, ?> raw = JsonUtils.convertValue(getApp().getAppConfiguration(), Map.class);
if (raw == null) {
return;
}
// Accept both native Boolean and string forms — depending on how the config arrives
// (typed POJO, raw JSON, YAML env-var override, API string body) the same logical value
// can land here as Boolean.TRUE or "true". An instanceof Boolean check would silently
// ignore the string form and fall back to JVM system properties, which surprises
// operators who set the flag in the UI.
warmBundles = parseBooleanFlag(raw.get("warmBundles"), warmBundles);
enableDistributedClaim =
parseBooleanFlag(raw.get("enableDistributedClaim"), enableDistributedClaim);
} catch (Exception e) {
LOG.debug("Could not read warmBundles / enableDistributedClaim from app config", e);
private boolean warmBundlesEnabled() {
if (appConfig != null && appConfig.getWarmBundles() != null) {
return appConfig.getWarmBundles();
}
return Boolean.parseBoolean(System.getProperty("om.cache.warmBundles", "true"));
}

private static boolean parseBooleanFlag(Object value, boolean fallback) {
if (value == null) {
return fallback;
private boolean distributedClaimEnabled() {
if (appConfig != null && appConfig.getEnableDistributedClaim() != null) {
return appConfig.getEnableDistributedClaim();
}
if (value instanceof Boolean) {
return (Boolean) value;
}
return Boolean.parseBoolean(String.valueOf(value));
return Boolean.parseBoolean(System.getProperty("om.cache.warmup.distributedClaim", "false"));
}

private void initJobData(JobExecutionContext ctx) {
if (jobData == null) {
jobData = loadJobData(ctx);
if (appConfig == null) {
appConfig = loadAppConfig(ctx);
jobData = newRuntimeJobData();
} else if (jobData == null) {
jobData = newRuntimeJobData();
}
if (ctx.getJobDetail().getKey().getName().equals(ON_DEMAND_JOB)) {
Comment thread
harshach marked this conversation as resolved.
Outdated
// Persist the (typed) user-supplied config back onto the App so subsequent runs see the
// same payload. We round-trip through a Map so AbstractNativeApplication's persistence
// layer doesn't have to know about CacheWarmupAppConfig directly.
Map<String, Object> asMap =
JsonUtils.convertValue(jobData, new TypeReference<Map<String, Object>>() {});
JsonUtils.convertValue(appConfig, new TypeReference<Map<String, Object>>() {});
getApp().setAppConfiguration(asMap);
Comment thread
harshach marked this conversation as resolved.
}
if (jobData.getBatchSize() == null) {
Expand All @@ -229,15 +237,15 @@ private void initJobData(JobExecutionContext ctx) {
jobData.setStats(stats);
}

private EventPublisherJob loadJobData(JobExecutionContext ctx) {
private CacheWarmupAppConfig loadAppConfig(JobExecutionContext ctx) {
String raw = (String) ctx.getJobDetail().getJobDataMap().get(APP_CONFIG);
if (raw != null) {
return JsonUtils.readValue(raw, EventPublisherJob.class);
return JsonUtils.readValue(raw, CacheWarmupAppConfig.class);
}
if (getApp() != null && getApp().getAppConfiguration() != null) {
return JsonUtils.convertValue(getApp().getAppConfiguration(), EventPublisherJob.class);
return parseAppConfig(getApp().getAppConfiguration());
}
throw new AppException("JobData is not initialized");
throw new AppException("CacheWarmup app configuration is not initialized");
}

private void runWarmup() {
Expand All @@ -255,12 +263,20 @@ private void runWarmup() {

int batchSize = jobData.getBatchSize();
Duration ttl = Duration.ofSeconds(cacheConfig.entityTtlSeconds);
partiallyWarmed = false;
for (String entityType : entityTypes) {
if (stopped) break;
warmupEntityType(entityType, batchSize, ttl);
}
if (stopped) {
jobData.setStatus(EventPublisherJob.Status.STOPPED);
} else if (partiallyWarmed) {
// Don't lie about success when one or more entity types bailed out because the cache went
Comment thread
harshach marked this conversation as resolved.
Outdated
// unavailable. Surface ACTIVE_ERROR so the AppRunRecord shows a non-success status and ops
// know to rerun. The 84%-cold incident happened because the app reported COMPLETED while
// most entities never made it to Redis.
jobData.setStatus(EventPublisherJob.Status.ACTIVE_ERROR);
LOG.warn("Cache warmup completed with one or more entity types only partially warmed");
Comment thread
harshach marked this conversation as resolved.
Comment thread
harshach marked this conversation as resolved.
} else {
jobData.setStatus(EventPublisherJob.Status.COMPLETED);
CacheMetrics metrics = CacheMetrics.getInstance();
Expand All @@ -275,7 +291,7 @@ private void warmupEntityType(String entityType, int batchSize, Duration ttl) {
LOG.debug("Skipping user entity type — not cached by design");
return;
}
if (enableDistributedClaim && !claimEntityType(entityType)) {
if (distributedClaimEnabled() && !claimEntityType(entityType)) {
Comment thread
harshach marked this conversation as resolved.
Outdated
LOG.info("Skipping {} — claimed by another instance", entityType);
Comment thread
harshach marked this conversation as resolved.
Outdated
return;
Comment thread
harshach marked this conversation as resolved.
Outdated
}
Expand All @@ -298,16 +314,35 @@ private void warmupEntityType(String entityType, int batchSize, Duration ttl) {
int success = 0;
int bundlesWritten = 0;
int failed = 0;
int unavailableAttempts = 0;
long start = System.currentTimeMillis();
while (!stopped) {
if (!cacheProvider.available()) {
// A prior batch tripped the provider to unavailable. Iterating further would silently
// drop every subsequent pipelineSet/Hset (their guard `if (!available) return;` fires)
// while the accounting below still counted pages as success. Bail out — the health
// checker will flip the flag back if Redis recovers; rerun the app to resume warmup.
LOG.warn("Cache provider unavailable, aborting warmup for {}", entityType);
break;
// Cache flipped to unavailable mid-warmup. Old behaviour was an immediate {@code break}
// here, which combined with a hair-trigger availability flag (a single 300ms timeout
// marked the whole provider unavailable) routinely left 80%+ of entities cold while the
// run reported COMPLETED. Now we wait for the health-check to confirm recovery (with
// bounded retries) before declaring this entity type partially warmed.
if (++unavailableAttempts > MAX_UNAVAILABLE_RETRIES) {
LOG.warn(
"Cache provider unavailable for {} after {} retries (~{}s); marking warmup partial",
entityType,
unavailableAttempts,
(MAX_UNAVAILABLE_RETRIES * UNAVAILABLE_BACKOFF_MS) / 1000);
partiallyWarmed = true;
break;
Comment thread
harshach marked this conversation as resolved.
}
try {
Thread.sleep(UNAVAILABLE_BACKOFF_MS);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
Comment thread
harshach marked this conversation as resolved.
break;
}
continue;
}
// Reset retry counter once the provider is available again so a flaky cache that recovers
// doesn't accumulate retry budget across separate hiccups within the same entity type.
unavailableAttempts = 0;
List<String> page;
try {
page = dao.listAfterWithOffset(batchSize, offset);
Expand Down Expand Up @@ -396,7 +431,7 @@ private void warmupEntityType(String entityType, int batchSize, Duration ttl) {
reportCoverage(entityType, dao, success, bundlesWritten);
clearCheckpoint(entityType);
}
if (enableDistributedClaim) {
if (distributedClaimEnabled()) {
releaseClaim(entityType);
}
}
Expand All @@ -421,7 +456,7 @@ private boolean claimEntityType(String entityType) {
* does redundant work (Redis writes are idempotent).
*/
private void refreshClaim(String entityType) {
if (!enableDistributedClaim
if (!distributedClaimEnabled()
|| cacheProvider == null
|| !cacheProvider.available()
|| claimKeyPrefix == null) {
Expand Down Expand Up @@ -631,7 +666,7 @@ public void stop() {
@Override
protected void validateConfig(Map<String, Object> appConfig) {
try {
JsonUtils.convertValue(appConfig, EventPublisherJob.class);
JsonUtils.convertValue(appConfig, CacheWarmupAppConfig.class);
} catch (IllegalArgumentException e) {
throw AppException.byMessage(
jakarta.ws.rs.core.Response.Status.BAD_REQUEST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,18 +323,8 @@ static boolean isTransientError(SearchIndexException e) {
}

static List<String> getSearchIndexFields(String entityType) {
if (TIME_SERIES_ENTITIES.contains(entityType)) {
return List.of();
}
org.openmetadata.service.search.SearchRepository repo =
org.openmetadata.service.Entity.getSearchRepository();
if (repo == null || repo.getSearchIndexFactory() == null) {
// Fallback for environments where the search subsystem isn't bootstrapped (e.g. unit
// tests that exercise the reader without the full Entity registry). Behaves the same
// as the pre-selective-fields code path.
return List.of("*");
}
return new ArrayList<>(repo.getSearchIndexFactory().getReindexFieldsFor(entityType));
return org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getSearchIndexFields(
entityType);
}

static int calculateNumberOfReaders(int totalEntityRecords, int batchSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,15 @@ public StepStats getProcessStats() {
}

public static class CustomBulkProcessor {
/**
* Cap on how long a flush will wait for a permit before declaring the bulk failed. With an
* unbounded {@code acquire()} a single leaked async future (no completion, no release) parks
* every subsequent caller permanently and the entire pipeline freezes at whatever record
* count was in flight at the time. 60s is conservative — well above any realistic OS bulk
* latency, well below "user gives up and bounces the pod".
*/
private static final long SEMAPHORE_ACQUIRE_TIMEOUT_SECONDS = 60L;

private final OpenSearchAsyncClient asyncClient;
private final List<BulkOperation> buffer = new ArrayList<>();

Expand Down Expand Up @@ -1093,8 +1102,16 @@ private void flushInternal() {
int numberOfActions = toFlush.size();
LOG.debug("Executing bulk request {} with {} actions", executionId, numberOfActions);

// Bounded acquire: a leaked bulk future (callback never fires — e.g., the OpenSearch HC5
// I/O reactor died, PR #27698 territory) used to drain this semaphore and park every
// subsequent caller forever. With a timeout we surface the leak as a permanent failure
// so workers can keep moving and operators see an actual error instead of the pipeline
// silently freezing at a fixed record count.
boolean acquired;
try {
concurrentRequestSemaphore.acquire();
acquired =
concurrentRequestSemaphore.tryAcquire(
SEMAPHORE_ACQUIRE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for semaphore", e);
Thread.currentThread().interrupt();
Expand All @@ -1104,6 +1121,19 @@ private void flushInternal() {
}
return;
}
if (!acquired) {
LOG.error(
"Bulk semaphore exhausted for {}s — recording {} ops as failed (active bulk requests={}). Likely a leaked async future.",
SEMAPHORE_ACQUIRE_TIMEOUT_SECONDS,
numberOfActions,
activeBulkRequests.get());
recordPermanentFailure(
toFlush, numberOfActions, "Bulk semaphore timeout — likely future leak");
if (metrics != null) {
metrics.decrementPendingBulkRequests();
}
Comment thread
harshach marked this conversation as resolved.
return;
Comment thread
harshach marked this conversation as resolved.
}

activeBulkRequests.incrementAndGet();
executeBulkWithRetry(toFlush, executionId, numberOfActions, 0);
Expand Down
Loading
Loading