Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
50f1406
fix(search): treat stale parentOf as warning during time-series reind…
mohityadav766 Apr 28, 2026
91bbdc1
review: tighten matcher, propagate warnings on cursor/keyset paths
mohityadav766 Apr 28, 2026
007670f
Show warn for failing test case resolution
mohityadav766 May 4, 2026
23578b0
Always recreate
mohityadav766 May 4, 2026
82d5aa4
Merge remote-tracking branch 'origin/main' into fix-testcaseresolutio…
mohityadav766 May 4, 2026
856fd2d
review: rename matcher + record reader failures before early-return
mohityadav766 May 4, 2026
38b4ec9
Merge branch 'main' into fix-testcaseresolutionstatus
mohityadav766 May 4, 2026
bd34de1
fix: import java.util.Objects instead of fully qualified name in Rein…
Copilot May 4, 2026
fcfbb39
Fill End time
mohityadav766 May 4, 2026
9a2d801
Not * reads
mohityadav766 May 4, 2026
6ec614e
Fix Fields issue
mohityadav766 May 4, 2026
d67704d
Add tags back
mohityadav766 May 4, 2026
bfb575a
Merge branch 'main' into fix-testcaseresolutionstatus
mohityadav766 May 4, 2026
a53abfc
Fix failing test
mohityadav766 May 4, 2026
fb60d37
Revert "Always recreate"
mohityadav766 May 4, 2026
b4cc517
Merge branch 'main' into fix-testcaseresolutionstatus
mohityadav766 May 4, 2026
0601641
review: cover all EntityNotFoundException formats; quiet null-id WARN
mohityadav766 May 4, 2026
eac76b6
review: NPE guard, locale-stable matcher, less log spam, accurate Jav…
mohityadav766 May 4, 2026
4e5b10d
Merge branch 'main' into fix-testcaseresolutionstatus
mohityadav766 May 5, 2026
e16d327
Relaign Changes
mohityadav766 May 5, 2026
b12c923
Merge branch 'main' into fix-testcaseresolutionstatus
mohityadav766 May 5, 2026
861aa2d
Add Exception handling
mohityadav766 May 5, 2026
9bfe79b
fix(reindex): degrade to required-set, not '*', when filter step throws
mohityadav766 May 5, 2026
75f704a
Add Only Supported field at other call sites
mohityadav766 May 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,11 @@ public static Fields getFields(String entityType, List<String> fields) {
return entityRepository.getFields(String.join(",", fields));
}

public static Fields getOnlySupportedFields(String entityType, List<String> fields) {
EntityRepository<?> entityRepository = Entity.getEntityRepository(entityType);
return entityRepository.getOnlySupportedFields(String.join(",", fields));
}

