Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
83b803c
Restore live index settings on per-entity distributed-promote path
harshach May 4, 2026
ba952d9
Wire jobData into per-entity reindex promotion handler
harshach May 4, 2026
80f01f6
Add regression test for live serving settings on per-entity promote
harshach May 5, 2026
c373592
Expand unit coverage around the per-entity promotion contract
harshach May 5, 2026
20e1cd4
Add integration test for live settings restoration after alias promotion
harshach May 5, 2026
a977032
Address PR review: harden settings revert + lock InOrder + drop redun…
harshach May 5, 2026
a2a2b4a
Drop verbose explanatory comments from promote-path edits
harshach May 5, 2026
f40a21b
Close Rest5Client in IT _settings helper
harshach May 5, 2026
4775e90
Tighten SearchIndexAliasPromotionIT against false-positive runs
harshach May 5, 2026
2af5827
Harden alias promotion: defer canonical delete, hard-fail on empty al…
harshach May 5, 2026
4654158
Consolidate finalizeReindex and promoteEntityIndex into one core path
harshach May 5, 2026
9a7fa49
Address PR review: post-state checks, FAILED listener, hermetic IT, I…
harshach May 6, 2026
8b4d1a8
Wrap post-state checks: indexExists / getAliases throws no longer escape
harshach May 6, 2026
2580597
Address Copilot review 4232747647: positive-evidence dataLoss, hermet…
harshach May 6, 2026
0351078
Merge branch 'main' into harshach/search-alias-promote
harshach May 6, 2026
30774c7
Wait for restore-triggered run to settle in SearchIndexAliasPromotionIT
harshach May 6, 2026
232d195
Fix AppsResourceIT.waitForAppJobCompletion case mismatch and timeout
harshach May 6, 2026
22717f5
Merge branch 'main' into harshach/search-alias-promote
mohityadav766 May 6, 2026
98b9871
Merge remote-tracking branch 'origin/main' into harshach/search-alias…
harshach May 6, 2026
aee61f2
Merge branch 'harshach/search-alias-promote' of github.com:open-metad…
harshach May 6, 2026
64a385a
Run SearchIndexAliasPromotionIT in the sequential bucket
harshach May 6, 2026
810ed16
Address Copilot PR review 4233452655
harshach May 6, 2026
cf18270
Remove SearchIndexAliasPromotionIT in favor of unit test coverage
harshach May 6, 2026
7d7e5db
Address Copilot PR review 4236718653
harshach May 6, 2026
bb973d8
Fix per-entity promote when canonical is an alias, not a concrete index
harshach May 6, 2026
0164cd8
Add ALIAS_PROMOTE_BEGIN diagnostic log per entity
harshach May 6, 2026
d80e44b
Drop heavy alias-promotion refactor; rely on PR #27930 fix already in…
harshach May 6, 2026
3e0d7ca
Merge remote-tracking branch 'origin/main' into harshach/search-alias…
harshach May 6, 2026
25240ec
Skip delete-by-alias-name when canonical is currently an alias
harshach May 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
/*
* Copyright 2026 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.it.tests;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeFalse;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import es.co.elastic.clients.transport.rest5_client.low_level.Request;
import es.co.elastic.clients.transport.rest5_client.low_level.Response;
import es.co.elastic.clients.transport.rest5_client.low_level.Rest5Client;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.junit.jupiter.api.parallel.Isolated;
import org.openmetadata.it.bootstrap.TestSuiteBootstrap;
import org.openmetadata.it.util.SdkClients;
import org.openmetadata.it.util.TestNamespaceExtension;
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.sdk.fluent.Apps;
import org.openmetadata.sdk.network.HttpClient;
import org.openmetadata.sdk.network.HttpMethod;

/**
* End-to-end regression guard for the per-entity alias promotion path.
*
* <p>The {@code SearchIndexingApplication} stages each entity's reindex into a fresh index with
* {@code BulkIndexOverrides} applied (typically {@code refresh_interval=-1},
* {@code number_of_replicas=0}, {@code translog.durability=async}) so the bulk write has minimum
* indexing-side amplification. Before swapping the alias, those overrides MUST be reverted to live
* serving values, or live writes after promotion are buffered indefinitely and only become
* searchable on a manual {@code _refresh} — surfacing as the "freshly created entity does not
* appear in search until reindex" production symptom.
*
* <p>This test triggers the bundled app with bulk overrides set, waits for completion, then
* queries {@code _settings} on a representative entity index that the app reindexed and asserts
* the live values are present, NOT the bulk overrides. Catches both halves of the original
* regression: missing {@code applyLiveServingSettings} call in {@code promoteEntityIndex}, and
* missing {@code withJobData} wiring on the per-entity handler.
*/
@Execution(ExecutionMode.SAME_THREAD)
@Isolated
@ExtendWith(TestNamespaceExtension.class)
public class SearchIndexAliasPromotionIT {

private static final String APP_NAME = "SearchIndexingApplication";
private static final String CLUSTER_ALIAS = "openmetadata";
private static final String SETTINGS_INDEX = CLUSTER_ALIAS + "_table_search_index";
private static final ObjectMapper MAPPER = new ObjectMapper();

@BeforeAll
static void setup() {
Apps.setDefaultClient(SdkClients.adminClient());
}

@Test
void perEntityPromotionRestoresLiveSettingsOnStagedIndex() throws Exception {
assumeFalse(
TestSuiteBootstrap.isK8sEnabled(), "App trigger not compatible with K8s pipeline backend");

HttpClient httpClient = SdkClients.adminClient().getHttpClient();
waitForCurrentRunCompletion(httpClient);
Long previousRunStartTime = readLatestRunStartTime(httpClient);
triggerWithBulkOverrides(httpClient);
waitForLatestRunSuccess(httpClient, previousRunStartTime);

Map<String, JsonNode> settingsByIndex = readIndexSettings(SETTINGS_INDEX);
assertTrue(
!settingsByIndex.isEmpty(),
() ->
"No concrete index resolved for alias '"
+ SETTINGS_INDEX
+ "'. Expected the reindex to produce a staged index that now serves the alias.");

for (Map.Entry<String, JsonNode> entry : settingsByIndex.entrySet()) {
String concreteIndex = entry.getKey();
JsonNode indexSettings = entry.getValue();
String refresh = textOrNull(indexSettings.path("refresh_interval"));
String replicas = textOrNull(indexSettings.path("number_of_replicas"));
String durability = textOrNull(indexSettings.path("translog").path("durability"));

assertNotEquals(
"-1",
refresh,
() ->
"Index '"
+ concreteIndex
+ "' kept bulk-build refresh_interval=-1 after promotion — "
+ "applyLiveServingSettings was not invoked on the per-entity promote path.");
assertNotEquals(
"0",
replicas,
() ->
"Index '"
+ concreteIndex
+ "' kept bulk-build number_of_replicas=0 after promotion.");
if (durability != null) {
assertNotEquals(
"async",
durability,
() ->
"Index '"
+ concreteIndex
+ "' kept bulk-build translog.durability=async after promotion.");
}

assertEquals(
"1s",
refresh,
() -> "Expected live refresh_interval=1s on '" + concreteIndex + "', got " + refresh);
assertEquals(
"1",
replicas,
() -> "Expected live number_of_replicas=1 on '" + concreteIndex + "', got " + replicas);
}
}
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 9a7fa49494. triggerWithBulkOverrides now explicitly sets liveIndexSettings (1s / 1 / request) and liveIndexSettingsByEntity (empty) in the trigger payload, so the post-promotion assertions are anchored to values the test itself supplies rather than the app's persisted config from a previous run.


private static void triggerWithBulkOverrides(HttpClient httpClient) {
Map<String, Object> bulk = new HashMap<>();
bulk.put("numberOfReplicas", 0);
bulk.put("refreshInterval", "-1");
bulk.put("translogDurability", "async");
bulk.put("translogSyncInterval", "30s");

Map<String, Object> config = new HashMap<>();
config.put("entities", List.of("table"));
config.put("recreateIndex", true);
config.put("batchSize", 100);
Comment thread
harshach marked this conversation as resolved.
Outdated
config.put("bulkIndexSettings", bulk);

Awaitility.await("Trigger " + APP_NAME)
.atMost(Duration.ofMinutes(2))
.pollInterval(Duration.ofSeconds(3))
.ignoreExceptionsMatching(
e -> e.getMessage() != null && e.getMessage().contains("already running"))
.until(
() -> {
httpClient.execute(
HttpMethod.POST, "/v1/apps/trigger/" + APP_NAME, config, Void.class);
return true;
});
}

private static AppRunRecord waitForLatestRunSuccess(
HttpClient httpClient, Long previousRunStartTime) {
AppRunRecord[] holder = new AppRunRecord[1];
Awaitility.await("Reindex run completion")
.atMost(Duration.ofMinutes(10))
.pollDelay(Duration.ofSeconds(2))
.pollInterval(Duration.ofSeconds(5))
.ignoreExceptions()
.untilAsserted(
() -> {
AppRunRecord run =
httpClient.execute(
HttpMethod.GET,
"/v1/apps/name/" + APP_NAME + "/runs/latest",
null,
AppRunRecord.class);
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 4775e90f91 and reinforced in 9a7fa49494. waitForLatestRunSuccess was tightened to require strict success/completed (rejects activeError), and the IT now also asserts the alias resolves to a *_rebuild_* concrete index — proving the swap actually moved the alias rather than passing against the pre-existing live index.

assertNotNull(run);
assertNotNull(run.getStatus());
if (previousRunStartTime != null
&& run.getStartTime() != null
&& run.getStartTime() <= previousRunStartTime) {
throw new AssertionError("Latest run is still the pre-trigger one");
}
String status = run.getStatus().value();
assertTrue(
"success".equalsIgnoreCase(status)
|| "completed".equalsIgnoreCase(status)
|| "failed".equalsIgnoreCase(status)
|| "activeError".equalsIgnoreCase(status),
"Run not in terminal state yet: " + status);
holder[0] = run;
});
AppRunRecord run = holder[0];
String status = run.getStatus().value().toLowerCase();
assertNotEquals("failed", status, () -> "Reindex job failed: " + run);
Comment thread
harshach marked this conversation as resolved.
Outdated
return run;
}

private static Long readLatestRunStartTime(HttpClient httpClient) {
try {
AppRunRecord latest =
httpClient.execute(
HttpMethod.GET,
"/v1/apps/name/" + APP_NAME + "/runs/latest",
null,
AppRunRecord.class);
return latest == null ? null : latest.getStartTime();
} catch (Exception ignored) {
return null;
}
}

private static void waitForCurrentRunCompletion(HttpClient httpClient) {
try {
Awaitility.await("Wait for in-flight " + APP_NAME)
.atMost(Duration.ofMinutes(5))
.pollInterval(Duration.ofSeconds(3))
.ignoreExceptions()
.until(
() -> {
AppRunRecord latest =
httpClient.execute(
HttpMethod.GET,
"/v1/apps/name/" + APP_NAME + "/runs/latest",
null,
AppRunRecord.class);
if (latest == null || latest.getStatus() == null) {
return true;
}
String status = latest.getStatus().value().toLowerCase();
return !"running".equals(status) && !"started".equals(status);
});
} catch (org.awaitility.core.ConditionTimeoutException ignored) {
// Best-effort wait; the trigger logic retries on "already running".
}
}

/**
* GET {@code <indexOrAlias>/_settings} and return a map of resolved concrete index → its
* {@code settings.index} subtree. When the argument is an alias, the response is keyed by the
* underlying concrete index, which may include the rebuild-suffixed staged-then-promoted index.
*/
private static Map<String, JsonNode> readIndexSettings(String indexOrAlias) throws Exception {
Rest5Client searchClient = TestSuiteBootstrap.createSearchClient();
Comment thread
gitar-bot[bot] marked this conversation as resolved.
Outdated
Request request = new Request("GET", "/" + indexOrAlias + "/_settings");
Response response = searchClient.performRequest(request);
assertEquals(200, response.getStatusCode());
String body =
new String(response.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8);
JsonNode root = MAPPER.readTree(body);
Map<String, JsonNode> result = new HashMap<>();
Iterator<Map.Entry<String, JsonNode>> it = root.fields();
while (it.hasNext()) {
Map.Entry<String, JsonNode> entry = it.next();
JsonNode indexSettings = entry.getValue().path("settings").path("index");
if (indexSettings.isMissingNode() || indexSettings.isNull()) {
continue;
}
result.put(entry.getKey(), indexSettings);
}
return result;
}

private static String textOrNull(JsonNode node) {
return node == null || node.isMissingNode() || node.isNull() ? null : node.asText();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,17 @@ private void initializeEntityTracker(UUID jobId, boolean recreateIndex) {
// Set up per-entity promotion callback if recreating indices
if (recreateIndex && recreateContext != null) {
this.recreateIndexHandler = Entity.getSearchRepository().createReindexHandler();
// Wire jobData into the handler so applyLiveServingSettings (in
// DefaultRecreateHandler.promoteEntityIndex) can resolve the configured live + bulk
// index settings. Without this, jobData is null on the handler instance the per-entity
// callback uses, buildRevertJson returns null, and the bulk-build overrides
// (refresh_interval=-1, replicas=0, async translog) silently become the live settings.
if (recreateIndexHandler
instanceof org.openmetadata.service.search.DefaultRecreateHandler defaultHandler
&& currentJob != null
&& currentJob.getJobConfiguration() != null) {
defaultHandler.withJobData(currentJob.getJobConfiguration());
Comment thread
harshach marked this conversation as resolved.
Outdated
}
entityTracker.setOnEntityComplete(this::promoteEntityIndex);
LOG.info(
"Per-entity promotion callback SET for job {} (recreateIndex={}, recreateContext entities={})",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,15 @@ public void promoteEntityIndex(EntityReindexContext context, boolean reindexSucc
return;
}

// Restore live serving settings on the staged index before alias swap. The bulk-build
// overrides (refresh=-1, replicas=0, async translog) must NOT survive into live serving —
// otherwise live writes after promotion are buffered indefinitely and only become
// searchable on a manual _refresh, which surfaces as the "create-then-search returns
// nothing until reindex" symptom on knowledge pages. This mirrors the call in
// finalizeReindex; the per-entity distributed promotion path was missing it.
applyLiveServingSettings(searchClient, stagedIndex, entityType);
maybeForceMerge(searchClient, stagedIndex, entityType);
Comment thread
gitar-bot[bot] marked this conversation as resolved.
Outdated

// Always clear staged-index routing on the way out — see the rationale in finalizeReindex.
try {
Set<String> aliasesToAttach =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,39 @@ void initializeEntityTrackerCallbackPromotesEntityWhenTrackingCompletes() throws
verify(recreateHandler).promoteEntityIndex(any(EntityReindexContext.class), eq(true));
}

@Test
void initializeEntityTrackerWiresJobDataIntoDefaultRecreateHandler() throws Exception {
UUID jobId = UUID.randomUUID();
ReindexContext recreateContext = mock(ReindexContext.class);
DefaultRecreateHandler recreateHandler = mock(DefaultRecreateHandler.class);
SearchRepository searchRepository = mock(SearchRepository.class);

EventPublisherJob jobConfig = new EventPublisherJob().withEntities(Set.of("table"));
SearchIndexJob runningJob =
SearchIndexJob.builder()
.id(jobId)
.status(IndexJobStatus.RUNNING)
.jobConfiguration(jobConfig)
.build();

when(coordinator.getPartitions(jobId, null))
.thenReturn(List.of(partition(jobId, "table", PartitionStatus.PENDING)));
when(recreateContext.getEntities()).thenReturn(Set.of("table"));
setField("entityTracker", new EntityCompletionTracker(jobId));
setField("recreateContext", recreateContext);
setField("currentJob", runningJob);

try (MockedStatic<Entity> entityMock = mockStatic(Entity.class)) {
entityMock.when(Entity::getSearchRepository).thenReturn(searchRepository);
when(searchRepository.createReindexHandler()).thenReturn(recreateHandler);

invokePrivate(
"initializeEntityTracker", new Class<?>[] {UUID.class, boolean.class}, jobId, true);
}

verify(recreateHandler).withJobData(jobConfig);
}

@Test
void promoteEntityIndexUsesDefaultAndGenericHandlers() throws Exception {
ReindexContext recreateContext = mock(ReindexContext.class);
Expand Down
Loading
Loading