Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions bootstrap/sql/migrations/native/1.12.8/postgres/schemaChanges.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- Restore the unique constraint dropped in 1.9.9. Closes the 1.9.9 regression that caused
-- /columns?fields=profile 504s, and brings Postgres back in line with MySQL (which never
-- lost it). The leading (entityFQNHash, extension) prefix serves the column-profile batch query.
-- Two-phase: CONCURRENTLY build avoids ACCESS EXCLUSIVE lock; ADD CONSTRAINT USING INDEX
-- promotes the built index without re-scanning.
CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS
profiler_data_time_series_unique_hash_extension_ts
ON profiler_data_time_series (entityFQNHash, extension, operation, timestamp);
Comment thread
sonika-shah marked this conversation as resolved.

ALTER TABLE profiler_data_time_series
ADD CONSTRAINT profiler_data_time_series_unique_hash_extension_ts
UNIQUE USING INDEX profiler_data_time_series_unique_hash_extension_ts;
Comment thread
sonika-shah marked this conversation as resolved.
Comment thread
sonika-shah marked this conversation as resolved.

ANALYZE profiler_data_time_series;
Original file line number Diff line number Diff line change
Expand Up @@ -5861,4 +5861,141 @@ void test_listTablesWithColumnTags_performance(TestNamespace ns) {
assertFalse(table.getTags().isEmpty(), "Table tags should not be empty");
}
}

// ===================================================================
// REGRESSION TEST - columns API with fields=profile (collate#3488)
// ===================================================================

@Test
@Execution(ExecutionMode.SAME_THREAD)
void test_getColumnsWithProfileField_correctnessAndNoBatchRegression(TestNamespace ns) {
OpenMetadataClient client = SdkClients.adminClient();

DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns);
DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, service);

CreateClassification createClassification =
new CreateClassification()
.withName(ns.prefix("cls"))
.withDescription("Classification for profile regression test");
Classification cls = client.classifications().create(createClassification);

CreateTag createTag =
new CreateTag()
.withName(ns.prefix("tag"))
.withDescription("Tag for profile regression test")
.withClassification(cls.getName());
Tag tag = client.tags().create(createTag);

TagLabel tagLabel =
new TagLabel()
.withTagFQN(tag.getFullyQualifiedName())
.withSource(TagLabel.TagSource.CLASSIFICATION);

Column idCol = ColumnBuilder.of("id", "BIGINT").primaryKey().notNull().build();
idCol.setTags(List.of(tagLabel));
Column emailCol = ColumnBuilder.of("email", "VARCHAR").dataLength(255).build();
emailCol.setTags(List.of(tagLabel));
Column nameCol = ColumnBuilder.of("name", "VARCHAR").dataLength(255).build();

CreateTable createRequest = createRequest(ns.prefix("profile_regression_table"), ns);
createRequest.setDatabaseSchema(schema.getFullyQualifiedName());
createRequest.setColumns(List.of(idCol, emailCol, nameCol));
Table table = client.tables().create(createRequest);

Long timestamp = System.currentTimeMillis();
ColumnProfile idProfile =
new ColumnProfile()
.withName("id")
.withMin(1.0)
.withMax(999.0)
.withUniqueCount(100.0)
.withTimestamp(timestamp);
ColumnProfile emailProfile =
new ColumnProfile()
.withName("email")
.withNullCount(5.0)
.withNullProportion(0.05)
.withTimestamp(timestamp);

TableProfile tableProfile =
new TableProfile().withRowCount(100.0).withColumnCount(3.0).withTimestamp(timestamp);

CreateTableProfile createProfile =
new CreateTableProfile()
.withTableProfile(tableProfile)
.withColumnProfile(List.of(idProfile, emailProfile));
client.tables().updateTableProfile(table.getId(), createProfile);

// Verify the three field combinations exercised below don't regress:
// (a) fields=profile — profile data returned, no full-table-scan on profiler_data_time_series
TableColumnList withProfile =
assertTimeout(
Duration.ofSeconds(30),
() -> client.tables().getColumns(table.getId(), "profile"),
"columns?fields=profile should complete within 30s");