public static <T> T getEntity(EntityReference ref, String fields, Include include) {
if (ref == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.openmetadata.service.Entity.QUERY_COST_RECORD;
import static org.openmetadata.service.Entity.TEST_CASE_RESOLUTION_STATUS;
import static org.openmetadata.service.Entity.TEST_CASE_RESULT;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getSearchIndexFields;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -322,21 +323,6 @@ static boolean isTransientError(SearchIndexException e) {
|| lower.contains("sockettimeoutexception");
}

static List<String> getSearchIndexFields(String entityType) {
if (TIME_SERIES_ENTITIES.contains(entityType)) {
return List.of();
}
org.openmetadata.service.search.SearchRepository repo =
org.openmetadata.service.Entity.getSearchRepository();
if (repo == null || repo.getSearchIndexFactory() == null) {
// Fallback for environments where the search subsystem isn't bootstrapped (e.g. unit
// tests that exercise the reader without the full Entity registry). Behaves the same
// as the pre-selective-fields code path.
return List.of("*");
}
return new ArrayList<>(repo.getSearchIndexFactory().getReindexFieldsFor(entityType));
}

static int calculateNumberOfReaders(int totalEntityRecords, int batchSize) {
if (batchSize <= 0) return 1;
return (totalEntityRecords + batchSize - 1) / batchSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ public static ReindexingConfiguration from(EventPublisherJob jobData) {
DEFAULT_FIELD_FETCH_THREADS,
DEFAULT_DOC_BUILD_THREADS,
DEFAULT_STATS_INTERVAL_MS,
Boolean.TRUE.equals(jobData.getRecreateIndex()),
// Always recreate
true,
Comment thread
gitar-bot[bot] marked this conversation as resolved.
Outdated
Comment thread
mohityadav766 marked this conversation as resolved.
Outdated
Comment thread
mohityadav766 marked this conversation as resolved.
Outdated
Boolean.TRUE.equals(jobData.getAutoTune()),
Comment thread
mohityadav766 marked this conversation as resolved.
Boolean.TRUE.equals(jobData.getUseDistributedIndexing()),
Boolean.TRUE.equals(jobData.getForce()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.bundles.searchIndex.listeners.LoggingProgressListener;
import org.openmetadata.service.apps.bundles.searchIndex.listeners.SlackProgressListener;
import org.openmetadata.service.apps.scheduler.OmAppJobListener;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.SystemRepository;
import org.openmetadata.service.search.SearchRepository;
Expand Down Expand Up @@ -109,7 +110,7 @@ public void stop() {

AppRunRecord appRecord = context.getJobRecord();
appRecord.setStatus(AppRunRecord.Status.STOPPED);
appRecord.setEndTime(System.currentTimeMillis());
OmAppJobListener.fillTerminalTimings(appRecord);
context.storeRunRecord(JsonUtils.pojoToJson(appRecord));
context.pushStatusUpdate(appRecord, true);
sendUpdates();
Expand Down Expand Up @@ -368,6 +369,7 @@ private void finalizeJobExecution() {
if (stopped) {
AppRunRecord appRecord = context.getJobRecord();
appRecord.setStatus(AppRunRecord.Status.STOPPED);
OmAppJobListener.fillTerminalTimings(appRecord);
context.storeRunRecord(JsonUtils.pojoToJson(appRecord));
}
}
Expand All @@ -383,6 +385,7 @@ private void sendUpdates() {
private void updateRecordToDbAndNotify() {
AppRunRecord appRecord = context.getJobRecord();
appRecord.setStatus(AppRunRecord.Status.fromValue(jobData.getStatus().value()));
OmAppJobListener.fillTerminalTimings(appRecord);

if (jobData.getFailure() != null) {
appRecord.setFailureContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.openmetadata.service.apps.bundles.searchIndex.BulkSink;
import org.openmetadata.service.apps.bundles.searchIndex.ReindexingJobContext;
import org.openmetadata.service.apps.bundles.searchIndex.ReindexingProgressListener;
import org.openmetadata.service.apps.scheduler.OmAppJobListener;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.socket.WebSocketManager;

Expand Down Expand Up @@ -482,6 +483,7 @@ private AppRunRecord convertToAppRunRecord(
appRecord.setStartTime(appStartTime != null ? appStartTime : job.getStartedAt());
appRecord.setEndTime(job.getCompletedAt());
appRecord.setTimestamp(job.getUpdatedAt());
OmAppJobListener.fillTerminalTimings(appRecord);

// Add stats as success context
SuccessContext successContext = new SuccessContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.openmetadata.service.util.RestUtil;
import org.openmetadata.service.workflows.searchIndex.PaginatedEntitiesSource;
import org.openmetadata.service.workflows.searchIndex.PaginatedEntityTimeSeriesSource;
import org.openmetadata.service.workflows.searchIndex.ReindexingUtil;

/**
* Worker that processes a single partition of entities for search indexing.
Expand Down Expand Up @@ -491,15 +492,16 @@ private BatchResult processBatch(
ResultList<?> resultList = readEntitiesKeyset(entityType, keysetCursor, batchSize);
long readDurationNanos = System.nanoTime() - readStartNanos;

if (resultList == null || resultList.getData() == null || resultList.getData().isEmpty()) {
LOG.debug("{} read={}ms returned empty", entityType, readDurationNanos / 1_000_000L);
return new BatchResult(0, 0, 0, null);
}

String nextCursor = resultList.getPaging() != null ? resultList.getPaging().getAfter() : null;
int readSuccessCount = listOrEmpty(resultList.getData()).size();
int readErrorCount = listOrEmpty(resultList.getErrors()).size();
int warningsCount = resultList.getWarningsCount() != null ? resultList.getWarningsCount() : 0;
int readSuccessCount = resultList != null ? listOrEmpty(resultList.getData()).size() : 0;
int readErrorCount = resultList != null ? listOrEmpty(resultList.getErrors()).size() : 0;
int warningsCount =
(resultList != null && resultList.getWarningsCount() != null)
? resultList.getWarningsCount()
: 0;
String nextCursor =
(resultList != null && resultList.getPaging() != null)
? resultList.getPaging().getAfter()
: null;

if (statsTracker != null) {
// Reader timing = wall-clock time of the keyset DB read (listAfter + setFieldsInBulk
Expand All @@ -508,28 +510,16 @@ private BatchResult processBatch(
readSuccessCount, readErrorCount, warningsCount, readDurationNanos);
}

if (failureRecorder != null && readErrorCount > 0) {
for (EntityError entityError : listOrEmpty(resultList.getErrors())) {
Object rawEntity = entityError.getEntity();
String entityId = null;
if (rawEntity instanceof EntityInterface) {
UUID id = ((EntityInterface) rawEntity).getId();
if (id != null) {
entityId = id.toString();
}
} else if (rawEntity != null) {
entityId = rawEntity.toString();
}
if (entityId == null) {
LOG.warn(
"Skipping reader failure record for entityType={}: entityId is null, message={}",
entityType,
entityError.getMessage());
continue;
}
failureRecorder.recordReaderEntityFailure(
entityType, entityId, null, entityError.getMessage());
}
recordReaderFailures(entityType, resultList, readErrorCount);

if (readSuccessCount == 0) {
LOG.debug(
"{} read={}ms returned no indexable rows (warnings={}, errors={})",
entityType,
readDurationNanos / 1_000_000L,
warningsCount,
readErrorCount);
Comment thread
mohityadav766 marked this conversation as resolved.
return new BatchResult(0, readErrorCount, warningsCount, nextCursor);
}
Comment thread
mohityadav766 marked this conversation as resolved.

Map<String, Object> contextData = createContextData(entityType, statsTracker);
Expand Down Expand Up @@ -557,6 +547,40 @@ private BatchResult processBatch(
}
}

/**
* Persist per-entity reader failures so that downstream tooling (e.g. the failures dashboard)
* can show which specific records the reader could not hydrate. Runs whether or not the batch
* has any successful rows — losing failure diagnostics for "all-error" batches would defeat
* the point of the recorder.
*/
private void recordReaderFailures(
String entityType, ResultList<?> resultList, int readErrorCount) {
if (failureRecorder == null || readErrorCount == 0 || resultList == null) {
return;
}
for (EntityError entityError : listOrEmpty(resultList.getErrors())) {
Object rawEntity = entityError.getEntity();
String entityId = null;
if (rawEntity instanceof EntityInterface) {
UUID id = ((EntityInterface) rawEntity).getId();
if (id != null) {
entityId = id.toString();
}
} else if (rawEntity != null) {
entityId = rawEntity.toString();
}
if (entityId == null) {
LOG.warn(
"Skipping reader failure record for entityType={}: entityId is null, message={}",
entityType,
entityError.getMessage());
continue;
Comment thread
mohityadav766 marked this conversation as resolved.
Outdated
}
failureRecorder.recordReaderEntityFailure(
entityType, entityId, null, entityError.getMessage());
}
}

/**
* Read entities from the database.
*
Expand All @@ -568,7 +592,7 @@ private BatchResult processBatch(
private ResultList<?> readEntitiesKeyset(String entityType, String keysetCursor, int limit)
throws SearchIndexException {

List<String> fields = TIME_SERIES_ENTITIES.contains(entityType) ? List.of() : List.of("*");
List<String> fields = ReindexingUtil.getSearchIndexFields(entityType);

if (!TIME_SERIES_ENTITIES.contains(entityType)) {
PaginatedEntitiesSource source = new PaginatedEntitiesSource(entityType, limit, fields, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.openmetadata.service.apps.bundles.searchIndex.ReindexingJobContext;
import org.openmetadata.service.apps.bundles.searchIndex.ReindexingProgressListener;
import org.openmetadata.service.apps.bundles.searchIndex.distributed.DistributedJobContext;
import org.openmetadata.service.apps.scheduler.OmAppJobListener;
import org.openmetadata.service.socket.WebSocketManager;
import org.quartz.JobExecutionContext;

Expand Down Expand Up @@ -228,6 +229,7 @@ private void broadcastViaWebSocket(AppRunRecord appRecord) {
private AppRunRecord getUpdatedAppRunRecord() {
AppRunRecord appRecord = readExistingRecord();
appRecord.setStatus(AppRunRecord.Status.fromValue(jobData.getStatus().value()));
OmAppJobListener.fillTerminalTimings(appRecord);

if (jobData.getStats() != null) {
SuccessContext ctx = appRecord.getSuccessContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ private void updateAndBroadcastStoppedStatus(JobExecutionContext context) {
if (runRecord != null) {
// Update status to STOPPED
runRecord.withStatus(AppRunRecord.Status.STOPPED);
runRecord.withEndTime(System.currentTimeMillis());
OmAppJobListener.fillTerminalTimings(runRecord);

// Get WebSocket channel name
String webSocketChannelName =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,37 @@ protected OmAppJobListener() {
this.repository = new AppRepository();
}

/**
* Populate {@code endTime} and {@code executionTime} on a terminal-state run record. Idempotent
* — does nothing if {@code endTime} is already set or if the status is non-terminal — so it is
* safe to call from progress listeners that may persist before {@link #jobWasExecuted} runs.
Comment thread
mohityadav766 marked this conversation as resolved.
Outdated
*
* <p>Without this, mid-flight writes by progress listeners (e.g. {@code QuartzProgressListener}
* firing {@code onJobFailed}) persist a terminal status to the DB without timings; if the job
* dies before {@code jobWasExecuted} fires, polling consumers see status=FAILED with no
* endTime/executionTime.
*/
public static void fillTerminalTimings(AppRunRecord record) {
if (record == null || record.getStatus() == null || !isTerminalStatus(record.getStatus())) {
return;
}
if (record.getEndTime() == null) {
record.withEndTime(System.currentTimeMillis());
}
if (record.getExecutionTime() == null
&& record.getStartTime() != null
&& record.getEndTime() != null) {
record.setExecutionTime(record.getEndTime() - record.getStartTime());
}
}

private static boolean isTerminalStatus(AppRunRecord.Status status) {
return switch (status) {
case SUCCESS, FAILED, ACTIVE_ERROR, STOPPED, COMPLETED -> true;
default -> false;
};
}

@Override
public String getName() {
return JOB_LISTENER_NAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6580,6 +6580,13 @@ public final Fields getFields(String fields) {
return new Fields(allowedFields, fields);
}

public final Fields getOnlySupportedFields(String fields) {
if ("*".equals(fields)) {
return new Fields(allowedFields, String.join(",", allowedFields), true);
}
return new Fields(allowedFields, fields, true);
Comment thread
mohityadav766 marked this conversation as resolved.
}

protected final Fields getFields(Set<String> fields) {
return new Fields(allowedFields, fields);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public interface SearchIndex {
"followers",
"votes",
"extension",
"tags",
Comment thread
mohityadav766 marked this conversation as resolved.
"certification",
"dataProducts");
Comment thread
mohityadav766 marked this conversation as resolved.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,39 @@ public Fields(Set<String> allowedFields, String fieldsParam) {
}
}

public Fields(Set<String> allowedFields, String fieldsParam, boolean ignoreExtra) {
if (nullOrEmpty(fieldsParam)) {
this.fieldList = new HashSet<>();
return;
}

Set<String> parsedFields = parseFields(fieldsParam);
this.fieldList = validateFields(parsedFields, allowedFields, ignoreExtra);
}

private Set<String> validateFields(
Set<String> inputFields, Set<String> allowedFields, boolean ignoreExtra) {

Set<String> result = new HashSet<>();

for (String field : inputFields) {
if (allowedFields.contains(field)) {
result.add(field);
} else if (!ignoreExtra) {
throw new IllegalArgumentException(CatalogExceptionMessage.invalidField(field));
}
}

return result;
}

private Set<String> parseFields(String fieldsParam) {
return Arrays.stream(fieldsParam.split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toSet());
}

public Fields(Set<String> allowedFields, Set<String> fieldsParam) {
if (nullOrEmpty(fieldsParam)) {
fieldList = new HashSet<>();
Expand Down
Loading
Loading