Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
50f1406
fix(search): treat stale parentOf as warning during time-series reind…
mohityadav766 Apr 28, 2026
91bbdc1
review: tighten matcher, propagate warnings on cursor/keyset paths
mohityadav766 Apr 28, 2026
007670f
Show warn for failing test case resolution
mohityadav766 May 4, 2026
23578b0
Always recreate
mohityadav766 May 4, 2026
82d5aa4
Merge remote-tracking branch 'origin/main' into fix-testcaseresolutio…
mohityadav766 May 4, 2026
856fd2d
review: rename matcher + record reader failures before early-return
mohityadav766 May 4, 2026
38b4ec9
Merge branch 'main' into fix-testcaseresolutionstatus
mohityadav766 May 4, 2026
bd34de1
fix: import java.util.Objects instead of fully qualified name in Rein…
Copilot May 4, 2026
fcfbb39
Fill End time
mohityadav766 May 4, 2026
9a2d801
Not * reads
mohityadav766 May 4, 2026
6ec614e
Fix Fields issue
mohityadav766 May 4, 2026
d67704d
Add tags back
mohityadav766 May 4, 2026
bfb575a
Merge branch 'main' into fix-testcaseresolutionstatus
mohityadav766 May 4, 2026
a53abfc
Fix failing test
mohityadav766 May 4, 2026
fb60d37
Revert "Always recreate"
mohityadav766 May 4, 2026
b4cc517
Merge branch 'main' into fix-testcaseresolutionstatus
mohityadav766 May 4, 2026
0601641
review: cover all EntityNotFoundException formats; quiet null-id WARN
mohityadav766 May 4, 2026
eac76b6
review: NPE guard, locale-stable matcher, less log spam, accurate Jav…
mohityadav766 May 4, 2026
4e5b10d
Merge branch 'main' into fix-testcaseresolutionstatus
mohityadav766 May 5, 2026
e16d327
Relaign Changes
mohityadav766 May 5, 2026
b12c923
Merge branch 'main' into fix-testcaseresolutionstatus
mohityadav766 May 5, 2026
861aa2d
Add Exception handling
mohityadav766 May 5, 2026
9bfe79b
fix(reindex): degrade to required-set, not '*', when filter step throws
mohityadav766 May 5, 2026
75f704a
Add Only Supported field at other call sites
mohityadav766 May 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,11 @@ public static Fields getFields(String entityType, List<String> fields) {
return entityRepository.getFields(String.join(",", fields));
}

public static Fields getOnlySupportedFields(String entityType, List<String> fields) {
EntityRepository<?> entityRepository = Entity.getEntityRepository(entityType);
return entityRepository.getOnlySupportedFields(String.join(",", fields));
}

