Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static org.openmetadata.schema.system.IndexingError.ErrorSource.READER;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isEntityNotFoundError;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -378,15 +379,4 @@ public void updateStats(int currentSuccess, int currentFailed) {
public void updateStats(int currentSuccess, int currentFailed, int currentWarnings) {
getUpdatedStats(stats, currentSuccess, currentFailed, currentWarnings);
}

private boolean isEntityNotFoundError(EntityError error) {
if (error == null || error.getMessage() == null) {
return false;
}
String message = error.getMessage().toLowerCase();
return message.contains("not found")
|| message.contains("instance for")
|| message.contains("does not exist")
|| message.contains("entitynotfoundexception");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static org.openmetadata.schema.system.IndexingError.ErrorSource.READER;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.partitionErrors;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -11,6 +12,7 @@
import lombok.extern.slf4j.Slf4j;
import org.glassfish.jersey.internal.util.ExceptionUtils;
import org.openmetadata.schema.EntityTimeSeriesInterface;
import org.openmetadata.schema.system.EntityError;
import org.openmetadata.schema.system.IndexingError;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.schema.type.Include;
Expand Down Expand Up @@ -117,9 +119,13 @@ public ResultList<? extends EntityTimeSeriesInterface> readWithCursor(String cur
} else {
result = repository.listWithOffset(currentCursor, filter, batchSize, true);
}
int warningsCount = filterStaleRelationshipErrors(result);
LOG.debug(
"[PaginatedEntitiesSource] Batch Stats :- %n Submitted : {} Success: {} Failed: {}",
batchSize, result.getData().size(), result.getErrors().size());
"[PaginatedEntityTimeSeriesSource] Batch Stats :- Submitted: {} Success: {} Failed: {} Warnings: {}",
batchSize,
result.getData().size(),
result.getErrors().size(),
Comment on lines 126 to +130
warningsCount);
} catch (Exception e) {
IndexingError indexingError =
new IndexingError()
Expand Down Expand Up @@ -149,18 +155,29 @@ private ResultList<? extends EntityTimeSeriesInterface> read(String cursor)
result = repository.listWithOffset(cursor, filter, batchSize, true);
}

int warningsCount = filterStaleRelationshipErrors(result);

