Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions conf/openmetadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,12 @@ logging:

database:
# the name of the JDBC driver, mysql in our case
driverClass: ${DB_DRIVER_CLASS:-com.mysql.cj.jdbc.Driver}
driverClass: ${DB_DRIVER_CLASS:-org.postgresql.Driver}
# the username and password
user: ${DB_USER:-openmetadata_user}
password: ${DB_USER_PASSWORD:-openmetadata_password}
# the JDBC URL; the database is called openmetadata_db
url: jdbc:${DB_SCHEME:-mysql}://${DB_HOST:-localhost}:${DB_PORT:-3306}/${OM_DATABASE:-openmetadata_db}?${DB_PARAMS:-allowPublicKeyRetrieval=true&useSSL=false&serverTimezone=UTC}
url: jdbc:${DB_SCHEME:-postgresql}://${DB_HOST:-192.168.29.172}:${DB_PORT:-5432}/${OM_DATABASE:-openmetadata_db}?${DB_PARAMS:-allowPublicKeyRetrieval=true&useSSL=false&serverTimezone=UTC}
Comment on lines +273 to +278

# HikariCP Connection Pool Settings - Optimized for Performance
maxSize: ${DB_CONNECTION_POOL_MAX_SIZE:-100} # Increased from 50 for better concurrency
Expand Down Expand Up @@ -474,7 +474,7 @@ jwtTokenConfiguration:
keyId: ${JWT_KEY_ID:-"Gb389a-9f76-gdjs-a92j-0242bk94356"}

elasticsearch:
searchType: ${SEARCH_TYPE:- "elasticsearch"}
searchType: ${SEARCH_TYPE:- "opensearch"}
Comment on lines 476 to +477
# Single host or comma-separated list for multiple hosts
# Examples: "localhost" or "es-node1:9200,es-node2:9200,es-node3:9200"
host: ${ELASTICSEARCH_HOST:-localhost}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,87 @@ void testClassificationTermCount(TestNamespace ns) {
assertEquals(3, withTermCount.getTermCount(), "Classification should have 3 tags");
}

@Test
void test_classificationAndTagUsageCount(TestNamespace ns) {
OpenMetadataClient client = SdkClients.adminClient();

CreateClassification createClassification = new CreateClassification();
createClassification.setName(ns.prefix("classification_usagecount"));
createClassification.setDescription("Classification for tag usage-count test");
Classification classification = createEntity(createClassification);

CreateTag createTag1 = new CreateTag();
createTag1.setName(ns.prefix("tag_a"));
createTag1.setDescription("Tag A");
createTag1.setClassification(classification.getFullyQualifiedName());
org.openmetadata.schema.entity.classification.Tag tagA = client.tags().create(createTag1);

CreateTag createTag2 = new CreateTag();
createTag2.setName(ns.prefix("tag_b"));
createTag2.setDescription("Tag B");
createTag2.setClassification(classification.getFullyQualifiedName());
org.openmetadata.schema.entity.classification.Tag tagB = client.tags().create(createTag2);

TableResourceIT tableResourceIT = new TableResourceIT();
Table table1 = tableResourceIT.createEntity(tableResourceIT.createRequest(ns.prefix("usage_t1"), ns).withTags(null));
Table table2 = tableResourceIT.createEntity(tableResourceIT.createRequest(ns.prefix("usage_t2"), ns).withTags(null));
Table table3 = tableResourceIT.createEntity(tableResourceIT.createRequest(ns.prefix("usage_t3"), ns).withTags(null));

table1.setTags(List.of(new TagLabel().withTagFQN(tagA.getFullyQualifiedName())));
tableResourceIT.patchEntity(table1.getId().toString(), table1);

table2.setTags(List.of(new TagLabel().withTagFQN(tagA.getFullyQualifiedName())));
tableResourceIT.patchEntity(table2.getId().toString(), table2);

table3.setTags(List.of(new TagLabel().withTagFQN(tagB.getFullyQualifiedName())));
Comment on lines +385 to +391
tableResourceIT.patchEntity(table3.getId().toString(), table3);

Classification withUsage =
getEntityWithFields(classification.getId().toString(), "usageCount");
assertEquals(
3,
withUsage.getUsageCount(),
"Classification usage count must include all child-tag applications (correctness regression: hierarchical hash prefix match)");

org.openmetadata.schema.entity.classification.Tag tagAWithUsage =
client.tags().get(tagA.getId().toString(), "usageCount");
assertEquals(
2,
tagAWithUsage.getUsageCount(),
"Tag A must report exact usage count (correctness regression: exact hash match)");

org.openmetadata.schema.entity.classification.Tag tagBWithUsage =
client.tags().get(tagB.getId().toString(), "usageCount");
assertEquals(1, tagBWithUsage.getUsageCount(), "Tag B must report exact usage count");

ListParams listParams =
new ListParams()
.addFilter("parent", classification.getFullyQualifiedName())
.setFields("usageCount");
ListResponse<org.openmetadata.schema.entity.classification.Tag> listed =
client.tags().list(listParams);
org.openmetadata.schema.entity.classification.Tag listedA =
listed.getData().stream()
.filter(t -> t.getId().equals(tagA.getId()))
.findFirst()
.orElse(null);
org.openmetadata.schema.entity.classification.Tag listedB =
listed.getData().stream()
.filter(t -> t.getId().equals(tagB.getId()))
.findFirst()
.orElse(null);
assertNotNull(listedA);
assertNotNull(listedB);
assertEquals(
2,
listedA.getUsageCount(),
"Bulk LIST: tag A usage count must be correct (exercises batched getTagCountsBulk path)");
assertEquals(
1,
listedB.getUsageCount(),
"Bulk LIST: tag B usage count must be correct (exercises batched getTagCountsBulk path)");
}