public static <T> T getEntity(EntityReference ref, String fields, Include include) {
if (ref == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.openmetadata.service.Entity.QUERY_COST_RECORD;
import static org.openmetadata.service.Entity.TEST_CASE_RESOLUTION_STATUS;
import static org.openmetadata.service.Entity.TEST_CASE_RESULT;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getSearchIndexFields;

import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.bundles.searchIndex.listeners.LoggingProgressListener;
import org.openmetadata.service.apps.bundles.searchIndex.listeners.SlackProgressListener;
import org.openmetadata.service.apps.scheduler.OmAppJobListener;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.SystemRepository;
import org.openmetadata.service.search.SearchRepository;
Expand Down Expand Up @@ -109,7 +110,7 @@ public void stop() {

AppRunRecord appRecord = context.getJobRecord();
appRecord.setStatus(AppRunRecord.Status.STOPPED);
appRecord.setEndTime(System.currentTimeMillis());
OmAppJobListener.fillTerminalTimings(appRecord);
context.storeRunRecord(JsonUtils.pojoToJson(appRecord));
context.pushStatusUpdate(appRecord, true);
sendUpdates();
Expand Down Expand Up @@ -368,6 +369,7 @@ private void finalizeJobExecution() {
if (stopped) {
AppRunRecord appRecord = context.getJobRecord();
appRecord.setStatus(AppRunRecord.Status.STOPPED);
OmAppJobListener.fillTerminalTimings(appRecord);
context.storeRunRecord(JsonUtils.pojoToJson(appRecord));
}
}
Expand All @@ -383,6 +385,7 @@ private void sendUpdates() {
private void updateRecordToDbAndNotify() {
AppRunRecord appRecord = context.getJobRecord();
appRecord.setStatus(AppRunRecord.Status.fromValue(jobData.getStatus().value()));
OmAppJobListener.fillTerminalTimings(appRecord);

if (jobData.getFailure() != null) {
appRecord.setFailureContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.openmetadata.service.apps.bundles.searchIndex.BulkSink;
import org.openmetadata.service.apps.bundles.searchIndex.ReindexingJobContext;
import org.openmetadata.service.apps.bundles.searchIndex.ReindexingProgressListener;
import org.openmetadata.service.apps.scheduler.OmAppJobListener;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.socket.WebSocketManager;

Expand Down Expand Up @@ -562,6 +563,7 @@ private AppRunRecord convertToAppRunRecord(
appRecord.setStartTime(appStartTime != null ? appStartTime : job.getStartedAt());
appRecord.setEndTime(job.getCompletedAt());
appRecord.setTimestamp(job.getUpdatedAt());
OmAppJobListener.fillTerminalTimings(appRecord);

// Add stats as success context
SuccessContext successContext = new SuccessContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,15 +506,16 @@ private BatchResult processBatch(
ResultList<?> resultList = readEntitiesKeyset(entityType, keysetCursor, batchSize);
long readDurationNanos = System.nanoTime() - readStartNanos;

if (resultList == null || resultList.getData() == null || resultList.getData().isEmpty()) {
LOG.debug("{} read={}ms returned empty", entityType, readDurationNanos / 1_000_000L);
return new BatchResult(0, 0, 0, null);
}

String nextCursor = resultList.getPaging() != null ? resultList.getPaging().getAfter() : null;
int readSuccessCount = listOrEmpty(resultList.getData()).size();
int readErrorCount = listOrEmpty(resultList.getErrors()).size();
int warningsCount = resultList.getWarningsCount() != null ? resultList.getWarningsCount() : 0;
int readSuccessCount = resultList != null ? listOrEmpty(resultList.getData()).size() : 0;
int readErrorCount = resultList != null ? listOrEmpty(resultList.getErrors()).size() : 0;
int warningsCount =
(resultList != null && resultList.getWarningsCount() != null)
? resultList.getWarningsCount()
: 0;
String nextCursor =
(resultList != null && resultList.getPaging() != null)
? resultList.getPaging().getAfter()
: null;

if (statsTracker != null) {
// Reader timing = wall-clock time of the keyset DB read (listAfter + setFieldsInBulk
Expand All @@ -523,28 +524,16 @@ private BatchResult processBatch(
readSuccessCount, readErrorCount, warningsCount, readDurationNanos);
}

if (failureRecorder != null && readErrorCount > 0) {
for (EntityError entityError : listOrEmpty(resultList.getErrors())) {
Object rawEntity = entityError.getEntity();
String entityId = null;
if (rawEntity instanceof EntityInterface) {
UUID id = ((EntityInterface) rawEntity).getId();
if (id != null) {
entityId = id.toString();
}
} else if (rawEntity != null) {
entityId = rawEntity.toString();
}
if (entityId == null) {
LOG.warn(
"Skipping reader failure record for entityType={}: entityId is null, message={}",
entityType,
entityError.getMessage());
continue;
}
failureRecorder.recordReaderEntityFailure(
entityType, entityId, null, entityError.getMessage());
}
recordReaderFailures(entityType, resultList, readErrorCount);

if (readSuccessCount == 0) {
LOG.debug(
"{} read={}ms returned no indexable rows (warnings={}, errors={})",
entityType,
readDurationNanos / 1_000_000L,
warningsCount,
readErrorCount);
Comment thread
mohityadav766 marked this conversation as resolved.
return new BatchResult(0, readErrorCount, warningsCount, nextCursor);
}
Comment thread
mohityadav766 marked this conversation as resolved.

Map<String, Object> contextData = createContextData(entityType, statsTracker);
Expand Down Expand Up @@ -572,6 +561,44 @@ private BatchResult processBatch(
}
}

/**
* Persist per-entity reader failures so that downstream tooling (e.g. the failures dashboard)
* can show which specific records the reader could not hydrate. Runs whether or not the batch
* has any successful rows — losing failure diagnostics for "all-error" batches would defeat
* the point of the recorder.
*/
private void recordReaderFailures(
String entityType, ResultList<?> resultList, int readErrorCount) {
if (failureRecorder == null || readErrorCount == 0 || resultList == null) {
return;
}
for (EntityError entityError : listOrEmpty(resultList.getErrors())) {
Object rawEntity = entityError.getEntity();
String entityId = null;
if (rawEntity instanceof EntityInterface) {
UUID id = ((EntityInterface) rawEntity).getId();
if (id != null) {
entityId = id.toString();
}
} else if (rawEntity != null) {
entityId = rawEntity.toString();
}
if (entityId == null) {
// Time-series readers (EntityTimeSeriesRepository) build EntityError without an id —
// they only have access to the JSON row, not the entity reference. Per-entity recording
// requires an id, so log at DEBUG (not WARN) to avoid spamming logs for every error in
// large time-series batches.
LOG.debug(
"No entityId on reader failure for entityType={} — skipping per-entity record. message={}",
entityType,
entityError.getMessage());
continue;
}
failureRecorder.recordReaderEntityFailure(
entityType, entityId, null, entityError.getMessage());
}
}

/**
* Read entities from the database.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.openmetadata.service.apps.bundles.searchIndex.ReindexingJobContext;
import org.openmetadata.service.apps.bundles.searchIndex.ReindexingProgressListener;
import org.openmetadata.service.apps.bundles.searchIndex.distributed.DistributedJobContext;
import org.openmetadata.service.apps.scheduler.OmAppJobListener;
import org.openmetadata.service.socket.WebSocketManager;
import org.quartz.JobExecutionContext;

Expand Down Expand Up @@ -228,6 +229,7 @@ private void broadcastViaWebSocket(AppRunRecord appRecord) {
private AppRunRecord getUpdatedAppRunRecord() {
AppRunRecord appRecord = readExistingRecord();
appRecord.setStatus(AppRunRecord.Status.fromValue(jobData.getStatus().value()));
OmAppJobListener.fillTerminalTimings(appRecord);

if (jobData.getStats() != null) {
SuccessContext ctx = appRecord.getSuccessContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ private void updateAndBroadcastStoppedStatus(JobExecutionContext context) {
if (runRecord != null) {
// Update status to STOPPED
runRecord.withStatus(AppRunRecord.Status.STOPPED);
runRecord.withEndTime(System.currentTimeMillis());
OmAppJobListener.fillTerminalTimings(runRecord);

// Get WebSocket channel name
String webSocketChannelName =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,45 @@ protected OmAppJobListener() {
this.repository = new AppRepository();
}

/**
* Populate {@code endTime} and {@code executionTime} on a terminal-state run record. Each field
* is filled independently and only if currently null:
*
* <ul>
* <li>{@code endTime} defaults to {@code System.currentTimeMillis()} if absent.
* <li>{@code executionTime} is computed from {@code endTime - startTime} if absent and both
* endpoints are available — this means callers that pre-populated {@code endTime} (e.g.
* from {@code job.getCompletedAt()}) still get an accurate {@code executionTime}.
* </ul>
*
* <p>The method is a no-op for non-terminal statuses, so it is safe to call from progress
* listeners that may persist before {@link #jobWasExecuted} runs. Without this, mid-flight
* writes by progress listeners (e.g. {@code QuartzProgressListener} firing {@code onJobFailed})
* would persist a terminal status to the DB without timings; if the job dies before {@code
* jobWasExecuted} fires, polling consumers would see {@code status=FAILED} with no
* {@code endTime} / {@code executionTime}.
*/
public static void fillTerminalTimings(AppRunRecord record) {
if (record == null || record.getStatus() == null || !isTerminalStatus(record.getStatus())) {
return;
}
if (record.getEndTime() == null) {
record.withEndTime(System.currentTimeMillis());
}
if (record.getExecutionTime() == null
&& record.getStartTime() != null
&& record.getEndTime() != null) {
record.setExecutionTime(record.getEndTime() - record.getStartTime());
}
}

private static boolean isTerminalStatus(AppRunRecord.Status status) {
return switch (status) {
case SUCCESS, FAILED, ACTIVE_ERROR, STOPPED, COMPLETED -> true;
default -> false;
};
}

@Override
public String getName() {
return JOB_LISTENER_NAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6607,6 +6607,13 @@ public final Fields getFields(String fields) {
return new Fields(allowedFields, fields);
}

public final Fields getOnlySupportedFields(String fields) {
if ("*".equals(fields)) {
return new Fields(allowedFields, String.join(",", allowedFields), true);
}
return new Fields(allowedFields, fields, true);
Comment thread
mohityadav766 marked this conversation as resolved.
}

protected final Fields getFields(Set<String> fields) {
return new Fields(allowedFields, fields);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public interface SearchIndex {
"followers",
"votes",
"extension",
"tags",
Comment thread
mohityadav766 marked this conversation as resolved.
"certification",
"dataProducts");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,39 @@ public Fields(Set<String> allowedFields, String fieldsParam) {
}
}

public Fields(Set<String> allowedFields, String fieldsParam, boolean ignoreExtra) {
if (nullOrEmpty(fieldsParam)) {
this.fieldList = new HashSet<>();
return;
}

Set<String> parsedFields = parseFields(fieldsParam);
this.fieldList = validateFields(parsedFields, allowedFields, ignoreExtra);
}

private Set<String> validateFields(
Set<String> inputFields, Set<String> allowedFields, boolean ignoreExtra) {

Set<String> result = new HashSet<>();

for (String field : inputFields) {
if (allowedFields.contains(field)) {
result.add(field);
} else if (!ignoreExtra) {
throw new IllegalArgumentException(CatalogExceptionMessage.invalidField(field));
}
}

return result;
}

private Set<String> parseFields(String fieldsParam) {
return Arrays.stream(fieldsParam.split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toSet());
}

public Fields(Set<String> allowedFields, Set<String> fieldsParam) {
if (nullOrEmpty(fieldsParam)) {
fieldList = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static org.openmetadata.schema.system.IndexingError.ErrorSource.READER;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isStaleReferenceError;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -140,7 +141,7 @@ private ResultList<? extends EntityInterface> read(String cursor) throws SearchI
batchSize,
cursor,
true,
Entity.getFields(entityType, fields),
Entity.getOnlySupportedFields(entityType, fields),
null);

// Filter out EntityNotFoundExceptions from errors - these are expected when relationships
Expand All @@ -150,7 +151,7 @@ private ResultList<? extends EntityInterface> read(String cursor) throws SearchI
List<EntityError> warningErrors = new ArrayList<>();

for (EntityError error : result.getErrors()) {
if (isEntityNotFoundError(error)) {
if (isStaleReferenceError(error)) {
warningErrors.add(error);
} else {
realErrors.add(error);
Expand Down Expand Up @@ -240,7 +241,7 @@ public ResultList<? extends EntityInterface> readWithCursor(String currentCursor
batchSize,
currentCursor,
true,
Entity.getFields(entityType, fields),
Entity.getOnlySupportedFields(entityType, fields),
null);

// Filter out EntityNotFoundExceptions from errors - same as in read() method
Expand All @@ -249,7 +250,7 @@ public ResultList<? extends EntityInterface> readWithCursor(String currentCursor
if (!result.getErrors().isEmpty()) {
List<EntityError> realErrors = new ArrayList<>();
for (EntityError error : result.getErrors()) {
if (isEntityNotFoundError(error)) {
if (isStaleReferenceError(error)) {
warningsCount++;
LOG.debug("Skipping entity due to missing relationship: {}", error.getMessage());
} else {
Expand Down Expand Up @@ -298,13 +299,13 @@ public ResultList<? extends EntityInterface> readNextKeyset(String keysetCursor)
keysetCursor,
cachedTotalCount,
true,
Entity.getFields(entityType, fields));
Entity.getOnlySupportedFields(entityType, fields));
Comment thread
mohityadav766 marked this conversation as resolved.

Comment thread
mohityadav766 marked this conversation as resolved.
int warningsCount = 0;
if (result.getErrors() != null && !result.getErrors().isEmpty()) {
List<EntityError> realErrors = new ArrayList<>();
for (EntityError error : result.getErrors()) {
if (isEntityNotFoundError(error)) {
if (isStaleReferenceError(error)) {
warningsCount++;
LOG.debug("Skipping entity due to missing relationship: {}", error.getMessage());
} else {
Expand Down Expand Up @@ -378,15 +379,4 @@ public void updateStats(int currentSuccess, int currentFailed) {
public void updateStats(int currentSuccess, int currentFailed, int currentWarnings) {
getUpdatedStats(stats, currentSuccess, currentFailed, currentWarnings);
}

private boolean isEntityNotFoundError(EntityError error) {
if (error == null || error.getMessage() == null) {
return false;
}
String message = error.getMessage().toLowerCase();
return message.contains("not found")
|| message.contains("instance for")
|| message.contains("does not exist")
|| message.contains("entitynotfoundexception");
}
}
Loading
Loading