diff --git a/bootstrap/sql/migrations/native/1.12.7/mysql/postDataMigrationSQLScript.sql b/bootstrap/sql/migrations/native/1.12.7/mysql/postDataMigrationSQLScript.sql deleted file mode 100644 index 1958763f08fc..000000000000 --- a/bootstrap/sql/migrations/native/1.12.7/mysql/postDataMigrationSQLScript.sql +++ /dev/null @@ -1,57 +0,0 @@ -WITH -field_changes AS ( - SELECT e.id, e.extension, jt.field_name - FROM entity_extension AS e - JOIN JSON_TABLE( - COALESCE(JSON_EXTRACT(e.json, '$.changeDescription.fieldsAdded'), JSON_ARRAY()), - '$[*]' COLUMNS (field_name VARCHAR(1024) PATH '$.name') - ) AS jt ON TRUE - WHERE e.extension LIKE '%.version.%' - - UNION ALL - - SELECT e.id, e.extension, jt.field_name - FROM entity_extension AS e - JOIN JSON_TABLE( - COALESCE(JSON_EXTRACT(e.json, '$.changeDescription.fieldsUpdated'), JSON_ARRAY()), - '$[*]' COLUMNS (field_name VARCHAR(1024) PATH '$.name') - ) AS jt ON TRUE - WHERE e.extension LIKE '%.version.%' - - UNION ALL - - SELECT e.id, e.extension, jt.field_name - FROM entity_extension AS e - JOIN JSON_TABLE( - COALESCE(JSON_EXTRACT(e.json, '$.changeDescription.fieldsDeleted'), JSON_ARRAY()), - '$[*]' COLUMNS (field_name VARCHAR(1024) PATH '$.name') - ) AS jt ON TRUE - WHERE e.extension LIKE '%.version.%' -), -distinct_field_changes AS ( - SELECT DISTINCT id, extension, field_name - FROM field_changes - WHERE field_name IS NOT NULL - AND field_name <> '' -), -version_metadata AS ( - SELECT - e.id, - e.extension, - CAST(SUBSTRING_INDEX(e.extension, '.version.', -1) AS DOUBLE) AS version_num, - CASE - WHEN COUNT(fc.field_name) = 0 THEN JSON_ARRAY() - ELSE JSON_ARRAYAGG(fc.field_name) - END AS changed_field_keys - FROM entity_extension AS e - LEFT JOIN distinct_field_changes AS fc - ON fc.id = e.id - AND fc.extension = e.extension - WHERE e.extension LIKE '%.version.%' - GROUP BY e.id, e.extension -) -UPDATE entity_extension AS e -JOIN version_metadata AS vm - ON vm.id = e.id AND vm.extension = e.extension -SET e.versionNum = vm.version_num, - e.changedFieldKeys = vm.changed_field_keys; diff --git a/bootstrap/sql/migrations/native/1.12.7/mysql/schemaChanges.sql b/bootstrap/sql/migrations/native/1.12.7/mysql/schemaChanges.sql deleted file mode 100644 index 0a96628fd32c..000000000000 --- a/bootstrap/sql/migrations/native/1.12.7/mysql/schemaChanges.sql +++ /dev/null @@ -1,9 +0,0 @@ -ALTER TABLE entity_extension - ADD COLUMN versionNum DOUBLE NULL, - ADD COLUMN changedFieldKeys JSON NULL; - -CREATE INDEX idx_entity_extension_version_order - ON entity_extension (id, versionNum); - -CREATE INDEX idx_entity_extension_changed_field_keys - ON entity_extension ((CAST(changedFieldKeys->'$' AS CHAR(512) ARRAY))); diff --git a/bootstrap/sql/migrations/native/1.12.7/postgres/postDataMigrationSQLScript.sql b/bootstrap/sql/migrations/native/1.12.7/postgres/postDataMigrationSQLScript.sql deleted file mode 100644 index dcaea9a5813c..000000000000 --- a/bootstrap/sql/migrations/native/1.12.7/postgres/postDataMigrationSQLScript.sql +++ /dev/null @@ -1,49 +0,0 @@ -WITH version_metadata AS ( - SELECT - e.id, - e.extension, - split_part(e.extension, '.version.', 2)::DOUBLE PRECISION AS version_num, - COALESCE( - ( - SELECT jsonb_agg(field_name ORDER BY field_name) - FROM ( - SELECT DISTINCT - field_change ->> 'name' AS field_name - FROM jsonb_array_elements( - COALESCE(e.json -> 'changeDescription' -> 'fieldsAdded', '[]'::jsonb) - ) AS field_change - WHERE field_change ->> 'name' IS NOT NULL - AND field_change ->> 'name' <> '' - - UNION - - SELECT DISTINCT - field_change ->> 'name' AS field_name - FROM jsonb_array_elements( - COALESCE(e.json -> 'changeDescription' -> 'fieldsUpdated', '[]'::jsonb) - ) AS field_change - WHERE field_change ->> 'name' IS NOT NULL - AND field_change ->> 'name' <> '' - - UNION - - SELECT DISTINCT - field_change ->> 'name' AS field_name - FROM jsonb_array_elements( - COALESCE(e.json -> 'changeDescription' -> 'fieldsDeleted', '[]'::jsonb) - ) AS field_change - WHERE field_change ->> 'name' IS NOT NULL - AND field_change ->> 'name' <> '' - ) AS exact_field_names - ), - '[]'::jsonb - ) AS changed_field_keys - FROM entity_extension AS e - WHERE e.extension LIKE '%.version.%' -) -UPDATE entity_extension AS e -SET versionNum = version_metadata.version_num, - changedFieldKeys = version_metadata.changed_field_keys -FROM version_metadata -WHERE e.id = version_metadata.id - AND e.extension = version_metadata.extension; diff --git a/bootstrap/sql/migrations/native/1.12.7/postgres/schemaChanges.sql b/bootstrap/sql/migrations/native/1.12.7/postgres/schemaChanges.sql deleted file mode 100644 index 99bc7b7ff037..000000000000 --- a/bootstrap/sql/migrations/native/1.12.7/postgres/schemaChanges.sql +++ /dev/null @@ -1,11 +0,0 @@ -ALTER TABLE entity_extension - ADD COLUMN IF NOT EXISTS versionNum DOUBLE PRECISION, - ADD COLUMN IF NOT EXISTS changedFieldKeys JSONB; - -CREATE INDEX IF NOT EXISTS idx_entity_extension_version_order - ON entity_extension (id, versionNum DESC) - WHERE versionNum IS NOT NULL; - -CREATE INDEX IF NOT EXISTS idx_entity_extension_changed_field_keys - ON entity_extension USING GIN (changedFieldKeys) - WHERE changedFieldKeys IS NOT NULL; diff --git a/bootstrap/sql/migrations/native/2.0.0/mysql/schemaChanges.sql b/bootstrap/sql/migrations/native/2.0.0/mysql/schemaChanges.sql index 576f7b739682..646da758f9da 100644 --- a/bootstrap/sql/migrations/native/2.0.0/mysql/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/2.0.0/mysql/schemaChanges.sql @@ -135,3 +135,10 @@ CREATE TABLE IF NOT EXISTS task_form_schema_entity ( KEY idx_task_form_schema_task_type (taskType), KEY idx_task_form_schema_deleted (deleted) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; + +ALTER TABLE entity_extension + ADD COLUMN versionNum DOUBLE NULL, + ADD COLUMN changedFieldKeys JSON NULL; + +CREATE INDEX idx_entity_extension_version_order + ON entity_extension (id, versionNum); diff --git a/bootstrap/sql/migrations/native/2.0.0/postgres/schemaChanges.sql b/bootstrap/sql/migrations/native/2.0.0/postgres/schemaChanges.sql index 288caf8cf0be..93a3490f48a2 100644 --- a/bootstrap/sql/migrations/native/2.0.0/postgres/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/2.0.0/postgres/schemaChanges.sql @@ -140,3 +140,15 @@ CREATE TABLE IF NOT EXISTS task_form_schema_entity ( CREATE INDEX IF NOT EXISTS idx_task_form_schema_name ON task_form_schema_entity (name); CREATE INDEX IF NOT EXISTS idx_task_form_schema_tasktype ON task_form_schema_entity (tasktype); CREATE INDEX IF NOT EXISTS idx_task_form_schema_deleted ON task_form_schema_entity (deleted); + +ALTER TABLE entity_extension + ADD COLUMN IF NOT EXISTS versionNum DOUBLE PRECISION, + ADD COLUMN IF NOT EXISTS changedFieldKeys JSONB; + +CREATE INDEX IF NOT EXISTS idx_entity_extension_version_order + ON entity_extension (id, versionNum DESC) + WHERE versionNum IS NOT NULL; + +CREATE INDEX IF NOT EXISTS idx_entity_extension_changed_field_keys + ON entity_extension USING GIN (changedFieldKeys) + WHERE changedFieldKeys IS NOT NULL; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v200/Migration.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v200/Migration.java index ef9b989c8c8c..ffe9dddaaa05 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v200/Migration.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v200/Migration.java @@ -3,14 +3,17 @@ import static org.openmetadata.service.jdbi3.locator.ConnectionType.MYSQL; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.addTableColumnSearchSettings; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.backfillAnnouncementRelationships; +import static org.openmetadata.service.migration.utils.v200.MigrationUtil.backfillVersionMetadata; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateLegacyActivityThreadsToActivityStream; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateSuggestionsToTaskEntity; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateThreadTasksToTaskEntity; import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; import org.openmetadata.service.migration.api.MigrationProcessImpl; import org.openmetadata.service.migration.utils.MigrationFile; +@Slf4j public class Migration extends MigrationProcessImpl { public Migration(MigrationFile migrationFile) { @@ -25,5 +28,13 @@ public void runDataMigration() { migrateThreadTasksToTaskEntity(handle, MYSQL); migrateLegacyActivityThreadsToActivityStream(handle, MYSQL); backfillAnnouncementRelationships(handle); + try { + backfillVersionMetadata(handle); + } catch (Exception e) { + LOG.error( + "Failed to backfill versionNum and changedFieldKeys in v200 migration. " + + "Version timeline filtering may not work correctly until the migration is re-run.", + e); + } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v200/Migration.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v200/Migration.java index dccef4e9ff43..72570e7ebd03 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v200/Migration.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v200/Migration.java @@ -3,14 +3,17 @@ import static org.openmetadata.service.jdbi3.locator.ConnectionType.POSTGRES; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.addTableColumnSearchSettings; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.backfillAnnouncementRelationships; +import static org.openmetadata.service.migration.utils.v200.MigrationUtil.backfillVersionMetadata; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateLegacyActivityThreadsToActivityStream; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateSuggestionsToTaskEntity; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateThreadTasksToTaskEntity; import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; import org.openmetadata.service.migration.api.MigrationProcessImpl; import org.openmetadata.service.migration.utils.MigrationFile; +@Slf4j public class Migration extends MigrationProcessImpl { public Migration(MigrationFile migrationFile) { @@ -25,5 +28,13 @@ public void runDataMigration() { migrateThreadTasksToTaskEntity(handle, POSTGRES); migrateLegacyActivityThreadsToActivityStream(handle, POSTGRES); backfillAnnouncementRelationships(handle); + try { + backfillVersionMetadata(handle); + } catch (Exception e) { + LOG.error( + "Failed to backfill versionNum and changedFieldKeys in v200 migration. " + + "Version timeline filtering may not work correctly until the migration is re-run.", + e); + } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java index 41f92dbb8199..80e320727486 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java @@ -5,11 +5,14 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import lombok.extern.slf4j.Slf4j; import org.jdbi.v3.core.Handle; +import org.jdbi.v3.core.statement.PreparedBatch; import org.openmetadata.schema.api.search.SearchSettings; import org.openmetadata.schema.entity.activity.ActivityEvent; import org.openmetadata.schema.entity.feed.Announcement; @@ -25,6 +28,7 @@ import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.locator.ConnectionType; import org.openmetadata.service.migration.utils.SearchSettingsMergeUtil; +import org.openmetadata.service.resources.databases.DatasourceConfig; import org.openmetadata.service.resources.feeds.MessageParser; import org.openmetadata.service.util.EntityUtil; import org.openmetadata.service.util.FullyQualifiedName; @@ -34,6 +38,20 @@ public class MigrationUtil { private static final String TABLE_COLUMN_ASSET_TYPE = "tableColumn"; + private static final int BATCH_SIZE = 5000; + + private static final String UPDATE_MYSQL = + "UPDATE entity_extension SET versionNum = :versionNum, changedFieldKeys = :changedFieldKeys " + + "WHERE id = :id AND extension = :extension"; + + private static final String UPDATE_POSTGRES = + "UPDATE entity_extension SET versionNum = :versionNum, changedFieldKeys = :changedFieldKeys::jsonb " + + "WHERE id = :id AND extension = :extension"; + + private static final String UPDATE_VERSION_NUM_ONLY = + "UPDATE entity_extension SET versionNum = :versionNum " + + "WHERE id = :id AND extension = :extension"; + private MigrationUtil() {} public static void addTableColumnSearchSettings() { @@ -1341,4 +1359,173 @@ private static void insertTaskDomainRelationships( } } } + + public static void backfillVersionMetadata(Handle handle) { + String updateSql = + Boolean.TRUE.equals(DatasourceConfig.getInstance().isMySQL()) + ? UPDATE_MYSQL + : UPDATE_POSTGRES; + LOG.info("Starting backfill of versionNum and changedFieldKeys in entity_extension"); + String lastId = ""; + String lastExt = ""; + int totalProcessed = 0; + List> batch; + do { + batch = + handle + .createQuery( + "SELECT id, extension, json FROM entity_extension " + + "WHERE (id, extension) > (:lastId, :lastExt) " + + "AND extension LIKE '%.version.%' " + + "AND versionNum IS NULL " + + "ORDER BY id, extension " + + "LIMIT :limit") + .bind("lastId", lastId) + .bind("lastExt", lastExt) + .bind("limit", BATCH_SIZE) + .mapToMap() + .list(); + PreparedBatch preparedBatch = handle.prepareBatch(updateSql); + int batchCount = 0; + for (Map row : batch) { + Object extObj = row.get("extension"); + Object idObj = row.get("id"); + if (extObj == null || idObj == null) { + LOG.warn("Skipping entity_extension row with null id or extension"); + continue; + } + String extension = extObj.toString(); + String id = idObj.toString(); + lastId = id; + lastExt = extension; + preparedBatch + .bind("versionNum", extractVersionNum(extension)) + .bind("changedFieldKeys", extractChangedFieldKeysQuietly(row.get("json"), extension)) + .bind("id", id) + .bind("extension", extension) + .add(); + batchCount++; + } + if (batchCount > 0) { + executeWithFallback(handle, updateSql, preparedBatch, batch); + } + totalProcessed += batch.size(); + if (!batch.isEmpty()) { + LOG.info("Backfilled {} entity_extension rows so far", totalProcessed); + } + } while (batch.size() == BATCH_SIZE); + LOG.info("Backfill complete: {} total rows processed", totalProcessed); + } + + private static String extractChangedFieldKeysQuietly(Object jsonObj, String extension) { + if (jsonObj == null) { + return "[]"; + } + try { + return extractChangedFieldKeys(jsonObj.toString()); + } catch (Exception e) { + LOG.warn( + "Failed to extract changedFieldKeys for extension {}, using empty array", extension, e); + return "[]"; + } + } + + private static void executeWithFallback( + Handle handle, + String updateSql, + PreparedBatch preparedBatch, + List> batch) { + try { + preparedBatch.execute(); + } catch (Exception e) { + LOG.warn("Batch update failed, falling back to per-row updates", e); + fallbackPerRow(handle, updateSql, batch); + } + } + + private static void fallbackPerRow( + Handle handle, String updateSql, List> batch) { + for (Map row : batch) { + Object extObj = row.get("extension"); + Object idObj = row.get("id"); + if (extObj == null || idObj == null) { + continue; + } + String extension = extObj.toString(); + String id = idObj.toString(); + if (!extension.contains(".version.")) { + continue; + } + double versionNum = extractVersionNum(extension); + String changedFieldKeys = extractChangedFieldKeysQuietly(row.get("json"), extension); + try { + handle + .createUpdate(updateSql) + .bind("versionNum", versionNum) + .bind("changedFieldKeys", changedFieldKeys) + .bind("id", id) + .bind("extension", extension) + .execute(); + } catch (Exception fullEx) { + try { + handle + .createUpdate(UPDATE_VERSION_NUM_ONLY) + .bind("versionNum", versionNum) + .bind("id", id) + .bind("extension", extension) + .execute(); + } catch (Exception versionOnlyEx) { + LOG.warn( + "Skipping row id={} extension={} after both updates failed", + id, + extension, + versionOnlyEx); + } + } + } + } + + static double extractVersionNum(String extension) { + int idx = extension.lastIndexOf(".version."); + if (idx < 0) { + return 0.0; + } + try { + return Double.parseDouble(extension.substring(idx + ".version.".length())); + } catch (NumberFormatException e) { + return 0.0; + } + } + + static String extractChangedFieldKeys(String json) { + JsonNode root = JsonUtils.readTree(json); + JsonNode changeDescription = root.get("changeDescription"); + if (changeDescription == null || changeDescription.isNull()) { + return "[]"; + } + + Set fieldNames = new LinkedHashSet<>(); + collectFieldNames(fieldNames, changeDescription.get("fieldsAdded")); + collectFieldNames(fieldNames, changeDescription.get("fieldsUpdated")); + collectFieldNames(fieldNames, changeDescription.get("fieldsDeleted")); + + List sorted = new ArrayList<>(fieldNames); + sorted.sort(String::compareTo); + return JsonUtils.pojoToJson(sorted); + } + + private static void collectFieldNames(Set fieldNames, JsonNode fieldChanges) { + if (fieldChanges == null || !fieldChanges.isArray()) { + return; + } + for (JsonNode fieldChange : fieldChanges) { + JsonNode nameNode = fieldChange.get("name"); + if (nameNode != null && !nameNode.isNull()) { + String name = nameNode.asText(); + if (!name.isEmpty()) { + fieldNames.add(name); + } + } + } + } } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/MigrationSqlStatementHashTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/MigrationSqlStatementHashTest.java index 185d6403e5f4..b9d504cb1c17 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/MigrationSqlStatementHashTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/MigrationSqlStatementHashTest.java @@ -19,25 +19,15 @@ class MigrationSqlStatementHashTest { @Test - void mysql1127MigrationStatementsHaveUniqueHashesWithinEachFile() { + void mysql200EntityExtensionVersionSchemaHasUniqueStatementHashes() { assertUniqueStatementHashes( - resolveRepoRoot() - .resolve("bootstrap/sql/migrations/native/1.12.7/mysql/schemaChanges.sql")); - assertUniqueStatementHashes( - resolveRepoRoot() - .resolve( - "bootstrap/sql/migrations/native/1.12.7/mysql/postDataMigrationSQLScript.sql")); + resolveRepoRoot().resolve("bootstrap/sql/migrations/native/2.0.0/mysql/schemaChanges.sql")); } @Test - void mysql1127MigrationFilesDoNotQueryInformationSchema() throws Exception { - assertDoesNotReferenceInformationSchema( - resolveRepoRoot() - .resolve("bootstrap/sql/migrations/native/1.12.7/mysql/schemaChanges.sql")); + void mysql200EntityExtensionVersionSchemaDoesNotQueryInformationSchema() throws Exception { assertDoesNotReferenceInformationSchema( - resolveRepoRoot() - .resolve( - "bootstrap/sql/migrations/native/1.12.7/mysql/postDataMigrationSQLScript.sql")); + resolveRepoRoot().resolve("bootstrap/sql/migrations/native/2.0.0/mysql/schemaChanges.sql")); } private void assertUniqueStatementHashes(Path sqlFile) { diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java index c7dea2da01b3..153aa9bef244 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java @@ -15,15 +15,22 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.contains; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.openmetadata.service.jdbi3.locator.ConnectionType.MYSQL; @@ -33,18 +40,23 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import java.lang.reflect.Method; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import org.jdbi.v3.core.Handle; +import org.jdbi.v3.core.statement.PreparedBatch; +import org.jdbi.v3.core.statement.Update; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import org.mockito.MockedStatic; import org.openmetadata.schema.entity.activity.ActivityEvent; import org.openmetadata.schema.entity.feed.Thread; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.service.Entity; import org.openmetadata.service.jdbi3.locator.ConnectionType; +import org.openmetadata.service.resources.databases.DatasourceConfig; import org.openmetadata.service.resources.feeds.MessageParser; class MigrationUtilTest { @@ -431,6 +443,394 @@ void buildActivityEventFromLegacyThreadReturnsNullForNonSystemThread() throws Ex assertNull(event); } + // ── extractVersionNum ────────────────────────────────────────────────────── + // + // Mirrors SQL: CAST(SUBSTRING_INDEX(extension, '.version.', -1) AS DOUBLE) + // SUBSTRING_INDEX with count=-1 returns everything after the LAST delimiter. + + @Test + void extractVersionNumFromStandardMinorVersion() { + assertEquals(0.1, MigrationUtil.extractVersionNum("uuid.version.0.1"), 1e-9); + } + + @Test + void extractVersionNumFromHigherVersion() { + assertEquals(1.5, MigrationUtil.extractVersionNum("uuid.version.1.5"), 1e-9); + } + + @Test + void extractVersionNumFromLargeVersion() { + assertEquals(10.0, MigrationUtil.extractVersionNum("uuid.version.10.0"), 1e-9); + } + + @Test + void extractVersionNumFromWholeNumberVersion() { + assertEquals(1.0, MigrationUtil.extractVersionNum("uuid.version.1"), 1e-9); + } + + @Test + void extractVersionNumReturnsZeroWhenNoVersionSegment() { + assertEquals(0.0, MigrationUtil.extractVersionNum("uuid.notype.something"), 1e-9); + } + + @Test + void extractVersionNumReturnsZeroForEmptyString() { + assertEquals(0.0, MigrationUtil.extractVersionNum(""), 1e-9); + } + + @Test + void extractVersionNumReturnsZeroWhenSuffixNotParseable() { + assertEquals(0.0, MigrationUtil.extractVersionNum("uuid.version.notanumber"), 1e-9); + } + + @Test + void extractVersionNumUsesLastOccurrenceMatchingSqlSubstringIndex() { + // SQL SUBSTRING_INDEX(str, '.version.', -1) takes after the LAST occurrence. + // "a.version.x.version.2.0" → SQL gives "2.0" → 2.0 + assertEquals(2.0, MigrationUtil.extractVersionNum("a.version.x.version.2.0"), 1e-9); + } + + @Test + void extractVersionNumFromRealWorldExtensionFormat() { + String uuid = UUID.randomUUID().toString(); + assertEquals(0.2, MigrationUtil.extractVersionNum(uuid + ".version.0.2"), 1e-9); + } + + // ── extractChangedFieldKeys ──────────────────────────────────────────────── + // + // Mirrors SQL UNION ALL + DISTINCT across fieldsAdded/Updated/Deleted .name, + // with NULL and empty-string filtering. Java additionally sorts alphabetically. + + @Test + void extractChangedFieldKeysFromAllThreeArrays() { + String json = + """ + { + "changeDescription": { + "fieldsAdded": [{"name": "tags"}], + "fieldsUpdated": [{"name": "description"}], + "fieldsDeleted": [{"name": "owner"}] + } + } + """; + assertEquals( + "[\"description\",\"owner\",\"tags\"]", MigrationUtil.extractChangedFieldKeys(json)); + } + + @Test + void extractChangedFieldKeysSortsAlphabetically() { + String json = + """ + { + "changeDescription": { + "fieldsAdded": [{"name": "z_col"}, {"name": "a_col"}, {"name": "m_col"}], + "fieldsUpdated": [], + "fieldsDeleted": [] + } + } + """; + assertEquals("[\"a_col\",\"m_col\",\"z_col\"]", MigrationUtil.extractChangedFieldKeys(json)); + } + + @Test + void extractChangedFieldKeysDeduplicatesSameNameAcrossAddedAndUpdated() { + String json = + """ + { + "changeDescription": { + "fieldsAdded": [{"name": "tags"}], + "fieldsUpdated": [{"name": "tags"}], + "fieldsDeleted": [] + } + } + """; + assertEquals("[\"tags\"]", MigrationUtil.extractChangedFieldKeys(json)); + } + + @Test + void extractChangedFieldKeysDeduplicatesAcrossAllThreeArrays() { + String json = + """ + { + "changeDescription": { + "fieldsAdded": [{"name": "b"}, {"name": "a"}, {"name": "c"}], + "fieldsUpdated": [{"name": "a"}, {"name": "d"}], + "fieldsDeleted": [{"name": "e"}, {"name": "b"}] + } + } + """; + assertEquals("[\"a\",\"b\",\"c\",\"d\",\"e\"]", MigrationUtil.extractChangedFieldKeys(json)); + } + + @Test + void extractChangedFieldKeysReturnsEmptyArrayWhenChangeDescriptionAbsent() { + assertEquals("[]", MigrationUtil.extractChangedFieldKeys("{\"name\": \"some_entity\"}")); + } + + @Test + void extractChangedFieldKeysReturnsEmptyArrayWhenChangeDescriptionIsNull() { + assertEquals("[]", MigrationUtil.extractChangedFieldKeys("{\"changeDescription\": null}")); + } + + @Test + void extractChangedFieldKeysReturnsEmptyArrayWhenAllArraysEmpty() { + String json = + """ + { + "changeDescription": { + "fieldsAdded": [], + "fieldsUpdated": [], + "fieldsDeleted": [] + } + } + """; + assertEquals("[]", MigrationUtil.extractChangedFieldKeys(json)); + } + + @Test + void extractChangedFieldKeysFiltersOutNullNameEntries() { + String json = + """ + { + "changeDescription": { + "fieldsAdded": [{"name": null}, {"name": "description"}], + "fieldsUpdated": [], + "fieldsDeleted": [] + } + } + """; + assertEquals("[\"description\"]", MigrationUtil.extractChangedFieldKeys(json)); + } + + @Test + void extractChangedFieldKeysFiltersOutEmptyStringNames() { + String json = + """ + { + "changeDescription": { + "fieldsAdded": [{"name": ""}, {"name": "tags"}], + "fieldsUpdated": [], + "fieldsDeleted": [] + } + } + """; + assertEquals("[\"tags\"]", MigrationUtil.extractChangedFieldKeys(json)); + } + + @Test + void extractChangedFieldKeysWithOnlyFieldsAddedOthersAbsent() { + String json = + """ + { + "changeDescription": { + "fieldsAdded": [{"name": "owner"}] + } + } + """; + assertDoesNotThrow( + () -> assertEquals("[\"owner\"]", MigrationUtil.extractChangedFieldKeys(json))); + } + + @Test + void extractChangedFieldKeysWithFieldEntryMissingNameKey() { + String json = + """ + { + "changeDescription": { + "fieldsAdded": [{"oldValue": "x", "newValue": "y"}], + "fieldsUpdated": [{"name": "tags"}], + "fieldsDeleted": [] + } + } + """; + assertDoesNotThrow( + () -> assertEquals("[\"tags\"]", MigrationUtil.extractChangedFieldKeys(json))); + } + + // ── backfillVersionMetadata ──────────────────────────────────────────────── + + @Test + void backfillIsNoOpWhenQueryReturnsEmptyBatch() { + Handle bHandle = mock(Handle.class, RETURNS_DEEP_STUBS); + when(bHandle.createQuery(any(String.class)).bind(anyString(), anyInt()).mapToMap().list()) + .thenReturn(List.of()); + + assertDoesNotThrow(() -> MigrationUtil.backfillVersionMetadata(bHandle)); + verify(bHandle, never()).createUpdate(any(String.class)); + } + + private static PreparedBatch batchWithSelfReturningBinds() { + PreparedBatch pb = mock(PreparedBatch.class, RETURNS_DEEP_STUBS); + when(pb.bind(anyString(), anyString())).thenReturn(pb); + when(pb.bind(anyString(), anyDouble())).thenReturn(pb); + return pb; + } + + @Test + void backfillSelectsMysqlSqlWhenDatasourceIsMySQL() { + Handle bHandle = mock(Handle.class, RETURNS_DEEP_STUBS); + PreparedBatch preparedBatch = batchWithSelfReturningBinds(); + when(bHandle + .createQuery(any(String.class)) + .bind(anyString(), anyString()) + .bind(anyString(), anyString()) + .bind(anyString(), anyInt()) + .mapToMap() + .list()) + .thenReturn(List.of(versionRow("uuid.version.0.1", emptyChangeDescriptionJson()))); + when(bHandle.prepareBatch(any(String.class))).thenReturn(preparedBatch); + + try (MockedStatic ds = mockStatic(DatasourceConfig.class)) { + DatasourceConfig cfg = mock(DatasourceConfig.class); + ds.when(DatasourceConfig::getInstance).thenReturn(cfg); + when(cfg.isMySQL()).thenReturn(true); + + assertDoesNotThrow(() -> MigrationUtil.backfillVersionMetadata(bHandle)); + + ArgumentCaptor sqlCaptor = ArgumentCaptor.forClass(String.class); + verify(bHandle).prepareBatch(sqlCaptor.capture()); + String sql = sqlCaptor.getValue(); + assertTrue(sql.contains("changedFieldKeys = :changedFieldKeys")); + assertFalse(sql.contains("::jsonb")); + } + } + + @Test + void backfillSelectsPostgresSqlWhenDatasourceIsPostgres() { + Handle bHandle = mock(Handle.class, RETURNS_DEEP_STUBS); + PreparedBatch preparedBatch = batchWithSelfReturningBinds(); + when(bHandle + .createQuery(any(String.class)) + .bind(anyString(), anyString()) + .bind(anyString(), anyString()) + .bind(anyString(), anyInt()) + .mapToMap() + .list()) + .thenReturn(List.of(versionRow("uuid.version.1.0", emptyChangeDescriptionJson()))); + when(bHandle.prepareBatch(any(String.class))).thenReturn(preparedBatch); + + try (MockedStatic ds = mockStatic(DatasourceConfig.class)) { + DatasourceConfig cfg = mock(DatasourceConfig.class); + ds.when(DatasourceConfig::getInstance).thenReturn(cfg); + when(cfg.isMySQL()).thenReturn(false); + + assertDoesNotThrow(() -> MigrationUtil.backfillVersionMetadata(bHandle)); + + ArgumentCaptor sqlCaptor = ArgumentCaptor.forClass(String.class); + verify(bHandle).prepareBatch(sqlCaptor.capture()); + assertTrue(sqlCaptor.getValue().contains("::jsonb")); + } + } + + @Test + void backfillFallsBackToVersionNumOnlyUpdateWhenFullUpdateFails() { + Handle bHandle = mock(Handle.class, RETURNS_DEEP_STUBS); + PreparedBatch preparedBatch = batchWithSelfReturningBinds(); + when(bHandle + .createQuery(any(String.class)) + .bind(anyString(), anyString()) + .bind(anyString(), anyString()) + .bind(anyString(), anyInt()) + .mapToMap() + .list()) + .thenReturn(List.of(versionRow("uuid.version.0.1", emptyChangeDescriptionJson()))); + when(bHandle.prepareBatch(any(String.class))).thenReturn(preparedBatch); + when(preparedBatch.execute()).thenThrow(new RuntimeException("Simulated batch failure")); + Update failingUpdate = mock(Update.class, RETURNS_DEEP_STUBS); + when(failingUpdate.bind(anyString(), anyString())).thenReturn(failingUpdate); + when(failingUpdate.bind(anyString(), anyDouble())).thenReturn(failingUpdate); + when(failingUpdate.execute()) + .thenThrow(new RuntimeException("Per-row full update also failed")); + Update versionNumOnlyUpdate = mock(Update.class, RETURNS_DEEP_STUBS); + when(bHandle.createUpdate(argThat(s -> s != null && s.contains("changedFieldKeys")))) + .thenReturn(failingUpdate); + when(bHandle.createUpdate(argThat(s -> s != null && !s.contains("changedFieldKeys")))) + .thenReturn(versionNumOnlyUpdate); + + try (MockedStatic ds = mockStatic(DatasourceConfig.class)) { + DatasourceConfig cfg = mock(DatasourceConfig.class); + ds.when(DatasourceConfig::getInstance).thenReturn(cfg); + when(cfg.isMySQL()).thenReturn(true); + + assertDoesNotThrow(() -> MigrationUtil.backfillVersionMetadata(bHandle)); + + verify(bHandle, times(1)) + .createUpdate(argThat(s -> s != null && s.contains("changedFieldKeys"))); + verify(bHandle, times(1)) + .createUpdate(argThat(s -> s != null && !s.contains("changedFieldKeys"))); + } + } + + @Test + void backfillHandlesNullJsonColumnWithoutException() { + Handle bHandle = mock(Handle.class, RETURNS_DEEP_STUBS); + PreparedBatch preparedBatch = batchWithSelfReturningBinds(); + Map row = new HashMap<>(); + row.put("id", UUID.randomUUID().toString()); + row.put("extension", "uuid.version.0.1"); + row.put("json", null); + when(bHandle + .createQuery(any(String.class)) + .bind(anyString(), anyString()) + .bind(anyString(), anyString()) + .bind(anyString(), anyInt()) + .mapToMap() + .list()) + .thenReturn(List.of(row)); + when(bHandle.prepareBatch(any(String.class))).thenReturn(preparedBatch); + + try (MockedStatic ds = mockStatic(DatasourceConfig.class)) { + DatasourceConfig cfg = mock(DatasourceConfig.class); + ds.when(DatasourceConfig::getInstance).thenReturn(cfg); + when(cfg.isMySQL()).thenReturn(true); + + assertDoesNotThrow(() -> MigrationUtil.backfillVersionMetadata(bHandle)); + verify(preparedBatch).execute(); + } + } + + @Test + void backfillProcessesMultipleRowsInOneBatch() { + Handle bHandle = mock(Handle.class, RETURNS_DEEP_STUBS); + PreparedBatch preparedBatch = batchWithSelfReturningBinds(); + List> rows = + List.of( + versionRow("uuid1.version.0.1", emptyChangeDescriptionJson()), + versionRow("uuid2.version.1.0", emptyChangeDescriptionJson()), + versionRow("uuid3.version.2.0", emptyChangeDescriptionJson())); + when(bHandle + .createQuery(any(String.class)) + .bind(anyString(), anyString()) + .bind(anyString(), anyString()) + .bind(anyString(), anyInt()) + .mapToMap() + .list()) + .thenReturn(rows); + when(bHandle.prepareBatch(any(String.class))).thenReturn(preparedBatch); + + try (MockedStatic ds = mockStatic(DatasourceConfig.class)) { + DatasourceConfig cfg = mock(DatasourceConfig.class); + ds.when(DatasourceConfig::getInstance).thenReturn(cfg); + when(cfg.isMySQL()).thenReturn(true); + + assertDoesNotThrow(() -> MigrationUtil.backfillVersionMetadata(bHandle)); + verify(preparedBatch, times(1)).execute(); + verify(preparedBatch, times(3)).add(); + } + } + + private static Map versionRow(String extension, String json) { + Map row = new HashMap<>(); + row.put("id", UUID.randomUUID().toString()); + row.put("extension", extension); + row.put("json", json); + return row; + } + + private static String emptyChangeDescriptionJson() { + return "{\"changeDescription\":{\"fieldsAdded\":[],\"fieldsUpdated\":[],\"fieldsDeleted\":[]}}"; + } + private Object invokePrivateStatic(String methodName, Class[] parameterTypes, Object... args) throws Exception { Method method = MigrationUtil.class.getDeclaredMethod(methodName, parameterTypes);