@Test
void test_entityStatusUpdateAndPatch(TestNamespace ns) {
// Create a classification
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6563,59 +6563,99 @@ int getTagCount(
hash = true)
String tagFqnHash);

/**
* Get tag usage counts for multiple tags.
* This method retrieves counts for exact tag matches and their children in one query.
*/
@SqlQuery(
"SELECT tagFQN, count FROM ("
+ " SELECT ? as tagFQN, COUNT(DISTINCT targetFQNHash) as count "
+ " FROM tag_usage "
+ " WHERE source = ? AND (tagFQNHash = MD5(?) OR tagFQNHash LIKE CONCAT(MD5(?), '.%'))"
+ ") t WHERE tagFQN IN (<tagFQNs>)")
"SELECT tagFQNHash AS tagFQN, COUNT(*) AS count "
+ "FROM tag_usage "
+ "WHERE source = :source AND tagFQNHash IN (<hashes>) "
+ "GROUP BY tagFQNHash")
@RegisterRowMapper(TagCountMapper.class)
@Deprecated
List<Map.Entry<String, Integer>> getTagCountsBulkComplex(
@Bind("tagFQN") String sampleTagFQN,
@Bind("source") int source,
@Bind("tagFQNHash") String tagFQNHash,
@Bind("tagFQNHashPrefix") String tagFQNHashPrefix,
@BindList("tagFQNs") List<String> tagFQNs);
List<Map.Entry<String, Integer>> getTagUsageCountsByExactHashes(
@Bind("source") int source, @BindList("hashes") List<String> hashes);
Comment on lines 6566 to +6573

int TAG_COUNT_BATCH_CHUNK_SIZE = 1000;