assertEquals(3, withProfile.getData().size());
Column returnedId =
withProfile.getData().stream()
.filter(c -> "id".equals(c.getName()))
.findFirst()
.orElse(null);
Column returnedName =
withProfile.getData().stream()
.filter(c -> "name".equals(c.getName()))
.findFirst()
.orElse(null);
assertNotNull(returnedId, "id column should be present");
assertNotNull(returnedId.getProfile(), "id column should have profile data");
assertEquals(1.0, returnedId.getProfile().getMin(), "id column min should match");
assertEquals(999.0, returnedId.getProfile().getMax(), "id column max should match");
assertNotNull(returnedName, "name column should be present");
assertNull(returnedName.getProfile(), "name column has no profile, should be null");

// (b) fields=tags,customMetrics,extension,profile — the exact production query
TableColumnList withAllFields =
assertTimeout(
Duration.ofSeconds(30),
() -> client.tables().getColumns(table.getId(), "tags,customMetrics,extension,profile"),
"columns?fields=tags,customMetrics,extension,profile should complete within 30s");

assertEquals(3, withAllFields.getData().size());

Column idResult =
withAllFields.getData().stream()
.filter(c -> "id".equals(c.getName()))
.findFirst()
.orElse(null);
assertNotNull(idResult, "id column must be present");
assertNotNull(idResult.getProfile(), "id column must have profile");
assertNotNull(idResult.getTags(), "id column must have tags");
assertFalse(idResult.getTags().isEmpty(), "id column tags must not be empty");
assertTrue(
idResult.getTags().stream()
.anyMatch(t -> tag.getFullyQualifiedName().equals(t.getTagFQN())),
"id column should carry the test tag");

// (c) fields=tags,profile — both tags and profile are populated correctly when requested
// together (the dedup of populateEntityFieldTags is exercised here, but this test
// verifies the observable contract — tags + profile both present on the result —
// not the internal call count)
TableColumnList withTagsAndProfile =
assertTimeout(
Duration.ofSeconds(30),
() -> client.tables().getColumns(table.getId(), "tags,profile"),
"columns?fields=tags,profile should complete within 30s");

