Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,11 @@ public static Fields getFields(String entityType, List<String> fields) {
return entityRepository.getFields(String.join(",", fields));
}

public static Fields getOnlySupportedFields(String entityType, List<String> fields) {
EntityRepository<?> entityRepository = Entity.getEntityRepository(entityType);
return entityRepository.getOnlySupportedFields(String.join(",", fields));
}

public static <T> T getEntity(EntityReference ref, String fields, Include include) {
if (ref == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.openmetadata.service.Entity.QUERY_COST_RECORD;
import static org.openmetadata.service.Entity.TEST_CASE_RESOLUTION_STATUS;
import static org.openmetadata.service.Entity.TEST_CASE_RESULT;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getSearchIndexFields;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -322,21 +323,6 @@ static boolean isTransientError(SearchIndexException e) {
|| lower.contains("sockettimeoutexception");
}

static List<String> getSearchIndexFields(String entityType) {
if (TIME_SERIES_ENTITIES.contains(entityType)) {
return List.of();
}
org.openmetadata.service.search.SearchRepository repo =
org.openmetadata.service.Entity.getSearchRepository();
if (repo == null || repo.getSearchIndexFactory() == null) {
// Fallback for environments where the search subsystem isn't bootstrapped (e.g. unit
// tests that exercise the reader without the full Entity registry). Behaves the same
// as the pre-selective-fields code path.
return List.of("*");
}
return new ArrayList<>(repo.getSearchIndexFactory().getReindexFieldsFor(entityType));
}

static int calculateNumberOfReaders(int totalEntityRecords, int batchSize) {
if (batchSize <= 0) return 1;
return (totalEntityRecords + batchSize - 1) / batchSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.bundles.searchIndex.listeners.LoggingProgressListener;
import org.openmetadata.service.apps.bundles.searchIndex.listeners.SlackProgressListener;
import org.openmetadata.service.apps.scheduler.OmAppJobListener;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.SystemRepository;
import org.openmetadata.service.search.SearchRepository;
Expand Down Expand Up @@ -109,7 +110,7 @@ public void stop() {

AppRunRecord appRecord = context.getJobRecord();
appRecord.setStatus(AppRunRecord.Status.STOPPED);
appRecord.setEndTime(System.currentTimeMillis());
OmAppJobListener.fillTerminalTimings(appRecord);
context.storeRunRecord(JsonUtils.pojoToJson(appRecord));
context.pushStatusUpdate(appRecord, true);
sendUpdates();
Expand Down Expand Up @@ -368,6 +369,7 @@ private void finalizeJobExecution() {
if (stopped) {
AppRunRecord appRecord = context.getJobRecord();
appRecord.setStatus(AppRunRecord.Status.STOPPED);
OmAppJobListener.fillTerminalTimings(appRecord);
context.storeRunRecord(JsonUtils.pojoToJson(appRecord));
}
}
Expand All @@ -383,6 +385,7 @@ private void sendUpdates() {
private void updateRecordToDbAndNotify() {
AppRunRecord appRecord = context.getJobRecord();
appRecord.setStatus(AppRunRecord.Status.fromValue(jobData.getStatus().value()));
OmAppJobListener.fillTerminalTimings(appRecord);

if (jobData.getFailure() != null) {
appRecord.setFailureContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.openmetadata.service.apps.bundles.searchIndex.BulkSink;
import org.openmetadata.service.apps.bundles.searchIndex.ReindexingJobContext;
import org.openmetadata.service.apps.bundles.searchIndex.ReindexingProgressListener;
import org.openmetadata.service.apps.scheduler.OmAppJobListener;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.socket.WebSocketManager;

Expand Down Expand Up @@ -482,6 +483,7 @@ private AppRunRecord convertToAppRunRecord(
appRecord.setStartTime(appStartTime != null ? appStartTime : job.getStartedAt());
appRecord.setEndTime(job.getCompletedAt());
appRecord.setTimestamp(job.getUpdatedAt());
OmAppJobListener.fillTerminalTimings(appRecord);

// Add stats as success context
SuccessContext successContext = new SuccessContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.openmetadata.service.util.RestUtil;
import org.openmetadata.service.workflows.searchIndex.PaginatedEntitiesSource;
import org.openmetadata.service.workflows.searchIndex.PaginatedEntityTimeSeriesSource;
import org.openmetadata.service.workflows.searchIndex.ReindexingUtil;

/**
* Worker that processes a single partition of entities for search indexing.
Expand Down Expand Up @@ -491,15 +492,16 @@ private BatchResult processBatch(
ResultList<?> resultList = readEntitiesKeyset(entityType, keysetCursor, batchSize);
long readDurationNanos = System.nanoTime() - readStartNanos;

if (resultList == null || resultList.getData() == null || resultList.getData().isEmpty()) {
LOG.debug("{} read={}ms returned empty", entityType, readDurationNanos / 1_000_000L);
return new BatchResult(0, 0, 0, null);
}

String nextCursor = resultList.getPaging() != null ? resultList.getPaging().getAfter() : null;
int readSuccessCount = listOrEmpty(resultList.getData()).size();
int readErrorCount = listOrEmpty(resultList.getErrors()).size();
int warningsCount = resultList.getWarningsCount() != null ? resultList.getWarningsCount() : 0;
int readSuccessCount = resultList != null ? listOrEmpty(resultList.getData()).size() : 0;
int readErrorCount = resultList != null ? listOrEmpty(resultList.getErrors()).size() : 0;
int warningsCount =
(resultList != null && resultList.getWarningsCount() != null)
? resultList.getWarningsCount()
: 0;
String nextCursor =
(resultList != null && resultList.getPaging() != null)
? resultList.getPaging().getAfter()
: null;

if (statsTracker != null) {
// Reader timing = wall-clock time of the keyset DB read (listAfter + setFieldsInBulk
Expand All @@ -508,28 +510,16 @@ private BatchResult processBatch(
readSuccessCount, readErrorCount, warningsCount, readDurationNanos);
}

if (failureRecorder != null && readErrorCount > 0) {
for (EntityError entityError : listOrEmpty(resultList.getErrors())) {
Object rawEntity = entityError.getEntity();
String entityId = null;
if (rawEntity instanceof EntityInterface) {
UUID id = ((EntityInterface) rawEntity).getId();
if (id != null) {
entityId = id.toString();
}
} else if (rawEntity != null) {
entityId = rawEntity.toString();
}
if (entityId == null) {
LOG.warn(
"Skipping reader failure record for entityType={}: entityId is null, message={}",
entityType,
entityError.getMessage());
continue;
}
failureRecorder.recordReaderEntityFailure(
entityType, entityId, null, entityError.getMessage());
}
recordReaderFailures(entityType, resultList, readErrorCount);

if (readSuccessCount == 0) {
LOG.debug(
"{} read={}ms returned no indexable rows (warnings={}, errors={})",
entityType,
readDurationNanos / 1_000_000L,
warningsCount,
readErrorCount);
Comment thread
mohityadav766 marked this conversation as resolved.
return new BatchResult(0, readErrorCount, warningsCount, nextCursor);
}
Comment thread
mohityadav766 marked this conversation as resolved.

Map<String, Object> contextData = createContextData(entityType, statsTracker);
Expand Down Expand Up @@ -557,6 +547,44 @@ private BatchResult processBatch(
}
}

/**
* Persist per-entity reader failures so that downstream tooling (e.g. the failures dashboard)
* can show which specific records the reader could not hydrate. Runs whether or not the batch
* has any successful rows — losing failure diagnostics for "all-error" batches would defeat
* the point of the recorder.
*/
private void recordReaderFailures(
String entityType, ResultList<?> resultList, int readErrorCount) {
if (failureRecorder == null || readErrorCount == 0 || resultList == null) {
return;
}
for (EntityError entityError : listOrEmpty(resultList.getErrors())) {
Object rawEntity = entityError.getEntity();
String entityId = null;
if (rawEntity instanceof EntityInterface) {
UUID id = ((EntityInterface) rawEntity).getId();
if (id != null) {
entityId = id.toString();
}
} else if (rawEntity != null) {
entityId = rawEntity.toString();
}
if (entityId == null) {
// Time-series readers (EntityTimeSeriesRepository) build EntityError without an id —
// they only have access to the JSON row, not the entity reference. Per-entity recording
// requires an id, so log at DEBUG (not WARN) to avoid spamming logs for every error in
// large time-series batches.
LOG.debug(
"No entityId on reader failure for entityType={} — skipping per-entity record. message={}",
entityType,
entityError.getMessage());
continue;
}
failureRecorder.recordReaderEntityFailure(
entityType, entityId, null, entityError.getMessage());
}
}

/**
* Read entities from the database.
*
Expand All @@ -568,7 +596,7 @@ private BatchResult processBatch(
private ResultList<?> readEntitiesKeyset(String entityType, String keysetCursor, int limit)
throws SearchIndexException {

List<String> fields = TIME_SERIES_ENTITIES.contains(entityType) ? List.of() : List.of("*");
List<String> fields = ReindexingUtil.getSearchIndexFields(entityType);

if (!TIME_SERIES_ENTITIES.contains(entityType)) {
PaginatedEntitiesSource source = new PaginatedEntitiesSource(entityType, limit, fields, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.openmetadata.service.apps.bundles.searchIndex.ReindexingJobContext;
import org.openmetadata.service.apps.bundles.searchIndex.ReindexingProgressListener;
import org.openmetadata.service.apps.bundles.searchIndex.distributed.DistributedJobContext;
import org.openmetadata.service.apps.scheduler.OmAppJobListener;
import org.openmetadata.service.socket.WebSocketManager;
import org.quartz.JobExecutionContext;

Expand Down Expand Up @@ -228,6 +229,7 @@ private void broadcastViaWebSocket(AppRunRecord appRecord) {
private AppRunRecord getUpdatedAppRunRecord() {
AppRunRecord appRecord = readExistingRecord();
appRecord.setStatus(AppRunRecord.Status.fromValue(jobData.getStatus().value()));
OmAppJobListener.fillTerminalTimings(appRecord);

if (jobData.getStats() != null) {
SuccessContext ctx = appRecord.getSuccessContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ private void updateAndBroadcastStoppedStatus(JobExecutionContext context) {
if (runRecord != null) {
// Update status to STOPPED
runRecord.withStatus(AppRunRecord.Status.STOPPED);
runRecord.withEndTime(System.currentTimeMillis());
OmAppJobListener.fillTerminalTimings(runRecord);

// Get WebSocket channel name
String webSocketChannelName =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,45 @@ protected OmAppJobListener() {
this.repository = new AppRepository();
}

/**
* Populate {@code endTime} and {@code executionTime} on a terminal-state run record. Each field
* is filled independently and only if currently null:
*
* <ul>
* <li>{@code endTime} defaults to {@code System.currentTimeMillis()} if absent.
* <li>{@code executionTime} is computed from {@code endTime - startTime} if absent and both
* endpoints are available — this means callers that pre-populated {@code endTime} (e.g.
* from {@code job.getCompletedAt()}) still get an accurate {@code executionTime}.
* </ul>
*
* <p>The method is a no-op for non-terminal statuses, so it is safe to call from progress
* listeners that may persist before {@link #jobWasExecuted} runs. Without this, mid-flight
* writes by progress listeners (e.g. {@code QuartzProgressListener} firing {@code onJobFailed})
* would persist a terminal status to the DB without timings; if the job dies before {@code
* jobWasExecuted} fires, polling consumers would see {@code status=FAILED} with no
* {@code endTime} / {@code executionTime}.
*/
public static void fillTerminalTimings(AppRunRecord record) {
if (record == null || record.getStatus() == null || !isTerminalStatus(record.getStatus())) {
return;
}
if (record.getEndTime() == null) {
record.withEndTime(System.currentTimeMillis());
}
if (record.getExecutionTime() == null
&& record.getStartTime() != null
&& record.getEndTime() != null) {
record.setExecutionTime(record.getEndTime() - record.getStartTime());
}
}

private static boolean isTerminalStatus(AppRunRecord.Status status) {
return switch (status) {
case SUCCESS, FAILED, ACTIVE_ERROR, STOPPED, COMPLETED -> true;
default -> false;
};
}

@Override
public String getName() {
return JOB_LISTENER_NAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6607,6 +6607,13 @@ public final Fields getFields(String fields) {
return new Fields(allowedFields, fields);
}

public final Fields getOnlySupportedFields(String fields) {
if ("*".equals(fields)) {
return new Fields(allowedFields, String.join(",", allowedFields), true);
}
return new Fields(allowedFields, fields, true);
}

protected final Fields getFields(Set<String> fields) {
return new Fields(allowedFields, fields);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public interface SearchIndex {
"followers",
"votes",
"extension",
"tags",
Comment thread
mohityadav766 marked this conversation as resolved.
"certification",
"dataProducts");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,39 @@ public Fields(Set<String> allowedFields, String fieldsParam) {
}
}

public Fields(Set<String> allowedFields, String fieldsParam, boolean ignoreExtra) {
if (nullOrEmpty(fieldsParam)) {
this.fieldList = new HashSet<>();
return;
}

Set<String> parsedFields = parseFields(fieldsParam);
this.fieldList = validateFields(parsedFields, allowedFields, ignoreExtra);
}

private Set<String> validateFields(
Set<String> inputFields, Set<String> allowedFields, boolean ignoreExtra) {

Set<String> result = new HashSet<>();

for (String field : inputFields) {
if (allowedFields.contains(field)) {
result.add(field);
} else if (!ignoreExtra) {
throw new IllegalArgumentException(CatalogExceptionMessage.invalidField(field));
}
}

return result;
}

private Set<String> parseFields(String fieldsParam) {
return Arrays.stream(fieldsParam.split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toSet());
}

public Fields(Set<String> allowedFields, Set<String> fieldsParam) {
if (nullOrEmpty(fieldsParam)) {
fieldList = new HashSet<>();
Expand Down
Loading
Loading