/**
* Returns usage count per tagFQN = exact-match rows + descendant rows.
*
* <p>Hashes are pre-computed via FullyQualifiedName.buildHash to match the hierarchical
* format stored in tag_usage.tagFQNHash. Both branches are chunked at
* {@link #TAG_COUNT_BATCH_CHUNK_SIZE} so neither the IN-list nor the UNION-ALL of
* prefix-LIKE inputs grows unbounded.
*
* <p>Per-chunk query count is exactly 2 (one exact-match GROUP BY + one batched
* descendant GROUP BY) regardless of how many tags the chunk contains — eliminating the
* N+1 round-trip pattern that would scale poorly for large tag pages.
*/
default Map<String, Integer> getTagCountsBulk(int source, List<String> tagFQNs) {
if (tagFQNs == null || tagFQNs.isEmpty()) {
return Collections.emptyMap();
}

Map<String, Integer> resultMap = new HashMap<>();
LinkedHashMap<String, String> fqnByHash = new LinkedHashMap<>();
for (String tagFQN : tagFQNs) {
fqnByHash.put(FullyQualifiedName.buildHash(tagFQN), tagFQN);
}

// Process tags in batches to create a single efficient query
// We'll use a UNION ALL approach which is more compatible with JDBI
StringBuilder queryBuilder = new StringBuilder();
List<String> params = new ArrayList<>();
Map<String, Integer> result = new HashMap<>();
for (String tagFQN : tagFQNs) {
result.put(tagFQN, 0);
}
Comment thread
gitar-bot[bot] marked this conversation as resolved.

for (int i = 0; i < tagFQNs.size(); i++) {
if (i > 0) {
queryBuilder.append(" UNION ALL ");
List<String> allHashes = new ArrayList<>(fqnByHash.keySet());
for (int i = 0; i < allHashes.size(); i += TAG_COUNT_BATCH_CHUNK_SIZE) {
List<String> chunk =
allHashes.subList(i, Math.min(i + TAG_COUNT_BATCH_CHUNK_SIZE, allHashes.size()));

for (Map.Entry<String, Integer> row : getTagUsageCountsByExactHashes(source, chunk)) {
String tagFQN = fqnByHash.get(row.getKey());
if (tagFQN != null) {
result.merge(tagFQN, row.getValue(), Integer::sum);
}
}

for (Map.Entry<String, Integer> row : batchedDescendantCounts(source, chunk)) {
String tagFQN = fqnByHash.get(row.getKey());
if (tagFQN != null && row.getValue() > 0) {
result.merge(tagFQN, row.getValue(), Integer::sum);
}
}
queryBuilder.append(
"SELECT ? as tagFQN, COUNT(DISTINCT targetFQNHash) as count "
+ "FROM tag_usage "
+ "WHERE source = ? AND (tagFQNHash = MD5(?) OR tagFQNHash LIKE CONCAT(MD5(?), '.%'))");
params.add(tagFQNs.get(i)); // tagFQN for selection
params.add(String.valueOf(source)); // source
params.add(tagFQNs.get(i)); // tagFQN for MD5
params.add(tagFQNs.get(i)); // tagFQN for LIKE
}

// For now, fall back to individual queries until we have a better solution
// This ensures correctness while we work on optimization
for (String tagFQN : tagFQNs) {
int count = getTagCount(source, tagFQN);
resultMap.put(tagFQN, count);
}

return resultMap;
return result;
}

/**
* Single-round-trip descendant count for up to {@link #TAG_COUNT_BATCH_CHUNK_SIZE} root
* hashes. Builds a UNION-ALL of (rootHash, hashPrefix) input rows and joins to
* tag_usage with an indexed LIKE per row. All values are bound as named parameters —
* no string interpolation, safe against injection.
*/
default List<Map.Entry<String, Integer>> batchedDescendantCounts(
int source, List<String> rootHashes) {
if (rootHashes == null || rootHashes.isEmpty()) {
return Collections.emptyList();
}
StringBuilder sql =
new StringBuilder("SELECT i.rootHash AS tagFQN, COUNT(*) AS count FROM (");
for (int i = 0; i < rootHashes.size(); i++) {
if (i > 0) sql.append(" UNION ALL ");
sql.append("SELECT :h").append(i).append(" AS rootHash, :p").append(i).append(" AS hashPrefix");
}
sql.append(") i JOIN tag_usage tu ON tu.source = :source ")
.append("AND tu.tagFQNHash LIKE i.hashPrefix ")
.append("GROUP BY i.rootHash");

return org.openmetadata.service.Entity.getJdbi()
.withHandle(
handle -> {
var query = handle.createQuery(sql.toString());
query.bind("source", source);
for (int i = 0; i < rootHashes.size(); i++) {
query.bind("h" + i, rootHashes.get(i));
query.bind("p" + i, rootHashes.get(i) + ".%");
}
return query.map(new TagCountMapper()).list();
});
}

@SqlUpdate("DELETE FROM tag_usage where targetFQNHash = :targetFQNHash")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,51 +732,10 @@ private Map<String, Integer> batchFetchUsageCounts(List<Tag> tags) {
if (tags == null || tags.isEmpty()) {
return Map.of();
}

// Build and execute a single query for all tags
var tagFQNs = tags.stream().map(Tag::getFullyQualifiedName).toList();

// Build UNION query that gets counts for all tags in one go
var queryBuilder = new StringBuilder();
tagFQNs.forEach(
tagFQN -> {
if (!queryBuilder.isEmpty()) {
queryBuilder.append(" UNION ALL ");
}
var escapedFQN = tagFQN.replace("'", "''");
queryBuilder.append(
"""
SELECT '%s' as tagFQN,
COUNT(DISTINCT targetFQNHash) as count
FROM tag_usage
WHERE source = %d
AND (tagFQNHash = MD5('%s') OR tagFQNHash LIKE CONCAT(MD5('%s'), '.%%'))
"""
.formatted(
escapedFQN, TagSource.CLASSIFICATION.ordinal(), escapedFQN, escapedFQN));
});

try {
var results =
Entity.getJdbi()
.withHandle(handle -> handle.createQuery(queryBuilder.toString()).mapToMap().list());

return results.stream()
.filter(row -> row.get("tagFQN") != null)
.collect(
Collectors.toMap(
row -> (String) row.get("tagFQN"),
row -> {
var count = (Number) row.get("count");
return count != null ? count.intValue() : 0;
}));
} catch (Exception e) {
LOG.error("Error batch fetching usage counts", e);
// Fall back to individual queries
return daoCollection
.tagUsageDAO()
.getTagCountsBulk(TagSource.CLASSIFICATION.ordinal(), tagFQNs);
}
List<String> tagFQNs = tags.stream().map(Tag::getFullyQualifiedName).toList();
return daoCollection
.tagUsageDAO()
.getTagCountsBulk(TagSource.CLASSIFICATION.ordinal(), tagFQNs);
}

@Override
Expand Down
Loading