assertEquals(3, withTagsAndProfile.getData().size());
Column idTagsProfile =
withTagsAndProfile.getData().stream()
.filter(c -> "id".equals(c.getName()))
.findFirst()
.orElse(null);
assertNotNull(idTagsProfile);
assertNotNull(idTagsProfile.getTags());
assertFalse(
idTagsProfile.getTags().isEmpty(), "Tags must be present even when profile requested");
assertNotNull(idTagsProfile.getProfile(), "Profile must be present when profile requested");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1526,6 +1526,13 @@ void bulkUpsertExtensions(
List<ExtensionRecord> getExtensions(
@BindUUID("id") UUID id, @Bind("extensionPrefix") String extensionPrefix);

@RegisterRowMapper(ExtensionMapper.class)
@SqlQuery(
"SELECT extension, json FROM entity_extension WHERE id = :id AND jsonschema = :jsonSchema "
+ "ORDER BY extension")
List<ExtensionRecord> getExtensionsByJsonSchema(
@BindUUID("id") UUID id, @Bind("jsonSchema") String jsonSchema);

@ConnectionAwareSqlQuery(
value =
"SELECT json FROM ("
Expand Down Expand Up @@ -9521,7 +9528,8 @@ default String getTimeSeriesTableName() {
+ " GROUP BY entityFQNHash"
+ ") latest "
+ "ON p.entityFQNHash = latest.entityFQNHash AND p.timestamp = latest.latestTs "
+ "WHERE p.extension = :extension")
+ "WHERE p.extension = :extension "
+ "AND p.entityFQNHash IN (<entityFQNHashes>)")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know what benefits we get in terms of performance by adding entityFqnHashes here since we already have this in the inner join table predicate.

@RegisterRowMapper(LatestExtensionRecordMapper.class)
List<LatestExtensionRecord> getLatestExtensionsBatch(
@Define("table") String table,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public class TableRepository extends EntityRepository<Table> {
public static final String TABLE_COLUMN_EXTENSION = "table.column";
public static final String TABLE_EXTENSION = "table.table";
public static final String CUSTOM_METRICS_EXTENSION = "customMetrics.";
public static final String COLUMN_EXTENSION_JSON_SCHEMA = "columnExtension";
public static final String TABLE_PROFILER_CONFIG = "tableProfilerConfig";
private static final ReadPrefetchKey PREFETCH_DEFAULT_FIELDS =
ReadPrefetchKey.TABLE_DEFAULT_FIELDS;
Expand Down Expand Up @@ -2891,20 +2892,53 @@ private ResultList<Column> getTableColumnsInternal(
}

if (fieldsParam != null && fieldsParam.contains("customMetrics")) {
List<ExtensionRecord> allColumnMetricRecords =
daoCollection
.entityExtensionDAO()
.getExtensions(table.getId(), CUSTOM_METRICS_EXTENSION + TABLE_COLUMN_EXTENSION);
Map<String, List<CustomMetric>> metricsByColumn = new HashMap<>();
for (ExtensionRecord record : allColumnMetricRecords) {
CustomMetric metric = JsonUtils.readValue(record.extensionJson(), CustomMetric.class);
if (metric != null && metric.getColumnName() != null) {
metricsByColumn
.computeIfAbsent(metric.getColumnName(), k -> new ArrayList<>())
.add(metric);
}
}
for (Column column : paginatedColumns) {
column.setCustomMetrics(getCustomMetrics(table, column.getName()));
column.setCustomMetrics(metricsByColumn.getOrDefault(column.getName(), List.of()));
}
}

if (fieldsParam != null && fieldsParam.contains("extension")) {
List<ExtensionRecord> allColumnExtensions =
daoCollection
.entityExtensionDAO()
.getExtensionsByJsonSchema(table.getId(), COLUMN_EXTENSION_JSON_SCHEMA);
Map<String, Object> extensionByColumnHash = new HashMap<>();
for (ExtensionRecord record : allColumnExtensions) {
try {
extensionByColumnHash.put(
record.extensionName(), JsonUtils.readValue(record.extensionJson(), Object.class));
} catch (Exception e) {
LOG.warn(
"Failed to deserialize column extension for table {}: {}",
table.getId(),
e.getMessage());
}
}
for (Column column : paginatedColumns) {
column.setExtension(getColumnExtension(table.getId(), column.getFullyQualifiedName()));
column.setExtension(
extensionByColumnHash.get(
FullyQualifiedName.buildHash(column.getFullyQualifiedName())));
}
}

if (fieldsParam != null && fieldsParam.contains("profile")) {
setColumnProfile(paginatedColumns);
populateEntityFieldTags(entityType, paginatedColumns, table.getFullyQualifiedName(), true);
if (!fieldsParam.contains("tags")) {
populateEntityFieldTags(entityType, paginatedColumns, table.getFullyQualifiedName(), true);
}
paginatedColumns =
piiOwners != null
? PIIMasker.getTableProfile(piiOwners, paginatedColumns, authorizer, securityContext)
Expand Down Expand Up @@ -3227,8 +3261,21 @@ private ResultList<Column> searchTableColumnsInternal(

Fields fields = getFields(fieldsParam);
if (fields.contains("customMetrics") || fields.contains("*")) {
List<ExtensionRecord> allColumnMetricRecords =
daoCollection
.entityExtensionDAO()
.getExtensions(table.getId(), CUSTOM_METRICS_EXTENSION + TABLE_COLUMN_EXTENSION);
Map<String, List<CustomMetric>> metricsByColumn = new HashMap<>();
for (ExtensionRecord record : allColumnMetricRecords) {
CustomMetric metric = JsonUtils.readValue(record.extensionJson(), CustomMetric.class);
if (metric != null && metric.getColumnName() != null) {
metricsByColumn
.computeIfAbsent(metric.getColumnName(), k -> new ArrayList<>())
.add(metric);
}
}
for (Column column : paginatedResults) {
column.setCustomMetrics(getCustomMetrics(table, column.getName()));
column.setCustomMetrics(metricsByColumn.getOrDefault(column.getName(), List.of()));
}
}

Expand Down
Loading