Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
f8393ba
Reindex robustness: selective fields, cache fail-fast, stop actually …
harshach May 4, 2026
e626769
Address review: synchronize Redis state transitions, drop weak test
harshach May 4, 2026
e7b36b7
Address review: complete selective fields, preserve checkpoint, skip …
harshach May 4, 2026
54b7d21
Reindex bypasses Redis entity cache
harshach May 4, 2026
529b148
Address review: on-demand config reload, semaphore timeout test, test…
harshach May 4, 2026
5d35437
Address review: bypass guards on put*, volatile timeout
harshach May 4, 2026
f4edb0c
Reindex bypass: zero Redis traffic — guard every cache touchpoint
harshach May 4, 2026
126902d
Merge branch 'main' into harshach/indexing-perf
mohityadav766 May 4, 2026
ddcdcf1
Merge remote-tracking branch 'origin/main' into harshach/indexing-perf
harshach May 4, 2026
29c5999
Merge branch 'harshach/indexing-perf' of github.com:open-metadata/Ope…
harshach May 4, 2026
ade6439
Reindex: filter required fields against entity allowedFields
harshach May 4, 2026
5a39918
Add reindex perf-test tooling: 100k container generator + bootstrap
harshach May 4, 2026
a68c801
Merge branch 'main' into harshach/indexing-perf
harshach May 4, 2026
c67b9b0
PR #27876 review: address open Copilot/gitar-bot threads (batch 1)
harshach May 4, 2026
1186162
Merge branch 'harshach/indexing-perf' of github.com:open-metadata/Ope…
harshach May 4, 2026
67fae84
PR #27876 review: state-machine test + intent comments (batch 2)
harshach May 4, 2026
e46dc0d
PR #27876 review: expand ReindexingUtil parity coverage (batch 3)
harshach May 4, 2026
f877ae6
PR #27876 review 4222114380: failure context, claim release, ES test …
harshach May 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.app.FailureContext;
import org.openmetadata.schema.entity.app.SuccessContext;
import org.openmetadata.schema.entity.applications.configuration.internal.CacheWarmupAppConfig;
import org.openmetadata.schema.system.EntityStats;
import org.openmetadata.schema.system.EventPublisherJob;
import org.openmetadata.schema.system.Stats;
Expand Down Expand Up @@ -101,7 +102,19 @@ public class CacheWarmupApp extends AbstractNativeApplication {
private static final Duration CHECKPOINT_TTL = Duration.ofDays(1);
private static final Duration CLAIM_TTL = Duration.ofMinutes(10);

// Bound how long we wait for a flapping cache before declaring the warmup partial. Each retry
// sleeps {@link #UNAVAILABLE_BACKOFF_MS}, so the total grace before bailing is roughly
// MAX_UNAVAILABLE_RETRIES * UNAVAILABLE_BACKOFF_MS / 1000 seconds. Old behaviour was a single
// {@code break} on first {@code !available}, which combined with a 300ms-timeout cache flipping
// unavailable on the very first hiccup left 84% of entities cold while the run reported SUCCESS.
private static final int MAX_UNAVAILABLE_RETRIES = 30;
private static final long UNAVAILABLE_BACKOFF_MS = 1_000L;

// Runtime state used for the AppRunRecord broadcast. We keep an EventPublisherJob here purely
// because that's what the AppRunRecord serialization expects in the success/failure contexts;
// it is NOT parsed from the user-supplied JSON. User configuration lives on {@link #appConfig}.
@Getter private EventPublisherJob jobData;
private CacheWarmupAppConfig appConfig;

private CacheProvider cacheProvider;
private CacheKeys keys;
Expand All @@ -111,16 +124,11 @@ public class CacheWarmupApp extends AbstractNativeApplication {
private String checkpointKeyPrefix;
private String claimKeyPrefix;
private final String instanceId = generateInstanceId();
// Read in initJobData from the user-supplied app config (cacheWarmupAppConfig schema). System
// properties remain a fallback for cases where the app record isn't editable (e.g. bootstrap).
private boolean warmBundles =
Boolean.parseBoolean(System.getProperty("om.cache.warmBundles", "true"));
private boolean enableDistributedClaim =
Boolean.parseBoolean(System.getProperty("om.cache.warmup.distributedClaim", "false"));

private JobExecutionContext jobExecutionContext;
private volatile boolean stopped = false;
private final Stats stats = new Stats().withEntityStats(new EntityStats());
private volatile boolean partiallyWarmed = false;
private volatile long lastWebSocketUpdate = 0;
private static final long WEBSOCKET_UPDATE_INTERVAL_MS = 2000;

Expand All @@ -131,17 +139,39 @@ public CacheWarmupApp(CollectionDAO collectionDAO, SearchRepository searchReposi
@Override
public void init(App app) {
super.init(app);
jobData = JsonUtils.convertValue(app.getAppConfiguration(), EventPublisherJob.class);
readAppConfigFlags();
appConfig = parseAppConfig(app.getAppConfiguration());
jobData = newRuntimeJobData();
}

private CacheWarmupAppConfig parseAppConfig(Object raw) {
if (raw == null) {
return new CacheWarmupAppConfig();
}
return JsonUtils.convertValue(raw, CacheWarmupAppConfig.class);
}

private EventPublisherJob newRuntimeJobData() {
EventPublisherJob runtime = new EventPublisherJob();
if (appConfig != null) {
runtime.setEntities(appConfig.getEntities());
if (appConfig.getBatchSize() != null) {
runtime.setBatchSize(appConfig.getBatchSize());
}
}
return runtime;
}

@Override
public void execute(JobExecutionContext jobExecutionContext) {
this.jobExecutionContext = jobExecutionContext;
this.stopped = false;
try {
initCacheComponents();
// Resolve the live config before constructing components. On-demand runs carry user
// overrides in the Quartz JobDataMap (entities, batchSize, warmBundles,
// enableDistributedClaim) that aren't in the persisted App config, and bundleBatcher in
// particular needs the right warmBundles flag at construction time.
initJobData(jobExecutionContext);
initCacheComponents();
if (cacheProvider == null || !cacheProvider.available()) {
// Surface this as FAILED — initJobData set status to RUNNING above, and the finally block
// will broadcast the terminal state. Leaving it RUNNING here would pin the job record in
Expand Down Expand Up @@ -170,56 +200,43 @@ private void initCacheComponents() {
checkpointKeyPrefix = ks + ":warmup:checkpoint:";
claimKeyPrefix = ks + ":warmup:claim:";
}
if (warmBundles && cacheProvider != null && keys != null) {
if (warmBundlesEnabled() && cacheProvider != null && keys != null) {
bundleBatcher = new BundleWarmupBatcher(collectionDAO, cacheProvider, keys);
Comment thread
harshach marked this conversation as resolved.
}
}

/**
* Pull the user-supplied warmBundles / enableDistributedClaim flags out of the raw app config
* map. The runtime parses the same payload as {@link EventPublisherJob}, which doesn't include
* these fields, so we read them from the original map rather than trying to extend the
* EventPublisherJob schema.
*/
private void readAppConfigFlags() {
if (getApp() == null || getApp().getAppConfiguration() == null) {
return;
}
try {
Map<?, ?> raw = JsonUtils.convertValue(getApp().getAppConfiguration(), Map.class);
if (raw == null) {
return;
}
// Accept both native Boolean and string forms — depending on how the config arrives
// (typed POJO, raw JSON, YAML env-var override, API string body) the same logical value
// can land here as Boolean.TRUE or "true". An instanceof Boolean check would silently
// ignore the string form and fall back to JVM system properties, which surprises
// operators who set the flag in the UI.
warmBundles = parseBooleanFlag(raw.get("warmBundles"), warmBundles);
enableDistributedClaim =
parseBooleanFlag(raw.get("enableDistributedClaim"), enableDistributedClaim);
} catch (Exception e) {
LOG.debug("Could not read warmBundles / enableDistributedClaim from app config", e);
private boolean warmBundlesEnabled() {
if (appConfig != null && appConfig.getWarmBundles() != null) {
return appConfig.getWarmBundles();
}
return Boolean.parseBoolean(System.getProperty("om.cache.warmBundles", "true"));
}

private static boolean parseBooleanFlag(Object value, boolean fallback) {
if (value == null) {
return fallback;
}
if (value instanceof Boolean) {
return (Boolean) value;
private boolean distributedClaimEnabled() {
if (appConfig != null && appConfig.getEnableDistributedClaim() != null) {
return appConfig.getEnableDistributedClaim();
}
return Boolean.parseBoolean(String.valueOf(value));
return Boolean.parseBoolean(System.getProperty("om.cache.warmup.distributedClaim", "false"));
}

private void initJobData(JobExecutionContext ctx) {
if (jobData == null) {
jobData = loadJobData(ctx);
}
if (ctx.getJobDetail().getKey().getName().equals(ON_DEMAND_JOB)) {
boolean isOnDemand = ctx.getJobDetail().getKey().getName().equals(ON_DEMAND_JOB);
// For on-demand runs, OmAppJobListener places the user-supplied config (with overrides
// for entities / batchSize / warmBundles / enableDistributedClaim) into the Quartz
// JobDataMap[APP_CONFIG]. {@code init(App)} ran earlier and cached the persisted App
// config in {@code appConfig}; if we don't reload here, those manual overrides are
// silently ignored. Always reload for on-demand; for scheduled runs the persisted config
// is what we want.
if (appConfig == null || isOnDemand) {
appConfig = loadAppConfig(ctx);
}
jobData = newRuntimeJobData();
if (isOnDemand) {
// Persist the (typed) user-supplied config back onto the App so subsequent renders see
// the same payload. Round-trip through a Map so AbstractNativeApplication's persistence
// layer doesn't have to know about CacheWarmupAppConfig directly.
Map<String, Object> asMap =
JsonUtils.convertValue(jobData, new TypeReference<Map<String, Object>>() {});
JsonUtils.convertValue(appConfig, new TypeReference<Map<String, Object>>() {});
getApp().setAppConfiguration(asMap);
Comment thread
harshach marked this conversation as resolved.
}
if (jobData.getBatchSize() == null) {
Expand All @@ -229,15 +246,15 @@ private void initJobData(JobExecutionContext ctx) {
jobData.setStats(stats);
}

private EventPublisherJob loadJobData(JobExecutionContext ctx) {
private CacheWarmupAppConfig loadAppConfig(JobExecutionContext ctx) {
String raw = (String) ctx.getJobDetail().getJobDataMap().get(APP_CONFIG);
if (raw != null) {
return JsonUtils.readValue(raw, EventPublisherJob.class);
return JsonUtils.readValue(raw, CacheWarmupAppConfig.class);
}
if (getApp() != null && getApp().getAppConfiguration() != null) {
return JsonUtils.convertValue(getApp().getAppConfiguration(), EventPublisherJob.class);
return parseAppConfig(getApp().getAppConfiguration());
}
throw new AppException("JobData is not initialized");
throw new AppException("CacheWarmup app configuration is not initialized");
}

private void runWarmup() {
Expand All @@ -255,12 +272,16 @@ private void runWarmup() {

int batchSize = jobData.getBatchSize();
Duration ttl = Duration.ofSeconds(cacheConfig.entityTtlSeconds);
partiallyWarmed = false;
for (String entityType : entityTypes) {
if (stopped) break;
warmupEntityType(entityType, batchSize, ttl);
}
if (stopped) {
jobData.setStatus(EventPublisherJob.Status.STOPPED);
} else if (partiallyWarmed) {
jobData.setStatus(EventPublisherJob.Status.ACTIVE_ERROR);
LOG.warn("Cache warmup completed with one or more entity types only partially warmed");
Comment thread
harshach marked this conversation as resolved.
Comment thread
harshach marked this conversation as resolved.
} else {
jobData.setStatus(EventPublisherJob.Status.COMPLETED);
CacheMetrics metrics = CacheMetrics.getInstance();
Expand All @@ -275,7 +296,7 @@ private void warmupEntityType(String entityType, int batchSize, Duration ttl) {
LOG.debug("Skipping user entity type — not cached by design");
return;
}
if (enableDistributedClaim && !claimEntityType(entityType)) {
if (distributedClaimEnabled() && !claimEntityType(entityType)) {
Comment thread
harshach marked this conversation as resolved.
Outdated
LOG.info("Skipping {} — claimed by another instance", entityType);
Comment thread
harshach marked this conversation as resolved.
Outdated
return;
Comment thread
harshach marked this conversation as resolved.
Outdated
}
Expand All @@ -298,16 +319,37 @@ private void warmupEntityType(String entityType, int batchSize, Duration ttl) {
int success = 0;
int bundlesWritten = 0;
int failed = 0;
int unavailableAttempts = 0;
boolean bailedOut = false;
long start = System.currentTimeMillis();
while (!stopped) {
if (!cacheProvider.available()) {
// A prior batch tripped the provider to unavailable. Iterating further would silently
// drop every subsequent pipelineSet/Hset (their guard `if (!available) return;` fires)
// while the accounting below still counted pages as success. Bail out — the health
// checker will flip the flag back if Redis recovers; rerun the app to resume warmup.
LOG.warn("Cache provider unavailable, aborting warmup for {}", entityType);
break;
// Cache flipped to unavailable mid-warmup. Old behaviour was an immediate {@code break}
// here, which combined with a hair-trigger availability flag (a single 300ms timeout
// marked the whole provider unavailable) routinely left 80%+ of entities cold while the
// run reported COMPLETED. Now we wait for the health-check to confirm recovery (with
// bounded retries) before declaring this entity type partially warmed.
if (++unavailableAttempts > MAX_UNAVAILABLE_RETRIES) {
LOG.warn(
"Cache provider unavailable for {} after {} retries (~{}s); marking warmup partial",
entityType,
unavailableAttempts,
(MAX_UNAVAILABLE_RETRIES * UNAVAILABLE_BACKOFF_MS) / 1000);
partiallyWarmed = true;
bailedOut = true;
break;
Comment thread
harshach marked this conversation as resolved.
}
try {
Thread.sleep(UNAVAILABLE_BACKOFF_MS);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
Comment thread
harshach marked this conversation as resolved.
break;
}
continue;
}
// Reset retry counter once the provider is available again so a flaky cache that recovers
// doesn't accumulate retry budget across separate hiccups within the same entity type.
unavailableAttempts = 0;
List<String> page;
try {
page = dao.listAfterWithOffset(batchSize, offset);
Expand Down Expand Up @@ -394,9 +436,15 @@ private void warmupEntityType(String entityType, int batchSize, Duration ttl) {
elapsed);
if (!stopped) {
reportCoverage(entityType, dao, success, bundlesWritten);
clearCheckpoint(entityType);
// Only clear the checkpoint when this entity type fully completed. If we bailed because
Comment thread
harshach marked this conversation as resolved.
// the cache went unavailable, the saved offset is the last successfully pipelined page
// and the next run should resume from there — clearing it would force a restart from
// offset 0 and re-warm everything we already wrote.
if (!bailedOut) {
Comment thread
harshach marked this conversation as resolved.
clearCheckpoint(entityType);
}
}
if (enableDistributedClaim) {
if (distributedClaimEnabled()) {
releaseClaim(entityType);
}
}
Expand All @@ -421,7 +469,7 @@ private boolean claimEntityType(String entityType) {
* does redundant work (Redis writes are idempotent).
*/
private void refreshClaim(String entityType) {
if (!enableDistributedClaim
if (!distributedClaimEnabled()
|| cacheProvider == null
|| !cacheProvider.available()
|| claimKeyPrefix == null) {
Expand Down Expand Up @@ -631,7 +679,7 @@ public void stop() {
@Override
protected void validateConfig(Map<String, Object> appConfig) {
try {
JsonUtils.convertValue(appConfig, EventPublisherJob.class);
JsonUtils.convertValue(appConfig, CacheWarmupAppConfig.class);
} catch (IllegalArgumentException e) {
throw AppException.byMessage(
jakarta.ws.rs.core.Response.Status.BAD_REQUEST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,25 @@ private void readKeysetBatches(
KeysetBatchReader batchReader,
Phaser phaser,
BatchCallback callback) {
// Bypass the Redis-backed entity cache on the reader thread for the same reasons as the
// distributed PartitionWorker: bulk reindex never re-reads entities, every relationship
// lookup pays a cache round-trip we don't need, and an unhealthy Redis turns each lookup
// into a 300ms timeout. See {@link org.openmetadata.service.cache.EntityCacheBypass}.
try (org.openmetadata.service.cache.EntityCacheBypass.Handle ignored =
org.openmetadata.service.cache.EntityCacheBypass.skip()) {
readKeysetBatchesInternal(
entityType, recordLimit, batchSize, startCursor, batchReader, phaser, callback);
}
}

private void readKeysetBatchesInternal(
String entityType,
int recordLimit,
int batchSize,
String startCursor,
KeysetBatchReader batchReader,
Phaser phaser,
BatchCallback callback) {
try {
String keysetCursor = startCursor;
int processed = 0;
Expand Down Expand Up @@ -323,18 +342,8 @@ static boolean isTransientError(SearchIndexException e) {
}

static List<String> getSearchIndexFields(String entityType) {
if (TIME_SERIES_ENTITIES.contains(entityType)) {
return List.of();
}
org.openmetadata.service.search.SearchRepository repo =
org.openmetadata.service.Entity.getSearchRepository();
if (repo == null || repo.getSearchIndexFactory() == null) {
// Fallback for environments where the search subsystem isn't bootstrapped (e.g. unit
// tests that exercise the reader without the full Entity registry). Behaves the same
// as the pre-selective-fields code path.
return List.of("*");
}
return new ArrayList<>(repo.getSearchIndexFactory().getReindexFieldsFor(entityType));
return org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getSearchIndexFields(
entityType);
}

static int calculateNumberOfReaders(int totalEntityRecords, int batchSize) {
Expand Down
Loading
Loading