Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 1 addition & 17 deletions bin/distributed-test/scripts/trigger-reindex.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ PROJECT_DIR="$(cd "$SCRIPT_DIR/.." && pwd)"

# Default values
SERVER_URL="http://localhost:8585"
RECREATE_INDEX=false
ENTITY_TYPES=""
BATCH_SIZE=100
PARTITION_SIZE=10000
Expand All @@ -20,10 +19,6 @@ while [[ $# -gt 0 ]]; do
SERVER_URL="$2"
shift 2
;;
--recreate)
RECREATE_INDEX=true
shift
;;
--entities)
ENTITY_TYPES="$2"
shift 2
Expand All @@ -41,7 +36,6 @@ while [[ $# -gt 0 ]]; do
echo ""
echo "Options:"
echo " --server URL Target server URL (default: http://localhost:8585)"
echo " --recreate Drop and recreate indices before reindexing"
echo " --entities TYPES Comma-separated entity types to reindex (default: all)"
echo " --batch-size NUM Batch size for indexing (default: 100)"
echo " --partition-size NUM Partition size for distributed indexing (default: 10000, range: 1000-50000)"
Expand All @@ -51,7 +45,6 @@ while [[ $# -gt 0 ]]; do
echo "Examples:"
echo " $0 # Reindex all on server 1"
echo " $0 --server http://localhost:8587 # Trigger on server 2"
echo " $0 --recreate # Drop and recreate indices"
echo " $0 --entities table,dashboard # Reindex only tables and dashboards"
echo " $0 --partition-size 2000 # Use smaller partitions for better distribution"
exit 0
Expand All @@ -67,7 +60,7 @@ echo "======================================"
echo "Triggering Search Reindexing"
echo "======================================"
echo "Server: $SERVER_URL"
echo "Recreate indices: $RECREATE_INDEX"
echo "Indexing mode: staged indexes with alias promotion"
echo "Batch size: $BATCH_SIZE"
echo "Partition size: $PARTITION_SIZE"
if [ -n "$ENTITY_TYPES" ]; then
Expand Down Expand Up @@ -96,13 +89,6 @@ fi
echo "Authenticated successfully."
echo ""

# Build the reindex request body
if [ "$RECREATE_INDEX" == "true" ]; then
RECREATE_FLAG="true"
else
RECREATE_FLAG="false"
fi

# Build entities array
if [ -n "$ENTITY_TYPES" ]; then
# Convert comma-separated to JSON array
Expand All @@ -113,11 +99,9 @@ fi

REQUEST_BODY=$(cat <<EOF
{
"recreateIndex": $RECREATE_FLAG,
"entities": $ENTITIES_JSON,
"batchSize": $BATCH_SIZE,
"partitionSize": $PARTITION_SIZE,
"useDistributedIndexing": true,
"runMode": "BATCH"
}
EOF
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
/*
* Copyright 2026 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.it.tests;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeFalse;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.junit.jupiter.api.parallel.Isolated;
import org.openmetadata.it.bootstrap.TestSuiteBootstrap;
import org.openmetadata.it.factories.DatabaseSchemaTestFactory;
import org.openmetadata.it.factories.DatabaseServiceTestFactory;
import org.openmetadata.it.factories.TableTestFactory;
import org.openmetadata.it.util.SdkClients;
import org.openmetadata.it.util.TestNamespace;
import org.openmetadata.it.util.TestNamespaceExtension;
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.data.DatabaseSchema;
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.entity.services.DatabaseService;
import org.openmetadata.sdk.network.HttpClient;
import org.openmetadata.sdk.network.HttpMethod;
import org.openmetadata.service.Entity;
import org.openmetadata.service.search.SearchClient;

@Execution(ExecutionMode.SAME_THREAD)
@Isolated
@ExtendWith(TestNamespaceExtension.class)
public class SearchIndexPromotionIT {

private static final String APP_NAME = "SearchIndexingApplication";
private static final String TABLE_ENTITY = "table";
private static final String TABLE_CANONICAL_ALIAS = "openmetadata_table_search_index";
private static final String TABLE_SHORT_ALIAS = "openmetadata_table";
private static final String TABLE_REBUILD_PREFIX = TABLE_CANONICAL_ALIAS + "_rebuild_";
private static final Set<String> SUCCESS_STATUSES = Set.of("success", "completed");
private static final Set<String> TERMINAL_STATUSES =
Set.of("success", "completed", "failed", "activeerror", "stopped");

@BeforeAll
static void setup() {
SdkClients.adminClient();
}

@Test
void tableOnlyRerunPromotesNewStagedIndex(TestNamespace ns) {
assumeFalse(
TestSuiteBootstrap.isK8sEnabled(), "App trigger not compatible with K8s pipeline backend");

createTableForReindex(ns);

HttpClient httpClient = SdkClients.adminClient().getHttpClient();
waitForCurrentRunCompletion(httpClient);

String initialTarget = readSingleTableAliasTargetIfPresent();
Long previousRunStartTime = readLatestRunStartTime(httpClient);
triggerTableReindex(httpClient);
AppRunRecord firstRun = waitForLatestRunSuccess(httpClient, previousRunStartTime);
String firstTarget = waitForPromotedTableAlias(initialTarget);

triggerTableReindex(httpClient);
waitForLatestRunSuccess(httpClient, firstRun.getStartTime());
String secondTarget = waitForPromotedTableAlias(firstTarget);

assertNotEquals(firstTarget, secondTarget, "Second reindex should promote a new staged index");
assertPreviousTargetIsNotServing(firstTarget);
}

private static void createTableForReindex(TestNamespace ns) {
DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns);
DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, service);
Table table =
TableTestFactory.createWithName(ns, schema.getFullyQualifiedName(), "promotion_table");

assertNotNull(table.getId(), "Test table should be created before reindex");
}

private static void triggerTableReindex(HttpClient httpClient) {
Map<String, Object> config = new HashMap<>();
config.put("entities", List.of(TABLE_ENTITY));
config.put("batchSize", 100);

Awaitility.await("Trigger table-only " + APP_NAME)
.atMost(Duration.ofMinutes(2))
.pollInterval(Duration.ofSeconds(3))
.ignoreExceptionsMatching(
e -> e.getMessage() != null && e.getMessage().contains("already running"))
.until(
() -> {
httpClient.execute(
HttpMethod.POST, "/v1/apps/trigger/" + APP_NAME, config, Void.class);
return true;
});
}

private static AppRunRecord waitForLatestRunSuccess(
HttpClient httpClient, Long previousRunStartTime) {
AppRunRecord[] holder = new AppRunRecord[1];

Awaitility.await("Table reindex run completion")
.atMost(Duration.ofMinutes(5))
.pollDelay(Duration.ofSeconds(2))
.pollInterval(Duration.ofSeconds(5))
.ignoreExceptions()
.untilAsserted(
() -> {
AppRunRecord run = readLatestRun(httpClient);
assertNotNull(run);
assertNotNull(run.getStatus());
if (previousRunStartTime != null
&& run.getStartTime() != null
&& run.getStartTime() <= previousRunStartTime) {
throw new AssertionError(
"Latest run is still the pre-trigger one (startTime="
+ run.getStartTime()
+ ", previous="
+ previousRunStartTime
+ ")");
}
String status = normalizedStatus(run);
assertTrue(
TERMINAL_STATUSES.contains(status), "Run not in terminal state: " + status);
holder[0] = run;
});

AppRunRecord run = holder[0];
assertTrue(
SUCCESS_STATUSES.contains(normalizedStatus(run)),
() -> "Expected successful table reindex run but got: " + run);
return run;
}

private static String waitForPromotedTableAlias(String previousTarget) {
String[] target = new String[1];

Awaitility.await("Table alias promotion")
.atMost(Duration.ofMinutes(2))
.pollDelay(Duration.ofSeconds(1))
.pollInterval(Duration.ofSeconds(2))
.ignoreExceptions()
.untilAsserted(
() -> {
String currentTarget = readSingleTableAliasTarget();
assertTrue(
currentTarget.startsWith(TABLE_REBUILD_PREFIX),
"Table alias should point at a staged rebuild index, got " + currentTarget);
if (previousTarget != null) {
assertNotEquals(
previousTarget,
currentTarget,
"Table alias should move to a new staged index after rerun");
}
Set<String> shortAliasTargets = searchClient().getIndicesByAlias(TABLE_SHORT_ALIAS);
assertTrue(
shortAliasTargets.contains(currentTarget),
"Short table alias should include the promoted staged table index");
target[0] = currentTarget;
});

return target[0];
}

private static String readSingleTableAliasTargetIfPresent() {
Set<String> targets = searchClient().getIndicesByAlias(TABLE_CANONICAL_ALIAS);
if (targets.isEmpty()) {
return null;
}
assertEquals(1, targets.size(), "Table canonical alias should have a single target");
return targets.iterator().next();
}

private static String readSingleTableAliasTarget() {
String target = readSingleTableAliasTargetIfPresent();
assertNotNull(target, "Table canonical alias should point at a promoted index");
return target;
}

private static void assertPreviousTargetIsNotServing(String previousTarget) {
SearchClient client = searchClient();
if (!client.indexExists(previousTarget)) {
return;
}

Set<String> aliases = client.getAliases(previousTarget);
assertFalse(
aliases.contains(TABLE_CANONICAL_ALIAS),
"Previous staged index should no longer have the canonical table alias");
assertFalse(
aliases.contains(TABLE_SHORT_ALIAS),
"Previous staged index should no longer have the short table alias");
}

private static Long readLatestRunStartTime(HttpClient httpClient) {
try {
AppRunRecord latest = readLatestRun(httpClient);
return latest == null ? null : latest.getStartTime();
} catch (Exception ignored) {
return null;
}
}

private static AppRunRecord readLatestRun(HttpClient httpClient) {
return httpClient.execute(
HttpMethod.GET, "/v1/apps/name/" + APP_NAME + "/runs/latest", null, AppRunRecord.class);
}

private static void waitForCurrentRunCompletion(HttpClient httpClient) {
try {
Awaitility.await("Wait for in-flight " + APP_NAME)
.atMost(Duration.ofMinutes(5))
.pollInterval(Duration.ofSeconds(3))
.ignoreExceptions()
.until(
() -> {
AppRunRecord latest = readLatestRun(httpClient);
if (latest == null || latest.getStatus() == null) {
return true;
}
String status = normalizedStatus(latest);
return !"running".equals(status) && !"started".equals(status);
});
} catch (org.awaitility.core.ConditionTimeoutException ignored) {
// The trigger retry loop handles "already running" if the current run continues.
}
}

private static String normalizedStatus(AppRunRecord run) {
return run.getStatus().value().toLowerCase();
}

private static SearchClient searchClient() {
return Entity.getSearchRepository().getSearchClient();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@
import org.openmetadata.service.apps.scheduler.AppScheduler;
import org.openmetadata.service.audit.AuditLogEventPublisher;
import org.openmetadata.service.audit.AuditLogRepository;
import org.openmetadata.service.cache.CacheConfig;
import org.openmetadata.service.config.CacheConfiguration;
import org.openmetadata.service.config.OMWebBundle;
import org.openmetadata.service.config.OMWebConfiguration;
Expand Down Expand Up @@ -397,7 +396,7 @@ public void run(OpenMetadataApplicationConfig catalogConfig, Environment environ
jdbi.onDemand(CollectionDAO.class), Entity.getSearchRepository()));

// Register Distributed Job Participant for distributed search indexing
registerDistributedJobParticipant(environment, jdbi, catalogConfig.getCacheConfig());
registerDistributedJobParticipant(environment, jdbi);
registerDistributedRdfJobParticipant(environment, jdbi);

// Register Event publishers
Expand Down Expand Up @@ -1132,24 +1131,18 @@ private void initializeWebsockets(
}
}

protected void registerDistributedJobParticipant(
Environment environment, Jdbi jdbi, CacheConfig cacheConfig) {
protected void registerDistributedJobParticipant(Environment environment, Jdbi jdbi) {
try {
CollectionDAO collectionDAO = jdbi.onDemand(CollectionDAO.class);
SearchRepository searchRepository = Entity.getSearchRepository();
String serverId = ServerIdentityResolver.getInstance().getServerId();

DistributedJobParticipant participant =
new DistributedJobParticipant(collectionDAO, searchRepository, serverId, cacheConfig);
new DistributedJobParticipant(collectionDAO, searchRepository, serverId);
environment.lifecycle().manage(participant);

String notifierType =
(cacheConfig != null && cacheConfig.provider == CacheConfig.Provider.redis)
? "Redis Pub/Sub"
: "database polling";
LOG.info(
"Registered DistributedJobParticipant for distributed search indexing using {}",
notifierType);
"Registered DistributedJobParticipant for distributed search indexing using database polling");
} catch (Exception e) {
LOG.warn("Failed to register DistributedJobParticipant", e);
}
Expand Down
Loading
Loading