From 50f14067c7345cdd0704aebb43afa87b925fea11 Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Tue, 28 Apr 2026 23:15:26 +0530 Subject: [PATCH 01/18] fix(search): treat stale parentOf as warning during time-series reindex (#27417) Time-series records (testCaseResolutionStatus, testCaseResult, ...) whose parentOf entity_relationship row is missing surface as "does not have expected relationship parentOf to/from entity type ..." from EntityRepository.ensureSingleRelationship and were failing the entire reindex batch. Mirror the warning-vs-failure split already used for EntityInterface sources: extract isEntityNotFoundError into ReindexingUtil, broaden it to match the relationship-not-found message, and apply the same partitioning to PaginatedEntityTimeSeriesSource.read/readWithCursor/ readNextKeyset so orphaned rows are counted as warnings via result.getWarningsCount() instead of failing the job. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../searchIndex/PaginatedEntitiesSource.java | 12 +- .../PaginatedEntityTimeSeriesSource.java | 64 +++++- .../workflows/searchIndex/ReindexingUtil.java | 35 +++ ...TimeSeriesSourceStaleRelationshipTest.java | 201 ++++++++++++++++++ .../ReindexingUtilStaleRelationshipTest.java | 96 +++++++++ 5 files changed, 391 insertions(+), 17 deletions(-) create mode 100644 openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSourceStaleRelationshipTest.java create mode 100644 openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtilStaleRelationshipTest.java diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java index f59a1e37d121..e0aa9bc764da 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java @@ -15,6 +15,7 @@ import static org.openmetadata.schema.system.IndexingError.ErrorSource.READER; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats; +import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isEntityNotFoundError; import java.util.ArrayList; import java.util.List; @@ -378,15 +379,4 @@ public void updateStats(int currentSuccess, int currentFailed) { public void updateStats(int currentSuccess, int currentFailed, int currentWarnings) { getUpdatedStats(stats, currentSuccess, currentFailed, currentWarnings); } - - private boolean isEntityNotFoundError(EntityError error) { - if (error == null || error.getMessage() == null) { - return false; - } - String message = error.getMessage().toLowerCase(); - return message.contains("not found") - || message.contains("instance for") - || message.contains("does not exist") - || message.contains("entitynotfoundexception"); - } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSource.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSource.java index afc3e0cd2f30..32402a0abefd 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSource.java @@ -2,6 +2,7 @@ import static org.openmetadata.schema.system.IndexingError.ErrorSource.READER; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats; +import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.partitionErrors; import java.util.ArrayList; import java.util.List; @@ -11,6 +12,7 @@ import lombok.extern.slf4j.Slf4j; import org.glassfish.jersey.internal.util.ExceptionUtils; import org.openmetadata.schema.EntityTimeSeriesInterface; +import org.openmetadata.schema.system.EntityError; import org.openmetadata.schema.system.IndexingError; import org.openmetadata.schema.system.StepStats; import org.openmetadata.schema.type.Include; @@ -117,9 +119,13 @@ public ResultList readWithCursor(String cur } else { result = repository.listWithOffset(currentCursor, filter, batchSize, true); } + int warningsCount = filterStaleRelationshipErrors(result); LOG.debug( - "[PaginatedEntitiesSource] Batch Stats :- %n Submitted : {} Success: {} Failed: {}", - batchSize, result.getData().size(), result.getErrors().size()); + "[PaginatedEntityTimeSeriesSource] Batch Stats :- Submitted: {} Success: {} Failed: {} Warnings: {}", + batchSize, + result.getData().size(), + result.getErrors().size(), + warningsCount); } catch (Exception e) { IndexingError indexingError = new IndexingError() @@ -149,18 +155,29 @@ private ResultList read(String cursor) result = repository.listWithOffset(cursor, filter, batchSize, true); } + int warningsCount = filterStaleRelationshipErrors(result); + if (!result.getErrors().isEmpty()) { + LOG.warn( + "[PaginatedEntityTimeSeriesSource] Real errors found: {}", result.getErrors().size()); + result.getErrors().forEach(error -> LOG.warn("Error: {}", error.getMessage())); lastFailedCursor = this.cursor.get(); if (result.getPaging().getAfter() == null) { + this.cursor.set(null); isDone.set(true); } else { this.cursor.set(result.getPaging().getAfter()); } + updateStats(result.getData().size(), result.getErrors().size(), warningsCount); return result; } LOG.debug( - "[PaginatedEntitiesSource] Batch Stats :- %n Submitted : {} Success: {} Failed: {}", - batchSize, result.getData().size(), result.getErrors().size()); + "[PaginatedEntityTimeSeriesSource] Batch Stats :- Submitted: {} Success: {} Failed: {} Warnings: {}", + batchSize, + result.getData().size(), + result.getErrors().size(), + warningsCount); + updateStats(result.getData().size(), 0, warningsCount); } catch (Exception e) { lastFailedCursor = this.cursor.get(); int remainingRecords = @@ -214,11 +231,13 @@ public ResultList readNextKeyset(String key cachedTotal, true); + int warningsCount = filterStaleRelationshipErrors(result); LOG.debug( - "[PaginatedEntityTimeSeriesSource] Keyset batch stats — Submitted: {} Success: {} Failed: {}", + "[PaginatedEntityTimeSeriesSource] Keyset batch stats — Submitted: {} Success: {} Failed: {} Warnings: {}", batchSize, result.getData().size(), - result.getErrors() != null ? result.getErrors().size() : 0); + result.getErrors() != null ? result.getErrors().size() : 0, + warningsCount); return result; } catch (Exception e) { LOG.error( @@ -269,6 +288,39 @@ public void updateStats(int currentSuccess, int currentFailed) { getUpdatedStats(stats, currentSuccess, currentFailed); } + public void updateStats(int currentSuccess, int currentFailed, int currentWarnings) { + getUpdatedStats(stats, currentSuccess, currentFailed, currentWarnings); + } + + /** + * Splits the errors on {@code result} into real failures and stale-relationship warnings, + * mutating {@code result} so its errors list contains only real failures and its warnings count + * reflects the skipped stale relationships. Returns the warnings count for callers that want to + * include it in their own logging or stats updates. + * + *

Stale relationships happen for time-series records (testCaseResolutionStatus, + * testCaseResult, ...) whose parent entity was hard-deleted out-of-band, or whose parentOf + * entity_relationship row was lost during a past migration. Such records cannot be indexed but + * should not fail the entire batch. + */ + private int filterStaleRelationshipErrors( + ResultList result) { + if (result == null || result.getErrors() == null || result.getErrors().isEmpty()) { + return 0; + } + List warnings = new ArrayList<>(); + List realErrors = partitionErrors(result.getErrors(), warnings); + if (!warnings.isEmpty()) { + LOG.debug( + "[PaginatedEntityTimeSeriesSource] {} stale-relationship warnings for entity type {}", + warnings.size(), + entityType); + } + result.setErrors(realErrors); + result.setWarningsCount(warnings.size()); + return warnings.size(); + } + public ListFilter getFilter() { ListFilter filter = new ListFilter(Include.ALL); if (ReindexingUtil.isDataInsightIndex(entityType)) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java index 7ba373880dae..b1a41e8a5a12 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java @@ -65,6 +65,41 @@ public static void getUpdatedStats( (stats.getWarningRecords() != null ? stats.getWarningRecords() : 0) + currentWarnings); } + /** + * Returns true when an EntityError represents a stale relationship to a missing entity. These + * are expected during reindexing — for example when a time-series record (testCaseResolutionStatus, + * testCaseResult) was migrated without a corresponding entity_relationship row, or when an entity + * was hard-deleted out-of-band leaving its relationship rows behind. Such records cannot be + * meaningfully indexed and are reported as warnings rather than failing the entire batch. + */ + public static boolean isEntityNotFoundError(EntityError error) { + if (error == null || error.getMessage() == null) { + return false; + } + String message = error.getMessage().toLowerCase(); + return message.contains("not found") + || message.contains("instance for") + || message.contains("does not exist") + || message.contains("entitynotfoundexception") + || message.contains("expected relationship"); + } + + public static List partitionErrors( + List errors, List warningsOut) { + if (CommonUtil.nullOrEmpty(errors)) { + return new ArrayList<>(); + } + List realErrors = new ArrayList<>(errors.size()); + for (EntityError error : errors) { + if (isEntityNotFoundError(error)) { + warningsOut.add(error); + } else { + realErrors.add(error); + } + } + return realErrors; + } + public static boolean isDataInsightIndex(String entityType) { return Entity.getSearchRepository().getDataInsightReports().contains(entityType); } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSourceStaleRelationshipTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSourceStaleRelationshipTest.java new file mode 100644 index 000000000000..0381d664b411 --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSourceStaleRelationshipTest.java @@ -0,0 +1,201 @@ +/* + * Copyright 2026 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package org.openmetadata.service.workflows.searchIndex; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.openmetadata.schema.EntityTimeSeriesInterface; +import org.openmetadata.schema.system.EntityError; +import org.openmetadata.schema.tests.type.TestCaseResolutionStatus; +import org.openmetadata.schema.tests.type.TestCaseResolutionStatusTypes; +import org.openmetadata.schema.utils.ResultList; +import org.openmetadata.service.Entity; +import org.openmetadata.service.exception.SearchIndexException; +import org.openmetadata.service.jdbi3.EntityTimeSeriesRepository; +import org.openmetadata.service.jdbi3.ListFilter; +import org.openmetadata.service.search.SearchRepository; + +/** + * Validates that {@link PaginatedEntityTimeSeriesSource} treats stale-relationship errors raised by + * {@code EntityRepository.ensureSingleRelationship} (the message from {@code + * CatalogExceptionMessage.entityRelationshipNotFound}) as warnings rather than fatal failures. + * + *

This is the production scenario from issue #27417: orphaned {@code testCaseResolutionStatus} + * rows whose parentOf {@code entity_relationship} row is missing should not fail an entire reindex + * batch. + */ +class PaginatedEntityTimeSeriesSourceStaleRelationshipTest { + + private static final String ENTITY_TYPE = Entity.TEST_CASE_RESOLUTION_STATUS; + private static final int BATCH_SIZE = 5; + + private static final String STALE_RELATIONSHIP_MESSAGE = + "Entity type testCaseResolutionStatus 7c5c3c4d-3a82-4d8c-9c4a-3e2c9b9b0d5b " + + "does not have expected relationship parentOf to/from entity type testCase"; + + private static final String REAL_DB_ERROR_MESSAGE = + "JsonProcessingException: Unrecognized field 'foo' (class TestCaseResolutionStatus)"; + + @Test + void readClassifiesStaleRelationshipErrorsAsWarnings() throws Exception { + EntityTimeSeriesRepository repository = mockRepository(); + ResultList mockedResult = + resultWith( + List.of(makeRecord("ok-1"), makeRecord("ok-2")), + List.of(error("orphan-1", STALE_RELATIONSHIP_MESSAGE))); + + when(repository.listWithOffset(any(), any(ListFilter.class), anyInt(), anyBoolean())) + .thenReturn(mockedResult); + + try (MockedStatic entityMock = + mockStatic(Entity.class, org.mockito.Mockito.CALLS_REAL_METHODS)) { + stubEntityRepositoryLookups(entityMock, repository); + + PaginatedEntityTimeSeriesSource source = + new PaginatedEntityTimeSeriesSource(ENTITY_TYPE, BATCH_SIZE, List.of(), 3); + + ResultList result = source.readNext(null); + + assertNotNull(result); + assertEquals(2, result.getData().size()); + assertTrue( + result.getErrors().isEmpty(), + () -> "stale-relationship errors should be filtered out, got " + result.getErrors()); + assertEquals(1, result.getWarningsCount()); + assertEquals(2, source.getStats().getSuccessRecords()); + assertEquals(0, source.getStats().getFailedRecords()); + assertEquals(1, source.getStats().getWarningRecords()); + } + } + + @Test + void readKeepsRealErrorsAsFailuresEvenWhenWarningsArePresent() throws Exception { + EntityTimeSeriesRepository repository = mockRepository(); + ResultList mockedResult = + resultWith( + List.of(makeRecord("ok-1")), + List.of( + error("orphan-1", STALE_RELATIONSHIP_MESSAGE), + error("broken-1", REAL_DB_ERROR_MESSAGE), + error("orphan-2", STALE_RELATIONSHIP_MESSAGE))); + + when(repository.listWithOffset(any(), any(ListFilter.class), anyInt(), anyBoolean())) + .thenReturn(mockedResult); + + try (MockedStatic entityMock = + mockStatic(Entity.class, org.mockito.Mockito.CALLS_REAL_METHODS)) { + stubEntityRepositoryLookups(entityMock, repository); + + PaginatedEntityTimeSeriesSource source = + new PaginatedEntityTimeSeriesSource(ENTITY_TYPE, BATCH_SIZE, List.of(), 4); + + ResultList result = source.readNext(null); + + assertNotNull(result); + assertEquals(1, result.getData().size()); + assertEquals(1, result.getErrors().size()); + assertEquals("broken-1", result.getErrors().get(0).getEntity()); + assertEquals(2, result.getWarningsCount()); + assertEquals(1, source.getStats().getSuccessRecords()); + assertEquals(1, source.getStats().getFailedRecords()); + assertEquals(2, source.getStats().getWarningRecords()); + } + } + + @Test + void readWithCursorFiltersStaleRelationshipErrors() throws Exception { + EntityTimeSeriesRepository repository = mockRepository(); + ResultList mockedResult = + resultWith( + List.of(makeRecord("ok-1")), List.of(error("orphan-1", STALE_RELATIONSHIP_MESSAGE))); + + when(repository.listWithOffset(any(), any(ListFilter.class), anyInt(), anyBoolean())) + .thenReturn(mockedResult); + + try (MockedStatic entityMock = + mockStatic(Entity.class, org.mockito.Mockito.CALLS_REAL_METHODS)) { + stubEntityRepositoryLookups(entityMock, repository); + + PaginatedEntityTimeSeriesSource source = + new PaginatedEntityTimeSeriesSource(ENTITY_TYPE, BATCH_SIZE, List.of(), 2); + + ResultList result = source.readWithCursor("0"); + + assertNotNull(result); + assertEquals(1, result.getData().size()); + assertTrue(result.getErrors().isEmpty()); + assertEquals(1, result.getWarningsCount()); + } + } + + @Test + void readPropagatesNonReaderExceptionsAsSearchIndexException() { + EntityTimeSeriesRepository repository = mockRepository(); + when(repository.listWithOffset(any(), any(ListFilter.class), anyInt(), anyBoolean())) + .thenThrow(new RuntimeException("connection refused")); + + try (MockedStatic entityMock = + mockStatic(Entity.class, org.mockito.Mockito.CALLS_REAL_METHODS)) { + stubEntityRepositoryLookups(entityMock, repository); + + PaginatedEntityTimeSeriesSource source = + new PaginatedEntityTimeSeriesSource(ENTITY_TYPE, BATCH_SIZE, List.of(), 1); + + org.junit.jupiter.api.Assertions.assertThrows( + SearchIndexException.class, () -> source.readNext(null)); + } + } + + @SuppressWarnings("unchecked") + private EntityTimeSeriesRepository mockRepository() { + return (EntityTimeSeriesRepository) + mock(EntityTimeSeriesRepository.class); + } + + private void stubEntityRepositoryLookups( + MockedStatic entityMock, + EntityTimeSeriesRepository repository) { + SearchRepository searchRepository = mock(SearchRepository.class); + when(searchRepository.getDataInsightReports()).thenReturn(List.of()); + entityMock.when(Entity::getSearchRepository).thenReturn(searchRepository); + entityMock + .when(() -> Entity.getEntityTimeSeriesRepository(ENTITY_TYPE)) + .thenReturn((EntityTimeSeriesRepository) repository); + } + + private static TestCaseResolutionStatus makeRecord(String name) { + return new TestCaseResolutionStatus() + .withId(UUID.randomUUID()) + .withTestCaseResolutionStatusType(TestCaseResolutionStatusTypes.New) + .withStateId(UUID.randomUUID()) + .withTimestamp(System.currentTimeMillis()); + } + + private static EntityError error(String entity, String message) { + return new EntityError().withEntity(entity).withMessage(message); + } + + private static ResultList resultWith( + List data, List errors) { + return new ResultList<>( + new ArrayList<>(data), new ArrayList<>(errors), null, null, data.size()); + } +} diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtilStaleRelationshipTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtilStaleRelationshipTest.java new file mode 100644 index 000000000000..12d3b044ec39 --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtilStaleRelationshipTest.java @@ -0,0 +1,96 @@ +/* + * Copyright 2026 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package org.openmetadata.service.workflows.searchIndex; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.Test; +import org.openmetadata.schema.system.EntityError; + +/** + * Validates the stale-relationship classification used by reindex readers. The matcher must + * recognise the {@code ensureSingleRelationship} message ("does not have expected relationship + * parentOf to/from entity type ...") that surfaces during indexing of orphaned time-series records + * (e.g. {@code testCaseResolutionStatus} rows whose parentOf row was lost in the 1.4.0 migration). + */ +class ReindexingUtilStaleRelationshipTest { + + private static final String RELATIONSHIP_NOT_FOUND_MESSAGE = + "Entity type testCaseResolutionStatus 7c5c3c4d-3a82-4d8c-9c4a-3e2c9b9b0d5b " + + "does not have expected relationship parentOf to/from entity type testCase"; + + private static final String ENTITY_NOT_FOUND_MESSAGE = + "EntityNotFoundException: Instance for testCase with id abc not found"; + + private static final String REAL_ERROR_MESSAGE = + "JsonProcessingException: Unexpected character at line 12"; + + @Test + void isEntityNotFoundError_recognisesRelationshipNotFoundMessage() { + assertTrue( + ReindexingUtil.isEntityNotFoundError( + new EntityError().withMessage(RELATIONSHIP_NOT_FOUND_MESSAGE))); + } + + @Test + void isEntityNotFoundError_recognisesEntityNotFoundException() { + assertTrue( + ReindexingUtil.isEntityNotFoundError( + new EntityError().withMessage(ENTITY_NOT_FOUND_MESSAGE))); + assertTrue( + ReindexingUtil.isEntityNotFoundError( + new EntityError().withMessage("Table 'foo' not found in database 'bar'"))); + assertTrue( + ReindexingUtil.isEntityNotFoundError( + new EntityError().withMessage("Instance for testCase with id ... "))); + assertTrue( + ReindexingUtil.isEntityNotFoundError( + new EntityError().withMessage("Resource does not exist anymore"))); + } + + @Test + void isEntityNotFoundError_doesNotMatchUnrelatedMessages() { + assertFalse( + ReindexingUtil.isEntityNotFoundError(new EntityError().withMessage(REAL_ERROR_MESSAGE))); + assertFalse( + ReindexingUtil.isEntityNotFoundError( + new EntityError().withMessage("Database connection refused"))); + assertFalse(ReindexingUtil.isEntityNotFoundError(null)); + assertFalse(ReindexingUtil.isEntityNotFoundError(new EntityError())); + } + + @Test + void partitionErrors_separatesStaleRelationshipsFromRealErrors() { + List errors = + List.of( + new EntityError().withMessage(RELATIONSHIP_NOT_FOUND_MESSAGE).withEntity("tcrs-1"), + new EntityError().withMessage(ENTITY_NOT_FOUND_MESSAGE).withEntity("tcrs-2"), + new EntityError().withMessage(REAL_ERROR_MESSAGE).withEntity("tcrs-3")); + + List warnings = new ArrayList<>(); + List realErrors = ReindexingUtil.partitionErrors(errors, warnings); + + assertEquals(2, warnings.size()); + assertEquals(1, realErrors.size()); + assertEquals("tcrs-3", realErrors.get(0).getEntity()); + } + + @Test + void partitionErrors_handlesEmptyAndNullInput() { + List warnings = new ArrayList<>(); + assertTrue(ReindexingUtil.partitionErrors(null, warnings).isEmpty()); + assertTrue(warnings.isEmpty()); + + assertTrue(ReindexingUtil.partitionErrors(List.of(), warnings).isEmpty()); + assertTrue(warnings.isEmpty()); + } +} From 91bbdc168f16b8de2d9a28d29353324d50e68f55 Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Tue, 28 Apr 2026 23:25:43 +0530 Subject: [PATCH 02/18] review: tighten matcher, propagate warnings on cursor/keyset paths Address PR #27800 review: - Drop bare "not found" from isEntityNotFoundError; add "entity not found" so we still match EntityNotFoundException's "Entity not found:" form but no longer misclassify "Column 'foo' not found in result set" or "SSL certificate not found" as warnings. - Call updateStats(success, failed, warnings) in readWithCursor and readNextKeyset so the source's StepStats.warningRecords reflects warnings for cursor- and keyset-based reads (was already correct in read()). - partitionErrors: requireNonNull(warningsOut) and document the contract. - Tests: cover the new bare-"not found" exclusion, the "Entity not found:" inclusion, the readWithCursor stats propagation, and the null-warningsOut guard. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../PaginatedEntityTimeSeriesSource.java | 5 +++- .../workflows/searchIndex/ReindexingUtil.java | 23 ++++++++++++++----- ...TimeSeriesSourceStaleRelationshipTest.java | 3 +++ .../ReindexingUtilStaleRelationshipTest.java | 21 +++++++++++++---- 4 files changed, 41 insertions(+), 11 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSource.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSource.java index 32402a0abefd..ee823df1464b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSource.java @@ -126,6 +126,7 @@ public ResultList readWithCursor(String cur result.getData().size(), result.getErrors().size(), warningsCount); + updateStats(result.getData().size(), result.getErrors().size(), warningsCount); } catch (Exception e) { IndexingError indexingError = new IndexingError() @@ -232,12 +233,14 @@ public ResultList readNextKeyset(String key true); int warningsCount = filterStaleRelationshipErrors(result); + int failedCount = result.getErrors() != null ? result.getErrors().size() : 0; LOG.debug( "[PaginatedEntityTimeSeriesSource] Keyset batch stats — Submitted: {} Success: {} Failed: {} Warnings: {}", batchSize, result.getData().size(), - result.getErrors() != null ? result.getErrors().size() : 0, + failedCount, warningsCount); + updateStats(result.getData().size(), failedCount, warningsCount); return result; } catch (Exception e) { LOG.error( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java index b1a41e8a5a12..77fe97480ec7 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java @@ -67,25 +67,36 @@ public static void getUpdatedStats( /** * Returns true when an EntityError represents a stale relationship to a missing entity. These - * are expected during reindexing — for example when a time-series record (testCaseResolutionStatus, - * testCaseResult) was migrated without a corresponding entity_relationship row, or when an entity - * was hard-deleted out-of-band leaving its relationship rows behind. Such records cannot be - * meaningfully indexed and are reported as warnings rather than failing the entire batch. + * are expected during reindexing — for example when a time-series record + * (testCaseResolutionStatus, testCaseResult) was migrated without a corresponding + * entity_relationship row, or when an entity was hard-deleted out-of-band leaving its + * relationship rows behind. Such records cannot be meaningfully indexed and are reported as + * warnings rather than failing the entire batch. + * + *

The patterns match the canonical message shapes from {@code CatalogExceptionMessage} and + * {@code EntityNotFoundException} — bare {@code "not found"} is intentionally NOT matched + * because it would misclassify unrelated errors like {@code "Column 'foo' not found in result + * set"} or {@code "SSL certificate not found"}. */ public static boolean isEntityNotFoundError(EntityError error) { if (error == null || error.getMessage() == null) { return false; } String message = error.getMessage().toLowerCase(); - return message.contains("not found") - || message.contains("instance for") + return message.contains("instance for") + || message.contains("entity not found") || message.contains("does not exist") || message.contains("entitynotfoundexception") || message.contains("expected relationship"); } + /** + * Splits {@code errors} into stale-relationship warnings (appended to {@code warningsOut}) and + * real failures (returned). Both lists must be mutable; {@code warningsOut} must be non-null. + */ public static List partitionErrors( List errors, List warningsOut) { + java.util.Objects.requireNonNull(warningsOut, "warningsOut must not be null"); if (CommonUtil.nullOrEmpty(errors)) { return new ArrayList<>(); } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSourceStaleRelationshipTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSourceStaleRelationshipTest.java index 0381d664b411..107dc4fa83de 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSourceStaleRelationshipTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSourceStaleRelationshipTest.java @@ -143,6 +143,9 @@ void readWithCursorFiltersStaleRelationshipErrors() throws Exception { assertEquals(1, result.getData().size()); assertTrue(result.getErrors().isEmpty()); assertEquals(1, result.getWarningsCount()); + assertEquals(1, source.getStats().getSuccessRecords()); + assertEquals(0, source.getStats().getFailedRecords()); + assertEquals(1, source.getStats().getWarningRecords()); } } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtilStaleRelationshipTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtilStaleRelationshipTest.java index 12d3b044ec39..175946d28983 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtilStaleRelationshipTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtilStaleRelationshipTest.java @@ -46,28 +46,41 @@ void isEntityNotFoundError_recognisesEntityNotFoundException() { assertTrue( ReindexingUtil.isEntityNotFoundError( new EntityError().withMessage(ENTITY_NOT_FOUND_MESSAGE))); - assertTrue( - ReindexingUtil.isEntityNotFoundError( - new EntityError().withMessage("Table 'foo' not found in database 'bar'"))); assertTrue( ReindexingUtil.isEntityNotFoundError( new EntityError().withMessage("Instance for testCase with id ... "))); assertTrue( ReindexingUtil.isEntityNotFoundError( new EntityError().withMessage("Resource does not exist anymore"))); + assertTrue( + ReindexingUtil.isEntityNotFoundError( + new EntityError().withMessage("Entity not found: testCase abc-123"))); } @Test - void isEntityNotFoundError_doesNotMatchUnrelatedMessages() { + void isEntityNotFoundError_doesNotMatchBareNotFoundOrUnrelatedMessages() { assertFalse( ReindexingUtil.isEntityNotFoundError(new EntityError().withMessage(REAL_ERROR_MESSAGE))); assertFalse( ReindexingUtil.isEntityNotFoundError( new EntityError().withMessage("Database connection refused"))); + assertFalse( + ReindexingUtil.isEntityNotFoundError( + new EntityError().withMessage("Column 'status' not found in result set"))); + assertFalse( + ReindexingUtil.isEntityNotFoundError( + new EntityError().withMessage("SSL certificate not found"))); assertFalse(ReindexingUtil.isEntityNotFoundError(null)); assertFalse(ReindexingUtil.isEntityNotFoundError(new EntityError())); } + @Test + void partitionErrors_throwsOnNullWarningsOut() { + org.junit.jupiter.api.Assertions.assertThrows( + NullPointerException.class, + () -> ReindexingUtil.partitionErrors(List.of(new EntityError().withMessage("x")), null)); + } + @Test void partitionErrors_separatesStaleRelationshipsFromRealErrors() { List errors = From 007670f34a7f30e3589758944da9abbb1734136c Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Mon, 4 May 2026 11:28:12 +0530 Subject: [PATCH 03/18] Show warn for failing test case resolution --- .../distributed/PartitionWorker.java | 29 +++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java index e209337788b2..5f962215cb06 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java @@ -491,20 +491,31 @@ private BatchResult processBatch( ResultList resultList = readEntitiesKeyset(entityType, keysetCursor, batchSize); long t1 = System.currentTimeMillis(); - if (resultList == null || resultList.getData() == null || resultList.getData().isEmpty()) { - LOG.debug("{} read={}ms returned empty", entityType, t1 - t0); - return new BatchResult(0, 0, 0, null); - } - - String nextCursor = resultList.getPaging() != null ? resultList.getPaging().getAfter() : null; - int readSuccessCount = listOrEmpty(resultList.getData()).size(); - int readErrorCount = listOrEmpty(resultList.getErrors()).size(); - int warningsCount = resultList.getWarningsCount() != null ? resultList.getWarningsCount() : 0; + int readSuccessCount = resultList != null ? listOrEmpty(resultList.getData()).size() : 0; + int readErrorCount = resultList != null ? listOrEmpty(resultList.getErrors()).size() : 0; + int warningsCount = + (resultList != null && resultList.getWarningsCount() != null) + ? resultList.getWarningsCount() + : 0; + String nextCursor = + (resultList != null && resultList.getPaging() != null) + ? resultList.getPaging().getAfter() + : null; if (statsTracker != null) { statsTracker.recordReaderBatch(readSuccessCount, readErrorCount, warningsCount); } + if (readSuccessCount == 0) { + LOG.debug( + "{} read={}ms returned no indexable rows (warnings={}, errors={})", + entityType, + t1 - t0, + warningsCount, + readErrorCount); + return new BatchResult(0, readErrorCount, warningsCount, nextCursor); + } + if (failureRecorder != null && readErrorCount > 0) { for (EntityError entityError : listOrEmpty(resultList.getErrors())) { Object rawEntity = entityError.getEntity(); From 23578b09f48b278f228a9954e00b20b70c3b505c Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Mon, 4 May 2026 11:31:31 +0530 Subject: [PATCH 04/18] Always recreate --- .../apps/bundles/searchIndex/ReindexingConfiguration.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ReindexingConfiguration.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ReindexingConfiguration.java index 2426e63c3672..a2cd19b8b94a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ReindexingConfiguration.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ReindexingConfiguration.java @@ -128,7 +128,8 @@ public static ReindexingConfiguration from(EventPublisherJob jobData) { DEFAULT_FIELD_FETCH_THREADS, DEFAULT_DOC_BUILD_THREADS, DEFAULT_STATS_INTERVAL_MS, - Boolean.TRUE.equals(jobData.getRecreateIndex()), + // Always recreate + true, Boolean.TRUE.equals(jobData.getAutoTune()), Boolean.TRUE.equals(jobData.getUseDistributedIndexing()), Boolean.TRUE.equals(jobData.getForce()), From 856fd2da8251e9a0af54df93d27ddc1cc4810522 Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Mon, 4 May 2026 11:51:33 +0530 Subject: [PATCH 05/18] review: rename matcher + record reader failures before early-return MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address two Copilot comments on PR #27800: 1. PartitionWorker.processBatch was returning early when readSuccessCount==0 without invoking failureRecorder.recordReaderEntityFailure for the per-row errors in that batch — losing entity-level failure diagnostics for "all-error" batches. Extract the failure-recording into a helper and call it before the early-return so it runs whether or not the batch contained any successful rows. 2. Rename ReindexingUtil.isEntityNotFoundError -> isStaleReferenceError to reflect that the matcher now also catches the relationship-not-found message ("does not have expected relationship ...") raised by EntityRepository.ensureSingleRelationship, not just plain entity-not-found. Update PaginatedEntitiesSource and ReindexingUtilStaleRelationshipTest call sites. EntityRepository has its own private isEntityNotFoundError helper that is unrelated and unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../distributed/PartitionWorker.java | 60 +++++++++++-------- .../searchIndex/PaginatedEntitiesSource.java | 8 +-- .../workflows/searchIndex/ReindexingUtil.java | 18 +++--- .../ReindexingUtilStaleRelationshipTest.java | 28 ++++----- 4 files changed, 64 insertions(+), 50 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java index ae0c67f976f6..990613425269 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java @@ -509,6 +509,8 @@ private BatchResult processBatch( readSuccessCount, readErrorCount, warningsCount, readDurationNanos); } + recordReaderFailures(entityType, resultList, readErrorCount); + if (readSuccessCount == 0) { LOG.debug( "{} read={}ms returned no indexable rows (warnings={}, errors={})", @@ -519,30 +521,6 @@ private BatchResult processBatch( return new BatchResult(0, readErrorCount, warningsCount, nextCursor); } - if (failureRecorder != null && readErrorCount > 0) { - for (EntityError entityError : listOrEmpty(resultList.getErrors())) { - Object rawEntity = entityError.getEntity(); - String entityId = null; - if (rawEntity instanceof EntityInterface) { - UUID id = ((EntityInterface) rawEntity).getId(); - if (id != null) { - entityId = id.toString(); - } - } else if (rawEntity != null) { - entityId = rawEntity.toString(); - } - if (entityId == null) { - LOG.warn( - "Skipping reader failure record for entityType={}: entityId is null, message={}", - entityType, - entityError.getMessage()); - continue; - } - failureRecorder.recordReaderEntityFailure( - entityType, entityId, null, entityError.getMessage()); - } - } - Map contextData = createContextData(entityType, statsTracker); long readMs = readDurationNanos / 1_000_000L; @@ -568,6 +546,40 @@ private BatchResult processBatch( } } + /** + * Persist per-entity reader failures so that downstream tooling (e.g. the failures dashboard) + * can show which specific records the reader could not hydrate. Runs whether or not the batch + * has any successful rows — losing failure diagnostics for "all-error" batches would defeat + * the point of the recorder. + */ + private void recordReaderFailures( + String entityType, ResultList resultList, int readErrorCount) { + if (failureRecorder == null || readErrorCount == 0 || resultList == null) { + return; + } + for (EntityError entityError : listOrEmpty(resultList.getErrors())) { + Object rawEntity = entityError.getEntity(); + String entityId = null; + if (rawEntity instanceof EntityInterface) { + UUID id = ((EntityInterface) rawEntity).getId(); + if (id != null) { + entityId = id.toString(); + } + } else if (rawEntity != null) { + entityId = rawEntity.toString(); + } + if (entityId == null) { + LOG.warn( + "Skipping reader failure record for entityType={}: entityId is null, message={}", + entityType, + entityError.getMessage()); + continue; + } + failureRecorder.recordReaderEntityFailure( + entityType, entityId, null, entityError.getMessage()); + } + } + /** * Read entities from the database. * diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java index e0aa9bc764da..62103d33ebc8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java @@ -15,7 +15,7 @@ import static org.openmetadata.schema.system.IndexingError.ErrorSource.READER; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats; -import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isEntityNotFoundError; +import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isStaleReferenceError; import java.util.ArrayList; import java.util.List; @@ -151,7 +151,7 @@ private ResultList read(String cursor) throws SearchI List warningErrors = new ArrayList<>(); for (EntityError error : result.getErrors()) { - if (isEntityNotFoundError(error)) { + if (isStaleReferenceError(error)) { warningErrors.add(error); } else { realErrors.add(error); @@ -250,7 +250,7 @@ public ResultList readWithCursor(String currentCursor if (!result.getErrors().isEmpty()) { List realErrors = new ArrayList<>(); for (EntityError error : result.getErrors()) { - if (isEntityNotFoundError(error)) { + if (isStaleReferenceError(error)) { warningsCount++; LOG.debug("Skipping entity due to missing relationship: {}", error.getMessage()); } else { @@ -305,7 +305,7 @@ public ResultList readNextKeyset(String keysetCursor) if (result.getErrors() != null && !result.getErrors().isEmpty()) { List realErrors = new ArrayList<>(); for (EntityError error : result.getErrors()) { - if (isEntityNotFoundError(error)) { + if (isStaleReferenceError(error)) { warningsCount++; LOG.debug("Skipping entity due to missing relationship: {}", error.getMessage()); } else { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java index 77fe97480ec7..183e2f5c3aba 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java @@ -66,19 +66,21 @@ public static void getUpdatedStats( } /** - * Returns true when an EntityError represents a stale relationship to a missing entity. These - * are expected during reindexing — for example when a time-series record - * (testCaseResolutionStatus, testCaseResult) was migrated without a corresponding - * entity_relationship row, or when an entity was hard-deleted out-of-band leaving its - * relationship rows behind. Such records cannot be meaningfully indexed and are reported as - * warnings rather than failing the entire batch. + * Returns true when an EntityError represents a stale reference — either a missing entity + * (canonical {@code EntityNotFoundException}) or a missing entity_relationship row (raised by + * {@code EntityRepository.ensureSingleRelationship} as "does not have expected relationship + * ..."). Both are expected during reindexing of long-lived records: e.g. a + * {@code testCaseResolutionStatus} migrated without a corresponding {@code parentOf} row, or + * an entity hard-deleted out-of-band leaving its relationship rows behind. Such records + * cannot be meaningfully indexed and are reported as warnings rather than failing the entire + * batch. * *

The patterns match the canonical message shapes from {@code CatalogExceptionMessage} and * {@code EntityNotFoundException} — bare {@code "not found"} is intentionally NOT matched * because it would misclassify unrelated errors like {@code "Column 'foo' not found in result * set"} or {@code "SSL certificate not found"}. */ - public static boolean isEntityNotFoundError(EntityError error) { + public static boolean isStaleReferenceError(EntityError error) { if (error == null || error.getMessage() == null) { return false; } @@ -102,7 +104,7 @@ public static List partitionErrors( } List realErrors = new ArrayList<>(errors.size()); for (EntityError error : errors) { - if (isEntityNotFoundError(error)) { + if (isStaleReferenceError(error)) { warningsOut.add(error); } else { realErrors.add(error); diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtilStaleRelationshipTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtilStaleRelationshipTest.java index 175946d28983..f279448189a4 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtilStaleRelationshipTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtilStaleRelationshipTest.java @@ -35,43 +35,43 @@ class ReindexingUtilStaleRelationshipTest { "JsonProcessingException: Unexpected character at line 12"; @Test - void isEntityNotFoundError_recognisesRelationshipNotFoundMessage() { + void isStaleReferenceError_recognisesRelationshipNotFoundMessage() { assertTrue( - ReindexingUtil.isEntityNotFoundError( + ReindexingUtil.isStaleReferenceError( new EntityError().withMessage(RELATIONSHIP_NOT_FOUND_MESSAGE))); } @Test - void isEntityNotFoundError_recognisesEntityNotFoundException() { + void isStaleReferenceError_recognisesEntityNotFoundException() { assertTrue( - ReindexingUtil.isEntityNotFoundError( + ReindexingUtil.isStaleReferenceError( new EntityError().withMessage(ENTITY_NOT_FOUND_MESSAGE))); assertTrue( - ReindexingUtil.isEntityNotFoundError( + ReindexingUtil.isStaleReferenceError( new EntityError().withMessage("Instance for testCase with id ... "))); assertTrue( - ReindexingUtil.isEntityNotFoundError( + ReindexingUtil.isStaleReferenceError( new EntityError().withMessage("Resource does not exist anymore"))); assertTrue( - ReindexingUtil.isEntityNotFoundError( + ReindexingUtil.isStaleReferenceError( new EntityError().withMessage("Entity not found: testCase abc-123"))); } @Test - void isEntityNotFoundError_doesNotMatchBareNotFoundOrUnrelatedMessages() { + void isStaleReferenceError_doesNotMatchBareNotFoundOrUnrelatedMessages() { assertFalse( - ReindexingUtil.isEntityNotFoundError(new EntityError().withMessage(REAL_ERROR_MESSAGE))); + ReindexingUtil.isStaleReferenceError(new EntityError().withMessage(REAL_ERROR_MESSAGE))); assertFalse( - ReindexingUtil.isEntityNotFoundError( + ReindexingUtil.isStaleReferenceError( new EntityError().withMessage("Database connection refused"))); assertFalse( - ReindexingUtil.isEntityNotFoundError( + ReindexingUtil.isStaleReferenceError( new EntityError().withMessage("Column 'status' not found in result set"))); assertFalse( - ReindexingUtil.isEntityNotFoundError( + ReindexingUtil.isStaleReferenceError( new EntityError().withMessage("SSL certificate not found"))); - assertFalse(ReindexingUtil.isEntityNotFoundError(null)); - assertFalse(ReindexingUtil.isEntityNotFoundError(new EntityError())); + assertFalse(ReindexingUtil.isStaleReferenceError(null)); + assertFalse(ReindexingUtil.isStaleReferenceError(new EntityError())); } @Test From bd34de125c7230b0b5b3672a3eb7e534d1dcf234 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 4 May 2026 06:27:23 +0000 Subject: [PATCH 06/18] fix: import java.util.Objects instead of fully qualified name in ReindexingUtil Agent-Logs-Url: https://github.com/open-metadata/OpenMetadata/sessions/e6d9ce67-1181-4e61-80f4-e7aba664dfe7 Co-authored-by: mohityadav766 <105265192+mohityadav766@users.noreply.github.com> --- .../service/workflows/searchIndex/ReindexingUtil.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java index 183e2f5c3aba..c928e168b8c4 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.UUID; import lombok.SneakyThrows; @@ -98,7 +99,7 @@ public static boolean isStaleReferenceError(EntityError error) { */ public static List partitionErrors( List errors, List warningsOut) { - java.util.Objects.requireNonNull(warningsOut, "warningsOut must not be null"); + Objects.requireNonNull(warningsOut, "warningsOut must not be null"); if (CommonUtil.nullOrEmpty(errors)) { return new ArrayList<>(); } From fcfbb392e90ece90578974c52a143df2aec2e62f Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Mon, 4 May 2026 13:29:52 +0530 Subject: [PATCH 07/18] Fill End time --- .../searchIndex/ReindexingOrchestrator.java | 5 ++- .../DistributedJobStatsAggregator.java | 2 ++ .../listeners/QuartzProgressListener.java | 2 ++ .../service/apps/scheduler/AppScheduler.java | 2 +- .../apps/scheduler/OmAppJobListener.java | 31 +++++++++++++++++++ 5 files changed, 40 insertions(+), 2 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ReindexingOrchestrator.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ReindexingOrchestrator.java index 299a1ac5eabc..05c9e1f4ae1c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ReindexingOrchestrator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ReindexingOrchestrator.java @@ -24,6 +24,7 @@ import org.openmetadata.service.Entity; import org.openmetadata.service.apps.bundles.searchIndex.listeners.LoggingProgressListener; import org.openmetadata.service.apps.bundles.searchIndex.listeners.SlackProgressListener; +import org.openmetadata.service.apps.scheduler.OmAppJobListener; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.SystemRepository; import org.openmetadata.service.search.SearchRepository; @@ -109,7 +110,7 @@ public void stop() { AppRunRecord appRecord = context.getJobRecord(); appRecord.setStatus(AppRunRecord.Status.STOPPED); - appRecord.setEndTime(System.currentTimeMillis()); + OmAppJobListener.fillTerminalTimings(appRecord); context.storeRunRecord(JsonUtils.pojoToJson(appRecord)); context.pushStatusUpdate(appRecord, true); sendUpdates(); @@ -368,6 +369,7 @@ private void finalizeJobExecution() { if (stopped) { AppRunRecord appRecord = context.getJobRecord(); appRecord.setStatus(AppRunRecord.Status.STOPPED); + OmAppJobListener.fillTerminalTimings(appRecord); context.storeRunRecord(JsonUtils.pojoToJson(appRecord)); } } @@ -383,6 +385,7 @@ private void sendUpdates() { private void updateRecordToDbAndNotify() { AppRunRecord appRecord = context.getJobRecord(); appRecord.setStatus(AppRunRecord.Status.fromValue(jobData.getStatus().value())); + OmAppJobListener.fillTerminalTimings(appRecord); if (jobData.getFailure() != null) { appRecord.setFailureContext( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/DistributedJobStatsAggregator.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/DistributedJobStatsAggregator.java index 6fdd5707ee5e..a10c3f5ac1f1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/DistributedJobStatsAggregator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/DistributedJobStatsAggregator.java @@ -35,6 +35,7 @@ import org.openmetadata.service.apps.bundles.searchIndex.BulkSink; import org.openmetadata.service.apps.bundles.searchIndex.ReindexingJobContext; import org.openmetadata.service.apps.bundles.searchIndex.ReindexingProgressListener; +import org.openmetadata.service.apps.scheduler.OmAppJobListener; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.socket.WebSocketManager; @@ -482,6 +483,7 @@ private AppRunRecord convertToAppRunRecord( appRecord.setStartTime(appStartTime != null ? appStartTime : job.getStartedAt()); appRecord.setEndTime(job.getCompletedAt()); appRecord.setTimestamp(job.getUpdatedAt()); + OmAppJobListener.fillTerminalTimings(appRecord); // Add stats as success context SuccessContext successContext = new SuccessContext(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/listeners/QuartzProgressListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/listeners/QuartzProgressListener.java index e9e665461cf2..4a4a0a96e1c5 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/listeners/QuartzProgressListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/listeners/QuartzProgressListener.java @@ -23,6 +23,7 @@ import org.openmetadata.service.apps.bundles.searchIndex.ReindexingJobContext; import org.openmetadata.service.apps.bundles.searchIndex.ReindexingProgressListener; import org.openmetadata.service.apps.bundles.searchIndex.distributed.DistributedJobContext; +import org.openmetadata.service.apps.scheduler.OmAppJobListener; import org.openmetadata.service.socket.WebSocketManager; import org.quartz.JobExecutionContext; @@ -228,6 +229,7 @@ private void broadcastViaWebSocket(AppRunRecord appRecord) { private AppRunRecord getUpdatedAppRunRecord() { AppRunRecord appRecord = readExistingRecord(); appRecord.setStatus(AppRunRecord.Status.fromValue(jobData.getStatus().value())); + OmAppJobListener.fillTerminalTimings(appRecord); if (jobData.getStats() != null) { SuccessContext ctx = appRecord.getSuccessContext(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java index 35320b536248..45c47d97a4e1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java @@ -465,7 +465,7 @@ private void updateAndBroadcastStoppedStatus(JobExecutionContext context) { if (runRecord != null) { // Update status to STOPPED runRecord.withStatus(AppRunRecord.Status.STOPPED); - runRecord.withEndTime(System.currentTimeMillis()); + OmAppJobListener.fillTerminalTimings(runRecord); // Get WebSocket channel name String webSocketChannelName = diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/OmAppJobListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/OmAppJobListener.java index 53280d87c664..3fa26ab44035 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/OmAppJobListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/OmAppJobListener.java @@ -47,6 +47,37 @@ protected OmAppJobListener() { this.repository = new AppRepository(); } + /** + * Populate {@code endTime} and {@code executionTime} on a terminal-state run record. Idempotent + * — does nothing if {@code endTime} is already set or if the status is non-terminal — so it is + * safe to call from progress listeners that may persist before {@link #jobWasExecuted} runs. + * + *

Without this, mid-flight writes by progress listeners (e.g. {@code QuartzProgressListener} + * firing {@code onJobFailed}) persist a terminal status to the DB without timings; if the job + * dies before {@code jobWasExecuted} fires, polling consumers see status=FAILED with no + * endTime/executionTime. + */ + public static void fillTerminalTimings(AppRunRecord record) { + if (record == null || record.getStatus() == null || !isTerminalStatus(record.getStatus())) { + return; + } + if (record.getEndTime() == null) { + record.withEndTime(System.currentTimeMillis()); + } + if (record.getExecutionTime() == null + && record.getStartTime() != null + && record.getEndTime() != null) { + record.setExecutionTime(record.getEndTime() - record.getStartTime()); + } + } + + private static boolean isTerminalStatus(AppRunRecord.Status status) { + return switch (status) { + case SUCCESS, FAILED, ACTIVE_ERROR, STOPPED, COMPLETED -> true; + default -> false; + }; + } + @Override public String getName() { return JOB_LISTENER_NAME; From 9a2d8013d7ed494ca2e47290d67fb3e182869632 Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Mon, 4 May 2026 13:45:47 +0530 Subject: [PATCH 08/18] Not * reads --- .../apps/bundles/searchIndex/EntityReader.java | 16 +--------------- .../searchIndex/distributed/PartitionWorker.java | 3 ++- .../workflows/searchIndex/ReindexingUtil.java | 15 +++++++++++++++ .../searchIndex/EntityReaderLifecycleTest.java | 5 +++-- 4 files changed, 21 insertions(+), 18 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/EntityReader.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/EntityReader.java index a2ab5750cd50..f4137dbaa221 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/EntityReader.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/EntityReader.java @@ -3,6 +3,7 @@ import static org.openmetadata.service.Entity.QUERY_COST_RECORD; import static org.openmetadata.service.Entity.TEST_CASE_RESOLUTION_STATUS; import static org.openmetadata.service.Entity.TEST_CASE_RESULT; +import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getSearchIndexFields; import java.util.ArrayList; import java.util.List; @@ -322,21 +323,6 @@ static boolean isTransientError(SearchIndexException e) { || lower.contains("sockettimeoutexception"); } - static List getSearchIndexFields(String entityType) { - if (TIME_SERIES_ENTITIES.contains(entityType)) { - return List.of(); - } - org.openmetadata.service.search.SearchRepository repo = - org.openmetadata.service.Entity.getSearchRepository(); - if (repo == null || repo.getSearchIndexFactory() == null) { - // Fallback for environments where the search subsystem isn't bootstrapped (e.g. unit - // tests that exercise the reader without the full Entity registry). Behaves the same - // as the pre-selective-fields code path. - return List.of("*"); - } - return new ArrayList<>(repo.getSearchIndexFactory().getReindexFieldsFor(entityType)); - } - static int calculateNumberOfReaders(int totalEntityRecords, int batchSize) { if (batchSize <= 0) return 1; return (totalEntityRecords + batchSize - 1) / batchSize; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java index 990613425269..346205d60b43 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java @@ -43,6 +43,7 @@ import org.openmetadata.service.util.RestUtil; import org.openmetadata.service.workflows.searchIndex.PaginatedEntitiesSource; import org.openmetadata.service.workflows.searchIndex.PaginatedEntityTimeSeriesSource; +import org.openmetadata.service.workflows.searchIndex.ReindexingUtil; /** * Worker that processes a single partition of entities for search indexing. @@ -591,7 +592,7 @@ private void recordReaderFailures( private ResultList readEntitiesKeyset(String entityType, String keysetCursor, int limit) throws SearchIndexException { - List fields = TIME_SERIES_ENTITIES.contains(entityType) ? List.of() : List.of("*"); + List fields = ReindexingUtil.getSearchIndexFields(entityType); if (!TIME_SERIES_ENTITIES.contains(entityType)) { PaginatedEntitiesSource source = new PaginatedEntitiesSource(entityType, limit, fields, 0); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java index c928e168b8c4..70394d2c9f97 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java @@ -222,6 +222,21 @@ public static List findReferenceInElasticSearchAcrossAllIndexes return entities; } + public static List getSearchIndexFields(String entityType) { + if (TIME_SERIES_ENTITIES.contains(entityType)) { + return List.of(); + } + org.openmetadata.service.search.SearchRepository repo = + org.openmetadata.service.Entity.getSearchRepository(); + if (repo == null || repo.getSearchIndexFactory() == null) { + // Fallback for environments where the search subsystem isn't bootstrapped (e.g. unit + // tests that exercise the reader without the full Entity registry). Behaves the same + // as the pre-selective-fields code path. + return List.of("*"); + } + return new ArrayList<>(repo.getSearchIndexFactory().getReindexFieldsFor(entityType)); + } + public static String escapeDoubleQuotes(String str) { return str.replace("\"", "\\\""); } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/EntityReaderLifecycleTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/EntityReaderLifecycleTest.java index 23a828caa6ce..eb9699104b69 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/EntityReaderLifecycleTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/EntityReaderLifecycleTest.java @@ -29,6 +29,7 @@ import org.openmetadata.service.util.RestUtil; import org.openmetadata.service.workflows.searchIndex.PaginatedEntitiesSource; import org.openmetadata.service.workflows.searchIndex.PaginatedEntityTimeSeriesSource; +import org.openmetadata.service.workflows.searchIndex.ReindexingUtil; class EntityReaderLifecycleTest { @@ -213,8 +214,8 @@ void readEntitySwallowsInterruptedCallbacksAndDeregistersReader() throws Excepti void helperMethodsRespectTimeSeriesAndMinimumReaderRules() { assertEquals( List.of(), - EntityReader.getSearchIndexFields(ReportData.ReportDataType.ENTITY_REPORT_DATA.value())); - assertEquals(List.of("*"), EntityReader.getSearchIndexFields("table")); + ReindexingUtil.getSearchIndexFields(ReportData.ReportDataType.ENTITY_REPORT_DATA.value())); + assertEquals(List.of("*"), ReindexingUtil.getSearchIndexFields("table")); assertEquals(1, EntityReader.calculateNumberOfReaders(10, 0)); assertEquals(3, EntityReader.calculateNumberOfReaders(11, 5)); } From 6ec614e5576400ad14e7c9e2fccfed1dba66c2bf Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Mon, 4 May 2026 14:51:14 +0530 Subject: [PATCH 09/18] Fix Fields issue --- .../java/org/openmetadata/service/Entity.java | 5 +++ .../service/jdbi3/EntityRepository.java | 7 ++++ .../openmetadata/service/util/EntityUtil.java | 33 +++++++++++++++++++ .../searchIndex/PaginatedEntitiesSource.java | 2 +- 4 files changed, 46 insertions(+), 1 deletion(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java b/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java index 64076bf5c09b..eaded403c79e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java @@ -563,6 +563,11 @@ public static Fields getFields(String entityType, List fields) { return entityRepository.getFields(String.join(",", fields)); } + public static Fields getOnlySupportedFields(String entityType, List fields) { + EntityRepository entityRepository = Entity.getEntityRepository(entityType); + return entityRepository.getOnlySupportedFields(String.join(",", fields)); + } + public static T getEntity(EntityReference ref, String fields, Include include) { if (ref == null) { return null; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java index 30934d93cf37..fdfab0a7360f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java @@ -6580,6 +6580,13 @@ public final Fields getFields(String fields) { return new Fields(allowedFields, fields); } + public final Fields getOnlySupportedFields(String fields) { + if ("*".equals(fields)) { + return new Fields(allowedFields, String.join(",", allowedFields), true); + } + return new Fields(allowedFields, fields, true); + } + protected final Fields getFields(Set fields) { return new Fields(allowedFields, fields); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/EntityUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/EntityUtil.java index 0b585474e951..1dd664fbb4db 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/EntityUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/EntityUtil.java @@ -532,6 +532,39 @@ public Fields(Set allowedFields, String fieldsParam) { } } + public Fields(Set allowedFields, String fieldsParam, boolean ignoreExtra) { + if (nullOrEmpty(fieldsParam)) { + this.fieldList = new HashSet<>(); + return; + } + + Set parsedFields = parseFields(fieldsParam); + this.fieldList = validateFields(parsedFields, allowedFields, ignoreExtra); + } + + private Set validateFields( + Set inputFields, Set allowedFields, boolean ignoreExtra) { + + Set result = new HashSet<>(); + + for (String field : inputFields) { + if (allowedFields.contains(field)) { + result.add(field); + } else if (!ignoreExtra) { + throw new IllegalArgumentException(CatalogExceptionMessage.invalidField(field)); + } + } + + return result; + } + + private Set parseFields(String fieldsParam) { + return Arrays.stream(fieldsParam.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toSet()); + } + public Fields(Set allowedFields, Set fieldsParam) { if (nullOrEmpty(fieldsParam)) { fieldList = new HashSet<>(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java index 62103d33ebc8..cbc03241b873 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java @@ -299,7 +299,7 @@ public ResultList readNextKeyset(String keysetCursor) keysetCursor, cachedTotalCount, true, - Entity.getFields(entityType, fields)); + Entity.getOnlySupportedFields(entityType, fields)); int warningsCount = 0; if (result.getErrors() != null && !result.getErrors().isEmpty()) { From d67704dbfb32ec567414e9ff005f37d24501293c Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Mon, 4 May 2026 17:38:56 +0530 Subject: [PATCH 10/18] Add tags back --- .../org/openmetadata/service/search/indexes/SearchIndex.java | 1 + 1 file changed, 1 insertion(+) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchIndex.java index 9760feeb256b..5291543c51a9 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchIndex.java @@ -67,6 +67,7 @@ public interface SearchIndex { "followers", "votes", "extension", + "tags", "certification", "dataProducts"); From a53abfccc6bb93fe30f4a4767d322386ee6409b1 Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Mon, 4 May 2026 18:10:22 +0530 Subject: [PATCH 11/18] Fix failing test --- .../service/search/SearchIndexReindexFieldsParityTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/search/SearchIndexReindexFieldsParityTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/search/SearchIndexReindexFieldsParityTest.java index 98c5364b9e27..e917821ddf43 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/search/SearchIndexReindexFieldsParityTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/search/SearchIndexReindexFieldsParityTest.java @@ -163,6 +163,7 @@ void commonReindexFieldsMatchDocumentedSet() { org.junit.jupiter.api.Assertions.assertEquals( Set.of( "owners", + "tags", "domains", "reviewers", "followers", From fb60d378bb59783e2fb412993e2b12a3ca635cc6 Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Mon, 4 May 2026 21:36:00 +0530 Subject: [PATCH 12/18] Revert "Always recreate" This reverts commit 23578b09f48b278f228a9954e00b20b70c3b505c. --- .../apps/bundles/searchIndex/ReindexingConfiguration.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ReindexingConfiguration.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ReindexingConfiguration.java index a2cd19b8b94a..2426e63c3672 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ReindexingConfiguration.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ReindexingConfiguration.java @@ -128,8 +128,7 @@ public static ReindexingConfiguration from(EventPublisherJob jobData) { DEFAULT_FIELD_FETCH_THREADS, DEFAULT_DOC_BUILD_THREADS, DEFAULT_STATS_INTERVAL_MS, - // Always recreate - true, + Boolean.TRUE.equals(jobData.getRecreateIndex()), Boolean.TRUE.equals(jobData.getAutoTune()), Boolean.TRUE.equals(jobData.getUseDistributedIndexing()), Boolean.TRUE.equals(jobData.getForce()), From 060164127b22487bb1af20f782fac2dba4af1155 Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Mon, 4 May 2026 23:42:16 +0530 Subject: [PATCH 13/18] review: cover all EntityNotFoundException formats; quiet null-id WARN MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address two review concerns on PR #27800: 1. The stale-reference matcher missed EntityNotFoundException's primary factory messages — byId ("Entity with id [...] not found."), byName ("Entity with name [...] not found."), byVersion ("Entity with id [...] and version [...] not found."), and byParserSchema ("Parser schema not found ..."). These would have been classified as real failures instead of warnings. Add specific contains() patterns ("entity with id", "entity with name", "parser schema not found") that match each factory's exact prefix without reintroducing the over-broad bare "not found" check. byFilter ("Entity not found for query params [...]") was already covered by "entity not found". 2. PartitionWorker.recordReaderFailures emitted WARN per error when entityId was null. EntityTimeSeriesRepository builds EntityError with only a message (no entity reference), so every time-series error hit that branch — log spam under load. Downgrade to DEBUG with a comment explaining why the id is absent. Tests: ReindexingUtilStaleRelationshipTest now exercises every EntityNotFoundException factory message. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../distributed/PartitionWorker.java | 8 +++++-- .../workflows/searchIndex/ReindexingUtil.java | 13 +++++++---- .../ReindexingUtilStaleRelationshipTest.java | 22 ++++++++++++++++++- 3 files changed, 36 insertions(+), 7 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java index 346205d60b43..035d57b92d88 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java @@ -570,8 +570,12 @@ private void recordReaderFailures( entityId = rawEntity.toString(); } if (entityId == null) { - LOG.warn( - "Skipping reader failure record for entityType={}: entityId is null, message={}", + // Time-series readers (EntityTimeSeriesRepository) build EntityError without an id — + // they only have access to the JSON row, not the entity reference. Per-entity recording + // requires an id, so log at DEBUG (not WARN) to avoid spamming logs for every error in + // large time-series batches. + LOG.debug( + "No entityId on reader failure for entityType={} — skipping per-entity record. message={}", entityType, entityError.getMessage()); continue; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java index 70394d2c9f97..400ae2d06dde 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java @@ -76,10 +76,12 @@ public static void getUpdatedStats( * cannot be meaningfully indexed and are reported as warnings rather than failing the entire * batch. * - *

The patterns match the canonical message shapes from {@code CatalogExceptionMessage} and - * {@code EntityNotFoundException} — bare {@code "not found"} is intentionally NOT matched - * because it would misclassify unrelated errors like {@code "Column 'foo' not found in result - * set"} or {@code "SSL certificate not found"}. + *

The patterns are deliberately specific so we do not misclassify unrelated errors that + * happen to contain {@code "not found"} (e.g. {@code "Column 'foo' not found in result set"} + * or {@code "SSL certificate not found"}). They cover every {@code EntityNotFoundException} + * factory message ({@code byId}, {@code byName}, {@code byFilter}, {@code byVersion}, + * {@code byParserSchema}) plus the legacy {@code CatalogExceptionMessage.entityNotFound} + * format and the relationship-not-found shape. */ public static boolean isStaleReferenceError(EntityError error) { if (error == null || error.getMessage() == null) { @@ -88,6 +90,9 @@ public static boolean isStaleReferenceError(EntityError error) { String message = error.getMessage().toLowerCase(); return message.contains("instance for") || message.contains("entity not found") + || message.contains("entity with id") + || message.contains("entity with name") + || message.contains("parser schema not found") || message.contains("does not exist") || message.contains("entitynotfoundexception") || message.contains("expected relationship"); diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtilStaleRelationshipTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtilStaleRelationshipTest.java index f279448189a4..a1a26a51e33c 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtilStaleRelationshipTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtilStaleRelationshipTest.java @@ -54,7 +54,27 @@ void isStaleReferenceError_recognisesEntityNotFoundException() { new EntityError().withMessage("Resource does not exist anymore"))); assertTrue( ReindexingUtil.isStaleReferenceError( - new EntityError().withMessage("Entity not found: testCase abc-123"))); + new EntityError().withMessage("Entity not found for query params [name=foo]."))); + } + + @Test + void isStaleReferenceError_recognisesEveryEntityNotFoundExceptionFactory() { + // Mirrors the exact message constants in EntityNotFoundException — every byX(...) factory + // must be classified as a stale-reference warning, not a real failure. + assertTrue( + ReindexingUtil.isStaleReferenceError( + new EntityError().withMessage("Entity with id [abc-123] not found."))); + assertTrue( + ReindexingUtil.isStaleReferenceError( + new EntityError().withMessage("Entity with name [my-table] not found."))); + assertTrue( + ReindexingUtil.isStaleReferenceError( + new EntityError() + .withMessage("Entity with id [abc-123] and version [0.2] not found."))); + assertTrue( + ReindexingUtil.isStaleReferenceError( + new EntityError() + .withMessage("Parser schema not found for entity with id [abc-123]."))); } @Test From eac76b6eb56d556f99fb5932363b689733e46d40 Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Mon, 4 May 2026 23:48:21 +0530 Subject: [PATCH 14/18] review: NPE guard, locale-stable matcher, less log spam, accurate Javadoc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address remaining Copilot comments on PR #27800: 1. EntityTimeSeriesRepository.getResultList(...) returns ResultList with errors=null on the success path. PaginatedEntityTimeSeriesSource was reading result.getErrors().size() / .isEmpty() right after, which would NPE on the common no-error path. Normalize errors to an empty list inside filterStaleRelationshipErrors so callers can rely on non-null. 2. ReindexingUtil.isStaleReferenceError now lowercases with Locale.ROOT instead of the platform default — avoids Turkish-locale style edge cases where 'I' lowercases differently and substring matches fail. 3. PaginatedEntityTimeSeriesSource.read() was logging every real reader error message at WARN. For large failed batches this floods logs. Switch to a single WARN with the error count plus the first 5 message details at DEBUG (gated by isDebugEnabled). 4. OmAppJobListener.fillTerminalTimings Javadoc claimed "does nothing if endTime is already set" but the body still backfills executionTime in that case. Rewrite to accurately describe the per-field idempotency. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../apps/scheduler/OmAppJobListener.java | 22 +++++++++++----- .../PaginatedEntityTimeSeriesSource.java | 26 ++++++++++++++++--- .../workflows/searchIndex/ReindexingUtil.java | 2 +- 3 files changed, 39 insertions(+), 11 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/OmAppJobListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/OmAppJobListener.java index 3fa26ab44035..b5d5380762f8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/OmAppJobListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/OmAppJobListener.java @@ -48,14 +48,22 @@ protected OmAppJobListener() { } /** - * Populate {@code endTime} and {@code executionTime} on a terminal-state run record. Idempotent - * — does nothing if {@code endTime} is already set or if the status is non-terminal — so it is - * safe to call from progress listeners that may persist before {@link #jobWasExecuted} runs. + * Populate {@code endTime} and {@code executionTime} on a terminal-state run record. Each field + * is filled independently and only if currently null: * - *

Without this, mid-flight writes by progress listeners (e.g. {@code QuartzProgressListener} - * firing {@code onJobFailed}) persist a terminal status to the DB without timings; if the job - * dies before {@code jobWasExecuted} fires, polling consumers see status=FAILED with no - * endTime/executionTime. + *

+ * + *

The method is a no-op for non-terminal statuses, so it is safe to call from progress + * listeners that may persist before {@link #jobWasExecuted} runs. Without this, mid-flight + * writes by progress listeners (e.g. {@code QuartzProgressListener} firing {@code onJobFailed}) + * would persist a terminal status to the DB without timings; if the job dies before {@code + * jobWasExecuted} fires, polling consumers would see {@code status=FAILED} with no + * {@code endTime} / {@code executionTime}. */ public static void fillTerminalTimings(AppRunRecord record) { if (record == null || record.getStatus() == null || !isTerminalStatus(record.getStatus())) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSource.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSource.java index ee823df1464b..af086bedd9e8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntityTimeSeriesSource.java @@ -30,6 +30,9 @@ @Getter public class PaginatedEntityTimeSeriesSource implements Source> { + /** Cap on per-error detail messages emitted to logs to avoid flooding under large batches. */ + private static final int MAX_ERROR_DETAILS_LOGGED = 5; + private final int batchSize; private final String entityType; private final List fields; @@ -159,9 +162,18 @@ private ResultList read(String cursor) int warningsCount = filterStaleRelationshipErrors(result); if (!result.getErrors().isEmpty()) { + int errorCount = result.getErrors().size(); LOG.warn( - "[PaginatedEntityTimeSeriesSource] Real errors found: {}", result.getErrors().size()); - result.getErrors().forEach(error -> LOG.warn("Error: {}", error.getMessage())); + "[PaginatedEntityTimeSeriesSource] {} real reader error(s) for entityType={}; " + + "first up to {} shown at DEBUG", + errorCount, + entityType, + MAX_ERROR_DETAILS_LOGGED); + if (LOG.isDebugEnabled()) { + result.getErrors().stream() + .limit(MAX_ERROR_DETAILS_LOGGED) + .forEach(error -> LOG.debug("Reader error: {}", error.getMessage())); + } lastFailedCursor = this.cursor.get(); if (result.getPaging().getAfter() == null) { this.cursor.set(null); @@ -308,7 +320,15 @@ public void updateStats(int currentSuccess, int currentFailed, int currentWarnin */ private int filterStaleRelationshipErrors( ResultList result) { - if (result == null || result.getErrors() == null || result.getErrors().isEmpty()) { + if (result == null) { + return 0; + } + // EntityTimeSeriesRepository.getResultList(...) leaves errors=null on the success path. + // Normalize so downstream callers (logging, stats) can rely on a non-null list. + if (result.getErrors() == null) { + result.setErrors(new ArrayList<>()); + } + if (result.getErrors().isEmpty()) { return 0; } List warnings = new ArrayList<>(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java index 400ae2d06dde..87a07f06cc79 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java @@ -87,7 +87,7 @@ public static boolean isStaleReferenceError(EntityError error) { if (error == null || error.getMessage() == null) { return false; } - String message = error.getMessage().toLowerCase(); + String message = error.getMessage().toLowerCase(java.util.Locale.ROOT); return message.contains("instance for") || message.contains("entity not found") || message.contains("entity with id") From e16d3277405f39fc3f466bcdad3e3667af6d2d29 Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Tue, 5 May 2026 10:39:23 +0530 Subject: [PATCH 15/18] Relaign Changes --- .../workflows/searchIndex/ReindexingUtil.java | 68 +------------------ 1 file changed, 3 insertions(+), 65 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java index 7ed1670bdc63..50a066e04da1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java @@ -239,74 +239,12 @@ public static List getSearchIndexFields(String entityType) { // as the pre-selective-fields code path. return List.of("*"); } - return new ArrayList<>(repo.getSearchIndexFactory().getReindexFieldsFor(entityType)); + List allFields = + new ArrayList<>(repo.getSearchIndexFactory().getReindexFieldsFor(entityType)); + return new ArrayList<>(Entity.getOnlySupportedFields(entityType, allFields).getFieldList()); } public static String escapeDoubleQuotes(String str) { return str.replace("\"", "\\\""); } - - /** - * Resolve the minimal field set the reindex path must request from {@code - * EntityRepository.setFields}. Time-series entities don't go through the entity-fields machinery, - * so they get an empty list. For everything else, ask the index class via {@link - * org.openmetadata.service.search.SearchIndexFactory#getReindexFieldsFor(String)} for exactly - * the fields its document needs, then intersect with the entity's {@code allowedFields} so we - * never request a field the JSON schema doesn't declare. Single source of truth shared by - * {@code EntityReader} (single-server pipeline) and {@code PartitionWorker} (distributed - * pipeline). - * - *

Why the allowedFields intersection matters. {@link - * org.openmetadata.service.search.indexes.SearchIndex#COMMON_REINDEX_FIELDS} is the union of - * relationship/enrichment fields that could appear on any entity ({@code owners}, - * {@code domains}, {@code reviewers}, {@code followers}, {@code votes}, {@code extension}, - * {@code certification}, {@code dataProducts}). Many entity schemas omit one or more of these: - * a {@code storageService} has no {@code reviewers}/{@code votes}/{@code extension}/{@code - * certification}; an {@code ingestionPipeline} has no {@code reviewers}/{@code dataProducts}; - * a {@code user}/{@code team} omits most of them. Without filtering, {@link - * org.openmetadata.service.Entity#getFields(String, java.util.List)} routes through {@link - * org.openmetadata.service.util.EntityUtil.Fields#Fields(java.util.Set, String)} which throws - * {@code IllegalArgumentException("Invalid field name ")} on the first unknown field, killing - * the whole batch. Filtering here keeps the helper safe to call for any registered entity type. - */ - public static List getSearchIndexFields(String entityType) { - if (TIME_SERIES_ENTITIES.contains(entityType)) { - return List.of(); - } - org.openmetadata.service.search.SearchRepository repo = Entity.getSearchRepository(); - if (repo == null || repo.getSearchIndexFactory() == null) { - // Fallback for environments without a bootstrapped search subsystem (unit tests) — keep - // pre-selective-fields behaviour. - return List.of("*"); - } - Set required = repo.getSearchIndexFactory().getReindexFieldsFor(entityType); - Set allowed = lookupAllowedFields(entityType); - if (allowed == null) { - return new ArrayList<>(required); - } - List filtered = new ArrayList<>(required.size()); - for (String field : required) { - if (allowed.contains(field)) { - filtered.add(field); - } else { - LOG.debug( - "Dropping reindex field '{}' for entityType '{}': not in allowedFields", - field, - entityType); - } - } - return filtered; - } - - /** Returns the entity's {@code allowedFields}, or {@code null} if the repository isn't - * registered (test boot, plugin not loaded). Callers fall back to the unfiltered set. */ - private static Set lookupAllowedFields(String entityType) { - try { - EntityRepository repository = Entity.getEntityRepository(entityType); - return repository == null ? null : repository.getAllowedFieldsCopy(); - } catch (Exception e) { - LOG.debug("Failed to resolve allowedFields for {}: {}", entityType, e.getMessage()); - return null; - } - } } From 861aa2d338a85d5e8b2cff5a728bb03ab8b7f90e Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Tue, 5 May 2026 10:51:07 +0530 Subject: [PATCH 16/18] Add Exception handling --- .../service/workflows/searchIndex/ReindexingUtil.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java index 50a066e04da1..db891b9652f7 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java @@ -239,9 +239,14 @@ public static List getSearchIndexFields(String entityType) { // as the pre-selective-fields code path. return List.of("*"); } - List allFields = - new ArrayList<>(repo.getSearchIndexFactory().getReindexFieldsFor(entityType)); - return new ArrayList<>(Entity.getOnlySupportedFields(entityType, allFields).getFieldList()); + try { + List allFields = + new ArrayList<>(repo.getSearchIndexFactory().getReindexFieldsFor(entityType)); + return new ArrayList<>(Entity.getOnlySupportedFields(entityType, allFields).getFieldList()); + } catch (Exception e) { + LOG.error("Failed while looking for indexing fields. Message : {}", e.getMessage()); + return List.of("*"); + } } public static String escapeDoubleQuotes(String str) { From 9bfe79b9d6c7a1c33b2a740965577b86d2ee2018 Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Tue, 5 May 2026 11:40:34 +0530 Subject: [PATCH 17/18] fix(reindex): degrade to required-set, not '*', when filter step throws MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ReindexingUtil.getSearchIndexFields was catching every exception in the filter path and returning ["*"]. That defeats the entire point of selective fields — sending all fields silently masks any drift between SearchIndex.COMMON_REINDEX_FIELDS and the entity's schema, instead of surfacing it at the PaginatedEntitiesSource boundary. Split the try/catch so: - getReindexFieldsFor() throwing → ["*"] (we have no required set to fall back to; pre-selective behavior). - getOnlySupportedFields() throwing (typically because the EntityRepository isn't registered yet — boot/test scenarios) → return the unfiltered required set. PaginatedEntitiesSource validates the fields when an actual entity flows through, so any real drift surfaces loudly rather than being silently swallowed by a "*" wildcard. Restores the assertion and intent of ReindexingUtilTest.unregisteredRepositoryReturnsRequiredUnfiltered, and fixes the parametrized parity tests that were also seeing "*" because of this regression. Also stubs repo.getOnlySupportedFields(...) in the test mocks so it returns a real EntityUtil.Fields built against the declared allowedFields — matches the production code path. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../workflows/searchIndex/ReindexingUtil.java | 29 ++++++++++++++----- .../searchIndex/ReindexingUtilTest.java | 24 ++++++++++++--- 2 files changed, 42 insertions(+), 11 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java index db891b9652f7..34d49476a614 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtil.java @@ -234,19 +234,34 @@ public static List getSearchIndexFields(String entityType) { org.openmetadata.service.search.SearchRepository repo = org.openmetadata.service.Entity.getSearchRepository(); if (repo == null || repo.getSearchIndexFactory() == null) { - // Fallback for environments where the search subsystem isn't bootstrapped (e.g. unit - // tests that exercise the reader without the full Entity registry). Behaves the same - // as the pre-selective-fields code path. + // Search subsystem isn't bootstrapped (e.g. unit tests that exercise the reader without the + // full Entity registry). Behaves the same as the pre-selective-fields code path. return List.of("*"); } + List allFields; try { - List allFields = - new ArrayList<>(repo.getSearchIndexFactory().getReindexFieldsFor(entityType)); - return new ArrayList<>(Entity.getOnlySupportedFields(entityType, allFields).getFieldList()); + allFields = new ArrayList<>(repo.getSearchIndexFactory().getReindexFieldsFor(entityType)); } catch (Exception e) { - LOG.error("Failed while looking for indexing fields. Message : {}", e.getMessage()); + LOG.error( + "Failed to look up reindex fields for {}: {}; falling back to all-fields wildcard", + entityType, + e.getMessage()); return List.of("*"); } + try { + return new ArrayList<>(Entity.getOnlySupportedFields(entityType, allFields).getFieldList()); + } catch (Exception e) { + // Filtering failed (typically because the EntityRepository isn't registered yet — + // happens during boot or in tests). Fall back to the unfiltered required set rather than + // "*": this keeps the per-entity intent intact and lets PaginatedEntitiesSource surface + // any drift loudly instead of silently sending every field. + LOG.warn( + "Could not filter reindex fields for {} against EntityRepository.allowedFields ({}); " + + "returning unfiltered required set", + entityType, + e.getMessage()); + return allFields; + } } public static String escapeDoubleQuotes(String str) { diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtilTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtilTest.java index 5861a2590101..5d52eb06ff8f 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtilTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/workflows/searchIndex/ReindexingUtilTest.java @@ -15,6 +15,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; @@ -37,6 +38,7 @@ import org.openmetadata.service.search.SearchClient; import org.openmetadata.service.search.SearchIndexFactory; import org.openmetadata.service.search.SearchRepository; +import org.openmetadata.service.util.EntityUtil; /** * Unit tests for {@link ReindexingUtil#getSearchIndexFields(String)}. The interesting @@ -148,8 +150,7 @@ void filteredFieldsAreSubsetOfEntityAllowedFields(String entityType, Class en when(searchRepository.getSearchIndexFactory()).thenReturn(new SearchIndexFactory()); Set declared = Entity.getEntityFields(entityClass); - EntityRepository repo = mock(EntityRepository.class); - when(repo.getAllowedFieldsCopy()).thenReturn(new HashSet<>(declared)); + EntityRepository repo = mockRepoWithAllowedFields(declared); List filtered; try (MockedStatic entityMock = @@ -260,12 +261,27 @@ private static Stream entityTypeAndClass() { } private List withAllowedFields(String entityType, Set allowed) { - EntityRepository repo = mock(EntityRepository.class); - when(repo.getAllowedFieldsCopy()).thenReturn(new HashSet<>(allowed)); + EntityRepository repo = mockRepoWithAllowedFields(allowed); try (MockedStatic entityMock = mockStatic(Entity.class, org.mockito.Mockito.CALLS_REAL_METHODS)) { entityMock.when(() -> Entity.getEntityRepository(eq(entityType))).thenReturn(repo); return ReindexingUtil.getSearchIndexFields(entityType); } } + + /** + * Build a mock EntityRepository whose {@code getOnlySupportedFields(...)} returns a real + * {@link EntityUtil.Fields} built against {@code allowed} with extras silently dropped — the + * same contract as the production method (see {@code EntityRepository#getOnlySupportedFields}). + * {@code ReindexingUtil.getSearchIndexFields} reaches into the repository through {@code + * Entity.getOnlySupportedFields(...)}, so this is the method that has to be stubbed. + */ + private static EntityRepository mockRepoWithAllowedFields(Set allowed) { + EntityRepository repo = mock(EntityRepository.class); + Set allowedCopy = new HashSet<>(allowed); + when(repo.getAllowedFieldsCopy()).thenReturn(allowedCopy); + when(repo.getOnlySupportedFields(anyString())) + .thenAnswer(inv -> new EntityUtil.Fields(allowedCopy, inv.getArgument(0), true)); + return repo; + } } From 75f704ab9df900b8f35c716e6486d688a43158ac Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Tue, 5 May 2026 12:08:35 +0530 Subject: [PATCH 18/18] Add Only Supported field at other call sites --- .../workflows/searchIndex/PaginatedEntitiesSource.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java index cbc03241b873..66de553430f7 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java @@ -141,7 +141,7 @@ private ResultList read(String cursor) throws SearchI batchSize, cursor, true, - Entity.getFields(entityType, fields), + Entity.getOnlySupportedFields(entityType, fields), null); // Filter out EntityNotFoundExceptions from errors - these are expected when relationships @@ -241,7 +241,7 @@ public ResultList readWithCursor(String currentCursor batchSize, currentCursor, true, - Entity.getFields(entityType, fields), + Entity.getOnlySupportedFields(entityType, fields), null); // Filter out EntityNotFoundExceptions from errors - same as in read() method