diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java b/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java index 64076bf5c09b..eaded403c79e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java @@ -563,6 +563,11 @@ public static Fields getFields(String entityType, List fields) { return entityRepository.getFields(String.join(",", fields)); } + public static Fields getOnlySupportedFields(String entityType, List fields) { + EntityRepository entityRepository = Entity.getEntityRepository(entityType); + return entityRepository.getOnlySupportedFields(String.join(",", fields)); + } + public static T getEntity(EntityReference ref, String fields, Include include) { if (ref == null) { return null; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/EntityReader.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/EntityReader.java index 215e5f6687c8..6ae3400d235f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/EntityReader.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/EntityReader.java @@ -3,6 +3,7 @@ import static org.openmetadata.service.Entity.QUERY_COST_RECORD; import static org.openmetadata.service.Entity.TEST_CASE_RESOLUTION_STATUS; import static org.openmetadata.service.Entity.TEST_CASE_RESULT; +import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getSearchIndexFields; import java.util.ArrayList; import java.util.List; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ReindexingOrchestrator.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ReindexingOrchestrator.java index 299a1ac5eabc..05c9e1f4ae1c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ReindexingOrchestrator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ReindexingOrchestrator.java @@ -24,6 +24,7 @@ import org.openmetadata.service.Entity; import org.openmetadata.service.apps.bundles.searchIndex.listeners.LoggingProgressListener; import org.openmetadata.service.apps.bundles.searchIndex.listeners.SlackProgressListener; +import org.openmetadata.service.apps.scheduler.OmAppJobListener; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.SystemRepository; import org.openmetadata.service.search.SearchRepository; @@ -109,7 +110,7 @@ public void stop() { AppRunRecord appRecord = context.getJobRecord(); appRecord.setStatus(AppRunRecord.Status.STOPPED); - appRecord.setEndTime(System.currentTimeMillis()); + OmAppJobListener.fillTerminalTimings(appRecord); context.storeRunRecord(JsonUtils.pojoToJson(appRecord)); context.pushStatusUpdate(appRecord, true); sendUpdates(); @@ -368,6 +369,7 @@ private void finalizeJobExecution() { if (stopped) { AppRunRecord appRecord = context.getJobRecord(); appRecord.setStatus(AppRunRecord.Status.STOPPED); + OmAppJobListener.fillTerminalTimings(appRecord); context.storeRunRecord(JsonUtils.pojoToJson(appRecord)); } } @@ -383,6 +385,7 @@ private void sendUpdates() { private void updateRecordToDbAndNotify() { AppRunRecord appRecord = context.getJobRecord(); appRecord.setStatus(AppRunRecord.Status.fromValue(jobData.getStatus().value())); + OmAppJobListener.fillTerminalTimings(appRecord); if (jobData.getFailure() != null) { appRecord.setFailureContext( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/DistributedJobStatsAggregator.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/DistributedJobStatsAggregator.java index 8cee00a30e6b..20edf4c45daf 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/DistributedJobStatsAggregator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/DistributedJobStatsAggregator.java @@ -35,6 +35,7 @@ import org.openmetadata.service.apps.bundles.searchIndex.BulkSink; import org.openmetadata.service.apps.bundles.searchIndex.ReindexingJobContext; import org.openmetadata.service.apps.bundles.searchIndex.ReindexingProgressListener; +import org.openmetadata.service.apps.scheduler.OmAppJobListener; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.socket.WebSocketManager; @@ -562,6 +563,7 @@ private AppRunRecord convertToAppRunRecord( appRecord.setStartTime(appStartTime != null ? appStartTime : job.getStartedAt()); appRecord.setEndTime(job.getCompletedAt()); appRecord.setTimestamp(job.getUpdatedAt()); + OmAppJobListener.fillTerminalTimings(appRecord); // Add stats as success context SuccessContext successContext = new SuccessContext(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java index bc1844c5344f..2bc1458327e4 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java @@ -506,15 +506,16 @@ private BatchResult processBatch( ResultList resultList = readEntitiesKeyset(entityType, keysetCursor, batchSize); long readDurationNanos = System.nanoTime() - readStartNanos; - if (resultList == null || resultList.getData() == null || resultList.getData().isEmpty()) { - LOG.debug("{} read={}ms returned empty", entityType, readDurationNanos / 1_000_000L); - return new BatchResult(0, 0, 0, null); - } - - String nextCursor = resultList.getPaging() != null ? resultList.getPaging().getAfter() : null; - int readSuccessCount = listOrEmpty(resultList.getData()).size(); - int readErrorCount = listOrEmpty(resultList.getErrors()).size(); - int warningsCount = resultList.getWarningsCount() != null ? resultList.getWarningsCount() : 0; + int readSuccessCount = resultList != null ? listOrEmpty(resultList.getData()).size() : 0; + int readErrorCount = resultList != null ? listOrEmpty(resultList.getErrors()).size() : 0; + int warningsCount = + (resultList != null && resultList.getWarningsCount() != null) + ? resultList.getWarningsCount() + : 0; + String nextCursor = + (resultList != null && resultList.getPaging() != null) + ? resultList.getPaging().getAfter() + : null; if (statsTracker != null) { // Reader timing = wall-clock time of the keyset DB read (listAfter + setFieldsInBulk @@ -523,28 +524,16 @@ private BatchResult processBatch( readSuccessCount, readErrorCount, warningsCount, readDurationNanos); } - if (failureRecorder != null && readErrorCount > 0) { - for (EntityError entityError : listOrEmpty(resultList.getErrors())) { - Object rawEntity = entityError.getEntity(); - String entityId = null; - if (rawEntity instanceof EntityInterface) { - UUID id = ((EntityInterface) rawEntity).getId(); - if (id != null) { - entityId = id.toString(); - } - } else if (rawEntity != null) { - entityId = rawEntity.toString(); - } - if (entityId == null) { - LOG.warn( - "Skipping reader failure record for entityType={}: entityId is null, message={}", - entityType, - entityError.getMessage()); - continue; - } - failureRecorder.recordReaderEntityFailure( - entityType, entityId, null, entityError.getMessage()); - } + recordReaderFailures(entityType, resultList, readErrorCount); + + if (readSuccessCount == 0) { + LOG.debug( + "{} read={}ms returned no indexable rows (warnings={}, errors={})", + entityType, + readDurationNanos / 1_000_000L, + warningsCount, + readErrorCount); + return new BatchResult(0, readErrorCount, warningsCount, nextCursor); } Map contextData = createContextData(entityType, statsTracker); @@ -572,6 +561,44 @@ private BatchResult processBatch( } } + /** + * Persist per-entity reader failures so that downstream tooling (e.g. the failures dashboard) + * can show which specific records the reader could not hydrate. Runs whether or not the batch + * has any successful rows — losing failure diagnostics for "all-error" batches would defeat + * the point of the recorder. + */ + private void recordReaderFailures( + String entityType, ResultList resultList, int readErrorCount) { + if (failureRecorder == null || readErrorCount == 0 || resultList == null) { + return; + } + for (EntityError entityError : listOrEmpty(resultList.getErrors())) { + Object rawEntity = entityError.getEntity(); + String entityId = null; + if (rawEntity instanceof EntityInterface) { + UUID id = ((EntityInterface) rawEntity).getId(); + if (id != null) { + entityId = id.toString(); + } + } else if (rawEntity != null) { + entityId = rawEntity.toString(); + } + if (entityId == null) { + // Time-series readers (EntityTimeSeriesRepository) build EntityError without an id — + // they only have access to the JSON row, not the entity reference. Per-entity recording + // requires an id, so log at DEBUG (not WARN) to avoid spamming logs for every error in + // large time-series batches. + LOG.debug( + "No entityId on reader failure for entityType={} — skipping per-entity record. message={}", + entityType, + entityError.getMessage()); + continue; + } + failureRecorder.recordReaderEntityFailure( + entityType, entityId, null, entityError.getMessage()); + } + } + /** * Read entities from the database. * diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/listeners/QuartzProgressListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/listeners/QuartzProgressListener.java index e9e665461cf2..4a4a0a96e1c5 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/listeners/QuartzProgressListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/listeners/QuartzProgressListener.java @@ -23,6 +23,7 @@ import org.openmetadata.service.apps.bundles.searchIndex.ReindexingJobContext; import org.openmetadata.service.apps.bundles.searchIndex.ReindexingProgressListener; import org.openmetadata.service.apps.bundles.searchIndex.distributed.DistributedJobContext; +import org.openmetadata.service.apps.scheduler.OmAppJobListener; import org.openmetadata.service.socket.WebSocketManager; import org.quartz.JobExecutionContext; @@ -228,6 +229,7 @@ private void broadcastViaWebSocket(AppRunRecord appRecord) { private AppRunRecord getUpdatedAppRunRecord() { AppRunRecord appRecord = readExistingRecord(); appRecord.setStatus(AppRunRecord.Status.fromValue(jobData.getStatus().value())); + OmAppJobListener.fillTerminalTimings(appRecord); if (jobData.getStats() != null) { SuccessContext ctx = appRecord.getSuccessContext(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java index 35320b536248..45c47d97a4e1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java @@ -465,7 +465,7 @@ private void updateAndBroadcastStoppedStatus(JobExecutionContext context) { if (runRecord != null) { // Update status to STOPPED runRecord.withStatus(AppRunRecord.Status.STOPPED); - runRecord.withEndTime(System.currentTimeMillis()); + OmAppJobListener.fillTerminalTimings(runRecord); // Get WebSocket channel name String webSocketChannelName = diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/OmAppJobListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/OmAppJobListener.java index 53280d87c664..b5d5380762f8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/OmAppJobListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/OmAppJobListener.java @@ -47,6 +47,45 @@ protected OmAppJobListener() { this.repository = new AppRepository(); } + /** + * Populate {@code endTime} and {@code executionTime} on a terminal-state run record. Each field + * is filled independently and only if currently null: + * + *
    + *
  • {@code endTime} defaults to {@code System.currentTimeMillis()} if absent. + *
  • {@code executionTime} is computed from {@code endTime - startTime} if absent and both + * endpoints are available — this means callers that pre-populated {@code endTime} (e.g. + * from {@code job.getCompletedAt()}) still get an accurate {@code executionTime}. + *
+ * + *

The method is a no-op for non-terminal statuses, so it is safe to call from progress + * listeners that may persist before {@link #jobWasExecuted} runs. Without this, mid-flight + * writes by progress listeners (e.g. {@code QuartzProgressListener} firing {@code onJobFailed}) + * would persist a terminal status to the DB without timings; if the job dies before {@code + * jobWasExecuted} fires, polling consumers would see {@code status=FAILED} with no + * {@code endTime} / {@code executionTime}. + */ + public static void fillTerminalTimings(AppRunRecord record) { + if (record == null || record.getStatus() == null || !isTerminalStatus(record.getStatus())) { + return; + } + if (record.getEndTime() == null) { + record.withEndTime(System.currentTimeMillis()); + } + if (record.getExecutionTime() == null + && record.getStartTime() != null + && record.getEndTime() != null) { + record.setExecutionTime(record.getEndTime() - record.getStartTime()); + } + } + + private static boolean isTerminalStatus(AppRunRecord.Status status) { + return switch (status) { + case SUCCESS, FAILED, ACTIVE_ERROR, STOPPED, COMPLETED -> true; + default -> false; + }; + } + @Override public String getName() { return JOB_LISTENER_NAME; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java index 34e648cb5362..a02f786c1573 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java @@ -6607,6 +6607,13 @@ public final Fields getFields(String fields) { return new Fields(allowedFields, fields); } + public final Fields getOnlySupportedFields(String fields) { + if ("*".equals(fields)) { + return new Fields(allowedFields, String.join(",", allowedFields), true); + } + return new Fields(allowedFields, fields, true); + } + protected final Fields getFields(Set fields) { return new Fields(allowedFields, fields); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchIndex.java index 9760feeb256b..5291543c51a9 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchIndex.java @@ -67,6 +67,7 @@ public interface SearchIndex { "followers", "votes", "extension", + "tags", "certification", "dataProducts"); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/EntityUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/EntityUtil.java index 0b585474e951..1dd664fbb4db 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/EntityUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/EntityUtil.java @@ -532,6 +532,39 @@ public Fields(Set allowedFields, String fieldsParam) { } } + public Fields(Set allowedFields, String fieldsParam, boolean ignoreExtra) { + if (nullOrEmpty(fieldsParam)) { + this.fieldList = new HashSet<>(); + return; + } + + Set parsedFields = parseFields(fieldsParam); + this.fieldList = validateFields(parsedFields, allowedFields, ignoreExtra); + } + + private Set validateFields( + Set inputFields, Set allowedFields, boolean ignoreExtra) { + + Set result = new HashSet<>(); + + for (String field : inputFields) { + if (allowedFields.contains(field)) { + result.add(field); + } else if (!ignoreExtra) { + throw new IllegalArgumentException(CatalogExceptionMessage.invalidField(field)); + } + } + + return result; + } + + private Set parseFields(String fieldsParam) { + return Arrays.stream(fieldsParam.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toSet()); + } + public Fields(Set allowedFields, Set fieldsParam) { if (nullOrEmpty(fieldsParam)) { fieldList = new HashSet<>(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java index f59a1e37d121..66de553430f7 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java @@ -15,6 +15,7 @@ import static org.openmetadata.schema.system.IndexingError.ErrorSource.READER; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats; +import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isStaleReferenceError; import java.util.ArrayList; import java.util.List; @@ -140,7 +141,7 @@ private ResultList read(String cursor) throws SearchI batchSize, cursor, true, - Entity.getFields(entityType, fields), + Entity.getOnlySupportedFields(entityType, fields), null); // Filter out EntityNotFoundExceptions from errors - these are expected when relationships @@ -150,7 +151,7 @@ private ResultList read(String cursor) throws SearchI List warningErrors = new ArrayList<>(); for (EntityError error : result.getErrors()) { - if (isEntityNotFoundError(error)) { + if (isStaleReferenceError(error)) { warningErrors.add(error); } else { realErrors.add(error); @@ -240,7 +241,7 @@ public ResultList readWithCursor(String currentCursor batchSize, currentCursor, true, - Entity.getFields(entityType, fields), + Entity.getOnlySupportedFields(entityType, fields), null); // Filter out EntityNotFoundExceptions from errors - same as in read() method @@ -249,7 +250,7 @@ public ResultList readWithCursor(String currentCursor if (!result.getErrors().isEmpty()) { List realErrors = new ArrayList<>(); for (EntityError error : result.getErrors()) { - if (isEntityNotFoundError(error)) { + if (isStaleReferenceError(error)) { warningsCount++; LOG.debug("Skipping entity due to missing relationship: {}", error.getMessage()); } else { @@ -298,13 +299,13 @@ public ResultList readNextKeyset(String keysetCursor) keysetCursor, cachedTotalCount, true, - Entity.getFields(entityType, fields)); + Entity.getOnlySupportedFields(entityType, fields)); int warningsCount = 0; if (result.getErrors() != null && !result.getErrors().isEmpty()) { List realErrors = new ArrayList<>(); for (EntityError error : result.getErrors()) { - if (isEntityNotFoundError(error)) { + if (isStaleReferenceError(error)) { warningsCount++; LOG.debug("Skipping entity due to missing relationship: {}", error.getMessage()); } else { @@ -378,15 +379,4 @@ public void updateStats(int currentSuccess, int currentFailed) { public void updateStats(int currentSuccess, int currentFailed, int currentWarnings) { getUpdatedStats(stats, currentSuccess, currentFailed, currentWarnings); } - - private boolean isEntityNotFoundError(EntityError error) { - if (error == null || error.getMessage() == null) { - return false; - } - String message = error.getMessage().toLowerCase(); - return message.contains("not found") - || message.contains("instance for") - || message.contains("does not exist") - || message.contains("entitynotfoundexception"); - } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSource.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSource.java index afc3e0cd2f30..af086bedd9e8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSource.java @@ -2,6 +2,7 @@ import static org.openmetadata.schema.system.IndexingError.ErrorSource.READER; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats; +import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.partitionErrors; import java.util.ArrayList; import java.util.List; @@ -11,6 +12,7 @@ import lombok.extern.slf4j.Slf4j; import org.glassfish.jersey.internal.util.ExceptionUtils; import org.openmetadata.schema.EntityTimeSeriesInterface; +import org.openmetadata.schema.system.EntityError; import org.openmetadata.schema.system.IndexingError; import org.openmetadata.schema.system.StepStats; import org.openmetadata.schema.type.Include; @@ -28,6 +30,9 @@ @Getter public class PaginatedEntityTimeSeriesSource implements Source> { + /** Cap on per-error detail messages emitted to logs to avoid flooding under large batches. */ + private static final int MAX_ERROR_DETAILS_LOGGED = 5; + private final int batchSize; private final String entityType; private final List fields; @@ -117,9 +122,14 @@ public ResultList readWithCursor(String cur } else { result = repository.listWithOffset(currentCursor, filter, batchSize, true); } + int warningsCount = filterStaleRelationshipErrors(result); LOG.debug( - "[PaginatedEntitiesSource] Batch Stats :- %n Submitted : {} Success: {} Failed: {}", - batchSize, result.getData().size(), result.getErrors().size()); + "[PaginatedEntityTimeSeriesSource] Batch Stats :- Submitted: {} Success: {} Failed: {} Warnings: {}", + batchSize, + result.getData().size(), + result.getErrors().size(), + warningsCount); + updateStats(result.getData().size(), result.getErrors().size(), warningsCount); } catch (Exception e) { IndexingError indexingError = new IndexingError() @@ -149,18 +159,38 @@ private ResultList read(String cursor) result = repository.listWithOffset(cursor, filter, batchSize, true); } + int warningsCount = filterStaleRelationshipErrors(result); + if (!result.getErrors().isEmpty()) { + int errorCount = result.getErrors().size(); + LOG.warn( + "[PaginatedEntityTimeSeriesSource] {} real reader error(s) for entityType={}; " + + "first up to {} shown at DEBUG", + errorCount, + entityType, + MAX_ERROR_DETAILS_LOGGED); + if (LOG.isDebugEnabled()) { + result.getErrors().stream() + .limit(MAX_ERROR_DETAILS_LOGGED) + .forEach(error -> LOG.debug("Reader error: {}", error.getMessage())); + } lastFailedCursor = this.cursor.get(); if (result.getPaging().getAfter() == null) { + this.cursor.set(null); isDone.set(true); } else { this.cursor.set(result.getPaging().getAfter()); } + updateStats(result.getData().size(), result.getErrors().size(), warningsCount); return result; } LOG.debug( - "[PaginatedEntitiesSource] Batch Stats :- %n Submitted : {} Success: {} Failed: {}", - batchSize, result.getData().size(), result.getErrors().size()); + "[PaginatedEntityTimeSeriesSource] Batch Stats :- Submitted: {} Success: {} Failed: {} Warnings: {}", + batchSize, + result.getData().size(), + result.getErrors().size(), + warningsCount); + updateStats(result.getData().size(), 0, warningsCount); } catch (Exception e) { lastFailedCursor = this.cursor.get(); int remainingRecords = @@ -214,11 +244,15 @@ public ResultList readNextKeyset(String key cachedTotal, true); + int warningsCount = filterStaleRelationshipErrors(result); + int failedCount = result.getErrors() != null ? result.getErrors().size() : 0; LOG.debug( - "[PaginatedEntityTimeSeriesSource] Keyset batch stats — Submitted: {} Success: {} Failed: {}", + "[PaginatedEntityTimeSeriesSource] Keyset batch stats — Submitted: {} Success: {} Failed: {} Warnings: {}", batchSize, result.getData().size(), - result.getErrors() != null ? result.getErrors().size() : 0); + failedCount, + warningsCount); + updateStats(result.getData().size(), failedCount, warningsCount); return result; } catch (Exception e) { LOG.error( @@ -269,6 +303,47 @@ public void updateStats(int currentSuccess, int currentFailed) { getUpdatedStats(stats, currentSuccess, currentFailed); } + public void updateStats(int currentSuccess, int currentFailed, int currentWarnings) { + getUpdatedStats(stats, currentSuccess, currentFailed, currentWarnings); + } + + /** + * Splits the errors on {@code result} into real failures and stale-relationship warnings, + * mutating {@code result} so its errors list contains only real failures and its warnings count + * reflects the skipped stale relationships. Returns the warnings count for callers that want to + * include it in their own logging or stats updates. + * + *

Stale relationships happen for time-series records (testCaseResolutionStatus, + * testCaseResult, ...) whose parent entity was hard-deleted out-of-band, or whose parentOf + * entity_relationship row was lost during a past migration. Such records cannot be indexed but + * should not fail the entire batch. + */ + private int filterStaleRelationshipErrors( + ResultList result) { + if (result == null) { + return 0; + } + // EntityTimeSeriesRepository.getResultList(...) leaves errors=null on the success path. + // Normalize so downstream callers (logging, stats) can rely on a non-null list. + if (result.getErrors() == null) { + result.setErrors(new ArrayList<>()); + } + if (result.getErrors().isEmpty()) { + return 0; + } + List warnings = new ArrayList<>(); + List realErrors = partitionErrors(result.getErrors(), warnings); + if (!warnings.isEmpty()) { + LOG.debug( + "[PaginatedEntityTimeSeriesSource] {} stale-relationship warnings for entity type {}", + warnings.size(), + entityType); + } + result.setErrors(realErrors); + result.setWarningsCount(warnings.size()); + return warnings.size(); + } + public ListFilter getFilter() { ListFilter filter = new ListFilter(Include.ALL); if (ReindexingUtil.isDataInsightIndex(entityType)) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java index f4f9ab32b872..34d49476a614 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.UUID; import lombok.SneakyThrows; @@ -65,6 +66,59 @@ public static void getUpdatedStats( (stats.getWarningRecords() != null ? stats.getWarningRecords() : 0) + currentWarnings); } + /** + * Returns true when an EntityError represents a stale reference — either a missing entity + * (canonical {@code EntityNotFoundException}) or a missing entity_relationship row (raised by + * {@code EntityRepository.ensureSingleRelationship} as "does not have expected relationship + * ..."). Both are expected during reindexing of long-lived records: e.g. a + * {@code testCaseResolutionStatus} migrated without a corresponding {@code parentOf} row, or + * an entity hard-deleted out-of-band leaving its relationship rows behind. Such records + * cannot be meaningfully indexed and are reported as warnings rather than failing the entire + * batch. + * + *

The patterns are deliberately specific so we do not misclassify unrelated errors that + * happen to contain {@code "not found"} (e.g. {@code "Column 'foo' not found in result set"} + * or {@code "SSL certificate not found"}). They cover every {@code EntityNotFoundException} + * factory message ({@code byId}, {@code byName}, {@code byFilter}, {@code byVersion}, + * {@code byParserSchema}) plus the legacy {@code CatalogExceptionMessage.entityNotFound} + * format and the relationship-not-found shape. + */ + public static boolean isStaleReferenceError(EntityError error) { + if (error == null || error.getMessage() == null) { + return false; + } + String message = error.getMessage().toLowerCase(java.util.Locale.ROOT); + return message.contains("instance for") + || message.contains("entity not found") + || message.contains("entity with id") + || message.contains("entity with name") + || message.contains("parser schema not found") + || message.contains("does not exist") + || message.contains("entitynotfoundexception") + || message.contains("expected relationship"); + } + + /** + * Splits {@code errors} into stale-relationship warnings (appended to {@code warningsOut}) and + * real failures (returned). Both lists must be mutable; {@code warningsOut} must be non-null. + */ + public static List partitionErrors( + List errors, List warningsOut) { + Objects.requireNonNull(warningsOut, "warningsOut must not be null"); + if (CommonUtil.nullOrEmpty(errors)) { + return new ArrayList<>(); + } + List realErrors = new ArrayList<>(errors.size()); + for (EntityError error : errors) { + if (isStaleReferenceError(error)) { + warningsOut.add(error); + } else { + realErrors.add(error); + } + } + return realErrors; + } + public static boolean isDataInsightIndex(String entityType) { return Entity.getSearchRepository().getDataInsightReports().contains(entityType); } @@ -173,71 +227,44 @@ public static List findReferenceInElasticSearchAcrossAllIndexes return entities; } - public static String escapeDoubleQuotes(String str) { - return str.replace("\"", "\\\""); - } - - /** - * Resolve the minimal field set the reindex path must request from {@code - * EntityRepository.setFields}. Time-series entities don't go through the entity-fields machinery, - * so they get an empty list. For everything else, ask the index class via {@link - * org.openmetadata.service.search.SearchIndexFactory#getReindexFieldsFor(String)} for exactly - * the fields its document needs, then intersect with the entity's {@code allowedFields} so we - * never request a field the JSON schema doesn't declare. Single source of truth shared by - * {@code EntityReader} (single-server pipeline) and {@code PartitionWorker} (distributed - * pipeline). - * - *

Why the allowedFields intersection matters. {@link - * org.openmetadata.service.search.indexes.SearchIndex#COMMON_REINDEX_FIELDS} is the union of - * relationship/enrichment fields that could appear on any entity ({@code owners}, - * {@code domains}, {@code reviewers}, {@code followers}, {@code votes}, {@code extension}, - * {@code certification}, {@code dataProducts}). Many entity schemas omit one or more of these: - * a {@code storageService} has no {@code reviewers}/{@code votes}/{@code extension}/{@code - * certification}; an {@code ingestionPipeline} has no {@code reviewers}/{@code dataProducts}; - * a {@code user}/{@code team} omits most of them. Without filtering, {@link - * org.openmetadata.service.Entity#getFields(String, java.util.List)} routes through {@link - * org.openmetadata.service.util.EntityUtil.Fields#Fields(java.util.Set, String)} which throws - * {@code IllegalArgumentException("Invalid field name ")} on the first unknown field, killing - * the whole batch. Filtering here keeps the helper safe to call for any registered entity type. - */ public static List getSearchIndexFields(String entityType) { if (TIME_SERIES_ENTITIES.contains(entityType)) { return List.of(); } - org.openmetadata.service.search.SearchRepository repo = Entity.getSearchRepository(); + org.openmetadata.service.search.SearchRepository repo = + org.openmetadata.service.Entity.getSearchRepository(); if (repo == null || repo.getSearchIndexFactory() == null) { - // Fallback for environments without a bootstrapped search subsystem (unit tests) — keep - // pre-selective-fields behaviour. + // Search subsystem isn't bootstrapped (e.g. unit tests that exercise the reader without the + // full Entity registry). Behaves the same as the pre-selective-fields code path. return List.of("*"); } - Set required = repo.getSearchIndexFactory().getReindexFieldsFor(entityType); - Set allowed = lookupAllowedFields(entityType); - if (allowed == null) { - return new ArrayList<>(required); - } - List filtered = new ArrayList<>(required.size()); - for (String field : required) { - if (allowed.contains(field)) { - filtered.add(field); - } else { - LOG.debug( - "Dropping reindex field '{}' for entityType '{}': not in allowedFields", - field, - entityType); - } + List allFields; + try { + allFields = new ArrayList<>(repo.getSearchIndexFactory().getReindexFieldsFor(entityType)); + } catch (Exception e) { + LOG.error( + "Failed to look up reindex fields for {}: {}; falling back to all-fields wildcard", + entityType, + e.getMessage()); + return List.of("*"); } - return filtered; - } - - /** Returns the entity's {@code allowedFields}, or {@code null} if the repository isn't - * registered (test boot, plugin not loaded). Callers fall back to the unfiltered set. */ - private static Set lookupAllowedFields(String entityType) { try { - EntityRepository repository = Entity.getEntityRepository(entityType); - return repository == null ? null : repository.getAllowedFieldsCopy(); + return new ArrayList<>(Entity.getOnlySupportedFields(entityType, allFields).getFieldList()); } catch (Exception e) { - LOG.debug("Failed to resolve allowedFields for {}: {}", entityType, e.getMessage()); - return null; + // Filtering failed (typically because the EntityRepository isn't registered yet — + // happens during boot or in tests). Fall back to the unfiltered required set rather than + // "*": this keeps the per-entity intent intact and lets PaginatedEntitiesSource surface + // any drift loudly instead of silently sending every field. + LOG.warn( + "Could not filter reindex fields for {} against EntityRepository.allowedFields ({}); " + + "returning unfiltered required set", + entityType, + e.getMessage()); + return allFields; } } + + public static String escapeDoubleQuotes(String str) { + return str.replace("\"", "\\\""); + } } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/EntityReaderLifecycleTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/EntityReaderLifecycleTest.java index 23a828caa6ce..eb9699104b69 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/EntityReaderLifecycleTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/EntityReaderLifecycleTest.java @@ -29,6 +29,7 @@ import org.openmetadata.service.util.RestUtil; import org.openmetadata.service.workflows.searchIndex.PaginatedEntitiesSource; import org.openmetadata.service.workflows.searchIndex.PaginatedEntityTimeSeriesSource; +import org.openmetadata.service.workflows.searchIndex.ReindexingUtil; class EntityReaderLifecycleTest { @@ -213,8 +214,8 @@ void readEntitySwallowsInterruptedCallbacksAndDeregistersReader() throws Excepti void helperMethodsRespectTimeSeriesAndMinimumReaderRules() { assertEquals( List.of(), - EntityReader.getSearchIndexFields(ReportData.ReportDataType.ENTITY_REPORT_DATA.value())); - assertEquals(List.of("*"), EntityReader.getSearchIndexFields("table")); + ReindexingUtil.getSearchIndexFields(ReportData.ReportDataType.ENTITY_REPORT_DATA.value())); + assertEquals(List.of("*"), ReindexingUtil.getSearchIndexFields("table")); assertEquals(1, EntityReader.calculateNumberOfReaders(10, 0)); assertEquals(3, EntityReader.calculateNumberOfReaders(11, 5)); } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/search/SearchIndexReindexFieldsParityTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/search/SearchIndexReindexFieldsParityTest.java index 98c5364b9e27..e917821ddf43 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/search/SearchIndexReindexFieldsParityTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/search/SearchIndexReindexFieldsParityTest.java @@ -163,6 +163,7 @@ void commonReindexFieldsMatchDocumentedSet() { org.junit.jupiter.api.Assertions.assertEquals( Set.of( "owners", + "tags", "domains", "reviewers", "followers", diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSourceStaleRelationshipTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSourceStaleRelationshipTest.java new file mode 100644 index 000000000000..107dc4fa83de --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSourceStaleRelationshipTest.java @@ -0,0 +1,204 @@ +/* + * Copyright 2026 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package org.openmetadata.service.workflows.searchIndex; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.openmetadata.schema.EntityTimeSeriesInterface; +import org.openmetadata.schema.system.EntityError; +import org.openmetadata.schema.tests.type.TestCaseResolutionStatus; +import org.openmetadata.schema.tests.type.TestCaseResolutionStatusTypes; +import org.openmetadata.schema.utils.ResultList; +import org.openmetadata.service.Entity; +import org.openmetadata.service.exception.SearchIndexException; +import org.openmetadata.service.jdbi3.EntityTimeSeriesRepository; +import org.openmetadata.service.jdbi3.ListFilter; +import org.openmetadata.service.search.SearchRepository; + +/** + * Validates that {@link PaginatedEntityTimeSeriesSource} treats stale-relationship errors raised by + * {@code EntityRepository.ensureSingleRelationship} (the message from {@code + * CatalogExceptionMessage.entityRelationshipNotFound}) as warnings rather than fatal failures. + * + *

This is the production scenario from issue #27417: orphaned {@code testCaseResolutionStatus} + * rows whose parentOf {@code entity_relationship} row is missing should not fail an entire reindex + * batch. + */ +class PaginatedEntityTimeSeriesSourceStaleRelationshipTest { + + private static final String ENTITY_TYPE = Entity.TEST_CASE_RESOLUTION_STATUS; + private static final int BATCH_SIZE = 5; + + private static final String STALE_RELATIONSHIP_MESSAGE = + "Entity type testCaseResolutionStatus 7c5c3c4d-3a82-4d8c-9c4a-3e2c9b9b0d5b " + + "does not have expected relationship parentOf to/from entity type testCase"; + + private static final String REAL_DB_ERROR_MESSAGE = + "JsonProcessingException: Unrecognized field 'foo' (class TestCaseResolutionStatus)"; + + @Test + void readClassifiesStaleRelationshipErrorsAsWarnings() throws Exception { + EntityTimeSeriesRepository repository = mockRepository(); + ResultList mockedResult = + resultWith( + List.of(makeRecord("ok-1"), makeRecord("ok-2")), + List.of(error("orphan-1", STALE_RELATIONSHIP_MESSAGE))); + + when(repository.listWithOffset(any(), any(ListFilter.class), anyInt(), anyBoolean())) + .thenReturn(mockedResult); + + try (MockedStatic entityMock = + mockStatic(Entity.class, org.mockito.Mockito.CALLS_REAL_METHODS)) { + stubEntityRepositoryLookups(entityMock, repository); + + PaginatedEntityTimeSeriesSource source = + new PaginatedEntityTimeSeriesSource(ENTITY_TYPE, BATCH_SIZE, List.of(), 3); + + ResultList result = source.readNext(null); + + assertNotNull(result); + assertEquals(2, result.getData().size()); + assertTrue( + result.getErrors().isEmpty(), + () -> "stale-relationship errors should be filtered out, got " + result.getErrors()); + assertEquals(1, result.getWarningsCount()); + assertEquals(2, source.getStats().getSuccessRecords()); + assertEquals(0, source.getStats().getFailedRecords()); + assertEquals(1, source.getStats().getWarningRecords()); + } + } + + @Test + void readKeepsRealErrorsAsFailuresEvenWhenWarningsArePresent() throws Exception { + EntityTimeSeriesRepository repository = mockRepository(); + ResultList mockedResult = + resultWith( + List.of(makeRecord("ok-1")), + List.of( + error("orphan-1", STALE_RELATIONSHIP_MESSAGE), + error("broken-1", REAL_DB_ERROR_MESSAGE), + error("orphan-2", STALE_RELATIONSHIP_MESSAGE))); + + when(repository.listWithOffset(any(), any(ListFilter.class), anyInt(), anyBoolean())) + .thenReturn(mockedResult); + + try (MockedStatic entityMock = + mockStatic(Entity.class, org.mockito.Mockito.CALLS_REAL_METHODS)) { + stubEntityRepositoryLookups(entityMock, repository); + + PaginatedEntityTimeSeriesSource source = + new PaginatedEntityTimeSeriesSource(ENTITY_TYPE, BATCH_SIZE, List.of(), 4); + + ResultList result = source.readNext(null); + + assertNotNull(result); + assertEquals(1, result.getData().size()); + assertEquals(1, result.getErrors().size()); + assertEquals("broken-1", result.getErrors().get(0).getEntity()); + assertEquals(2, result.getWarningsCount()); + assertEquals(1, source.getStats().getSuccessRecords()); + assertEquals(1, source.getStats().getFailedRecords()); + assertEquals(2, source.getStats().getWarningRecords()); + } + } + + @Test + void readWithCursorFiltersStaleRelationshipErrors() throws Exception { + EntityTimeSeriesRepository repository = mockRepository(); + ResultList mockedResult = + resultWith( + List.of(makeRecord("ok-1")), List.of(error("orphan-1", STALE_RELATIONSHIP_MESSAGE))); + + when(repository.listWithOffset(any(), any(ListFilter.class), anyInt(), anyBoolean())) + .thenReturn(mockedResult); + + try (MockedStatic entityMock = + mockStatic(Entity.class, org.mockito.Mockito.CALLS_REAL_METHODS)) { + stubEntityRepositoryLookups(entityMock, repository); + + PaginatedEntityTimeSeriesSource source = + new PaginatedEntityTimeSeriesSource(ENTITY_TYPE, BATCH_SIZE, List.of(), 2); + + ResultList result = source.readWithCursor("0"); + + assertNotNull(result); + assertEquals(1, result.getData().size()); + assertTrue(result.getErrors().isEmpty()); + assertEquals(1, result.getWarningsCount()); + assertEquals(1, source.getStats().getSuccessRecords()); + assertEquals(0, source.getStats().getFailedRecords()); + assertEquals(1, source.getStats().getWarningRecords()); + } + } + + @Test + void readPropagatesNonReaderExceptionsAsSearchIndexException() { + EntityTimeSeriesRepository repository = mockRepository(); + when(repository.listWithOffset(any(), any(ListFilter.class), anyInt(), anyBoolean())) + .thenThrow(new RuntimeException("connection refused")); + + try (MockedStatic entityMock = + mockStatic(Entity.class, org.mockito.Mockito.CALLS_REAL_METHODS)) { + stubEntityRepositoryLookups(entityMock, repository); + + PaginatedEntityTimeSeriesSource source = + new PaginatedEntityTimeSeriesSource(ENTITY_TYPE, BATCH_SIZE, List.of(), 1); + + org.junit.jupiter.api.Assertions.assertThrows( + SearchIndexException.class, () -> source.readNext(null)); + } + } + + @SuppressWarnings("unchecked") + private EntityTimeSeriesRepository mockRepository() { + return (EntityTimeSeriesRepository) + mock(EntityTimeSeriesRepository.class); + } + + private void stubEntityRepositoryLookups( + MockedStatic entityMock, + EntityTimeSeriesRepository repository) { + SearchRepository searchRepository = mock(SearchRepository.class); + when(searchRepository.getDataInsightReports()).thenReturn(List.of()); + entityMock.when(Entity::getSearchRepository).thenReturn(searchRepository); + entityMock + .when(() -> Entity.getEntityTimeSeriesRepository(ENTITY_TYPE)) + .thenReturn((EntityTimeSeriesRepository) repository); + } + + private static TestCaseResolutionStatus makeRecord(String name) { + return new TestCaseResolutionStatus() + .withId(UUID.randomUUID()) + .withTestCaseResolutionStatusType(TestCaseResolutionStatusTypes.New) + .withStateId(UUID.randomUUID()) + .withTimestamp(System.currentTimeMillis()); + } + + private static EntityError error(String entity, String message) { + return new EntityError().withEntity(entity).withMessage(message); + } + + private static ResultList resultWith( + List data, List errors) { + return new ResultList<>( + new ArrayList<>(data), new ArrayList<>(errors), null, null, data.size()); + } +} diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtilStaleRelationshipTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtilStaleRelationshipTest.java new file mode 100644 index 000000000000..a1a26a51e33c --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtilStaleRelationshipTest.java @@ -0,0 +1,129 @@ +/* + * Copyright 2026 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package org.openmetadata.service.workflows.searchIndex; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.Test; +import org.openmetadata.schema.system.EntityError; + +/** + * Validates the stale-relationship classification used by reindex readers. The matcher must + * recognise the {@code ensureSingleRelationship} message ("does not have expected relationship + * parentOf to/from entity type ...") that surfaces during indexing of orphaned time-series records + * (e.g. {@code testCaseResolutionStatus} rows whose parentOf row was lost in the 1.4.0 migration). + */ +class ReindexingUtilStaleRelationshipTest { + + private static final String RELATIONSHIP_NOT_FOUND_MESSAGE = + "Entity type testCaseResolutionStatus 7c5c3c4d-3a82-4d8c-9c4a-3e2c9b9b0d5b " + + "does not have expected relationship parentOf to/from entity type testCase"; + + private static final String ENTITY_NOT_FOUND_MESSAGE = + "EntityNotFoundException: Instance for testCase with id abc not found"; + + private static final String REAL_ERROR_MESSAGE = + "JsonProcessingException: Unexpected character at line 12"; + + @Test + void isStaleReferenceError_recognisesRelationshipNotFoundMessage() { + assertTrue( + ReindexingUtil.isStaleReferenceError( + new EntityError().withMessage(RELATIONSHIP_NOT_FOUND_MESSAGE))); + } + + @Test + void isStaleReferenceError_recognisesEntityNotFoundException() { + assertTrue( + ReindexingUtil.isStaleReferenceError( + new EntityError().withMessage(ENTITY_NOT_FOUND_MESSAGE))); + assertTrue( + ReindexingUtil.isStaleReferenceError( + new EntityError().withMessage("Instance for testCase with id ... "))); + assertTrue( + ReindexingUtil.isStaleReferenceError( + new EntityError().withMessage("Resource does not exist anymore"))); + assertTrue( + ReindexingUtil.isStaleReferenceError( + new EntityError().withMessage("Entity not found for query params [name=foo]."))); + } + + @Test + void isStaleReferenceError_recognisesEveryEntityNotFoundExceptionFactory() { + // Mirrors the exact message constants in EntityNotFoundException — every byX(...) factory + // must be classified as a stale-reference warning, not a real failure. + assertTrue( + ReindexingUtil.isStaleReferenceError( + new EntityError().withMessage("Entity with id [abc-123] not found."))); + assertTrue( + ReindexingUtil.isStaleReferenceError( + new EntityError().withMessage("Entity with name [my-table] not found."))); + assertTrue( + ReindexingUtil.isStaleReferenceError( + new EntityError() + .withMessage("Entity with id [abc-123] and version [0.2] not found."))); + assertTrue( + ReindexingUtil.isStaleReferenceError( + new EntityError() + .withMessage("Parser schema not found for entity with id [abc-123]."))); + } + + @Test + void isStaleReferenceError_doesNotMatchBareNotFoundOrUnrelatedMessages() { + assertFalse( + ReindexingUtil.isStaleReferenceError(new EntityError().withMessage(REAL_ERROR_MESSAGE))); + assertFalse( + ReindexingUtil.isStaleReferenceError( + new EntityError().withMessage("Database connection refused"))); + assertFalse( + ReindexingUtil.isStaleReferenceError( + new EntityError().withMessage("Column 'status' not found in result set"))); + assertFalse( + ReindexingUtil.isStaleReferenceError( + new EntityError().withMessage("SSL certificate not found"))); + assertFalse(ReindexingUtil.isStaleReferenceError(null)); + assertFalse(ReindexingUtil.isStaleReferenceError(new EntityError())); + } + + @Test + void partitionErrors_throwsOnNullWarningsOut() { + org.junit.jupiter.api.Assertions.assertThrows( + NullPointerException.class, + () -> ReindexingUtil.partitionErrors(List.of(new EntityError().withMessage("x")), null)); + } + + @Test + void partitionErrors_separatesStaleRelationshipsFromRealErrors() { + List errors = + List.of( + new EntityError().withMessage(RELATIONSHIP_NOT_FOUND_MESSAGE).withEntity("tcrs-1"), + new EntityError().withMessage(ENTITY_NOT_FOUND_MESSAGE).withEntity("tcrs-2"), + new EntityError().withMessage(REAL_ERROR_MESSAGE).withEntity("tcrs-3")); + + List warnings = new ArrayList<>(); + List realErrors = ReindexingUtil.partitionErrors(errors, warnings); + + assertEquals(2, warnings.size()); + assertEquals(1, realErrors.size()); + assertEquals("tcrs-3", realErrors.get(0).getEntity()); + } + + @Test + void partitionErrors_handlesEmptyAndNullInput() { + List warnings = new ArrayList<>(); + assertTrue(ReindexingUtil.partitionErrors(null, warnings).isEmpty()); + assertTrue(warnings.isEmpty()); + + assertTrue(ReindexingUtil.partitionErrors(List.of(), warnings).isEmpty()); + assertTrue(warnings.isEmpty()); + } +} diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtilTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtilTest.java index 5861a2590101..5d52eb06ff8f 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtilTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtilTest.java @@ -15,6 +15,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; @@ -37,6 +38,7 @@ import org.openmetadata.service.search.SearchClient; import org.openmetadata.service.search.SearchIndexFactory; import org.openmetadata.service.search.SearchRepository; +import org.openmetadata.service.util.EntityUtil; /** * Unit tests for {@link ReindexingUtil#getSearchIndexFields(String)}. The interesting @@ -148,8 +150,7 @@ void filteredFieldsAreSubsetOfEntityAllowedFields(String entityType, Class en when(searchRepository.getSearchIndexFactory()).thenReturn(new SearchIndexFactory()); Set declared = Entity.getEntityFields(entityClass); - EntityRepository repo = mock(EntityRepository.class); - when(repo.getAllowedFieldsCopy()).thenReturn(new HashSet<>(declared)); + EntityRepository repo = mockRepoWithAllowedFields(declared); List filtered; try (MockedStatic entityMock = @@ -260,12 +261,27 @@ private static Stream entityTypeAndClass() { } private List withAllowedFields(String entityType, Set allowed) { - EntityRepository repo = mock(EntityRepository.class); - when(repo.getAllowedFieldsCopy()).thenReturn(new HashSet<>(allowed)); + EntityRepository repo = mockRepoWithAllowedFields(allowed); try (MockedStatic entityMock = mockStatic(Entity.class, org.mockito.Mockito.CALLS_REAL_METHODS)) { entityMock.when(() -> Entity.getEntityRepository(eq(entityType))).thenReturn(repo); return ReindexingUtil.getSearchIndexFields(entityType); } } + + /** + * Build a mock EntityRepository whose {@code getOnlySupportedFields(...)} returns a real + * {@link EntityUtil.Fields} built against {@code allowed} with extras silently dropped — the + * same contract as the production method (see {@code EntityRepository#getOnlySupportedFields}). + * {@code ReindexingUtil.getSearchIndexFields} reaches into the repository through {@code + * Entity.getOnlySupportedFields(...)}, so this is the method that has to be stubbed. + */ + private static EntityRepository mockRepoWithAllowedFields(Set allowed) { + EntityRepository repo = mock(EntityRepository.class); + Set allowedCopy = new HashSet<>(allowed); + when(repo.getAllowedFieldsCopy()).thenReturn(allowedCopy); + when(repo.getOnlySupportedFields(anyString())) + .thenAnswer(inv -> new EntityUtil.Fields(allowedCopy, inv.getArgument(0), true)); + return repo; + } }