Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
50f1406
fix(search): treat stale parentOf as warning during time-series reind…
mohityadav766 Apr 28, 2026
91bbdc1
review: tighten matcher, propagate warnings on cursor/keyset paths
mohityadav766 Apr 28, 2026
007670f
Show warn for failing test case resolution
mohityadav766 May 4, 2026
23578b0
Always recreate
mohityadav766 May 4, 2026
82d5aa4
Merge remote-tracking branch 'origin/main' into fix-testcaseresolutio…
mohityadav766 May 4, 2026
856fd2d
review: rename matcher + record reader failures before early-return
mohityadav766 May 4, 2026
38b4ec9
Merge branch 'main' into fix-testcaseresolutionstatus
mohityadav766 May 4, 2026
bd34de1
fix: import java.util.Objects instead of fully qualified name in Rein…
Copilot May 4, 2026
fcfbb39
Fill End time
mohityadav766 May 4, 2026
9a2d801
Not * reads
mohityadav766 May 4, 2026
6ec614e
Fix Fields issue
mohityadav766 May 4, 2026
d67704d
Add tags back
mohityadav766 May 4, 2026
bfb575a
Merge branch 'main' into fix-testcaseresolutionstatus
mohityadav766 May 4, 2026
a53abfc
Fix failing test
mohityadav766 May 4, 2026
fb60d37
Revert "Always recreate"
mohityadav766 May 4, 2026
b4cc517
Merge branch 'main' into fix-testcaseresolutionstatus
mohityadav766 May 4, 2026
0601641
review: cover all EntityNotFoundException formats; quiet null-id WARN
mohityadav766 May 4, 2026
eac76b6
review: NPE guard, locale-stable matcher, less log spam, accurate Jav…
mohityadav766 May 4, 2026
4e5b10d
Merge branch 'main' into fix-testcaseresolutionstatus
mohityadav766 May 5, 2026
e16d327
Relaign Changes
mohityadav766 May 5, 2026
b12c923
Merge branch 'main' into fix-testcaseresolutionstatus
mohityadav766 May 5, 2026
861aa2d
Add Exception handling
mohityadav766 May 5, 2026
9bfe79b
fix(reindex): degrade to required-set, not '*', when filter step throws
mohityadav766 May 5, 2026
75f704a
Add Only Supported field at other call sites
mohityadav766 May 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -491,20 +491,31 @@ private BatchResult processBatch(
ResultList<?> resultList = readEntitiesKeyset(entityType, keysetCursor, batchSize);
long t1 = System.currentTimeMillis();

if (resultList == null || resultList.getData() == null || resultList.getData().isEmpty()) {
LOG.debug("{} read={}ms returned empty", entityType, t1 - t0);
return new BatchResult(0, 0, 0, null);
}

String nextCursor = resultList.getPaging() != null ? resultList.getPaging().getAfter() : null;
int readSuccessCount = listOrEmpty(resultList.getData()).size();
int readErrorCount = listOrEmpty(resultList.getErrors()).size();
int warningsCount = resultList.getWarningsCount() != null ? resultList.getWarningsCount() : 0;
int readSuccessCount = resultList != null ? listOrEmpty(resultList.getData()).size() : 0;
int readErrorCount = resultList != null ? listOrEmpty(resultList.getErrors()).size() : 0;
int warningsCount =
(resultList != null && resultList.getWarningsCount() != null)
? resultList.getWarningsCount()
: 0;
String nextCursor =
(resultList != null && resultList.getPaging() != null)
? resultList.getPaging().getAfter()
: null;

if (statsTracker != null) {
statsTracker.recordReaderBatch(readSuccessCount, readErrorCount, warningsCount);
}

if (readSuccessCount == 0) {
LOG.debug(
"{} read={}ms returned no indexable rows (warnings={}, errors={})",
entityType,
t1 - t0,
warningsCount,
readErrorCount);
Comment thread
mohityadav766 marked this conversation as resolved.
return new BatchResult(0, readErrorCount, warningsCount, nextCursor);
}
Comment thread
mohityadav766 marked this conversation as resolved.

if (failureRecorder != null && readErrorCount > 0) {
for (EntityError entityError : listOrEmpty(resultList.getErrors())) {
Object rawEntity = entityError.getEntity();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static org.openmetadata.schema.system.IndexingError.ErrorSource.READER;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isEntityNotFoundError;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -378,15 +379,4 @@ public void updateStats(int currentSuccess, int currentFailed) {
public void updateStats(int currentSuccess, int currentFailed, int currentWarnings) {
getUpdatedStats(stats, currentSuccess, currentFailed, currentWarnings);
}

private boolean isEntityNotFoundError(EntityError error) {
if (error == null || error.getMessage() == null) {
return false;
}
String message = error.getMessage().toLowerCase();
return message.contains("not found")
|| message.contains("instance for")
|| message.contains("does not exist")
|| message.contains("entitynotfoundexception");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static org.openmetadata.schema.system.IndexingError.ErrorSource.READER;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.partitionErrors;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -11,6 +12,7 @@
import lombok.extern.slf4j.Slf4j;
import org.glassfish.jersey.internal.util.ExceptionUtils;
import org.openmetadata.schema.EntityTimeSeriesInterface;
import org.openmetadata.schema.system.EntityError;
import org.openmetadata.schema.system.IndexingError;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.schema.type.Include;
Expand Down Expand Up @@ -117,9 +119,14 @@ public ResultList<? extends EntityTimeSeriesInterface> readWithCursor(String cur
} else {
result = repository.listWithOffset(currentCursor, filter, batchSize, true);
}
int warningsCount = filterStaleRelationshipErrors(result);
LOG.debug(
"[PaginatedEntitiesSource] Batch Stats :- %n Submitted : {} Success: {} Failed: {}",
batchSize, result.getData().size(), result.getErrors().size());
"[PaginatedEntityTimeSeriesSource] Batch Stats :- Submitted: {} Success: {} Failed: {} Warnings: {}",
batchSize,
result.getData().size(),
result.getErrors().size(),
Comment on lines 126 to +130
warningsCount);
updateStats(result.getData().size(), result.getErrors().size(), warningsCount);
} catch (Exception e) {
IndexingError indexingError =
new IndexingError()
Expand Down Expand Up @@ -149,18 +156,29 @@ private ResultList<? extends EntityTimeSeriesInterface> read(String cursor)
result = repository.listWithOffset(cursor, filter, batchSize, true);
}

int warningsCount = filterStaleRelationshipErrors(result);

if (!result.getErrors().isEmpty()) {
LOG.warn(
"[PaginatedEntityTimeSeriesSource] Real errors found: {}", result.getErrors().size());
result.getErrors().forEach(error -> LOG.warn("Error: {}", error.getMessage()));
Comment thread
mohityadav766 marked this conversation as resolved.
Outdated
lastFailedCursor = this.cursor.get();
if (result.getPaging().getAfter() == null) {
this.cursor.set(null);
isDone.set(true);
} else {
this.cursor.set(result.getPaging().getAfter());
}
updateStats(result.getData().size(), result.getErrors().size(), warningsCount);
return result;
}
LOG.debug(
"[PaginatedEntitiesSource] Batch Stats :- %n Submitted : {} Success: {} Failed: {}",
batchSize, result.getData().size(), result.getErrors().size());
"[PaginatedEntityTimeSeriesSource] Batch Stats :- Submitted: {} Success: {} Failed: {} Warnings: {}",
batchSize,
result.getData().size(),
result.getErrors().size(),
warningsCount);
updateStats(result.getData().size(), 0, warningsCount);
} catch (Exception e) {
lastFailedCursor = this.cursor.get();
int remainingRecords =
Expand Down Expand Up @@ -214,11 +232,15 @@ public ResultList<? extends EntityTimeSeriesInterface> readNextKeyset(String key
cachedTotal,
true);

int warningsCount = filterStaleRelationshipErrors(result);
int failedCount = result.getErrors() != null ? result.getErrors().size() : 0;
LOG.debug(
"[PaginatedEntityTimeSeriesSource] Keyset batch stats — Submitted: {} Success: {} Failed: {}",
"[PaginatedEntityTimeSeriesSource] Keyset batch stats — Submitted: {} Success: {} Failed: {} Warnings: {}",
batchSize,
result.getData().size(),
result.getErrors() != null ? result.getErrors().size() : 0);
failedCount,
warningsCount);
updateStats(result.getData().size(), failedCount, warningsCount);
return result;
} catch (Exception e) {
LOG.error(
Expand Down Expand Up @@ -269,6 +291,39 @@ public void updateStats(int currentSuccess, int currentFailed) {
getUpdatedStats(stats, currentSuccess, currentFailed);
}

public void updateStats(int currentSuccess, int currentFailed, int currentWarnings) {
getUpdatedStats(stats, currentSuccess, currentFailed, currentWarnings);
}

/**
* Splits the errors on {@code result} into real failures and stale-relationship warnings,
* mutating {@code result} so its errors list contains only real failures and its warnings count
* reflects the skipped stale relationships. Returns the warnings count for callers that want to
* include it in their own logging or stats updates.
*
* <p>Stale relationships happen for time-series records (testCaseResolutionStatus,
* testCaseResult, ...) whose parent entity was hard-deleted out-of-band, or whose parentOf
* entity_relationship row was lost during a past migration. Such records cannot be indexed but
* should not fail the entire batch.
*/
private int filterStaleRelationshipErrors(
ResultList<? extends EntityTimeSeriesInterface> result) {
if (result == null || result.getErrors() == null || result.getErrors().isEmpty()) {
return 0;
}
List<EntityError> warnings = new ArrayList<>();
List<EntityError> realErrors = partitionErrors(result.getErrors(), warnings);
if (!warnings.isEmpty()) {
LOG.debug(
"[PaginatedEntityTimeSeriesSource] {} stale-relationship warnings for entity type {}",
warnings.size(),
entityType);
}
result.setErrors(realErrors);
result.setWarningsCount(warnings.size());
return warnings.size();
}

public ListFilter getFilter() {
ListFilter filter = new ListFilter(Include.ALL);
if (ReindexingUtil.isDataInsightIndex(entityType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,52 @@ public static void getUpdatedStats(
(stats.getWarningRecords() != null ? stats.getWarningRecords() : 0) + currentWarnings);
}

/**
* Returns true when an EntityError represents a stale relationship to a missing entity. These
* are expected during reindexing — for example when a time-series record
* (testCaseResolutionStatus, testCaseResult) was migrated without a corresponding
* entity_relationship row, or when an entity was hard-deleted out-of-band leaving its
* relationship rows behind. Such records cannot be meaningfully indexed and are reported as
* warnings rather than failing the entire batch.
*
* <p>The patterns match the canonical message shapes from {@code CatalogExceptionMessage} and
* {@code EntityNotFoundException} — bare {@code "not found"} is intentionally NOT matched
* because it would misclassify unrelated errors like {@code "Column 'foo' not found in result
* set"} or {@code "SSL certificate not found"}.
*/
public static boolean isEntityNotFoundError(EntityError error) {
if (error == null || error.getMessage() == null) {
return false;
}
String message = error.getMessage().toLowerCase();
return message.contains("instance for")
Comment thread
mohityadav766 marked this conversation as resolved.
Outdated
|| message.contains("entity not found")
|| message.contains("does not exist")
|| message.contains("entitynotfoundexception")
Comment thread
mohityadav766 marked this conversation as resolved.
Outdated
|| message.contains("expected relationship");
}
Comment thread
mohityadav766 marked this conversation as resolved.
Outdated
Comment thread
mohityadav766 marked this conversation as resolved.

/**
* Splits {@code errors} into stale-relationship warnings (appended to {@code warningsOut}) and
* real failures (returned). Both lists must be mutable; {@code warningsOut} must be non-null.
*/
public static List<EntityError> partitionErrors(
List<EntityError> errors, List<EntityError> warningsOut) {
java.util.Objects.requireNonNull(warningsOut, "warningsOut must not be null");
if (CommonUtil.nullOrEmpty(errors)) {
return new ArrayList<>();
}
List<EntityError> realErrors = new ArrayList<>(errors.size());
for (EntityError error : errors) {
if (isEntityNotFoundError(error)) {
warningsOut.add(error);
} else {
Comment on lines +105 to +115
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitionErrors assumes warningsOut is non-null and mutable; passing null (or an unmodifiable list) will throw at warningsOut.add(error). Since this is a public utility, consider either validating warningsOut (e.g., initialize when null / throw an explicit IllegalArgumentException) or documenting/annotating it as non-null to avoid surprising NPEs at runtime.

Copilot uses AI. Check for mistakes.
realErrors.add(error);
}
}
return realErrors;
}

public static boolean isDataInsightIndex(String entityType) {
return Entity.getSearchRepository().getDataInsightReports().contains(entityType);
}
Expand Down
Loading
Loading