From 338fdf8d3053ae26cef3100bb385cff3c190d078 Mon Sep 17 00:00:00 2001 From: Ram Narayan Balaji Date: Wed, 29 Apr 2026 17:19:13 +0530 Subject: [PATCH 1/3] fix: move version metadata migration from 1.12.7 to 2.0.0 and remove crashing MySQL multi-valued index --- .../mysql/postDataMigrationSQLScript.sql | 57 --- .../native/1.12.7/mysql/schemaChanges.sql | 9 - .../postgres/postDataMigrationSQLScript.sql | 49 --- .../native/1.12.7/postgres/schemaChanges.sql | 11 - .../native/2.0.0/mysql/schemaChanges.sql | 7 + .../native/2.0.0/postgres/schemaChanges.sql | 12 + .../migration/mysql/v200/Migration.java | 11 + .../migration/postgres/v200/Migration.java | 11 + .../migration/utils/v200/MigrationUtil.java | 137 +++++++ .../utils/MigrationSqlStatementHashTest.java | 16 +- .../utils/v200/MigrationUtilTest.java | 349 ++++++++++++++++++ 11 files changed, 531 insertions(+), 138 deletions(-) delete mode 100644 bootstrap/sql/migrations/native/1.12.7/mysql/postDataMigrationSQLScript.sql delete mode 100644 bootstrap/sql/migrations/native/1.12.7/mysql/schemaChanges.sql delete mode 100644 bootstrap/sql/migrations/native/1.12.7/postgres/postDataMigrationSQLScript.sql delete mode 100644 bootstrap/sql/migrations/native/1.12.7/postgres/schemaChanges.sql diff --git a/bootstrap/sql/migrations/native/1.12.7/mysql/postDataMigrationSQLScript.sql b/bootstrap/sql/migrations/native/1.12.7/mysql/postDataMigrationSQLScript.sql deleted file mode 100644 index 1958763f08fc..000000000000 --- a/bootstrap/sql/migrations/native/1.12.7/mysql/postDataMigrationSQLScript.sql +++ /dev/null @@ -1,57 +0,0 @@ -WITH -field_changes AS ( - SELECT e.id, e.extension, jt.field_name - FROM entity_extension AS e - JOIN JSON_TABLE( - COALESCE(JSON_EXTRACT(e.json, '$.changeDescription.fieldsAdded'), JSON_ARRAY()), - '$[*]' COLUMNS (field_name VARCHAR(1024) PATH '$.name') - ) AS jt ON TRUE - WHERE e.extension LIKE '%.version.%' - - UNION ALL - - SELECT e.id, e.extension, jt.field_name - FROM entity_extension AS e - JOIN JSON_TABLE( - COALESCE(JSON_EXTRACT(e.json, '$.changeDescription.fieldsUpdated'), JSON_ARRAY()), - '$[*]' COLUMNS (field_name VARCHAR(1024) PATH '$.name') - ) AS jt ON TRUE - WHERE e.extension LIKE '%.version.%' - - UNION ALL - - SELECT e.id, e.extension, jt.field_name - FROM entity_extension AS e - JOIN JSON_TABLE( - COALESCE(JSON_EXTRACT(e.json, '$.changeDescription.fieldsDeleted'), JSON_ARRAY()), - '$[*]' COLUMNS (field_name VARCHAR(1024) PATH '$.name') - ) AS jt ON TRUE - WHERE e.extension LIKE '%.version.%' -), -distinct_field_changes AS ( - SELECT DISTINCT id, extension, field_name - FROM field_changes - WHERE field_name IS NOT NULL - AND field_name <> '' -), -version_metadata AS ( - SELECT - e.id, - e.extension, - CAST(SUBSTRING_INDEX(e.extension, '.version.', -1) AS DOUBLE) AS version_num, - CASE - WHEN COUNT(fc.field_name) = 0 THEN JSON_ARRAY() - ELSE JSON_ARRAYAGG(fc.field_name) - END AS changed_field_keys - FROM entity_extension AS e - LEFT JOIN distinct_field_changes AS fc - ON fc.id = e.id - AND fc.extension = e.extension - WHERE e.extension LIKE '%.version.%' - GROUP BY e.id, e.extension -) -UPDATE entity_extension AS e -JOIN version_metadata AS vm - ON vm.id = e.id AND vm.extension = e.extension -SET e.versionNum = vm.version_num, - e.changedFieldKeys = vm.changed_field_keys; diff --git a/bootstrap/sql/migrations/native/1.12.7/mysql/schemaChanges.sql b/bootstrap/sql/migrations/native/1.12.7/mysql/schemaChanges.sql deleted file mode 100644 index 0a96628fd32c..000000000000 --- a/bootstrap/sql/migrations/native/1.12.7/mysql/schemaChanges.sql +++ /dev/null @@ -1,9 +0,0 @@ -ALTER TABLE entity_extension - ADD COLUMN versionNum DOUBLE NULL, - ADD COLUMN changedFieldKeys JSON NULL; - -CREATE INDEX idx_entity_extension_version_order - ON entity_extension (id, versionNum); - -CREATE INDEX idx_entity_extension_changed_field_keys - ON entity_extension ((CAST(changedFieldKeys->'$' AS CHAR(512) ARRAY))); diff --git a/bootstrap/sql/migrations/native/1.12.7/postgres/postDataMigrationSQLScript.sql b/bootstrap/sql/migrations/native/1.12.7/postgres/postDataMigrationSQLScript.sql deleted file mode 100644 index dcaea9a5813c..000000000000 --- a/bootstrap/sql/migrations/native/1.12.7/postgres/postDataMigrationSQLScript.sql +++ /dev/null @@ -1,49 +0,0 @@ -WITH version_metadata AS ( - SELECT - e.id, - e.extension, - split_part(e.extension, '.version.', 2)::DOUBLE PRECISION AS version_num, - COALESCE( - ( - SELECT jsonb_agg(field_name ORDER BY field_name) - FROM ( - SELECT DISTINCT - field_change ->> 'name' AS field_name - FROM jsonb_array_elements( - COALESCE(e.json -> 'changeDescription' -> 'fieldsAdded', '[]'::jsonb) - ) AS field_change - WHERE field_change ->> 'name' IS NOT NULL - AND field_change ->> 'name' <> '' - - UNION - - SELECT DISTINCT - field_change ->> 'name' AS field_name - FROM jsonb_array_elements( - COALESCE(e.json -> 'changeDescription' -> 'fieldsUpdated', '[]'::jsonb) - ) AS field_change - WHERE field_change ->> 'name' IS NOT NULL - AND field_change ->> 'name' <> '' - - UNION - - SELECT DISTINCT - field_change ->> 'name' AS field_name - FROM jsonb_array_elements( - COALESCE(e.json -> 'changeDescription' -> 'fieldsDeleted', '[]'::jsonb) - ) AS field_change - WHERE field_change ->> 'name' IS NOT NULL - AND field_change ->> 'name' <> '' - ) AS exact_field_names - ), - '[]'::jsonb - ) AS changed_field_keys - FROM entity_extension AS e - WHERE e.extension LIKE '%.version.%' -) -UPDATE entity_extension AS e -SET versionNum = version_metadata.version_num, - changedFieldKeys = version_metadata.changed_field_keys -FROM version_metadata -WHERE e.id = version_metadata.id - AND e.extension = version_metadata.extension; diff --git a/bootstrap/sql/migrations/native/1.12.7/postgres/schemaChanges.sql b/bootstrap/sql/migrations/native/1.12.7/postgres/schemaChanges.sql deleted file mode 100644 index 99bc7b7ff037..000000000000 --- a/bootstrap/sql/migrations/native/1.12.7/postgres/schemaChanges.sql +++ /dev/null @@ -1,11 +0,0 @@ -ALTER TABLE entity_extension - ADD COLUMN IF NOT EXISTS versionNum DOUBLE PRECISION, - ADD COLUMN IF NOT EXISTS changedFieldKeys JSONB; - -CREATE INDEX IF NOT EXISTS idx_entity_extension_version_order - ON entity_extension (id, versionNum DESC) - WHERE versionNum IS NOT NULL; - -CREATE INDEX IF NOT EXISTS idx_entity_extension_changed_field_keys - ON entity_extension USING GIN (changedFieldKeys) - WHERE changedFieldKeys IS NOT NULL; diff --git a/bootstrap/sql/migrations/native/2.0.0/mysql/schemaChanges.sql b/bootstrap/sql/migrations/native/2.0.0/mysql/schemaChanges.sql index 576f7b739682..646da758f9da 100644 --- a/bootstrap/sql/migrations/native/2.0.0/mysql/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/2.0.0/mysql/schemaChanges.sql @@ -135,3 +135,10 @@ CREATE TABLE IF NOT EXISTS task_form_schema_entity ( KEY idx_task_form_schema_task_type (taskType), KEY idx_task_form_schema_deleted (deleted) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; + +ALTER TABLE entity_extension + ADD COLUMN versionNum DOUBLE NULL, + ADD COLUMN changedFieldKeys JSON NULL; + +CREATE INDEX idx_entity_extension_version_order + ON entity_extension (id, versionNum); diff --git a/bootstrap/sql/migrations/native/2.0.0/postgres/schemaChanges.sql b/bootstrap/sql/migrations/native/2.0.0/postgres/schemaChanges.sql index 288caf8cf0be..93a3490f48a2 100644 --- a/bootstrap/sql/migrations/native/2.0.0/postgres/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/2.0.0/postgres/schemaChanges.sql @@ -140,3 +140,15 @@ CREATE TABLE IF NOT EXISTS task_form_schema_entity ( CREATE INDEX IF NOT EXISTS idx_task_form_schema_name ON task_form_schema_entity (name); CREATE INDEX IF NOT EXISTS idx_task_form_schema_tasktype ON task_form_schema_entity (tasktype); CREATE INDEX IF NOT EXISTS idx_task_form_schema_deleted ON task_form_schema_entity (deleted); + +ALTER TABLE entity_extension + ADD COLUMN IF NOT EXISTS versionNum DOUBLE PRECISION, + ADD COLUMN IF NOT EXISTS changedFieldKeys JSONB; + +CREATE INDEX IF NOT EXISTS idx_entity_extension_version_order + ON entity_extension (id, versionNum DESC) + WHERE versionNum IS NOT NULL; + +CREATE INDEX IF NOT EXISTS idx_entity_extension_changed_field_keys + ON entity_extension USING GIN (changedFieldKeys) + WHERE changedFieldKeys IS NOT NULL; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v200/Migration.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v200/Migration.java index ef9b989c8c8c..ffe9dddaaa05 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v200/Migration.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v200/Migration.java @@ -3,14 +3,17 @@ import static org.openmetadata.service.jdbi3.locator.ConnectionType.MYSQL; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.addTableColumnSearchSettings; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.backfillAnnouncementRelationships; +import static org.openmetadata.service.migration.utils.v200.MigrationUtil.backfillVersionMetadata; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateLegacyActivityThreadsToActivityStream; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateSuggestionsToTaskEntity; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateThreadTasksToTaskEntity; import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; import org.openmetadata.service.migration.api.MigrationProcessImpl; import org.openmetadata.service.migration.utils.MigrationFile; +@Slf4j public class Migration extends MigrationProcessImpl { public Migration(MigrationFile migrationFile) { @@ -25,5 +28,13 @@ public void runDataMigration() { migrateThreadTasksToTaskEntity(handle, MYSQL); migrateLegacyActivityThreadsToActivityStream(handle, MYSQL); backfillAnnouncementRelationships(handle); + try { + backfillVersionMetadata(handle); + } catch (Exception e) { + LOG.error( + "Failed to backfill versionNum and changedFieldKeys in v200 migration. " + + "Version timeline filtering may not work correctly until the migration is re-run.", + e); + } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v200/Migration.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v200/Migration.java index dccef4e9ff43..72570e7ebd03 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v200/Migration.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v200/Migration.java @@ -3,14 +3,17 @@ import static org.openmetadata.service.jdbi3.locator.ConnectionType.POSTGRES; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.addTableColumnSearchSettings; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.backfillAnnouncementRelationships; +import static org.openmetadata.service.migration.utils.v200.MigrationUtil.backfillVersionMetadata; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateLegacyActivityThreadsToActivityStream; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateSuggestionsToTaskEntity; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateThreadTasksToTaskEntity; import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; import org.openmetadata.service.migration.api.MigrationProcessImpl; import org.openmetadata.service.migration.utils.MigrationFile; +@Slf4j public class Migration extends MigrationProcessImpl { public Migration(MigrationFile migrationFile) { @@ -25,5 +28,13 @@ public void runDataMigration() { migrateThreadTasksToTaskEntity(handle, POSTGRES); migrateLegacyActivityThreadsToActivityStream(handle, POSTGRES); backfillAnnouncementRelationships(handle); + try { + backfillVersionMetadata(handle); + } catch (Exception e) { + LOG.error( + "Failed to backfill versionNum and changedFieldKeys in v200 migration. " + + "Version timeline filtering may not work correctly until the migration is re-run.", + e); + } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java index 41f92dbb8199..a86dbe6626a1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java @@ -5,8 +5,10 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import lombok.extern.slf4j.Slf4j; import org.jdbi.v3.core.Handle; @@ -25,6 +27,7 @@ import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.locator.ConnectionType; import org.openmetadata.service.migration.utils.SearchSettingsMergeUtil; +import org.openmetadata.service.resources.databases.DatasourceConfig; import org.openmetadata.service.resources.feeds.MessageParser; import org.openmetadata.service.util.EntityUtil; import org.openmetadata.service.util.FullyQualifiedName; @@ -34,6 +37,20 @@ public class MigrationUtil { private static final String TABLE_COLUMN_ASSET_TYPE = "tableColumn"; + private static final int BATCH_SIZE = 1000; + + private static final String UPDATE_MYSQL = + "UPDATE entity_extension SET versionNum = :versionNum, changedFieldKeys = :changedFieldKeys " + + "WHERE id = :id AND extension = :extension"; + + private static final String UPDATE_POSTGRES = + "UPDATE entity_extension SET versionNum = :versionNum, changedFieldKeys = :changedFieldKeys::jsonb " + + "WHERE id = :id AND extension = :extension"; + + private static final String UPDATE_VERSION_NUM_ONLY = + "UPDATE entity_extension SET versionNum = :versionNum " + + "WHERE id = :id AND extension = :extension"; + private MigrationUtil() {} public static void addTableColumnSearchSettings() { @@ -1341,4 +1358,124 @@ private static void insertTaskDomainRelationships( } } } + + public static void backfillVersionMetadata(Handle handle) { + String updateSql = + Boolean.TRUE.equals(DatasourceConfig.getInstance().isMySQL()) + ? UPDATE_MYSQL + : UPDATE_POSTGRES; + + LOG.info("Starting backfill of versionNum and changedFieldKeys in entity_extension"); + int totalProcessed = 0; + List> batch; + + do { + batch = + handle + .createQuery( + "SELECT id, extension, json FROM entity_extension " + + "WHERE extension LIKE '%.version.%' " + + "AND versionNum IS NULL " + + "LIMIT :limit") + .bind("limit", BATCH_SIZE) + .mapToMap() + .list(); + + for (Map row : batch) { + String extension = row.get("extension").toString(); + String id = row.get("id").toString(); + Object jsonObj = row.get("json"); + double versionNum = extractVersionNum(extension); + String changedFieldKeys = "[]"; + + if (jsonObj != null) { + try { + changedFieldKeys = extractChangedFieldKeys(jsonObj.toString()); + } catch (Exception e) { + LOG.warn( + "Failed to extract changedFieldKeys for extension {}, using empty array", + extension, + e); + } + } + + try { + handle + .createUpdate(updateSql) + .bind("versionNum", versionNum) + .bind("changedFieldKeys", changedFieldKeys) + .bind("id", id) + .bind("extension", extension) + .execute(); + } catch (Exception e) { + LOG.warn( + "Full update failed for extension {}, falling back to versionNum-only update", + extension, + e); + // Guarantee versionNum is set so this row is not retried in the next batch. + // If this also fails we have a real DB issue — let the exception propagate to stop + // the loop rather than hammering a broken database indefinitely. + handle + .createUpdate(UPDATE_VERSION_NUM_ONLY) + .bind("versionNum", versionNum) + .bind("id", id) + .bind("extension", extension) + .execute(); + } + } + + totalProcessed += batch.size(); + if (!batch.isEmpty()) { + LOG.info("Backfilled {} entity_extension rows so far", totalProcessed); + } + } while (batch.size() == BATCH_SIZE); + + LOG.info( + "Backfill of versionNum and changedFieldKeys complete: {} total rows processed", + totalProcessed); + } + + static double extractVersionNum(String extension) { + int idx = extension.lastIndexOf(".version."); + if (idx < 0) { + return 0.0; + } + try { + return Double.parseDouble(extension.substring(idx + ".version.".length())); + } catch (NumberFormatException e) { + return 0.0; + } + } + + static String extractChangedFieldKeys(String json) { + JsonNode root = JsonUtils.readTree(json); + JsonNode changeDescription = root.get("changeDescription"); + if (changeDescription == null || changeDescription.isNull()) { + return "[]"; + } + + Set fieldNames = new LinkedHashSet<>(); + collectFieldNames(fieldNames, changeDescription.get("fieldsAdded")); + collectFieldNames(fieldNames, changeDescription.get("fieldsUpdated")); + collectFieldNames(fieldNames, changeDescription.get("fieldsDeleted")); + + List sorted = new ArrayList<>(fieldNames); + sorted.sort(String::compareTo); + return JsonUtils.pojoToJson(sorted); + } + + private static void collectFieldNames(Set fieldNames, JsonNode fieldChanges) { + if (fieldChanges == null || !fieldChanges.isArray()) { + return; + } + for (JsonNode fieldChange : fieldChanges) { + JsonNode nameNode = fieldChange.get("name"); + if (nameNode != null && !nameNode.isNull()) { + String name = nameNode.asText(); + if (!name.isEmpty()) { + fieldNames.add(name); + } + } + } + } } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/MigrationSqlStatementHashTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/MigrationSqlStatementHashTest.java index 185d6403e5f4..784364174ada 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/MigrationSqlStatementHashTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/MigrationSqlStatementHashTest.java @@ -19,25 +19,17 @@ class MigrationSqlStatementHashTest { @Test - void mysql1127MigrationStatementsHaveUniqueHashesWithinEachFile() { + void mysql200EntityExtensionVersionSchemaHasUniqueStatementHashes() { assertUniqueStatementHashes( resolveRepoRoot() - .resolve("bootstrap/sql/migrations/native/1.12.7/mysql/schemaChanges.sql")); - assertUniqueStatementHashes( - resolveRepoRoot() - .resolve( - "bootstrap/sql/migrations/native/1.12.7/mysql/postDataMigrationSQLScript.sql")); + .resolve("bootstrap/sql/migrations/native/2.0.0/mysql/schemaChanges.sql")); } @Test - void mysql1127MigrationFilesDoNotQueryInformationSchema() throws Exception { - assertDoesNotReferenceInformationSchema( - resolveRepoRoot() - .resolve("bootstrap/sql/migrations/native/1.12.7/mysql/schemaChanges.sql")); + void mysql200EntityExtensionVersionSchemaDoesNotQueryInformationSchema() throws Exception { assertDoesNotReferenceInformationSchema( resolveRepoRoot() - .resolve( - "bootstrap/sql/migrations/native/1.12.7/mysql/postDataMigrationSQLScript.sql")); + .resolve("bootstrap/sql/migrations/native/2.0.0/mysql/schemaChanges.sql")); } private void assertUniqueStatementHashes(Path sqlFile) { diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java index c7dea2da01b3..a0567ffe0ecc 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java @@ -15,15 +15,21 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.contains; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.openmetadata.service.jdbi3.locator.ConnectionType.MYSQL; @@ -33,10 +39,15 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import java.lang.reflect.Method; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import org.jdbi.v3.core.Handle; +import org.jdbi.v3.core.statement.Update; +import org.mockito.ArgumentCaptor; +import org.mockito.MockedStatic; +import org.openmetadata.service.resources.databases.DatasourceConfig; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -431,6 +442,344 @@ void buildActivityEventFromLegacyThreadReturnsNullForNonSystemThread() throws Ex assertNull(event); } + // ── extractVersionNum ────────────────────────────────────────────────────── + // + // Mirrors SQL: CAST(SUBSTRING_INDEX(extension, '.version.', -1) AS DOUBLE) + // SUBSTRING_INDEX with count=-1 returns everything after the LAST delimiter. + + @Test + void extractVersionNumFromStandardMinorVersion() { + assertEquals(0.1, MigrationUtil.extractVersionNum("uuid.version.0.1"), 1e-9); + } + + @Test + void extractVersionNumFromHigherVersion() { + assertEquals(1.5, MigrationUtil.extractVersionNum("uuid.version.1.5"), 1e-9); + } + + @Test + void extractVersionNumFromLargeVersion() { + assertEquals(10.0, MigrationUtil.extractVersionNum("uuid.version.10.0"), 1e-9); + } + + @Test + void extractVersionNumFromWholeNumberVersion() { + assertEquals(1.0, MigrationUtil.extractVersionNum("uuid.version.1"), 1e-9); + } + + @Test + void extractVersionNumReturnsZeroWhenNoVersionSegment() { + assertEquals(0.0, MigrationUtil.extractVersionNum("uuid.notype.something"), 1e-9); + } + + @Test + void extractVersionNumReturnsZeroForEmptyString() { + assertEquals(0.0, MigrationUtil.extractVersionNum(""), 1e-9); + } + + @Test + void extractVersionNumReturnsZeroWhenSuffixNotParseable() { + assertEquals(0.0, MigrationUtil.extractVersionNum("uuid.version.notanumber"), 1e-9); + } + + @Test + void extractVersionNumUsesLastOccurrenceMatchingSqlSubstringIndex() { + // SQL SUBSTRING_INDEX(str, '.version.', -1) takes after the LAST occurrence. + // "a.version.x.version.2.0" → SQL gives "2.0" → 2.0 + assertEquals(2.0, MigrationUtil.extractVersionNum("a.version.x.version.2.0"), 1e-9); + } + + @Test + void extractVersionNumFromRealWorldExtensionFormat() { + String uuid = UUID.randomUUID().toString(); + assertEquals(0.2, MigrationUtil.extractVersionNum(uuid + ".version.0.2"), 1e-9); + } + + // ── extractChangedFieldKeys ──────────────────────────────────────────────── + // + // Mirrors SQL UNION ALL + DISTINCT across fieldsAdded/Updated/Deleted .name, + // with NULL and empty-string filtering. Java additionally sorts alphabetically. + + @Test + void extractChangedFieldKeysFromAllThreeArrays() { + String json = + """ + { + "changeDescription": { + "fieldsAdded": [{"name": "tags"}], + "fieldsUpdated": [{"name": "description"}], + "fieldsDeleted": [{"name": "owner"}] + } + } + """; + assertEquals( + "[\"description\",\"owner\",\"tags\"]", MigrationUtil.extractChangedFieldKeys(json)); + } + + @Test + void extractChangedFieldKeysSortsAlphabetically() { + String json = + """ + { + "changeDescription": { + "fieldsAdded": [{"name": "z_col"}, {"name": "a_col"}, {"name": "m_col"}], + "fieldsUpdated": [], + "fieldsDeleted": [] + } + } + """; + assertEquals("[\"a_col\",\"m_col\",\"z_col\"]", MigrationUtil.extractChangedFieldKeys(json)); + } + + @Test + void extractChangedFieldKeysDeduplicatesSameNameAcrossAddedAndUpdated() { + String json = + """ + { + "changeDescription": { + "fieldsAdded": [{"name": "tags"}], + "fieldsUpdated": [{"name": "tags"}], + "fieldsDeleted": [] + } + } + """; + assertEquals("[\"tags\"]", MigrationUtil.extractChangedFieldKeys(json)); + } + + @Test + void extractChangedFieldKeysDeduplicatesAcrossAllThreeArrays() { + String json = + """ + { + "changeDescription": { + "fieldsAdded": [{"name": "b"}, {"name": "a"}, {"name": "c"}], + "fieldsUpdated": [{"name": "a"}, {"name": "d"}], + "fieldsDeleted": [{"name": "e"}, {"name": "b"}] + } + } + """; + assertEquals("[\"a\",\"b\",\"c\",\"d\",\"e\"]", MigrationUtil.extractChangedFieldKeys(json)); + } + + @Test + void extractChangedFieldKeysReturnsEmptyArrayWhenChangeDescriptionAbsent() { + assertEquals( + "[]", MigrationUtil.extractChangedFieldKeys("{\"name\": \"some_entity\"}")); + } + + @Test + void extractChangedFieldKeysReturnsEmptyArrayWhenChangeDescriptionIsNull() { + assertEquals( + "[]", MigrationUtil.extractChangedFieldKeys("{\"changeDescription\": null}")); + } + + @Test + void extractChangedFieldKeysReturnsEmptyArrayWhenAllArraysEmpty() { + String json = + """ + { + "changeDescription": { + "fieldsAdded": [], + "fieldsUpdated": [], + "fieldsDeleted": [] + } + } + """; + assertEquals("[]", MigrationUtil.extractChangedFieldKeys(json)); + } + + @Test + void extractChangedFieldKeysFiltersOutNullNameEntries() { + String json = + """ + { + "changeDescription": { + "fieldsAdded": [{"name": null}, {"name": "description"}], + "fieldsUpdated": [], + "fieldsDeleted": [] + } + } + """; + assertEquals("[\"description\"]", MigrationUtil.extractChangedFieldKeys(json)); + } + + @Test + void extractChangedFieldKeysFiltersOutEmptyStringNames() { + String json = + """ + { + "changeDescription": { + "fieldsAdded": [{"name": ""}, {"name": "tags"}], + "fieldsUpdated": [], + "fieldsDeleted": [] + } + } + """; + assertEquals("[\"tags\"]", MigrationUtil.extractChangedFieldKeys(json)); + } + + @Test + void extractChangedFieldKeysWithOnlyFieldsAddedOthersAbsent() { + String json = + """ + { + "changeDescription": { + "fieldsAdded": [{"name": "owner"}] + } + } + """; + assertDoesNotThrow( + () -> assertEquals("[\"owner\"]", MigrationUtil.extractChangedFieldKeys(json))); + } + + @Test + void extractChangedFieldKeysWithFieldEntryMissingNameKey() { + String json = + """ + { + "changeDescription": { + "fieldsAdded": [{"oldValue": "x", "newValue": "y"}], + "fieldsUpdated": [{"name": "tags"}], + "fieldsDeleted": [] + } + } + """; + assertDoesNotThrow( + () -> assertEquals("[\"tags\"]", MigrationUtil.extractChangedFieldKeys(json))); + } + + // ── backfillVersionMetadata ──────────────────────────────────────────────── + + @Test + void backfillIsNoOpWhenQueryReturnsEmptyBatch() { + Handle bHandle = mock(Handle.class, RETURNS_DEEP_STUBS); + when(bHandle.createQuery(any(String.class)).bind(anyString(), anyInt()).mapToMap().list()) + .thenReturn(List.of()); + + assertDoesNotThrow(() -> MigrationUtil.backfillVersionMetadata(bHandle)); + verify(bHandle, never()).createUpdate(any(String.class)); + } + + @Test + void backfillSelectsMysqlSqlWhenDatasourceIsMySQL() { + Handle bHandle = mock(Handle.class, RETURNS_DEEP_STUBS); + when(bHandle.createQuery(any(String.class)).bind(anyString(), anyInt()).mapToMap().list()) + .thenReturn(List.of(versionRow("uuid.version.0.1", emptyChangeDescriptionJson()))); + + try (MockedStatic ds = mockStatic(DatasourceConfig.class)) { + DatasourceConfig cfg = mock(DatasourceConfig.class); + ds.when(DatasourceConfig::getInstance).thenReturn(cfg); + when(cfg.isMySQL()).thenReturn(true); + + assertDoesNotThrow(() -> MigrationUtil.backfillVersionMetadata(bHandle)); + + ArgumentCaptor sqlCaptor = ArgumentCaptor.forClass(String.class); + verify(bHandle).createUpdate(sqlCaptor.capture()); + String sql = sqlCaptor.getValue(); + assertTrue(sql.contains("changedFieldKeys = :changedFieldKeys")); + assertFalse(sql.contains("::jsonb")); + } + } + + @Test + void backfillSelectsPostgresSqlWhenDatasourceIsPostgres() { + Handle bHandle = mock(Handle.class, RETURNS_DEEP_STUBS); + when(bHandle.createQuery(any(String.class)).bind(anyString(), anyInt()).mapToMap().list()) + .thenReturn(List.of(versionRow("uuid.version.1.0", emptyChangeDescriptionJson()))); + + try (MockedStatic ds = mockStatic(DatasourceConfig.class)) { + DatasourceConfig cfg = mock(DatasourceConfig.class); + ds.when(DatasourceConfig::getInstance).thenReturn(cfg); + when(cfg.isMySQL()).thenReturn(false); + + assertDoesNotThrow(() -> MigrationUtil.backfillVersionMetadata(bHandle)); + + ArgumentCaptor sqlCaptor = ArgumentCaptor.forClass(String.class); + verify(bHandle).createUpdate(sqlCaptor.capture()); + assertTrue(sqlCaptor.getValue().contains("::jsonb")); + } + } + + @Test + void backfillFallsBackToVersionNumOnlyUpdateWhenFullUpdateFails() { + Handle bHandle = mock(Handle.class, RETURNS_DEEP_STUBS); + when(bHandle.createQuery(any(String.class)).bind(anyString(), anyInt()).mapToMap().list()) + .thenReturn(List.of(versionRow("uuid.version.0.1", emptyChangeDescriptionJson()))); + + when(bHandle.createUpdate(argThat(s -> s != null && s.contains("changedFieldKeys")))) + .thenThrow(new RuntimeException("Simulated column-too-wide error")); + Update fallbackUpdate = mock(Update.class, RETURNS_DEEP_STUBS); + when(bHandle.createUpdate(argThat(s -> s != null && !s.contains("changedFieldKeys")))) + .thenReturn(fallbackUpdate); + + try (MockedStatic ds = mockStatic(DatasourceConfig.class)) { + DatasourceConfig cfg = mock(DatasourceConfig.class); + ds.when(DatasourceConfig::getInstance).thenReturn(cfg); + when(cfg.isMySQL()).thenReturn(true); + + assertDoesNotThrow(() -> MigrationUtil.backfillVersionMetadata(bHandle)); + + verify(bHandle, times(1)) + .createUpdate(argThat(s -> s != null && s.contains("changedFieldKeys"))); + verify(bHandle, times(1)) + .createUpdate(argThat(s -> s != null && !s.contains("changedFieldKeys"))); + } + } + + @Test + void backfillHandlesNullJsonColumnWithoutException() { + Handle bHandle = mock(Handle.class, RETURNS_DEEP_STUBS); + Map row = new HashMap<>(); + row.put("id", UUID.randomUUID().toString()); + row.put("extension", "uuid.version.0.1"); + row.put("json", null); + when(bHandle.createQuery(any(String.class)).bind(anyString(), anyInt()).mapToMap().list()) + .thenReturn(List.of(row)); + + try (MockedStatic ds = mockStatic(DatasourceConfig.class)) { + DatasourceConfig cfg = mock(DatasourceConfig.class); + ds.when(DatasourceConfig::getInstance).thenReturn(cfg); + when(cfg.isMySQL()).thenReturn(true); + + assertDoesNotThrow(() -> MigrationUtil.backfillVersionMetadata(bHandle)); + verify(bHandle).createUpdate(any(String.class)); + } + } + + @Test + void backfillProcessesMultipleRowsInOneBatch() { + Handle bHandle = mock(Handle.class, RETURNS_DEEP_STUBS); + List> batch = + List.of( + versionRow("uuid1.version.0.1", emptyChangeDescriptionJson()), + versionRow("uuid2.version.1.0", emptyChangeDescriptionJson()), + versionRow("uuid3.version.2.0", emptyChangeDescriptionJson())); + when(bHandle.createQuery(any(String.class)).bind(anyString(), anyInt()).mapToMap().list()) + .thenReturn(batch); + + try (MockedStatic ds = mockStatic(DatasourceConfig.class)) { + DatasourceConfig cfg = mock(DatasourceConfig.class); + ds.when(DatasourceConfig::getInstance).thenReturn(cfg); + when(cfg.isMySQL()).thenReturn(true); + + assertDoesNotThrow(() -> MigrationUtil.backfillVersionMetadata(bHandle)); + verify(bHandle, times(3)).createUpdate(any(String.class)); + } + } + + private static Map versionRow(String extension, String json) { + Map row = new HashMap<>(); + row.put("id", UUID.randomUUID().toString()); + row.put("extension", extension); + row.put("json", json); + return row; + } + + private static String emptyChangeDescriptionJson() { + return + "{\"changeDescription\":{\"fieldsAdded\":[],\"fieldsUpdated\":[],\"fieldsDeleted\":[]}}"; + } + private Object invokePrivateStatic(String methodName, Class[] parameterTypes, Object... args) throws Exception { Method method = MigrationUtil.class.getDeclaredMethod(methodName, parameterTypes); From 7c3afeb476a5e224b1357cf705af7d4fdf280dd0 Mon Sep 17 00:00:00 2001 From: Ram Narayan Balaji Date: Wed, 29 Apr 2026 17:37:28 +0530 Subject: [PATCH 2/3] fix: null guard for id/extension in backfillVersionMetadata, remove duplicate import --- .../migration/utils/v200/MigrationUtil.java | 10 ++++++++-- .../utils/MigrationSqlStatementHashTest.java | 6 ++---- .../migration/utils/v200/MigrationUtilTest.java | 14 +++++--------- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java index a86dbe6626a1..d427be427143 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java @@ -1382,8 +1382,14 @@ public static void backfillVersionMetadata(Handle handle) { .list(); for (Map row : batch) { - String extension = row.get("extension").toString(); - String id = row.get("id").toString(); + Object extObj = row.get("extension"); + Object idObj = row.get("id"); + if (extObj == null || idObj == null) { + LOG.warn("Skipping entity_extension row with null id or extension"); + continue; + } + String extension = extObj.toString(); + String id = idObj.toString(); Object jsonObj = row.get("json"); double versionNum = extractVersionNum(extension); String changedFieldKeys = "[]"; diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/MigrationSqlStatementHashTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/MigrationSqlStatementHashTest.java index 784364174ada..b9d504cb1c17 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/MigrationSqlStatementHashTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/MigrationSqlStatementHashTest.java @@ -21,15 +21,13 @@ class MigrationSqlStatementHashTest { @Test void mysql200EntityExtensionVersionSchemaHasUniqueStatementHashes() { assertUniqueStatementHashes( - resolveRepoRoot() - .resolve("bootstrap/sql/migrations/native/2.0.0/mysql/schemaChanges.sql")); + resolveRepoRoot().resolve("bootstrap/sql/migrations/native/2.0.0/mysql/schemaChanges.sql")); } @Test void mysql200EntityExtensionVersionSchemaDoesNotQueryInformationSchema() throws Exception { assertDoesNotReferenceInformationSchema( - resolveRepoRoot() - .resolve("bootstrap/sql/migrations/native/2.0.0/mysql/schemaChanges.sql")); + resolveRepoRoot().resolve("bootstrap/sql/migrations/native/2.0.0/mysql/schemaChanges.sql")); } private void assertUniqueStatementHashes(Path sqlFile) { diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java index a0567ffe0ecc..0fbe07b7e01c 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java @@ -45,17 +45,16 @@ import java.util.UUID; import org.jdbi.v3.core.Handle; import org.jdbi.v3.core.statement.Update; -import org.mockito.ArgumentCaptor; -import org.mockito.MockedStatic; -import org.openmetadata.service.resources.databases.DatasourceConfig; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import org.mockito.MockedStatic; import org.openmetadata.schema.entity.activity.ActivityEvent; import org.openmetadata.schema.entity.feed.Thread; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.service.Entity; import org.openmetadata.service.jdbi3.locator.ConnectionType; +import org.openmetadata.service.resources.databases.DatasourceConfig; import org.openmetadata.service.resources.feeds.MessageParser; class MigrationUtilTest { @@ -563,14 +562,12 @@ void extractChangedFieldKeysDeduplicatesAcrossAllThreeArrays() { @Test void extractChangedFieldKeysReturnsEmptyArrayWhenChangeDescriptionAbsent() { - assertEquals( - "[]", MigrationUtil.extractChangedFieldKeys("{\"name\": \"some_entity\"}")); + assertEquals("[]", MigrationUtil.extractChangedFieldKeys("{\"name\": \"some_entity\"}")); } @Test void extractChangedFieldKeysReturnsEmptyArrayWhenChangeDescriptionIsNull() { - assertEquals( - "[]", MigrationUtil.extractChangedFieldKeys("{\"changeDescription\": null}")); + assertEquals("[]", MigrationUtil.extractChangedFieldKeys("{\"changeDescription\": null}")); } @Test @@ -776,8 +773,7 @@ private static Map versionRow(String extension, String json) { } private static String emptyChangeDescriptionJson() { - return - "{\"changeDescription\":{\"fieldsAdded\":[],\"fieldsUpdated\":[],\"fieldsDeleted\":[]}}"; + return "{\"changeDescription\":{\"fieldsAdded\":[],\"fieldsUpdated\":[],\"fieldsDeleted\":[]}}"; } private Object invokePrivateStatic(String methodName, Class[] parameterTypes, Object... args) From 322436a79fe64f322242007888c1f01f55f14071 Mon Sep 17 00:00:00 2001 From: Ram Narayan Balaji Date: Wed, 29 Apr 2026 19:34:38 +0530 Subject: [PATCH 3/3] perf: cursor-based pagination + PreparedBatch for backfillVersionMetadata Replace WHERE versionNum IS NULL full-table scan (EXPLAIN: type=ALL, 2.1M rows/batch) with PK cursor pagination (EXPLAIN: type=index, key=PRIMARY, 5000 rows/batch). Use PreparedBatch to reduce UPDATE round trips from 5000 to 1 per batch. Bump BATCH_SIZE 1000 -> 5000. --- .../migration/utils/v200/MigrationUtil.java | 128 ++++++++++++------ .../utils/v200/MigrationUtilTest.java | 85 ++++++++++-- 2 files changed, 156 insertions(+), 57 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java index d427be427143..80e320727486 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java @@ -12,6 +12,7 @@ import java.util.UUID; import lombok.extern.slf4j.Slf4j; import org.jdbi.v3.core.Handle; +import org.jdbi.v3.core.statement.PreparedBatch; import org.openmetadata.schema.api.search.SearchSettings; import org.openmetadata.schema.entity.activity.ActivityEvent; import org.openmetadata.schema.entity.feed.Announcement; @@ -37,7 +38,7 @@ public class MigrationUtil { private static final String TABLE_COLUMN_ASSET_TYPE = "tableColumn"; - private static final int BATCH_SIZE = 1000; + private static final int BATCH_SIZE = 5000; private static final String UPDATE_MYSQL = "UPDATE entity_extension SET versionNum = :versionNum, changedFieldKeys = :changedFieldKeys " @@ -1364,23 +1365,28 @@ public static void backfillVersionMetadata(Handle handle) { Boolean.TRUE.equals(DatasourceConfig.getInstance().isMySQL()) ? UPDATE_MYSQL : UPDATE_POSTGRES; - LOG.info("Starting backfill of versionNum and changedFieldKeys in entity_extension"); + String lastId = ""; + String lastExt = ""; int totalProcessed = 0; List> batch; - do { batch = handle .createQuery( "SELECT id, extension, json FROM entity_extension " - + "WHERE extension LIKE '%.version.%' " + + "WHERE (id, extension) > (:lastId, :lastExt) " + + "AND extension LIKE '%.version.%' " + "AND versionNum IS NULL " + + "ORDER BY id, extension " + "LIMIT :limit") + .bind("lastId", lastId) + .bind("lastExt", lastExt) .bind("limit", BATCH_SIZE) .mapToMap() .list(); - + PreparedBatch preparedBatch = handle.prepareBatch(updateSql); + int batchCount = 0; for (Map row : batch) { Object extObj = row.get("extension"); Object idObj = row.get("id"); @@ -1390,55 +1396,93 @@ public static void backfillVersionMetadata(Handle handle) { } String extension = extObj.toString(); String id = idObj.toString(); - Object jsonObj = row.get("json"); - double versionNum = extractVersionNum(extension); - String changedFieldKeys = "[]"; + lastId = id; + lastExt = extension; + preparedBatch + .bind("versionNum", extractVersionNum(extension)) + .bind("changedFieldKeys", extractChangedFieldKeysQuietly(row.get("json"), extension)) + .bind("id", id) + .bind("extension", extension) + .add(); + batchCount++; + } + if (batchCount > 0) { + executeWithFallback(handle, updateSql, preparedBatch, batch); + } + totalProcessed += batch.size(); + if (!batch.isEmpty()) { + LOG.info("Backfilled {} entity_extension rows so far", totalProcessed); + } + } while (batch.size() == BATCH_SIZE); + LOG.info("Backfill complete: {} total rows processed", totalProcessed); + } - if (jsonObj != null) { - try { - changedFieldKeys = extractChangedFieldKeys(jsonObj.toString()); - } catch (Exception e) { - LOG.warn( - "Failed to extract changedFieldKeys for extension {}, using empty array", - extension, - e); - } - } + private static String extractChangedFieldKeysQuietly(Object jsonObj, String extension) { + if (jsonObj == null) { + return "[]"; + } + try { + return extractChangedFieldKeys(jsonObj.toString()); + } catch (Exception e) { + LOG.warn( + "Failed to extract changedFieldKeys for extension {}, using empty array", extension, e); + return "[]"; + } + } + + private static void executeWithFallback( + Handle handle, + String updateSql, + PreparedBatch preparedBatch, + List> batch) { + try { + preparedBatch.execute(); + } catch (Exception e) { + LOG.warn("Batch update failed, falling back to per-row updates", e); + fallbackPerRow(handle, updateSql, batch); + } + } + private static void fallbackPerRow( + Handle handle, String updateSql, List> batch) { + for (Map row : batch) { + Object extObj = row.get("extension"); + Object idObj = row.get("id"); + if (extObj == null || idObj == null) { + continue; + } + String extension = extObj.toString(); + String id = idObj.toString(); + if (!extension.contains(".version.")) { + continue; + } + double versionNum = extractVersionNum(extension); + String changedFieldKeys = extractChangedFieldKeysQuietly(row.get("json"), extension); + try { + handle + .createUpdate(updateSql) + .bind("versionNum", versionNum) + .bind("changedFieldKeys", changedFieldKeys) + .bind("id", id) + .bind("extension", extension) + .execute(); + } catch (Exception fullEx) { try { handle - .createUpdate(updateSql) + .createUpdate(UPDATE_VERSION_NUM_ONLY) .bind("versionNum", versionNum) - .bind("changedFieldKeys", changedFieldKeys) .bind("id", id) .bind("extension", extension) .execute(); - } catch (Exception e) { + } catch (Exception versionOnlyEx) { LOG.warn( - "Full update failed for extension {}, falling back to versionNum-only update", + "Skipping row id={} extension={} after both updates failed", + id, extension, - e); - // Guarantee versionNum is set so this row is not retried in the next batch. - // If this also fails we have a real DB issue — let the exception propagate to stop - // the loop rather than hammering a broken database indefinitely. - handle - .createUpdate(UPDATE_VERSION_NUM_ONLY) - .bind("versionNum", versionNum) - .bind("id", id) - .bind("extension", extension) - .execute(); + versionOnlyEx); } } - - totalProcessed += batch.size(); - if (!batch.isEmpty()) { - LOG.info("Backfilled {} entity_extension rows so far", totalProcessed); - } - } while (batch.size() == BATCH_SIZE); - - LOG.info( - "Backfill of versionNum and changedFieldKeys complete: {} total rows processed", - totalProcessed); + } } static double extractVersionNum(String extension) { diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java index 0fbe07b7e01c..153aa9bef244 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java @@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; @@ -44,6 +45,7 @@ import java.util.Map; import java.util.UUID; import org.jdbi.v3.core.Handle; +import org.jdbi.v3.core.statement.PreparedBatch; import org.jdbi.v3.core.statement.Update; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -657,11 +659,26 @@ void backfillIsNoOpWhenQueryReturnsEmptyBatch() { verify(bHandle, never()).createUpdate(any(String.class)); } + private static PreparedBatch batchWithSelfReturningBinds() { + PreparedBatch pb = mock(PreparedBatch.class, RETURNS_DEEP_STUBS); + when(pb.bind(anyString(), anyString())).thenReturn(pb); + when(pb.bind(anyString(), anyDouble())).thenReturn(pb); + return pb; + } + @Test void backfillSelectsMysqlSqlWhenDatasourceIsMySQL() { Handle bHandle = mock(Handle.class, RETURNS_DEEP_STUBS); - when(bHandle.createQuery(any(String.class)).bind(anyString(), anyInt()).mapToMap().list()) + PreparedBatch preparedBatch = batchWithSelfReturningBinds(); + when(bHandle + .createQuery(any(String.class)) + .bind(anyString(), anyString()) + .bind(anyString(), anyString()) + .bind(anyString(), anyInt()) + .mapToMap() + .list()) .thenReturn(List.of(versionRow("uuid.version.0.1", emptyChangeDescriptionJson()))); + when(bHandle.prepareBatch(any(String.class))).thenReturn(preparedBatch); try (MockedStatic ds = mockStatic(DatasourceConfig.class)) { DatasourceConfig cfg = mock(DatasourceConfig.class); @@ -671,7 +688,7 @@ void backfillSelectsMysqlSqlWhenDatasourceIsMySQL() { assertDoesNotThrow(() -> MigrationUtil.backfillVersionMetadata(bHandle)); ArgumentCaptor sqlCaptor = ArgumentCaptor.forClass(String.class); - verify(bHandle).createUpdate(sqlCaptor.capture()); + verify(bHandle).prepareBatch(sqlCaptor.capture()); String sql = sqlCaptor.getValue(); assertTrue(sql.contains("changedFieldKeys = :changedFieldKeys")); assertFalse(sql.contains("::jsonb")); @@ -681,8 +698,16 @@ void backfillSelectsMysqlSqlWhenDatasourceIsMySQL() { @Test void backfillSelectsPostgresSqlWhenDatasourceIsPostgres() { Handle bHandle = mock(Handle.class, RETURNS_DEEP_STUBS); - when(bHandle.createQuery(any(String.class)).bind(anyString(), anyInt()).mapToMap().list()) + PreparedBatch preparedBatch = batchWithSelfReturningBinds(); + when(bHandle + .createQuery(any(String.class)) + .bind(anyString(), anyString()) + .bind(anyString(), anyString()) + .bind(anyString(), anyInt()) + .mapToMap() + .list()) .thenReturn(List.of(versionRow("uuid.version.1.0", emptyChangeDescriptionJson()))); + when(bHandle.prepareBatch(any(String.class))).thenReturn(preparedBatch); try (MockedStatic ds = mockStatic(DatasourceConfig.class)) { DatasourceConfig cfg = mock(DatasourceConfig.class); @@ -692,7 +717,7 @@ void backfillSelectsPostgresSqlWhenDatasourceIsPostgres() { assertDoesNotThrow(() -> MigrationUtil.backfillVersionMetadata(bHandle)); ArgumentCaptor sqlCaptor = ArgumentCaptor.forClass(String.class); - verify(bHandle).createUpdate(sqlCaptor.capture()); + verify(bHandle).prepareBatch(sqlCaptor.capture()); assertTrue(sqlCaptor.getValue().contains("::jsonb")); } } @@ -700,14 +725,27 @@ void backfillSelectsPostgresSqlWhenDatasourceIsPostgres() { @Test void backfillFallsBackToVersionNumOnlyUpdateWhenFullUpdateFails() { Handle bHandle = mock(Handle.class, RETURNS_DEEP_STUBS); - when(bHandle.createQuery(any(String.class)).bind(anyString(), anyInt()).mapToMap().list()) + PreparedBatch preparedBatch = batchWithSelfReturningBinds(); + when(bHandle + .createQuery(any(String.class)) + .bind(anyString(), anyString()) + .bind(anyString(), anyString()) + .bind(anyString(), anyInt()) + .mapToMap() + .list()) .thenReturn(List.of(versionRow("uuid.version.0.1", emptyChangeDescriptionJson()))); - + when(bHandle.prepareBatch(any(String.class))).thenReturn(preparedBatch); + when(preparedBatch.execute()).thenThrow(new RuntimeException("Simulated batch failure")); + Update failingUpdate = mock(Update.class, RETURNS_DEEP_STUBS); + when(failingUpdate.bind(anyString(), anyString())).thenReturn(failingUpdate); + when(failingUpdate.bind(anyString(), anyDouble())).thenReturn(failingUpdate); + when(failingUpdate.execute()) + .thenThrow(new RuntimeException("Per-row full update also failed")); + Update versionNumOnlyUpdate = mock(Update.class, RETURNS_DEEP_STUBS); when(bHandle.createUpdate(argThat(s -> s != null && s.contains("changedFieldKeys")))) - .thenThrow(new RuntimeException("Simulated column-too-wide error")); - Update fallbackUpdate = mock(Update.class, RETURNS_DEEP_STUBS); + .thenReturn(failingUpdate); when(bHandle.createUpdate(argThat(s -> s != null && !s.contains("changedFieldKeys")))) - .thenReturn(fallbackUpdate); + .thenReturn(versionNumOnlyUpdate); try (MockedStatic ds = mockStatic(DatasourceConfig.class)) { DatasourceConfig cfg = mock(DatasourceConfig.class); @@ -726,12 +764,20 @@ void backfillFallsBackToVersionNumOnlyUpdateWhenFullUpdateFails() { @Test void backfillHandlesNullJsonColumnWithoutException() { Handle bHandle = mock(Handle.class, RETURNS_DEEP_STUBS); + PreparedBatch preparedBatch = batchWithSelfReturningBinds(); Map row = new HashMap<>(); row.put("id", UUID.randomUUID().toString()); row.put("extension", "uuid.version.0.1"); row.put("json", null); - when(bHandle.createQuery(any(String.class)).bind(anyString(), anyInt()).mapToMap().list()) + when(bHandle + .createQuery(any(String.class)) + .bind(anyString(), anyString()) + .bind(anyString(), anyString()) + .bind(anyString(), anyInt()) + .mapToMap() + .list()) .thenReturn(List.of(row)); + when(bHandle.prepareBatch(any(String.class))).thenReturn(preparedBatch); try (MockedStatic ds = mockStatic(DatasourceConfig.class)) { DatasourceConfig cfg = mock(DatasourceConfig.class); @@ -739,20 +785,28 @@ void backfillHandlesNullJsonColumnWithoutException() { when(cfg.isMySQL()).thenReturn(true); assertDoesNotThrow(() -> MigrationUtil.backfillVersionMetadata(bHandle)); - verify(bHandle).createUpdate(any(String.class)); + verify(preparedBatch).execute(); } } @Test void backfillProcessesMultipleRowsInOneBatch() { Handle bHandle = mock(Handle.class, RETURNS_DEEP_STUBS); - List> batch = + PreparedBatch preparedBatch = batchWithSelfReturningBinds(); + List> rows = List.of( versionRow("uuid1.version.0.1", emptyChangeDescriptionJson()), versionRow("uuid2.version.1.0", emptyChangeDescriptionJson()), versionRow("uuid3.version.2.0", emptyChangeDescriptionJson())); - when(bHandle.createQuery(any(String.class)).bind(anyString(), anyInt()).mapToMap().list()) - .thenReturn(batch); + when(bHandle + .createQuery(any(String.class)) + .bind(anyString(), anyString()) + .bind(anyString(), anyString()) + .bind(anyString(), anyInt()) + .mapToMap() + .list()) + .thenReturn(rows); + when(bHandle.prepareBatch(any(String.class))).thenReturn(preparedBatch); try (MockedStatic ds = mockStatic(DatasourceConfig.class)) { DatasourceConfig cfg = mock(DatasourceConfig.class); @@ -760,7 +814,8 @@ void backfillProcessesMultipleRowsInOneBatch() { when(cfg.isMySQL()).thenReturn(true); assertDoesNotThrow(() -> MigrationUtil.backfillVersionMetadata(bHandle)); - verify(bHandle, times(3)).createUpdate(any(String.class)); + verify(preparedBatch, times(1)).execute(); + verify(preparedBatch, times(3)).add(); } }