if (!result.getErrors().isEmpty()) {
LOG.warn(
"[PaginatedEntityTimeSeriesSource] Real errors found: {}", result.getErrors().size());
result.getErrors().forEach(error -> LOG.warn("Error: {}", error.getMessage()));
Comment thread
mohityadav766 marked this conversation as resolved.
Outdated
lastFailedCursor = this.cursor.get();
if (result.getPaging().getAfter() == null) {
this.cursor.set(null);
isDone.set(true);
} else {
this.cursor.set(result.getPaging().getAfter());
}
updateStats(result.getData().size(), result.getErrors().size(), warningsCount);
return result;
}
LOG.debug(
"[PaginatedEntitiesSource] Batch Stats :- %n Submitted : {} Success: {} Failed: {}",
batchSize, result.getData().size(), result.getErrors().size());
"[PaginatedEntityTimeSeriesSource] Batch Stats :- Submitted: {} Success: {} Failed: {} Warnings: {}",
batchSize,
result.getData().size(),
result.getErrors().size(),
warningsCount);
updateStats(result.getData().size(), 0, warningsCount);
} catch (Exception e) {
lastFailedCursor = this.cursor.get();
int remainingRecords =
Expand Down Expand Up @@ -214,11 +231,13 @@ public ResultList<? extends EntityTimeSeriesInterface> readNextKeyset(String key
cachedTotal,
true);

int warningsCount = filterStaleRelationshipErrors(result);
LOG.debug(
"[PaginatedEntityTimeSeriesSource] Keyset batch stats — Submitted: {} Success: {} Failed: {}",
"[PaginatedEntityTimeSeriesSource] Keyset batch stats — Submitted: {} Success: {} Failed: {} Warnings: {}",
batchSize,
result.getData().size(),
result.getErrors() != null ? result.getErrors().size() : 0);
result.getErrors() != null ? result.getErrors().size() : 0,
warningsCount);
return result;
} catch (Exception e) {
LOG.error(
Expand Down Expand Up @@ -269,6 +288,39 @@ public void updateStats(int currentSuccess, int currentFailed) {
getUpdatedStats(stats, currentSuccess, currentFailed);
}

public void updateStats(int currentSuccess, int currentFailed, int currentWarnings) {
getUpdatedStats(stats, currentSuccess, currentFailed, currentWarnings);
}

/**
* Splits the errors on {@code result} into real failures and stale-relationship warnings,
* mutating {@code result} so its errors list contains only real failures and its warnings count
* reflects the skipped stale relationships. Returns the warnings count for callers that want to
* include it in their own logging or stats updates.
*
* <p>Stale relationships happen for time-series records (testCaseResolutionStatus,
* testCaseResult, ...) whose parent entity was hard-deleted out-of-band, or whose parentOf
* entity_relationship row was lost during a past migration. Such records cannot be indexed but
* should not fail the entire batch.
*/
private int filterStaleRelationshipErrors(
ResultList<? extends EntityTimeSeriesInterface> result) {
if (result == null || result.getErrors() == null || result.getErrors().isEmpty()) {
return 0;
}
List<EntityError> warnings = new ArrayList<>();
List<EntityError> realErrors = partitionErrors(result.getErrors(), warnings);
if (!warnings.isEmpty()) {
LOG.debug(
"[PaginatedEntityTimeSeriesSource] {} stale-relationship warnings for entity type {}",
warnings.size(),
entityType);
}
result.setErrors(realErrors);
result.setWarningsCount(warnings.size());
return warnings.size();
}

public ListFilter getFilter() {
ListFilter filter = new ListFilter(Include.ALL);
if (ReindexingUtil.isDataInsightIndex(entityType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,41 @@ public static void getUpdatedStats(
(stats.getWarningRecords() != null ? stats.getWarningRecords() : 0) + currentWarnings);
}

/**
* Returns true when an EntityError represents a stale relationship to a missing entity. These
* are expected during reindexing — for example when a time-series record (testCaseResolutionStatus,
* testCaseResult) was migrated without a corresponding entity_relationship row, or when an entity
* was hard-deleted out-of-band leaving its relationship rows behind. Such records cannot be
* meaningfully indexed and are reported as warnings rather than failing the entire batch.
*/
public static boolean isEntityNotFoundError(EntityError error) {
if (error == null || error.getMessage() == null) {
return false;
}
String message = error.getMessage().toLowerCase();
return message.contains("not found")
|| message.contains("instance for")
|| message.contains("does not exist")
|| message.contains("entitynotfoundexception")
|| message.contains("expected relationship");
Comment thread
gitar-bot[bot] marked this conversation as resolved.
Outdated
}
Comment thread
mohityadav766 marked this conversation as resolved.

public static List<EntityError> partitionErrors(
List<EntityError> errors, List<EntityError> warningsOut) {
if (CommonUtil.nullOrEmpty(errors)) {
return new ArrayList<>();
}
List<EntityError> realErrors = new ArrayList<>(errors.size());
for (EntityError error : errors) {
if (isEntityNotFoundError(error)) {
warningsOut.add(error);
} else {
Comment on lines +105 to +115
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitionErrors assumes warningsOut is non-null and mutable; passing null (or an unmodifiable list) will throw at warningsOut.add(error). Since this is a public utility, consider either validating warningsOut (e.g., initialize when null / throw an explicit IllegalArgumentException) or documenting/annotating it as non-null to avoid surprising NPEs at runtime.

Copilot uses AI. Check for mistakes.
realErrors.add(error);
}
}
return realErrors;
}

public static boolean isDataInsightIndex(String entityType) {
return Entity.getSearchRepository().getDataInsightReports().contains(entityType);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Copyright 2026 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
Comment thread
mohityadav766 marked this conversation as resolved.
*/
package org.openmetadata.service.workflows.searchIndex;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.openmetadata.schema.EntityTimeSeriesInterface;
import org.openmetadata.schema.system.EntityError;
import org.openmetadata.schema.tests.type.TestCaseResolutionStatus;
import org.openmetadata.schema.tests.type.TestCaseResolutionStatusTypes;
import org.openmetadata.schema.utils.ResultList;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.SearchIndexException;
import org.openmetadata.service.jdbi3.EntityTimeSeriesRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.search.SearchRepository;

/**
* Validates that {@link PaginatedEntityTimeSeriesSource} treats stale-relationship errors raised by
* {@code EntityRepository.ensureSingleRelationship} (the message from {@code
* CatalogExceptionMessage.entityRelationshipNotFound}) as warnings rather than fatal failures.
*
* <p>This is the production scenario from issue #27417: orphaned {@code testCaseResolutionStatus}
* rows whose parentOf {@code entity_relationship} row is missing should not fail an entire reindex
* batch.
*/
class PaginatedEntityTimeSeriesSourceStaleRelationshipTest {

private static final String ENTITY_TYPE = Entity.TEST_CASE_RESOLUTION_STATUS;
private static final int BATCH_SIZE = 5;

private static final String STALE_RELATIONSHIP_MESSAGE =
"Entity type testCaseResolutionStatus 7c5c3c4d-3a82-4d8c-9c4a-3e2c9b9b0d5b "
+ "does not have expected relationship parentOf to/from entity type testCase";

private static final String REAL_DB_ERROR_MESSAGE =
"JsonProcessingException: Unrecognized field 'foo' (class TestCaseResolutionStatus)";

@Test
void readClassifiesStaleRelationshipErrorsAsWarnings() throws Exception {
EntityTimeSeriesRepository<TestCaseResolutionStatus> repository = mockRepository();
ResultList<TestCaseResolutionStatus> mockedResult =
resultWith(
List.of(makeRecord("ok-1"), makeRecord("ok-2")),
List.of(error("orphan-1", STALE_RELATIONSHIP_MESSAGE)));

when(repository.listWithOffset(any(), any(ListFilter.class), anyInt(), anyBoolean()))
.thenReturn(mockedResult);

try (MockedStatic<Entity> entityMock =
mockStatic(Entity.class, org.mockito.Mockito.CALLS_REAL_METHODS)) {
stubEntityRepositoryLookups(entityMock, repository);

PaginatedEntityTimeSeriesSource source =
new PaginatedEntityTimeSeriesSource(ENTITY_TYPE, BATCH_SIZE, List.of(), 3);

ResultList<? extends EntityTimeSeriesInterface> result = source.readNext(null);

assertNotNull(result);
assertEquals(2, result.getData().size());
assertTrue(
result.getErrors().isEmpty(),
() -> "stale-relationship errors should be filtered out, got " + result.getErrors());
assertEquals(1, result.getWarningsCount());
assertEquals(2, source.getStats().getSuccessRecords());
assertEquals(0, source.getStats().getFailedRecords());
assertEquals(1, source.getStats().getWarningRecords());
}
}

@Test
void readKeepsRealErrorsAsFailuresEvenWhenWarningsArePresent() throws Exception {
EntityTimeSeriesRepository<TestCaseResolutionStatus> repository = mockRepository();
ResultList<TestCaseResolutionStatus> mockedResult =
resultWith(
List.of(makeRecord("ok-1")),
List.of(
error("orphan-1", STALE_RELATIONSHIP_MESSAGE),
error("broken-1", REAL_DB_ERROR_MESSAGE),
error("orphan-2", STALE_RELATIONSHIP_MESSAGE)));

when(repository.listWithOffset(any(), any(ListFilter.class), anyInt(), anyBoolean()))
.thenReturn(mockedResult);

try (MockedStatic<Entity> entityMock =
mockStatic(Entity.class, org.mockito.Mockito.CALLS_REAL_METHODS)) {
stubEntityRepositoryLookups(entityMock, repository);

PaginatedEntityTimeSeriesSource source =
new PaginatedEntityTimeSeriesSource(ENTITY_TYPE, BATCH_SIZE, List.of(), 4);

ResultList<? extends EntityTimeSeriesInterface> result = source.readNext(null);

assertNotNull(result);
assertEquals(1, result.getData().size());
assertEquals(1, result.getErrors().size());
assertEquals("broken-1", result.getErrors().get(0).getEntity());
assertEquals(2, result.getWarningsCount());
assertEquals(1, source.getStats().getSuccessRecords());
assertEquals(1, source.getStats().getFailedRecords());
assertEquals(2, source.getStats().getWarningRecords());
}
}

@Test
void readWithCursorFiltersStaleRelationshipErrors() throws Exception {
EntityTimeSeriesRepository<TestCaseResolutionStatus> repository = mockRepository();
ResultList<TestCaseResolutionStatus> mockedResult =
resultWith(
List.of(makeRecord("ok-1")), List.of(error("orphan-1", STALE_RELATIONSHIP_MESSAGE)));

when(repository.listWithOffset(any(), any(ListFilter.class), anyInt(), anyBoolean()))
.thenReturn(mockedResult);

try (MockedStatic<Entity> entityMock =
mockStatic(Entity.class, org.mockito.Mockito.CALLS_REAL_METHODS)) {
stubEntityRepositoryLookups(entityMock, repository);

PaginatedEntityTimeSeriesSource source =
new PaginatedEntityTimeSeriesSource(ENTITY_TYPE, BATCH_SIZE, List.of(), 2);

ResultList<? extends EntityTimeSeriesInterface> result = source.readWithCursor("0");

assertNotNull(result);
assertEquals(1, result.getData().size());
assertTrue(result.getErrors().isEmpty());
assertEquals(1, result.getWarningsCount());
}
}

@Test
void readPropagatesNonReaderExceptionsAsSearchIndexException() {
EntityTimeSeriesRepository<TestCaseResolutionStatus> repository = mockRepository();
when(repository.listWithOffset(any(), any(ListFilter.class), anyInt(), anyBoolean()))
.thenThrow(new RuntimeException("connection refused"));

try (MockedStatic<Entity> entityMock =
mockStatic(Entity.class, org.mockito.Mockito.CALLS_REAL_METHODS)) {
stubEntityRepositoryLookups(entityMock, repository);

PaginatedEntityTimeSeriesSource source =
new PaginatedEntityTimeSeriesSource(ENTITY_TYPE, BATCH_SIZE, List.of(), 1);

org.junit.jupiter.api.Assertions.assertThrows(
SearchIndexException.class, () -> source.readNext(null));
}
}

@SuppressWarnings("unchecked")
private EntityTimeSeriesRepository<TestCaseResolutionStatus> mockRepository() {
return (EntityTimeSeriesRepository<TestCaseResolutionStatus>)
mock(EntityTimeSeriesRepository.class);
}

private void stubEntityRepositoryLookups(
MockedStatic<Entity> entityMock,
EntityTimeSeriesRepository<TestCaseResolutionStatus> repository) {
SearchRepository searchRepository = mock(SearchRepository.class);
when(searchRepository.getDataInsightReports()).thenReturn(List.of());
entityMock.when(Entity::getSearchRepository).thenReturn(searchRepository);
entityMock
.when(() -> Entity.getEntityTimeSeriesRepository(ENTITY_TYPE))
.thenReturn((EntityTimeSeriesRepository) repository);
}

private static TestCaseResolutionStatus makeRecord(String name) {
return new TestCaseResolutionStatus()
.withId(UUID.randomUUID())
.withTestCaseResolutionStatusType(TestCaseResolutionStatusTypes.New)
.withStateId(UUID.randomUUID())
.withTimestamp(System.currentTimeMillis());
}

private static EntityError error(String entity, String message) {
return new EntityError().withEntity(entity).withMessage(message);
}

private static ResultList<TestCaseResolutionStatus> resultWith(
List<TestCaseResolutionStatus> data, List<EntityError> errors) {
return new ResultList<>(
new ArrayList<>(data), new ArrayList<>(errors), null, null, data.size());
}
}
Loading
Loading