diff --git a/message/controller.proto b/message/controller.proto index 217db023e37..67ec1514519 100644 --- a/message/controller.proto +++ b/message/controller.proto @@ -244,8 +244,12 @@ message PrometheusMetricTarget { message PrometheusLabel { required uint32 id = 1; - required string name = 2; - required string value = 3; + // Deprecated: use name_id instead. + optional string name = 2; + // Deprecated: use value_id instead. + optional string value = 3; + optional uint32 name_id = 4; + optional uint32 value_id = 5; } message PrometheusTargetRequest { diff --git a/server/controller/db/metadb/migrator/schema/const.go b/server/controller/db/metadb/migrator/schema/const.go index 42b28c1f8ca..26a1c49e4db 100644 --- a/server/controller/db/metadb/migrator/schema/const.go +++ b/server/controller/db/metadb/migrator/schema/const.go @@ -20,5 +20,5 @@ const ( RAW_SQL_ROOT_DIR = "/etc/metadb/schema/rawsql" DB_VERSION_TABLE = "db_version" - DB_VERSION_EXPECTED = "7.1.0.39" + DB_VERSION_EXPECTED = "7.1.0.40" ) diff --git a/server/controller/db/metadb/migrator/schema/rawsql/mysql/ddl_create_table.sql b/server/controller/db/metadb/migrator/schema/rawsql/mysql/ddl_create_table.sql index e94be82edfd..a806dacde6c 100644 --- a/server/controller/db/metadb/migrator/schema/rawsql/mysql/ddl_create_table.sql +++ b/server/controller/db/metadb/migrator/schema/rawsql/mysql/ddl_create_table.sql @@ -2570,10 +2570,11 @@ TRUNCATE TABLE prometheus_label_value; CREATE TABLE IF NOT EXISTS prometheus_label ( `id` INT NOT NULL AUTO_INCREMENT PRIMARY KEY, - `name` VARCHAR(256) NOT NULL, - `value` TEXT, + `name_id` INT NOT NULL DEFAULT 0, + `value_id` INT NOT NULL DEFAULT 0, `synced_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, - `created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP + `created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + UNIQUE INDEX `name_id_value_id_index` (`name_id`, `value_id`) USING BTREE )ENGINE=innodb AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_bin; TRUNCATE TABLE prometheus_label; diff --git a/server/controller/db/metadb/migrator/schema/rawsql/mysql/issu/7.1.0.40.sql b/server/controller/db/metadb/migrator/schema/rawsql/mysql/issu/7.1.0.40.sql new file mode 100644 index 00000000000..ce0615a3753 --- /dev/null +++ b/server/controller/db/metadb/migrator/schema/rawsql/mysql/issu/7.1.0.40.sql @@ -0,0 +1,135 @@ +-- Step 1: Truncate prometheus data tables. +-- Prometheus metrics/labels are re-synced from agents on next startup, so no data +-- migration is needed — a clean slate is both correct and instant. +TRUNCATE TABLE prometheus_metric_name; +TRUNCATE TABLE prometheus_label_name; +TRUNCATE TABLE prometheus_label_value; +TRUNCATE TABLE prometheus_label; +TRUNCATE TABLE prometheus_metric_app_label_layout; +TRUNCATE TABLE ch_app_label; +TRUNCATE TABLE ch_prometheus_label_name; +TRUNCATE TABLE ch_prometheus_metric_app_label_layout; +TRUNCATE TABLE ch_prometheus_metric_name; + +-- Step 2: DDL — add name_id, value_id columns to prometheus_label (idempotent). +DROP PROCEDURE IF EXISTS ColumnExists; + +CREATE PROCEDURE ColumnExists( + IN p_table_name VARCHAR(255), + IN p_col_name VARCHAR(255), + OUT p_exists TINYINT(1) +) +BEGIN + SELECT COUNT(*) > 0 + INTO p_exists + FROM information_schema.columns + WHERE TABLE_SCHEMA = DATABASE() + AND TABLE_NAME = p_table_name + AND COLUMN_NAME = p_col_name; +END; + +DROP PROCEDURE IF EXISTS AddColumnIfNotExists; + +CREATE PROCEDURE AddColumnIfNotExists( + IN tableName VARCHAR(255), + IN colName VARCHAR(255), + IN colType VARCHAR(255), + IN afterCol VARCHAR(255) +) +BEGIN + CALL ColumnExists(tableName, colName, @exists); + IF NOT @exists THEN + SET @sql = CONCAT('ALTER TABLE ', tableName, ' ADD COLUMN ', colName, ' ', colType, ' AFTER ', afterCol); + PREPARE stmt FROM @sql; + EXECUTE stmt; + DEALLOCATE PREPARE stmt; + END IF; +END; + +CALL AddColumnIfNotExists('prometheus_label', 'name_id', 'INT NOT NULL DEFAULT 0', 'id'); +CALL AddColumnIfNotExists('prometheus_label', 'value_id', 'INT NOT NULL DEFAULT 0', 'name_id'); + +DROP PROCEDURE IF EXISTS ColumnExists; +DROP PROCEDURE IF EXISTS AddColumnIfNotExists; + +-- Step 3: Add UNIQUE index on (name_id, value_id) before any data is inserted. +-- The tables are truncated below so no deduplication is needed first. +DROP PROCEDURE IF EXISTS AddUniqueIndexIfNotExists; + +CREATE PROCEDURE AddUniqueIndexIfNotExists( + IN tableName VARCHAR(255), + IN indexName VARCHAR(255), + IN indexDef VARCHAR(1024) +) +BEGIN + DECLARE cnt INT; + SELECT COUNT(*) INTO cnt + FROM information_schema.statistics + WHERE table_schema = DATABASE() + AND table_name = tableName + AND index_name = indexName; + IF cnt = 0 THEN + SET @sql = CONCAT('ALTER TABLE ', tableName, ' ADD UNIQUE INDEX ', indexName, ' ', indexDef); + PREPARE stmt FROM @sql; + EXECUTE stmt; + DEALLOCATE PREPARE stmt; + END IF; +END; + +CALL AddUniqueIndexIfNotExists('prometheus_label', 'name_id_value_id_index', '(name_id, value_id)'); +DROP PROCEDURE IF EXISTS AddUniqueIndexIfNotExists; + +-- Step 4: Rename unused tables to _bak. +DROP PROCEDURE IF EXISTS RenameTableIfExists; + +CREATE PROCEDURE RenameTableIfExists( + IN oldName VARCHAR(255), + IN newName VARCHAR(255) +) +BEGIN + DECLARE cnt INT; + SELECT COUNT(*) INTO cnt + FROM information_schema.tables + WHERE table_schema = DATABASE() + AND table_name = oldName; + IF cnt > 0 THEN + SET @sql = CONCAT('RENAME TABLE ', oldName, ' TO ', newName); + PREPARE stmt FROM @sql; + EXECUTE stmt; + DEALLOCATE PREPARE stmt; + END IF; +END; + +CALL RenameTableIfExists('prometheus_metric_target', 'prometheus_metric_target_bak'); +CALL RenameTableIfExists('prometheus_metric_label_name', 'prometheus_metric_label_name_bak'); +DROP PROCEDURE IF EXISTS RenameTableIfExists; + +-- Step 4: Drop unused columns. +DROP PROCEDURE IF EXISTS DropColumnIfExists; + +CREATE PROCEDURE DropColumnIfExists( + IN tableName VARCHAR(255), + IN colName VARCHAR(255) +) +BEGIN + DECLARE cnt INT; + SELECT COUNT(*) INTO cnt + FROM information_schema.columns + WHERE table_schema = DATABASE() + AND table_name = tableName + AND column_name = colName; + IF cnt > 0 THEN + SET @sql = CONCAT('ALTER TABLE ', tableName, ' DROP COLUMN ', colName); + PREPARE stmt FROM @sql; + EXECUTE stmt; + DEALLOCATE PREPARE stmt; + END IF; +END; + +CALL DropColumnIfExists('prometheus_label', 'name'); +CALL DropColumnIfExists('prometheus_label', 'value'); +DROP PROCEDURE IF EXISTS DropColumnIfExists; + +-- Update DB version +UPDATE db_version SET version='7.1.0.40'; + diff --git a/server/controller/db/metadb/model/prometheus_model.go b/server/controller/db/metadb/model/prometheus_model.go index 24d77285eee..ae7b60f0273 100644 --- a/server/controller/db/metadb/model/prometheus_model.go +++ b/server/controller/db/metadb/model/prometheus_model.go @@ -52,8 +52,8 @@ type PrometheusLabelValue struct { type PrometheusLabel struct { PrometheusAutoIncID `gorm:"embedded"` PrometheusOperatedTime `gorm:"embedded"` - Name string `gorm:"column:name;type:varchar(256);not null"` - Value string `gorm:"column:value;type:text;default:''"` + NameID int `gorm:"column:name_id;type:int;not null;default:0"` + ValueID int `gorm:"column:value_id;type:int;not null;default:0"` } type PrometheusMetricLabelName struct { diff --git a/server/controller/prometheus/cache/cache.go b/server/controller/prometheus/cache/cache.go index bdce7c0e619..5c9e9ef71e4 100644 --- a/server/controller/prometheus/cache/cache.go +++ b/server/controller/prometheus/cache/cache.go @@ -122,10 +122,9 @@ func GetDebugCache(t controller.PrometheusCacheType) []byte { temp := map[string]interface{}{ "name_to_id": make(map[string]interface{}), } - tempCache.MetricName.nameToID.Range(func(key, value any) bool { - temp["name_to_id"].(map[string]interface{})[key.(string)] = value - return true - }) + for k, v := range tempCache.MetricName.GetNameToID() { + temp["name_to_id"].(map[string]interface{})[k] = v + } if len(temp["name_to_id"].(map[string]interface{})) > 0 { content["metric_name"] = temp } @@ -134,10 +133,9 @@ func GetDebugCache(t controller.PrometheusCacheType) []byte { temp := map[string]interface{}{ "name_to_id": make(map[string]interface{}), } - tempCache.LabelName.nameToID.Range(func(key, value any) bool { - temp["name_to_id"].(map[string]interface{})[key.(string)] = value - return true - }) + for k, v := range tempCache.LabelName.GetNameToID() { + temp["name_to_id"].(map[string]interface{})[k] = v + } if len(temp["name_to_id"].(map[string]interface{})) > 0 { content["label_name"] = temp } @@ -146,10 +144,10 @@ func GetDebugCache(t controller.PrometheusCacheType) []byte { temp := map[string]interface{}{ "value_to_id": make(map[string]interface{}), } - tempCache.LabelValue.GetValueToID().Range(func(key string, value int) bool { + + for key, value := range tempCache.LabelValue.GetValueToID() { temp["value_to_id"].(map[string]interface{})[key] = value - return true - }) + } if len(temp["value_to_id"].(map[string]interface{})) > 0 { content["label_value"] = temp @@ -159,10 +157,9 @@ func GetDebugCache(t controller.PrometheusCacheType) []byte { temp := map[string]interface{}{ "layout_key_to_index": make(map[string]interface{}), } - tempCache.MetricAndAPPLabelLayout.layoutKeyToIndex.Range(func(key, value any) bool { - temp["layout_key_to_index"].(map[string]interface{})[marshal(key)] = value - return true - }) + for k, v := range tempCache.MetricAndAPPLabelLayout.GetLayoutKeyToIndex() { + temp["layout_key_to_index"].(map[string]interface{})[marshal(k)] = v + } if len(temp["layout_key_to_index"].(map[string]interface{})) > 0 { content["metric_and_app_label_layout"] = temp } @@ -171,8 +168,8 @@ func GetDebugCache(t controller.PrometheusCacheType) []byte { temp := map[string]interface{}{ "key_to_id": make(map[string]interface{}), } - for iter := range tempCache.Label.keyToID.Iter() { - temp["key_to_id"].(map[string]interface{})[iter.Key.String()] = iter.Val + for key, value := range tempCache.Label.GetKeyToID() { + temp["key_to_id"].(map[string]interface{})[key.String()] = value } if len(temp["key_to_id"].(map[string]interface{})) > 0 { diff --git a/server/controller/prometheus/cache/cache_db_test.go b/server/controller/prometheus/cache/cache_db_test.go new file mode 100644 index 00000000000..98b9448ab55 --- /dev/null +++ b/server/controller/prometheus/cache/cache_db_test.go @@ -0,0 +1,542 @@ +/** + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cache + +import ( + "fmt" + "os" + "runtime" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + "gorm.io/gorm/schema" + + "github.com/deepflowio/deepflow/server/controller/db/metadb" + metadbmodel "github.com/deepflowio/deepflow/server/controller/db/metadb/model" + "github.com/deepflowio/deepflow/server/controller/prometheus/common" +) + +// ============================================================================ +// 测试基础设施:SQLite 内存数据库 + ORG 构造 +// ============================================================================ + +const testDBFile = "/tmp/prometheus_cache_test.db" + +// setupTestDB 创建 SQLite 数据库并自动建表 +func setupTestDB(t testing.TB) *gorm.DB { + t.Helper() + os.Remove(testDBFile) + db, err := gorm.Open( + sqlite.Open(testDBFile), + &gorm.Config{NamingStrategy: schema.NamingStrategy{SingularTable: true}}, + ) + require.NoError(t, err) + + sqlDB, err := db.DB() + require.NoError(t, err) + sqlDB.SetMaxIdleConns(50) + sqlDB.SetMaxOpenConns(100) + sqlDB.SetConnMaxLifetime(time.Hour) + + // 开启 WAL 模式提升写入性能 + db.Exec("PRAGMA journal_mode=WAL") + db.Exec("PRAGMA synchronous=OFF") + + // AutoMigrate prometheus 表(layout 需手动建表,因 SQLite 不支持 unsigned) + err = db.AutoMigrate( + &metadbmodel.PrometheusMetricName{}, + &metadbmodel.PrometheusLabelName{}, + &metadbmodel.PrometheusLabelValue{}, + &metadbmodel.PrometheusLabel{}, + ) + require.NoError(t, err) + + // SQLite 不支持 "tinyint(3) unsigned",手动建表 + err = db.Exec(`CREATE TABLE IF NOT EXISTS prometheus_metric_app_label_layout ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + created_at DATETIME, + synced_at DATETIME, + metric_name VARCHAR(256) NOT NULL, + app_label_name VARCHAR(256) NOT NULL, + app_label_column_index TINYINT NOT NULL + )`).Error + require.NoError(t, err) + + return db +} + +func cleanupTestDB(t testing.TB) { + t.Helper() + os.Remove(testDBFile) +} + +// newTestORG 构造一个使用 SQLite 数据库的 ORG,绕开真实的 metadb.GetDB +func newTestORG(gormDB *gorm.DB) *common.ORG { + return &common.ORG{ + ID: 1, + DB: &metadb.DB{DB: gormDB}, + } +} + +// batchInsert 通用批量插入,每批 batchSize 条 +func batchInsert[T any](db *gorm.DB, items []T, batchSize int) error { + for i := 0; i < len(items); i += batchSize { + end := i + batchSize + if end > len(items) { + end = len(items) + } + if err := db.Create(items[i:end]).Error; err != nil { + return err + } + } + return nil +} + +// ============================================================================ +// 数据生成器(DB 版本 — 带 time 字段,写入真实行) +// ============================================================================ + +func generateDBMetricNames(n int) []metadbmodel.PrometheusMetricName { + items := make([]metadbmodel.PrometheusMetricName, n) + now := time.Now() + for i := 0; i < n; i++ { + items[i] = metadbmodel.PrometheusMetricName{ + PrometheusID: metadbmodel.PrometheusID{ID: i + 1}, + PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now}, + Name: fmt.Sprintf("metric_%d", i), + } + } + return items +} + +func generateDBLabelNames(n int) []metadbmodel.PrometheusLabelName { + items := make([]metadbmodel.PrometheusLabelName, n) + now := time.Now() + for i := 0; i < n; i++ { + items[i] = metadbmodel.PrometheusLabelName{ + PrometheusID: metadbmodel.PrometheusID{ID: i + 1}, + PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now}, + Name: fmt.Sprintf("ln_%d", i), + } + } + return items +} + +func generateDBLabelValues(n int) []metadbmodel.PrometheusLabelValue { + items := make([]metadbmodel.PrometheusLabelValue, n) + now := time.Now() + for i := 0; i < n; i++ { + items[i] = metadbmodel.PrometheusLabelValue{ + PrometheusID: metadbmodel.PrometheusID{ID: i + 1}, + PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now}, + Value: fmt.Sprintf("lv_%d", i), + } + } + return items +} + +func generateDBLabels(n int) []metadbmodel.PrometheusLabel { + items := make([]metadbmodel.PrometheusLabel, n) + now := time.Now() + for i := 0; i < n; i++ { + items[i] = metadbmodel.PrometheusLabel{ + PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now}, + NameID: i + 1, + ValueID: i + 1, + } + } + return items +} + +func generateDBLayouts(n int) []metadbmodel.PrometheusMetricAPPLabelLayout { + items := make([]metadbmodel.PrometheusMetricAPPLabelLayout, n) + now := time.Now() + for i := 0; i < n; i++ { + items[i] = metadbmodel.PrometheusMetricAPPLabelLayout{ + PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now}, + MetricName: fmt.Sprintf("metric_%d", i/10), + APPLabelName: fmt.Sprintf("app_label_%d", i%10), + APPLabelColumnIndex: uint8(i%10 + 1), + } + } + return items +} + +// ============================================================================ +// Select 精简列正确性测试 +// ============================================================================ + +func TestSelect_MetricName_LoadOnlyIDAndName(t *testing.T) { + db := setupTestDB(t) + defer cleanupTestDB(t) + + items := generateDBMetricNames(1000) + require.NoError(t, batchInsert(db, items, 500)) + + mn := newTestMetricName() + mn.org = newTestORG(db) + + err := mn.refresh() + require.NoError(t, err) + + assert.Equal(t, 1000, len(mn.GetNameToID())) + for i := 0; i < 1000; i++ { + id, ok := mn.GetIDByName(fmt.Sprintf("metric_%d", i)) + assert.True(t, ok) + assert.Equal(t, i+1, id) + } +} + +func TestSelect_LabelName_LoadOnlyIDAndName(t *testing.T) { + db := setupTestDB(t) + defer cleanupTestDB(t) + + items := generateDBLabelNames(1000) + require.NoError(t, batchInsert(db, items, 500)) + + ln := newTestLabelName() + ln.org = newTestORG(db) + + err := ln.refresh() + require.NoError(t, err) + + assert.Equal(t, 1000, len(ln.GetNameToID())) + for i := 0; i < 1000; i++ { + id, ok := ln.GetIDByName(fmt.Sprintf("ln_%d", i)) + assert.True(t, ok) + assert.Equal(t, i+1, id) + } +} + +func TestSelect_LabelValue_LoadOnlyIDAndValue(t *testing.T) { + db := setupTestDB(t) + defer cleanupTestDB(t) + + items := generateDBLabelValues(10000) + require.NoError(t, batchInsert(db, items, 1000)) + + lv := newTestLabelValue() + lv.org = newTestORG(db) + + err := lv.refresh() + require.NoError(t, err) + + snapshot := lv.GetValueToID() + assert.Equal(t, 10000, countStringIntMap(snapshot)) + for i := 0; i < 10000; i++ { + id, ok := lv.GetIDByValue(fmt.Sprintf("lv_%d", i)) + assert.True(t, ok, "value lv_%d should exist", i) + assert.Equal(t, i+1, id) + } +} + +func TestSelect_Label_LoadOnlyIDNameValue(t *testing.T) { + db := setupTestDB(t) + defer cleanupTestDB(t) + + items := generateDBLabels(5000) + require.NoError(t, batchInsert(db, items, 1000)) + + l := newTestLabel() + l.org = newTestORG(db) + + err := l.refresh() + require.NoError(t, err) + + snapshot := l.GetKeyToID() + assert.Equal(t, 5000, countLabelConcurrentMap(snapshot)) + for i := 0; i < 5000; i++ { + _, ok := l.GetIDByKey(NewLabelKey(i+1, i+1)) + assert.True(t, ok, "label name_id=%d/value_id=%d should exist", i+1, i+1) + } +} + +func TestSelect_Layout_LoadOnlyNeededColumns(t *testing.T) { + db := setupTestDB(t) + defer cleanupTestDB(t) + + items := generateDBLayouts(500) + require.NoError(t, batchInsert(db, items, 200)) + + mll := newTestLayout() + mll.org = newTestORG(db) + + err := mll.refresh() + require.NoError(t, err) + + assert.NotZero(t, len(mll.GetLayoutKeyToIndex())) + for _, item := range items { + idx, ok := mll.GetIndexByKey(NewLayoutKey(item.MetricName, item.APPLabelName)) + assert.True(t, ok, "layout %s/%s should exist", item.MetricName, item.APPLabelName) + assert.Equal(t, item.APPLabelColumnIndex, idx) + } +} + +// ============================================================================ +// 覆盖式 refresh + Select 联合测试:验证 swap 后旧条目消失 +// ============================================================================ + +func TestSelect_Label_RefreshDiscardsDeletedRows(t *testing.T) { + db := setupTestDB(t) + defer cleanupTestDB(t) + + // 第一轮:插入 200 条并 refresh + items := generateDBLabels(200) + require.NoError(t, batchInsert(db, items, 100)) + + l := newTestLabel() + l.org = newTestORG(db) + require.NoError(t, l.refresh()) + assert.Equal(t, 200, countLabelConcurrentMap(l.GetKeyToID())) + + // 模拟 Cleaner 删除后 100 条 + db.Where("id > ?", 100).Delete(&metadbmodel.PrometheusLabel{}) + + // 第二轮 refresh — 应该只剩 100 条 + require.NoError(t, l.refresh()) + assert.Equal(t, 100, countLabelConcurrentMap(l.GetKeyToID())) + + // 被删除的条目不可访问 + _, ok := l.GetIDByKey(NewLabelKey(151, 151)) + assert.False(t, ok, "deleted label should not exist after refresh") +} + +func TestSelect_MetricName_RefreshDiscardsDeletedRows(t *testing.T) { + db := setupTestDB(t) + defer cleanupTestDB(t) + + items := generateDBMetricNames(200) + require.NoError(t, batchInsert(db, items, 100)) + + mn := newTestMetricName() + mn.org = newTestORG(db) + require.NoError(t, mn.refresh()) + assert.Equal(t, 200, len(mn.GetNameToID())) + + // 删除后半部分 + db.Where("id > ?", 100).Delete(&metadbmodel.PrometheusMetricName{}) + + require.NoError(t, mn.refresh()) + assert.Equal(t, 100, len(mn.GetNameToID())) + + _, ok := mn.GetIDByName("metric_150") + assert.False(t, ok) +} + +// ============================================================================ +// 大数据量测试:label_value 100 万行 +// ============================================================================ + +func TestSelect_LabelValue_1M_Refresh(t *testing.T) { + if testing.Short() { + t.Skip("skipping 1M label_value test in short mode") + } + + const N = 1_000_000 + db := setupTestDB(t) + defer cleanupTestDB(t) + + // ---- 写入 100 万行 ---- + t.Logf("inserting %d label_values into SQLite...", N) + insertStart := time.Now() + items := generateDBLabelValues(N) + require.NoError(t, batchInsert(db, items, 5000)) + t.Logf("insert completed in %v", time.Since(insertStart)) + + // 验证行数 + var count int64 + db.Model(&metadbmodel.PrometheusLabelValue{}).Count(&count) + require.Equal(t, int64(N), count) + + // ---- refresh(Select 精简列)性能测试 ---- + lv := newTestLabelValue() + lv.org = newTestORG(db) + + runtime.GC() + var m1 runtime.MemStats + runtime.ReadMemStats(&m1) + + refreshStart := time.Now() + err := lv.refresh() + refreshDuration := time.Since(refreshStart) + require.NoError(t, err) + + var m2 runtime.MemStats + runtime.GC() + runtime.ReadMemStats(&m2) + + snapshot := lv.GetValueToID() + assert.Equal(t, N, countStringIntMap(snapshot)) + t.Logf("refresh %d label_values: duration=%v, HeapInuse before=%dMB after=%dMB", + N, refreshDuration, m1.HeapInuse/1024/1024, m2.HeapInuse/1024/1024) + + // ---- 正确性抽检 ---- + for _, idx := range []int{0, 1, 999, 10000, 99999, 500000, 999999} { + id, ok := lv.GetIDByValue(fmt.Sprintf("lv_%d", idx)) + assert.True(t, ok, "lv_%d should exist", idx) + assert.Equal(t, idx+1, id) + } + + // ---- 模拟 Cleaner 删除 90% 数据后 refresh ---- + t.Log("deleting 90% rows to test shrink...") + db.Where("id > ?", N/10).Delete(&metadbmodel.PrometheusLabelValue{}) + + runtime.GC() + var m3 runtime.MemStats + runtime.ReadMemStats(&m3) + + refreshStart2 := time.Now() + require.NoError(t, lv.refresh()) + refreshDuration2 := time.Since(refreshStart2) + + runtime.GC() + runtime.GC() + var m4 runtime.MemStats + runtime.ReadMemStats(&m4) + + remaining := countStringIntMap(lv.GetValueToID()) + assert.Equal(t, N/10, remaining) + t.Logf("refresh after delete: remaining=%d, duration=%v, HeapInuse before=%dMB after=%dMB", + remaining, refreshDuration2, m3.HeapInuse/1024/1024, m4.HeapInuse/1024/1024) + + // 被删除的条目不可访问 + _, ok := lv.GetIDByValue(fmt.Sprintf("lv_%d", N-1)) + assert.False(t, ok, "deleted value should not exist after refresh") +} + +// ============================================================================ +// 大数据量测试:label 50 万行(Name+Value 组合键) +// ============================================================================ + +func TestSelect_Label_500K_Refresh(t *testing.T) { + if testing.Short() { + t.Skip("skipping 500K label test in short mode") + } + + const N = 500_000 + db := setupTestDB(t) + defer cleanupTestDB(t) + + t.Logf("inserting %d labels into SQLite...", N) + insertStart := time.Now() + items := generateDBLabels(N) + require.NoError(t, batchInsert(db, items, 5000)) + t.Logf("insert completed in %v", time.Since(insertStart)) + + l := newTestLabel() + l.org = newTestORG(db) + + refreshStart := time.Now() + err := l.refresh() + refreshDuration := time.Since(refreshStart) + require.NoError(t, err) + + snapshot := l.GetKeyToID() + assert.Equal(t, N, countLabelConcurrentMap(snapshot)) + t.Logf("refresh %d labels: duration=%v", N, refreshDuration) + + // 正确性抜检 + for _, idx := range []int{0, 100, 9999, 250000, 499999} { + _, ok := l.GetIDByKey(NewLabelKey(idx+1, idx+1)) + assert.True(t, ok, "label name_id=%d/value_id=%d should exist", idx+1, idx+1) + } +} + +// // ============================================================================ +// // Benchmark:对比 Select 列裁剪 vs 全列加载的 refresh 性能 +// // ============================================================================ + +// func BenchmarkRefresh_LabelValue_WithSelect(b *testing.B) { +// db := setupTestDB(b) +// defer cleanupTestDB(b) + +// const N = 100_000 +// items := generateDBLabelValues(N) +// require.NoError(b, batchInsert(db, items, 5000)) + +// lv := newTestLabelValue() +// lv.org = newTestORG(db) + +// b.ResetTimer() +// for i := 0; i < b.N; i++ { +// lv.refresh() +// } +// } + +// func BenchmarkRefresh_LabelValue_WithoutSelect(b *testing.B) { +// db := setupTestDB(b) +// defer cleanupTestDB(b) + +// const N = 100_000 +// items := generateDBLabelValues(N) +// require.NoError(b, batchInsert(db, items, 5000)) + +// org := newTestORG(db) + +// b.ResetTimer() +// for i := 0; i < b.N; i++ { +// // 模拟不加 Select 的全列加载 +// var labelValues []*metadbmodel.PrometheusLabelValue +// org.DB.Find(&labelValues) +// newMap := make(map[string]int, len(labelValues)) +// for _, item := range labelValues { +// newMap[item.Value] = item.ID +// } +// } +// } + +// func BenchmarkRefresh_Label_WithSelect(b *testing.B) { +// db := setupTestDB(b) +// defer cleanupTestDB(b) + +// const N = 100_000 +// items := generateDBLabels(N) +// require.NoError(b, batchInsert(db, items, 5000)) + +// l := newTestLabel() +// l.org = newTestORG(db) + +// b.ResetTimer() +// for i := 0; i < b.N; i++ { +// l.refresh() +// } +// } + +// func BenchmarkRefresh_Label_WithoutSelect(b *testing.B) { +// db := setupTestDB(b) +// defer cleanupTestDB(b) + +// const N = 100_000 +// items := generateDBLabels(N) +// require.NoError(b, batchInsert(db, items, 5000)) + +// org := newTestORG(db) + +// b.ResetTimer() +// for i := 0; i < b.N; i++ { +// var labels []*metadbmodel.PrometheusLabel +// org.DB.Find(&labels) +// newMap := make(map[LabelKey]int, len(labels)) +// for _, item := range labels { +// newMap[NewLabelKey(item.Name, item.Value)] = item.ID +// } +// } +// } diff --git a/server/controller/prometheus/cache/cache_test.go b/server/controller/prometheus/cache/cache_test.go new file mode 100644 index 00000000000..d947b73cd63 --- /dev/null +++ b/server/controller/prometheus/cache/cache_test.go @@ -0,0 +1,1089 @@ +/** + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cache + +import ( + "fmt" + "math/rand" + "os" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + + "github.com/deepflowio/deepflow/message/controller" + metadbmodel "github.com/deepflowio/deepflow/server/controller/db/metadb/model" +) + +// ============================================================================ +// Helper: 数据生成器 +// ============================================================================ + +func generateLabelKeys(n int) []LabelKey { + keys := make([]LabelKey, n) + for i := 0; i < n; i++ { + keys[i] = NewLabelKey(i+1, i+1) + } + return keys +} + +func generateLabelCacheEntries(n int) []LabelCacheEntry { + entries := make([]LabelCacheEntry, n) + for i := 0; i < n; i++ { + entries[i] = LabelCacheEntry{NameID: i + 1, ValueID: i + 1, LabelID: i + 1} + } + return entries +} + +func generateProtoLabels(n int) []*controller.PrometheusLabel { + labels := make([]*controller.PrometheusLabel, n) + for i := 0; i < n; i++ { + labels[i] = &controller.PrometheusLabel{ + Name: proto.String(fmt.Sprintf("label_name_%d", i)), + Value: proto.String(fmt.Sprintf("label_value_%d", i)), + Id: proto.Uint32(uint32(i + 1)), + } + } + return labels +} + +func generateProtoMetricNames(n int) []*controller.PrometheusMetricName { + names := make([]*controller.PrometheusMetricName, n) + for i := 0; i < n; i++ { + names[i] = &controller.PrometheusMetricName{ + Name: proto.String(fmt.Sprintf("metric_%d", i)), + Id: proto.Uint32(uint32(i + 1)), + } + } + return names +} + +func generateProtoLabelNames(n int) []*controller.PrometheusLabelName { + names := make([]*controller.PrometheusLabelName, n) + for i := 0; i < n; i++ { + names[i] = &controller.PrometheusLabelName{ + Name: proto.String(fmt.Sprintf("ln_%d", i)), + Id: proto.Uint32(uint32(i + 1)), + } + } + return names +} + +func generateProtoLabelValues(n int) []*controller.PrometheusLabelValue { + values := make([]*controller.PrometheusLabelValue, n) + for i := 0; i < n; i++ { + values[i] = &controller.PrometheusLabelValue{ + Value: proto.String(fmt.Sprintf("lv_%d", i)), + Id: proto.Uint32(uint32(i + 1)), + } + } + return values +} + +func generateProtoLayouts(n int) []*controller.PrometheusMetricAPPLabelLayout { + layouts := make([]*controller.PrometheusMetricAPPLabelLayout, n) + for i := 0; i < n; i++ { + layouts[i] = &controller.PrometheusMetricAPPLabelLayout{ + MetricName: proto.String(fmt.Sprintf("metric_%d", i/10)), + AppLabelName: proto.String(fmt.Sprintf("app_label_%d", i%10)), + AppLabelColumnIndex: proto.Uint32(uint32(i%10 + 1)), + } + } + return layouts +} + +func generateMockDBLabels(size int) []*metadbmodel.PrometheusLabel { + mockData := make([]*metadbmodel.PrometheusLabel, size) + for i := 0; i < size; i++ { + mockData[i] = &metadbmodel.PrometheusLabel{ + NameID: i + 1, + ValueID: i + 1, + PrometheusAutoIncID: metadbmodel.PrometheusAutoIncID{ + ID: i + 1, + }, + } + } + return mockData +} + +func generateMockDBLabelValues(size int) []*metadbmodel.PrometheusLabelValue { + mockData := make([]*metadbmodel.PrometheusLabelValue, size) + for i := 0; i < size; i++ { + mockData[i] = &metadbmodel.PrometheusLabelValue{ + Value: fmt.Sprintf("label_value_%d", i), + PrometheusID: metadbmodel.PrometheusID{ + ID: i + 1, + }, + } + } + return mockData +} + +// newTestLabel 创建一个不依赖 DB 的 label 实例 +func newTestLabel() *label { + return newLabel(nil) +} + +func newTestMetricName() *metricName { + return newMetricName(nil) +} + +func newTestLabelName() *labelName { + return newLabelName(nil) +} + +func newTestLabelValue() *labelValue { + return newLabelValue(nil) +} + +func newTestLayout() *metricAndAPPLabelLayout { + return newMetricAndAPPLabelLayout(nil) +} + +func countStringIntMap(m map[string]int) int { + return len(m) +} +func countLabelConcurrentMap(m map[LabelKey]int) int { + return len(m) +} + +func buildLabelConcurrentMap(n int) map[LabelKey]int { + m := make(map[LabelKey]int, n) + for i := 0; i < n; i++ { + m[NewLabelKey(i+1, i+1)] = i + 1 + } + return m +} +func buildLabelValueMap(n int) map[string]int { + m := make(map[string]int, n) + for i := 0; i < n; i++ { + m[fmt.Sprintf("lv_%d", i)] = i + 1 + } + return m +} + +func resetLabelState(l *label, active map[LabelKey]int) { + l.mu.Lock() + l.pending = make(map[LabelKey]int) + l.mu.Unlock() + l.replaceActive(active) +} + +func resetLabelValueState(lv *labelValue, active map[string]int) { + lv.mu.Lock() + lv.pending = make(map[string]int) + lv.mu.Unlock() + lv.replaceActive(active) +} + +func resetMetricNameState(mn *metricName, n int) { + items := make([]*metadbmodel.PrometheusMetricName, n) + for i := 0; i < n; i++ { + items[i] = &metadbmodel.PrometheusMetricName{Name: fmt.Sprintf("metric_%d", i)} + items[i].ID = i + 1 + } + mn.processLoadedData(items) +} + +func resetLabelNameState(ln *labelName, n int) { + newActive := make(map[string]int, n) + for i := 0; i < n; i++ { + newActive[fmt.Sprintf("ln_%d", i)] = i + 1 + } + ln.mu.Lock() + ln.pendingNameToID = make(map[string]int) + ln.mu.Unlock() + ln.replaceActive(newActive) +} + +func resetLayoutState(mll *metricAndAPPLabelLayout, n int) { + newActive := make(map[LayoutKey]uint8, n) + for i := 0; i < n; i++ { + newActive[NewLayoutKey(fmt.Sprintf("metric_%d", i/10), fmt.Sprintf("app_label_%d", i%10))] = uint8(i%10 + 1) + } + mll.mu.Lock() + mll.pending = make(map[LayoutKey]uint8) + mll.mu.Unlock() + mll.replaceActive(newActive) +} + +func refreshLabelCurrent(l *label, batch []LabelCacheEntry) { + l.Add(batch) +} + +func refreshLabelValueCurrent(lv *labelValue, batch []*controller.PrometheusLabelValue) { + lv.Add(batch) +} + +type labelRefreshEntry struct { + key LabelKey + id int +} + +type labelValueRefreshEntry struct { + value string + id int +} + +func generateLabelRefreshEntries(n int) []labelRefreshEntry { + entries := make([]labelRefreshEntry, n) + for i := 0; i < n; i++ { + entries[i] = labelRefreshEntry{ + key: NewLabelKey(i+1, i+1), + id: i + 1, + } + } + return entries +} + +func generateLabelValueRefreshEntries(n int) []labelValueRefreshEntry { + entries := make([]labelValueRefreshEntry, n) + for i := 0; i < n; i++ { + entries[i] = labelValueRefreshEntry{ + value: fmt.Sprintf("lv_%d", i), + id: i + 1, + } + } + return entries +} + +func refreshLabelEntriesCurrent(l *label, entries []labelRefreshEntry) { + l.mu.Lock() + defer l.mu.Unlock() + for _, entry := range entries { + l.pending[entry.key] = entry.id + } +} +func refreshLabelValueEntriesCurrent(lv *labelValue, entries []labelValueRefreshEntry) { + lv.mu.Lock() + defer lv.mu.Unlock() + for _, entry := range entries { + lv.pending[entry.value] = entry.id + } +} +func benchmarkScaleEnabled(env string) bool { + return os.Getenv(env) != "" +} + +func benchmarkLabelLookupSizes() []int { + sizes := []int{1000, 100_000, 1_000_000} + if benchmarkScaleEnabled("PROM_CACHE_BENCH_LARGE") { + sizes = append(sizes, 5_000_000) + } + if benchmarkScaleEnabled("PROM_CACHE_BENCH_HUGE") { + sizes = append(sizes, 10_000_000) + } + return sizes +} + +func benchmarkLabelValueLookupSizes() []int { + sizes := []int{1000, 100_000, 1_000_000} + if benchmarkScaleEnabled("PROM_CACHE_BENCH_LARGE") { + sizes = append(sizes, 5_000_000) + } + if benchmarkScaleEnabled("PROM_CACHE_BENCH_HUGE") { + sizes = append(sizes, 10_000_000) + } + return sizes +} + +func benchmarkRefreshSizes() []int { + sizes := []int{10_000, 100_000, 1_000_000} + if benchmarkScaleEnabled("PROM_CACHE_BENCH_LARGE") { + sizes = append(sizes, 5_000_000) + } + if benchmarkScaleEnabled("PROM_CACHE_BENCH_HUGE") { + sizes = append(sizes, 10_000_000) + } + return sizes +} + +func benchmarkProtoRefreshSizes() []int { + return []int{10_000, 100_000, 500_000} +} + +// ============================================================================ +// 第一部分:正确性测试 — 基本功能 +// ============================================================================ + +func TestLabel_AddAndGet(t *testing.T) { + l := newTestLabel() + entries := generateLabelCacheEntries(100) + + l.Add(entries) + + for _, e := range entries { + id, ok := l.GetIDByKey(NewLabelKey(e.NameID, e.ValueID)) + assert.True(t, ok) + assert.Equal(t, e.LabelID, id) + } + + // 不存在的 key + _, ok := l.GetIDByKey(NewLabelKey(9999, 9999)) + assert.False(t, ok) +} + +func TestMetricName_AddAndGet(t *testing.T) { + mn := newTestMetricName() + batch := generateProtoMetricNames(100) + + mn.Add(batch) + + for _, item := range batch { + id, ok := mn.GetIDByName(item.GetName()) + assert.True(t, ok) + assert.Equal(t, int(item.GetId()), id) + } +} + +func TestLabelName_AddAndGet(t *testing.T) { + ln := newTestLabelName() + batch := generateProtoLabelNames(100) + + ln.Add(batch) + + for _, item := range batch { + id, ok := ln.GetIDByName(item.GetName()) + assert.True(t, ok) + assert.Equal(t, int(item.GetId()), id) + } +} + +func TestLabelValue_AddAndGet(t *testing.T) { + lv := newTestLabelValue() + batch := generateProtoLabelValues(100) + + lv.Add(batch) + + for _, item := range batch { + id, ok := lv.GetIDByValue(item.GetValue()) + assert.True(t, ok) + assert.Equal(t, int(item.GetId()), id) + } + // 不存在的 value + _, ok := lv.GetIDByValue("nonexistent") + assert.False(t, ok) +} + +func TestLayout_AddAndGet(t *testing.T) { + mll := newTestLayout() + batch := generateProtoLayouts(100) + + mll.Add(batch) + + for _, item := range batch { + idx, ok := mll.GetIndexByKey(NewLayoutKey(item.GetMetricName(), item.GetAppLabelName())) + assert.True(t, ok) + assert.Equal(t, uint8(item.GetAppLabelColumnIndex()), idx) + } +} + +// ============================================================================ +// 第二部分:快照测试 — 快照隔离性 +// ============================================================================ + +func TestLabel_GetKeyToID_SnapshotIsolation(t *testing.T) { + l := newTestLabel() + l.Add(generateLabelCacheEntries(100)) + + // 取快照 + snapshot := l.GetKeyToID() + assert.Equal(t, 100, countLabelConcurrentMap(snapshot)) + + // 取完快照后追加数据 + extra := []LabelCacheEntry{ + {NameID: 9999, ValueID: 9999, LabelID: 999}, + } + l.Add(extra) + + // 快照不受影响 + assert.Equal(t, 100, countLabelConcurrentMap(snapshot)) + _, exists := snapshot[NewLabelKey(9999, 9999)] + assert.False(t, exists) + + // 但新查询可以看到 + id, ok := l.GetIDByKey(NewLabelKey(9999, 9999)) + assert.True(t, ok) + assert.Equal(t, 999, id) +} + +func TestMetricName_GetNameToID_SnapshotIsolation(t *testing.T) { + mn := newTestMetricName() + mn.Add(generateProtoMetricNames(50)) + + snapshot := mn.GetNameToID() + assert.Equal(t, 50, len(snapshot)) + + mn.Add([]*controller.PrometheusMetricName{ + {Name: proto.String("extra_metric"), Id: proto.Uint32(999)}, + }) + + assert.Equal(t, 50, len(snapshot)) // 快照隔离 +} +func TestLabelValue_GetValueToID_SnapshotIsolation(t *testing.T) { + lv := newTestLabelValue() + lv.Add(generateProtoLabelValues(100)) + + // 取快照 + snapshot := lv.GetValueToID() + assert.Equal(t, 100, countStringIntMap(snapshot)) + + // 取完快照后追加数据 + extra := []*controller.PrometheusLabelValue{ + {Value: proto.String("extra_value"), Id: proto.Uint32(999)}, + } + lv.Add(extra) + + // 快照不受影响 + assert.Equal(t, 100, countStringIntMap(snapshot)) + _, exists := snapshot["extra_value"] + assert.False(t, exists) + + // 但新查询可以看到 + id, ok := lv.GetIDByValue("extra_value") + assert.True(t, ok) + assert.Equal(t, 999, id) +} + +// ============================================================================ +// 第三部分:Snapshot-and-Swap — 模拟 refresh 覆盖 +// ============================================================================ + +func TestLabel_SnapshotSwap_DiscardsOldEntries(t *testing.T) { + l := newTestLabel() + + // 初始加载 200 条 + l.Add(generateLabelCacheEntries(200)) + assert.Equal(t, 200, countLabelConcurrentMap(l.GetKeyToID())) + + // 模拟 refresh:只有前 100 条仍在 DB 中,后 100 条已被 Cleaner 删除 + resetLabelState(l, buildLabelConcurrentMap(100)) + + // 验证旧条目已消失 + assert.Equal(t, 100, countLabelConcurrentMap(l.GetKeyToID())) + _, ok := l.GetIDByKey(NewLabelKey(151, 151)) + assert.False(t, ok, "deleted entry should not exist after snapshot-swap") +} + +func TestMetricName_SnapshotSwap_DiscardsOldEntries(t *testing.T) { + mn := newTestMetricName() + // 模拟第一次 refresh:200 条 + resetMetricNameState(mn, 200) + // 模拟第二次 refresh:只有前 50 条(后 150 条被 Cleaner 删除) + resetMetricNameState(mn, 50) + + assert.Equal(t, 50, len(mn.GetNameToID())) + _, ok := mn.GetIDByName("metric_100") + assert.False(t, ok) +} +func TestLabelValue_SnapshotSwap_DiscardsOldEntries(t *testing.T) { + lv := newTestLabelValue() + + // 初始加载 200 条 + lv.Add(generateProtoLabelValues(200)) + assert.Equal(t, 200, countStringIntMap(lv.GetValueToID())) + + // 模拟 refresh:只有前 100 条仍在 DB 中,后 100 条已被 Cleaner 删除 + resetLabelValueState(lv, buildLabelValueMap(100)) + + // 验证旧条目已消失 + assert.Equal(t, 100, countStringIntMap(lv.GetValueToID())) + _, ok := lv.GetIDByValue("lv_150") + assert.False(t, ok, "deleted entry should not exist after snapshot-swap") +} + +// ============================================================================ +// 第四部分:并发正确性 — race detector 测试 +// go test -race -run TestConcurrent +// ============================================================================ + +func TestConcurrentLabel_ReadDuringSwap(t *testing.T) { + l := newTestLabel() + l.Add(generateLabelCacheEntries(1000)) + + const ( + numReaders = 8 + numSwaps = 20 + readsPerRdr = 5000 + ) + + var wg sync.WaitGroup + errCount := atomic.Int64{} + + // 持续读 + for r := 0; r < numReaders; r++ { + wg.Add(1) + go func() { + defer wg.Done() + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := 0; i < readsPerRdr; i++ { + idx := rng.Intn(1000) + key := NewLabelKey(idx+1, idx+1) + if _, ok := l.GetIDByKey(key); !ok { + // refresh 期间旧数据可能暂时不可见,这是预期行为 + errCount.Add(1) + } + } + }() + } + + // 持续 swap + wg.Add(1) + go func() { + defer wg.Done() + for s := 0; s < numSwaps; s++ { + resetLabelState(l, buildLabelConcurrentMap(1000)) + runtime.Gosched() + } + }() + + // 持续 Add + wg.Add(1) + go func() { + defer wg.Done() + for s := 0; s < numSwaps; s++ { + l.Add([]LabelCacheEntry{ + {NameID: 99999, ValueID: 99999, LabelID: 99999}, + }) + runtime.Gosched() + } + }() + + wg.Wait() + t.Logf("reads that missed (expected during swap): %d / %d", errCount.Load(), int64(numReaders*readsPerRdr)) +} + +func TestConcurrentMetricName_ReadDuringSwap(t *testing.T) { + mn := newTestMetricName() + resetMetricNameState(mn, 1000) + + items := make([]*metadbmodel.PrometheusMetricName, 1000) + for i := 0; i < 1000; i++ { + items[i] = &metadbmodel.PrometheusMetricName{Name: fmt.Sprintf("metric_%d", i)} + items[i].ID = i + 1 + } + + var wg sync.WaitGroup + + for r := 0; r < 8; r++ { + wg.Add(1) + go func() { + defer wg.Done() + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := 0; i < 5000; i++ { + idx := rng.Intn(1000) + mn.GetIDByName(fmt.Sprintf("metric_%d", idx)) + } + }() + } + + wg.Add(1) + go func() { + defer wg.Done() + for s := 0; s < 20; s++ { + mn.processLoadedData(items) + runtime.Gosched() + } + }() + + wg.Wait() +} + +func TestConcurrentLabelValue_ReadDuringSwap(t *testing.T) { + lv := newTestLabelValue() + lv.Add(generateProtoLabelValues(1000)) + + const ( + numReaders = 8 + numSwaps = 20 + readsPerRdr = 5000 + ) + + var wg sync.WaitGroup + errCount := atomic.Int64{} + + // 持续读 + for r := 0; r < numReaders; r++ { + wg.Add(1) + go func() { + defer wg.Done() + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := 0; i < readsPerRdr; i++ { + value := fmt.Sprintf("lv_%d", rng.Intn(1000)) + if _, ok := lv.GetIDByValue(value); !ok { + // refresh 期间旧数据可能暂时不可见,这是预期行为 + errCount.Add(1) + } + } + }() + } + + // 持续 swap + wg.Add(1) + go func() { + defer wg.Done() + for s := 0; s < numSwaps; s++ { + resetLabelValueState(lv, buildLabelValueMap(1000)) + runtime.Gosched() + } + }() + + // 持续 Add + wg.Add(1) + go func() { + defer wg.Done() + for s := 0; s < numSwaps; s++ { + lv.Add([]*controller.PrometheusLabelValue{ + {Value: proto.String("hot_value"), Id: proto.Uint32(99999)}, + }) + runtime.Gosched() + } + }() + + wg.Wait() + t.Logf("reads that missed (expected during swap): %d / %d", errCount.Load(), int64(numReaders*readsPerRdr)) +} + +func TestConcurrentLabelValue_SnapshotDuringSwap(t *testing.T) { + lv := newTestLabelValue() + lv.Add(generateProtoLabelValues(500)) + + var wg sync.WaitGroup + + // 读者不断拿快照并遍历 + for r := 0; r < 4; r++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + snapshot := lv.GetValueToID() + // 遍历快照——不应 panic 或 data race + count := 0 + for range snapshot { + count++ + } + _ = count + } + }() + } + + // 写者不断 swap + wg.Add(1) + go func() { + defer wg.Done() + for s := 0; s < 50; s++ { + resetLabelValueState(lv, buildLabelValueMap(500)) + } + }() + + wg.Wait() +} + +func TestConcurrentLayout_ReadDuringSwap(t *testing.T) { + mll := newTestLayout() + resetLayoutState(mll, 1000) + + items := make([]*metadbmodel.PrometheusMetricAPPLabelLayout, 1000) + for i := 0; i < 1000; i++ { + items[i] = &metadbmodel.PrometheusMetricAPPLabelLayout{ + MetricName: fmt.Sprintf("metric_%d", i/10), + APPLabelName: fmt.Sprintf("app_label_%d", i%10), + APPLabelColumnIndex: uint8(i%10 + 1), + } + } + + var wg sync.WaitGroup + + for r := 0; r < 8; r++ { + wg.Add(1) + go func() { + defer wg.Done() + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := 0; i < 5000; i++ { + idx := rng.Intn(1000) + mll.GetIndexByKey(NewLayoutKey( + fmt.Sprintf("metric_%d", idx/10), + fmt.Sprintf("app_label_%d", idx%10), + )) + } + }() + } + + wg.Add(1) + go func() { + defer wg.Done() + for s := 0; s < 20; s++ { + mll.processLoadedData(items) + runtime.Gosched() + } + }() + + wg.Wait() +} + +func TestConcurrentLabel_SnapshotDuringSwap(t *testing.T) { + l := newTestLabel() + l.Add(generateLabelCacheEntries(500)) + + var wg sync.WaitGroup + + // 读者不断拿快照并遍历 + for r := 0; r < 4; r++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + snapshot := l.GetKeyToID() + // 遍历快照——不应 panic 或 data race + count := 0 + for range snapshot { + count++ + } + _ = count + } + }() + } + + // 写者不断 swap + wg.Add(1) + go func() { + defer wg.Done() + for s := 0; s < 50; s++ { + resetLabelState(l, buildLabelConcurrentMap(500)) + } + }() + + wg.Wait() +} + +// ============================================================================ +// 第五部分:大数据量内存验证 +// ============================================================================ + +func TestLabel_LargeScale_MemoryRelease(t *testing.T) { + if testing.Short() { + t.Skip("skipping large-scale test in short mode") + } + + const N = 1_000_000 + + l := newTestLabel() + + // 阶段1:加载百万条数据 + batch := make([]LabelCacheEntry, N) + for i := 0; i < N; i++ { + batch[i] = LabelCacheEntry{NameID: i + 1, ValueID: i + 1, LabelID: i + 1} + } + l.Add(batch) + + require.Equal(t, N, countLabelConcurrentMap(l.GetKeyToID())) + + var m1 runtime.MemStats + runtime.GC() + runtime.ReadMemStats(&m1) + t.Logf("after loading %d entries: HeapInuse = %d MB", N, m1.HeapInuse/1024/1024) + + // 阶段2:模拟 refresh——只保留 10% 的数据(模拟 Cleaner 删除后 refresh) + kept := N / 10 + newMap := make(map[LabelKey]int, kept) + for i := 0; i < kept; i++ { + newMap[NewLabelKey(i+1, i+1)] = i + 1 + } + resetLabelState(l, newMap) + + // 触发 GC 让旧 map 被回收 + runtime.GC() + runtime.GC() // 双次 GC 确保 finalizer 运行 + + var m2 runtime.MemStats + runtime.ReadMemStats(&m2) + t.Logf("after swap to %d entries: HeapInuse = %d MB", kept, m2.HeapInuse/1024/1024) + + require.Equal(t, kept, countLabelConcurrentMap(l.GetKeyToID())) + + // 旧 map 被回收后,HeapInuse 应显著下降 + // 允许一定误差(其他 goroutine 可能有分配),但至少应降低 50% + if m2.HeapInuse >= m1.HeapInuse { + t.Logf("WARNING: HeapInuse did not decrease after swap (m1=%d, m2=%d). "+ + "This may be due to GC timing or other allocations.", m1.HeapInuse, m2.HeapInuse) + } +} + +func TestLabelValue_LargeScale_MemoryRelease(t *testing.T) { + if testing.Short() { + t.Skip("skipping large-scale test in short mode") + } + + const N = 1_000_000 + + lv := newTestLabelValue() + + // 阶段1:加载百万条数据 + batch := make([]*controller.PrometheusLabelValue, N) + for i := 0; i < N; i++ { + batch[i] = &controller.PrometheusLabelValue{ + Value: proto.String(fmt.Sprintf("lv_%d", i)), + Id: proto.Uint32(uint32(i + 1)), + } + } + lv.Add(batch) + + require.Equal(t, N, countStringIntMap(lv.GetValueToID())) + + var m1 runtime.MemStats + runtime.GC() + runtime.ReadMemStats(&m1) + t.Logf("after loading %d entries: HeapInuse = %d MB", N, m1.HeapInuse/1024/1024) + + // 阶段2:模拟 refresh——只保留 10% 的数据(模拟 Cleaner 删除后 refresh) + kept := N / 10 + newMap := make(map[string]int, kept) + for i := 0; i < kept; i++ { + newMap[fmt.Sprintf("lv_%d", i)] = i + 1 + } + resetLabelValueState(lv, newMap) + + // 触发 GC 让旧 map 被回收 + runtime.GC() + runtime.GC() // 双次 GC 确保 finalizer 运行 + + var m2 runtime.MemStats + runtime.ReadMemStats(&m2) + t.Logf("after swap to %d entries: HeapInuse = %d MB", kept, m2.HeapInuse/1024/1024) + + require.Equal(t, kept, countStringIntMap(lv.GetValueToID())) + + // 旧 map 被回收后,HeapInuse 应显著下降 + if m2.HeapInuse >= m1.HeapInuse { + t.Logf("WARNING: HeapInuse did not decrease after swap (m1=%d, m2=%d). "+ + "This may be due to GC timing or other allocations.", m1.HeapInuse, m2.HeapInuse) + } +} + +// ============================================================================ +// 第六部分:Benchmark — 性能基准 +// ============================================================================ + +// --- label --- + +func BenchmarkLabel_Add(b *testing.B) { + for _, size := range []int{1000, 10_000, 100_000} { + batch := generateLabelCacheEntries(size) + b.Run(fmt.Sprintf("n=%d", size), func(b *testing.B) { + for i := 0; i < b.N; i++ { + l := newTestLabel() + l.Add(batch) + } + }) + } +} + +func BenchmarkLabel_GetIDByKey(b *testing.B) { + for _, size := range benchmarkLabelLookupSizes() { + l := newTestLabel() + l.Add(generateLabelCacheEntries(size)) + keys := generateLabelKeys(size) + + b.Run(fmt.Sprintf("n=%d", size), func(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + for pb.Next() { + l.GetIDByKey(keys[rng.Intn(size)]) + } + }) + }) + } +} + +func BenchmarkLabel_GetKeyToID_Snapshot(b *testing.B) { + for _, size := range []int{1000, 10_000, 100_000} { + l := newTestLabel() + l.Add(generateLabelCacheEntries(size)) + + b.Run(fmt.Sprintf("n=%d", size), func(b *testing.B) { + for i := 0; i < b.N; i++ { + _ = l.GetKeyToID() + } + }) + } +} + +func BenchmarkLabel_SnapshotSwap(b *testing.B) { + for _, size := range []int{1000, 10_000, 100_000} { + b.Run(fmt.Sprintf("n=%d", size), func(b *testing.B) { + l := newTestLabel() + for i := 0; i < b.N; i++ { + newMap := make(map[LabelKey]int, size) + for j := 0; j < size; j++ { + newMap[NewLabelKey(j+1, j+1)] = j + 1 + } + resetLabelState(l, newMap) + } + }) + } +} + +func BenchmarkLabel_Refresh(b *testing.B) { + for _, size := range benchmarkRefreshSizes() { + b.Run(fmt.Sprintf("n=%d", size), func(b *testing.B) { + l := newTestLabel() + mockData := generateMockDBLabels(size) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + l.processLoadedData(mockData) + } + }) + } +} + +// --- metricName --- + +func BenchmarkMetricName_GetIDByName(b *testing.B) { + for _, size := range []int{1000, 100_000, 500_000} { + mn := newTestMetricName() + mn.Add(generateProtoMetricNames(size)) + + b.Run(fmt.Sprintf("n=%d", size), func(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + for pb.Next() { + mn.GetIDByName(fmt.Sprintf("metric_%d", rng.Intn(size))) + } + }) + }) + } +} + +// --- labelValue --- +func BenchmarkLabelValue_GetIDByValue(b *testing.B) { + for _, size := range benchmarkLabelValueLookupSizes() { + lv := newTestLabelValue() + lv.Add(generateProtoLabelValues(size)) + + b.Run(fmt.Sprintf("n=%d", size), func(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + for pb.Next() { + lv.GetIDByValue(fmt.Sprintf("lv_%d", rng.Intn(size))) + } + }) + }) + } +} + +func BenchmarkLabelValue_GetValueToID_Snapshot(b *testing.B) { + for _, size := range []int{1000, 10_000, 100_000} { + lv := newTestLabelValue() + lv.Add(generateProtoLabelValues(size)) + + b.Run(fmt.Sprintf("n=%d", size), func(b *testing.B) { + for i := 0; i < b.N; i++ { + _ = lv.GetValueToID() + } + }) + } +} + +func BenchmarkLabelValue_SnapshotSwap(b *testing.B) { + for _, size := range []int{1000, 10_000, 100_000} { + b.Run(fmt.Sprintf("n=%d", size), func(b *testing.B) { + lv := newTestLabelValue() + for i := 0; i < b.N; i++ { + newMap := make(map[string]int, size) + for j := 0; j < size; j++ { + newMap[fmt.Sprintf("lv_%d", j)] = j + 1 + } + resetLabelValueState(lv, newMap) + } + }) + } +} + +func BenchmarkLabelValue_Refresh(b *testing.B) { + for _, size := range benchmarkRefreshSizes() { + b.Run(fmt.Sprintf("n=%d", size), func(b *testing.B) { + lv := newTestLabelValue() + mockData := generateMockDBLabelValues(size) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + newActive := make(map[string]int, len(mockData)) + newActiveR := make(map[int]string, len(mockData)) + for _, item := range mockData { + newActive[item.Value] = item.ID + newActiveR[item.ID] = item.Value + } + lv.activeR.Store(newActiveR) + lv.replaceActive(newActive) + } + }) + } +} + +// --- layout --- + +func BenchmarkLayout_GetIndexByKey(b *testing.B) { + for _, size := range []int{100, 1000, 10_000} { + mll := newTestLayout() + mll.Add(generateProtoLayouts(size)) + + b.Run(fmt.Sprintf("n=%d", size), func(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + for pb.Next() { + idx := rng.Intn(size) + mll.GetIndexByKey(NewLayoutKey( + fmt.Sprintf("metric_%d", idx/10), + fmt.Sprintf("app_label_%d", idx%10), + )) + } + }) + }) + } +} + +// --- 并发混合负载 benchmark --- + +func BenchmarkLabel_MixedReadWrite(b *testing.B) { + for _, size := range []int{10_000, 100_000} { + l := newTestLabel() + l.Add(generateLabelCacheEntries(size)) + keys := generateLabelKeys(size) + + b.Run(fmt.Sprintf("n=%d", size), func(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + for pb.Next() { + if rng.Intn(100) < 95 { // 95% read + l.GetIDByKey(keys[rng.Intn(size)]) + } else { // 5% write + hotID := rng.Intn(100) + size + 1 + l.Add([]LabelCacheEntry{ + {NameID: hotID, ValueID: hotID, LabelID: hotID}, + }) + } + } + }) + }) + } +} diff --git a/server/controller/prometheus/cache/label.go b/server/controller/prometheus/cache/label.go index 1e8b00a1b66..f20db41fe5b 100644 --- a/server/controller/prometheus/cache/label.go +++ b/server/controller/prometheus/cache/label.go @@ -17,74 +17,144 @@ package cache import ( - cmap "github.com/orcaman/concurrent-map/v2" + "fmt" + "sync" + "sync/atomic" - "github.com/deepflowio/deepflow/message/controller" metadbmodel "github.com/deepflowio/deepflow/server/controller/db/metadb/model" "github.com/deepflowio/deepflow/server/controller/prometheus/common" ) +// LabelKey identifies a prometheus_label by its foreign-key IDs, avoiding +// repeated string storage for the hot in-memory map. type LabelKey struct { - Name string - Value string + NameID int + ValueID int } func (k LabelKey) String() string { - return k.Name + "-" + k.Value + return fmt.Sprintf("%d-%d", k.NameID, k.ValueID) } -func NewLabelKey(name, value string) LabelKey { - return LabelKey{ - Name: name, - Value: value, - } +func NewLabelKey(nameID, valueID int) LabelKey { + return LabelKey{NameID: nameID, ValueID: valueID} +} + +// LabelCacheEntry is used by callers to batch-add newly encoded labels to +// the cache after the name_id and value_id are known. +type LabelCacheEntry struct { + NameID int + ValueID int + LabelID int } type label struct { org *common.ORG - keyToID cmap.ConcurrentMap[LabelKey, int] + active atomic.Value + pending map[LabelKey]int + mu sync.RWMutex } func newLabel(org *common.ORG) *label { - return &label{ + l := &label{ org: org, - keyToID: cmap.NewStringer[LabelKey, int](), + pending: make(map[LabelKey]int), + } + l.active.Store(make(map[LabelKey]int)) + return l +} + +func (l *label) getActive() map[LabelKey]int { + if active := l.active.Load(); active != nil { + return active.(map[LabelKey]int) + } + return map[LabelKey]int{} +} + +func cloneLabelMap(src map[LabelKey]int, extra int) map[LabelKey]int { + dst := make(map[LabelKey]int, len(src)+extra) + for key, value := range src { + dst[key] = value } + return dst +} + +func (l *label) replaceActive(newActive map[LabelKey]int) { + l.active.Store(newActive) } -func (l *label) GetKeyToID() cmap.ConcurrentMap[LabelKey, int] { - return l.keyToID +func (l *label) GetKeyToID() map[LabelKey]int { + active := l.getActive() + + l.mu.RLock() + pendingLen := len(l.pending) + snapshot := cloneLabelMap(active, pendingLen) + for key, value := range l.pending { + snapshot[key] = value + } + l.mu.RUnlock() + + return snapshot } func (l *label) GetIDByKey(key LabelKey) (int, bool) { - if item, ok := l.keyToID.Get(key); ok { + if item, ok := l.getActive()[key]; ok { + return item, true + } + + l.mu.RLock() + defer l.mu.RUnlock() + if item, ok := l.pending[key]; ok { return item, true } return 0, false } -func (l *label) Add(batch []*controller.PrometheusLabel) { - for _, item := range batch { - k := NewLabelKey(item.GetName(), item.GetValue()) - l.keyToID.Set(k, int(item.GetId())) +// Add inserts newly encoded labels into the pending map. Callers must resolve +// name_id and value_id before calling (see LabelCacheEntry). +func (l *label) Add(entries []LabelCacheEntry) { + l.mu.Lock() + defer l.mu.Unlock() + for _, e := range entries { + l.pending[NewLabelKey(e.NameID, e.ValueID)] = e.LabelID } } func (l *label) refresh(args ...interface{}) error { - ls, err := l.load() + var count int64 + if err := l.org.DB.Model(&metadbmodel.PrometheusLabel{}).Count(&count).Error; err != nil { + return err + } + + rows, err := l.org.DB.Model(&metadbmodel.PrometheusLabel{}).Select("id", "name_id", "value_id").Rows() if err != nil { return err } - for _, item := range ls { - k := NewLabelKey(item.Name, item.Value) - l.keyToID.Set(k, item.ID) + defer rows.Close() + + newActive := make(map[LabelKey]int, count) + for rows.Next() { + var id, nameID, valueID int + if scanErr := rows.Scan(&id, &nameID, &valueID); scanErr != nil { + log.Errorf("stream scan prometheus_label interrupted: %v", scanErr, l.org.LogPrefix) + return scanErr + } + newActive[NewLabelKey(nameID, valueID)] = id + } + if err := rows.Err(); err != nil { + log.Errorf("stream read prometheus_label error: %v", err, l.org.LogPrefix) + return err + } + + l.mu.Lock() + pending := l.pending + l.pending = make(map[LabelKey]int) + for key, value := range pending { + newActive[key] = value } - return nil -} + l.mu.Unlock() -func (l *label) load() ([]*metadbmodel.PrometheusLabel, error) { - var labels []*metadbmodel.PrometheusLabel - err := l.org.DB.Find(&labels).Error - return labels, err + l.replaceActive(newActive) + return nil } diff --git a/server/controller/prometheus/cache/label_layout.go b/server/controller/prometheus/cache/label_layout.go index 47c38f25ded..62c9cf45159 100644 --- a/server/controller/prometheus/cache/label_layout.go +++ b/server/controller/prometheus/cache/label_layout.go @@ -18,6 +18,7 @@ package cache import ( "sync" + "sync/atomic" "github.com/deepflowio/deepflow/message/controller" metadbmodel "github.com/deepflowio/deepflow/server/controller/db/metadb/model" @@ -42,47 +43,93 @@ func NewLayoutKey(metricName, labelName string) LayoutKey { type appLabelNameToValue map[string]string type metricAndAPPLabelLayout struct { - org *common.ORG + org *common.ORG + active atomic.Value // map[LayoutKey]uint8 - layoutKeyToIndex sync.Map + mu sync.RWMutex + pending map[LayoutKey]uint8 } func newMetricAndAPPLabelLayout(org *common.ORG) *metricAndAPPLabelLayout { - return &metricAndAPPLabelLayout{ - org: org, + mll := &metricAndAPPLabelLayout{ + org: org, + pending: make(map[LayoutKey]uint8), } + mll.active.Store(make(map[LayoutKey]uint8)) + return mll } -func (mll *metricAndAPPLabelLayout) Get() *sync.Map { - return &mll.layoutKeyToIndex +func (mll *metricAndAPPLabelLayout) getActive() map[LayoutKey]uint8 { + if active := mll.active.Load(); active != nil { + return active.(map[LayoutKey]uint8) + } + return map[LayoutKey]uint8{} +} + +func (mll *metricAndAPPLabelLayout) replaceActive(newActive map[LayoutKey]uint8) { + mll.active.Store(newActive) } func (mll *metricAndAPPLabelLayout) GetIndexByKey(key LayoutKey) (uint8, bool) { - if index, ok := mll.layoutKeyToIndex.Load(key); ok { - return index.(uint8), true + if index, ok := mll.getActive()[key]; ok { + return index, true + } + mll.mu.RLock() + defer mll.mu.RUnlock() + index, ok := mll.pending[key] + return index, ok +} + +func (mll *metricAndAPPLabelLayout) GetLayoutKeyToIndex() map[LayoutKey]uint8 { + active := mll.getActive() + mll.mu.RLock() + snapshot := make(map[LayoutKey]uint8, len(active)+len(mll.pending)) + for k, v := range active { + snapshot[k] = v + } + for k, v := range mll.pending { + snapshot[k] = v } - return 0, false + mll.mu.RUnlock() + return snapshot } func (mll *metricAndAPPLabelLayout) Add(batch []*controller.PrometheusMetricAPPLabelLayout) { + mll.mu.Lock() + defer mll.mu.Unlock() for _, m := range batch { - mll.layoutKeyToIndex.Store(NewLayoutKey(m.GetMetricName(), m.GetAppLabelName()), uint8(m.GetAppLabelColumnIndex())) + mll.pending[NewLayoutKey(m.GetMetricName(), m.GetAppLabelName())] = uint8(m.GetAppLabelColumnIndex()) } } func (mll *metricAndAPPLabelLayout) refresh(args ...interface{}) error { - metricAPPLabelLayouts, err := mll.load() + items, err := mll.load() if err != nil { return err } - for _, l := range metricAPPLabelLayouts { - mll.layoutKeyToIndex.Store(NewLayoutKey(l.MetricName, l.APPLabelName), uint8(l.APPLabelColumnIndex)) - } + mll.processLoadedData(items) return nil } +func (mll *metricAndAPPLabelLayout) processLoadedData(items []*metadbmodel.PrometheusMetricAPPLabelLayout) { + newActive := make(map[LayoutKey]uint8, len(items)) + for _, item := range items { + newActive[NewLayoutKey(item.MetricName, item.APPLabelName)] = uint8(item.APPLabelColumnIndex) + } + + mll.mu.Lock() + pending := mll.pending + mll.pending = make(map[LayoutKey]uint8) + mll.mu.Unlock() + + for k, v := range pending { + newActive[k] = v + } + mll.replaceActive(newActive) +} + func (mml *metricAndAPPLabelLayout) load() ([]*metadbmodel.PrometheusMetricAPPLabelLayout, error) { var metricAPPLabelLayouts []*metadbmodel.PrometheusMetricAPPLabelLayout - err := mml.org.DB.Find(&metricAPPLabelLayouts).Error + err := mml.org.DB.Select("metric_name", "app_label_name", "app_label_column_index").Find(&metricAPPLabelLayouts).Error return metricAPPLabelLayouts, err } diff --git a/server/controller/prometheus/cache/label_name.go b/server/controller/prometheus/cache/label_name.go index e98249790a8..4aed293b106 100644 --- a/server/controller/prometheus/cache/label_name.go +++ b/server/controller/prometheus/cache/label_name.go @@ -18,8 +18,7 @@ package cache import ( "sync" - - "github.com/cornelk/hashmap" + "sync/atomic" "github.com/deepflowio/deepflow/message/controller" metadbmodel "github.com/deepflowio/deepflow/server/controller/db/metadb/model" @@ -27,54 +26,107 @@ import ( ) type labelName struct { - org *common.ORG + org *common.ORG + active atomic.Value // map[string]int (nameToID) + activeR atomic.Value // map[int]string (idToName), rebuilt on every refresh - nameToID sync.Map - idToName *hashmap.Map[int, string] + mu sync.RWMutex + pendingNameToID map[string]int } func newLabelName(org *common.ORG) *labelName { - return &labelName{ - org: org, - idToName: hashmap.New[int, string](), + ln := &labelName{ + org: org, + pendingNameToID: make(map[string]int), } + ln.active.Store(make(map[string]int)) + return ln } -func (ln *labelName) GetIDByName(n string) (int, bool) { - if id, ok := ln.nameToID.Load(n); ok { - return id.(int), true +func (ln *labelName) getActive() map[string]int { + if active := ln.active.Load(); active != nil { + return active.(map[string]int) } - return 0, false + return map[string]int{} } +func (ln *labelName) replaceActive(newActive map[string]int) { + ln.active.Store(newActive) +} + +// GetNameByID returns the label name string for a given ID. +// Only reflects the last committed refresh snapshot (pending entries are excluded). func (ln *labelName) GetNameByID(id int) (string, bool) { - if name, ok := ln.idToName.Get(id); ok { - return name, true + if r := ln.activeR.Load(); r != nil { + name, ok := r.(map[int]string)[id] + return name, ok } return "", false } +func (ln *labelName) GetIDByName(n string) (int, bool) { + if id, ok := ln.getActive()[n]; ok { + return id, true + } + ln.mu.RLock() + defer ln.mu.RUnlock() + id, ok := ln.pendingNameToID[n] + return id, ok +} + +func (ln *labelName) GetNameToID() map[string]int { + active := ln.getActive() + ln.mu.RLock() + snapshot := make(map[string]int, len(active)+len(ln.pendingNameToID)) + for k, v := range active { + snapshot[k] = v + } + for k, v := range ln.pendingNameToID { + snapshot[k] = v + } + ln.mu.RUnlock() + return snapshot +} + func (ln *labelName) Add(batch []*controller.PrometheusLabelName) { + ln.mu.Lock() + defer ln.mu.Unlock() for _, item := range batch { - ln.nameToID.Store(item.GetName(), int(item.GetId())) - ln.idToName.Set(int(item.GetId()), item.GetName()) + ln.pendingNameToID[item.GetName()] = int(item.GetId()) } } func (ln *labelName) refresh(args ...interface{}) error { - labelNames, err := ln.load() + items, err := ln.load() if err != nil { return err } - for _, item := range labelNames { - ln.nameToID.Store(item.Name, item.ID) - ln.idToName.Set(item.ID, item.Name) - } + ln.processLoadedData(items) return nil } +func (ln *labelName) processLoadedData(items []*metadbmodel.PrometheusLabelName) { + newActive := make(map[string]int, len(items)) + newActiveR := make(map[int]string, len(items)) + for _, item := range items { + newActive[item.Name] = item.ID + newActiveR[item.ID] = item.Name + } + + ln.mu.Lock() + pending := ln.pendingNameToID + ln.pendingNameToID = make(map[string]int) + ln.mu.Unlock() + + for k, v := range pending { + newActive[k] = v + } + ln.activeR.Store(newActiveR) + ln.replaceActive(newActive) +} + func (ln *labelName) load() ([]*metadbmodel.PrometheusLabelName, error) { var labelNames []*metadbmodel.PrometheusLabelName - err := ln.org.DB.Find(&labelNames).Error + err := ln.org.DB.Select("id", "name").Find(&labelNames).Error return labelNames, err } diff --git a/server/controller/prometheus/cache/label_value.go b/server/controller/prometheus/cache/label_value.go index 284454ee84f..b82a07308a2 100644 --- a/server/controller/prometheus/cache/label_value.go +++ b/server/controller/prometheus/cache/label_value.go @@ -17,7 +17,8 @@ package cache import ( - "github.com/cornelk/hashmap" + "sync" + "sync/atomic" "github.com/deepflowio/deepflow/message/controller" metadbmodel "github.com/deepflowio/deepflow/server/controller/db/metadb/model" @@ -27,46 +28,124 @@ import ( type labelValue struct { org *common.ORG - valueToID *hashmap.Map[string, int] + active atomic.Value // map[string]int (valueToID) + activeR atomic.Value // map[int]string (idToValue), rebuilt on every refresh + pending map[string]int + mu sync.RWMutex +} + +func (lv *labelValue) replaceActive(newActive map[string]int) { + lv.active.Store(newActive) +} + +// GetValueByID returns the label value string for a given ID. +// Only reflects the last committed refresh snapshot (pending entries are excluded). +func (lv *labelValue) GetValueByID(id int) (string, bool) { + if r := lv.activeR.Load(); r != nil { + value, ok := r.(map[int]string)[id] + return value, ok + } + return "", false } func newLabelValue(org *common.ORG) *labelValue { - return &labelValue{ - org: org, - valueToID: hashmap.New[string, int](), + lv := &labelValue{ + org: org, + active: atomic.Value{}, + pending: make(map[string]int), + } + lv.active.Store(make(map[string]int)) + return lv +} + +func (lv *labelValue) getActive() map[string]int { + if active := lv.active.Load(); active != nil { + return active.(map[string]int) + } + return map[string]int{} +} + +func cloneValueMap(src map[string]int, extra int) map[string]int { + dst := make(map[string]int, len(src)+extra) + for key, value := range src { + dst[key] = value } + return dst } func (lv *labelValue) GetIDByValue(v string) (int, bool) { - if id, ok := lv.valueToID.Get(v); ok { - return id, true + if item, ok := lv.getActive()[v]; ok { + return item, true + } + + lv.mu.RLock() + defer lv.mu.RUnlock() + if item, ok := lv.pending[v]; ok { + return item, true } return 0, false } -func (lv *labelValue) GetValueToID() *hashmap.Map[string, int] { - return lv.valueToID +func (lv *labelValue) GetValueToID() map[string]int { + active := lv.getActive() + + lv.mu.RLock() + pendingLen := len(lv.pending) + snapshot := cloneValueMap(active, pendingLen) + for key, value := range lv.pending { + snapshot[key] = value + } + lv.mu.RUnlock() + + return snapshot } func (lv *labelValue) Add(batch []*controller.PrometheusLabelValue) { + lv.mu.Lock() + defer lv.mu.Unlock() for _, item := range batch { - lv.valueToID.Set(item.GetValue(), int(item.GetId())) + lv.pending[item.GetValue()] = int(item.GetId()) } } func (lv *labelValue) refresh(args ...interface{}) error { - labelValues, err := lv.load() + var count int64 + if err := lv.org.DB.Model(&metadbmodel.PrometheusLabelValue{}).Count(&count).Error; err != nil { + return err + } + + rows, err := lv.org.DB.Model(&metadbmodel.PrometheusLabelValue{}).Select("id", "value").Rows() if err != nil { return err } - for _, item := range labelValues { - lv.valueToID.Set(item.Value, item.ID) + defer rows.Close() + + newActive := make(map[string]int, count) + newActiveR := make(map[int]string, count) + for rows.Next() { + var id int + var value string + if scanErr := rows.Scan(&id, &value); scanErr != nil { + log.Errorf("stream scan prometheus_label_value interrupted: %v", scanErr, lv.org.LogPrefix) + return scanErr + } + newActive[value] = id + newActiveR[id] = value + } + if err := rows.Err(); err != nil { + log.Errorf("stream read prometheus_label_value error: %v", err, lv.org.LogPrefix) + return err } - return nil -} -func (lv *labelValue) load() ([]*metadbmodel.PrometheusLabelValue, error) { - var labelValues []*metadbmodel.PrometheusLabelValue - err := lv.org.DB.Find(&labelValues).Error - return labelValues, err + lv.mu.Lock() + pending := lv.pending + lv.pending = make(map[string]int) + for key, value := range pending { + newActive[key] = value + } + lv.mu.Unlock() + + lv.activeR.Store(newActiveR) + lv.replaceActive(newActive) + return nil } diff --git a/server/controller/prometheus/cache/metric_name.go b/server/controller/prometheus/cache/metric_name.go index cfc2759bc6e..088b3cc0f0f 100644 --- a/server/controller/prometheus/cache/metric_name.go +++ b/server/controller/prometheus/cache/metric_name.go @@ -18,6 +18,7 @@ package cache import ( "sync" + "sync/atomic" "github.com/deepflowio/deepflow/message/controller" metadbmodel "github.com/deepflowio/deepflow/server/controller/db/metadb/model" @@ -25,55 +26,93 @@ import ( ) type metricName struct { - org *common.ORG + org *common.ORG + active atomic.Value // map[string]int - nameToID sync.Map - idToName sync.Map + mu sync.RWMutex + pendingNameToID map[string]int } func newMetricName(org *common.ORG) *metricName { - return &metricName{org: org} + mn := &metricName{ + org: org, + pendingNameToID: make(map[string]int), + } + mn.active.Store(make(map[string]int)) + return mn +} + +func (mn *metricName) getActive() map[string]int { + if active := mn.active.Load(); active != nil { + return active.(map[string]int) + } + return map[string]int{} } -func (mn *metricName) Get() *sync.Map { - return &mn.nameToID +func (mn *metricName) replaceActive(newActive map[string]int) { + mn.active.Store(newActive) } func (mn *metricName) GetIDByName(n string) (int, bool) { - if id, ok := mn.nameToID.Load(n); ok { - return id.(int), true + if id, ok := mn.getActive()[n]; ok { + return id, true } - return 0, false + mn.mu.RLock() + defer mn.mu.RUnlock() + id, ok := mn.pendingNameToID[n] + return id, ok } -func (mn *metricName) GetNameByID(id int) (string, bool) { - if name, ok := mn.idToName.Load(id); ok { - return name.(string), true +func (mn *metricName) GetNameToID() map[string]int { + active := mn.getActive() + mn.mu.RLock() + snapshot := make(map[string]int, len(active)+len(mn.pendingNameToID)) + for k, v := range active { + snapshot[k] = v + } + for k, v := range mn.pendingNameToID { + snapshot[k] = v } - return "", false + mn.mu.RUnlock() + return snapshot } func (mn *metricName) Add(batch []*controller.PrometheusMetricName) { + mn.mu.Lock() + defer mn.mu.Unlock() for _, item := range batch { - mn.nameToID.Store(item.GetName(), int(item.GetId())) - mn.idToName.Store(int(item.GetId()), item.GetName()) + mn.pendingNameToID[item.GetName()] = int(item.GetId()) } } func (mn *metricName) refresh(args ...interface{}) error { - metricNames, err := mn.load() + items, err := mn.load() if err != nil { return err } - for _, item := range metricNames { - mn.nameToID.Store(item.Name, item.ID) - mn.idToName.Store(item.ID, item.Name) - } + mn.processLoadedData(items) return nil } +func (mn *metricName) processLoadedData(items []*metadbmodel.PrometheusMetricName) { + newActive := make(map[string]int, len(items)) + for _, item := range items { + newActive[item.Name] = item.ID + } + + mn.mu.Lock() + pending := mn.pendingNameToID + mn.pendingNameToID = make(map[string]int) + mn.mu.Unlock() + + for k, v := range pending { + newActive[k] = v + } + mn.replaceActive(newActive) +} + func (mn *metricName) load() ([]*metadbmodel.PrometheusMetricName, error) { var metricNames []*metadbmodel.PrometheusMetricName - err := mn.org.DB.Find(&metricNames).Error + err := mn.org.DB.Select("id", "name").Find(&metricNames).Error return metricNames, err } diff --git a/server/controller/prometheus/cleaner.go b/server/controller/prometheus/cleaner.go index 7891df3a137..a4ee582e332 100644 --- a/server/controller/prometheus/cleaner.go +++ b/server/controller/prometheus/cleaner.go @@ -233,7 +233,7 @@ func (d *deleter) deleteExpiredMetricName() error { func (d *deleter) deleteExpiredLabel() error { toDelete := make([]metadbmodel.PrometheusLabel, 0) for _, item := range d.dataToCheck.labels { - if !d.activeData.getLabel(item.Name, item.Value) { + if !d.activeData.getLabel(item.NameID, item.ValueID) { toDelete = append(toDelete, item) } } @@ -478,6 +478,7 @@ func (q *querier) getRegionToDomainNamePrefix() (map[string]string, error) { } type activeData struct { + cache *cache.Cache metricNames map[string]struct{} // for prometheus_metric_name labelNames map[string]struct{} // for prometheus_label_name labelValues map[string]struct{} // for prometheus_label_value @@ -487,6 +488,7 @@ type activeData struct { func newActiveData(c *cache.Cache) *activeData { return &activeData{ + cache: c, metricNames: make(map[string]struct{}), labelNames: make(map[string]struct{}), labelValues: make(map[string]struct{}), @@ -517,8 +519,8 @@ func (d *activeData) getLabelValue(value string) bool { return ok } -func (d *activeData) getLabel(name, value string) bool { - _, ok := d.labels[newLabelKey(name, value)] +func (d *activeData) getLabel(nameID, valueID int) bool { + _, ok := d.labels[newLabelKey(nameID, valueID)] return ok } @@ -540,7 +542,14 @@ func (d *activeData) appendLabelValue(value string) { } func (d *activeData) appendLabel(name, value string) { - d.labels[newLabelKey(name, value)] = struct{}{} + // Resolve to int IDs via the cache to keep the labels map compact. + nameID, okN := d.cache.LabelName.GetIDByName(name) + valueID, okV := d.cache.LabelValue.GetIDByValue(value) + if !okN || !okV { + // Name or value not yet registered — label can't exist in DB either. + return + } + d.labels[newLabelKey(nameID, valueID)] = struct{}{} } func (d *activeData) appendMetricLabelName(metricName, labelName string) { @@ -560,15 +569,12 @@ func newMetricLabelNameKey(metricName, labelName string) metricLabelNameKey { } type labelKey struct { - name string - value string + nameID int + valueID int } -func newLabelKey(name, value string) labelKey { - return labelKey{ - name: name, - value: value, - } +func newLabelKey(nameID, valueID int) labelKey { + return labelKey{nameID: nameID, valueID: valueID} } type dataToCheck struct { diff --git a/server/controller/prometheus/common/errgroup.go b/server/controller/prometheus/common/errgroup.go index c4e844d4c53..893f346be57 100644 --- a/server/controller/prometheus/common/errgroup.go +++ b/server/controller/prometheus/common/errgroup.go @@ -18,14 +18,29 @@ package common import ( "context" + "fmt" + "runtime/debug" "golang.org/x/sync/errgroup" + + "github.com/deepflowio/deepflow/server/libs/logger" ) +var log = logger.MustGetLogger("prometheus.synchronizer.common") + type ErrFunc func(...interface{}) error func AppendErrGroupWithContext(ctx context.Context, eg *errgroup.Group, f ErrFunc, args ...interface{}) { - eg.Go(func() error { + eg.Go(func() (err error) { + defer func() { + if r := recover(); r != nil { + // NOTE: fatal errors such as "concurrent map read and map write" are + // thrown by the runtime and cannot be caught here; prevent them by + // fixing the underlying data race instead. + log.Errorf("prometheus goroutine recovered from panic: %v\n%s", r, debug.Stack()) + err = fmt.Errorf("panic: %v", r) + } + }() select { case <-ctx.Done(): return ctx.Err() @@ -36,7 +51,13 @@ func AppendErrGroupWithContext(ctx context.Context, eg *errgroup.Group, f ErrFun } func AppendErrGroup(eg *errgroup.Group, f ErrFunc, args ...interface{}) { - eg.Go(func() error { + eg.Go(func() (err error) { + defer func() { + if r := recover(); r != nil { + log.Errorf("prometheus goroutine recovered from panic: %v\n%s", r, debug.Stack()) + err = fmt.Errorf("panic: %v", r) + } + }() return f(args...) }) } diff --git a/server/controller/prometheus/config/config.go b/server/controller/prometheus/config/config.go index 10e21e267f6..75505f59879 100644 --- a/server/controller/prometheus/config/config.go +++ b/server/controller/prometheus/config/config.go @@ -17,8 +17,8 @@ package config type Config struct { - SynchronizerCacheRefreshInterval int `default:"60" yaml:"synchronizer_cache_refresh_interval"` - EncoderCacheRefreshInterval int `default:"3600" yaml:"encoder_cache_refresh_interval"` + SynchronizerCacheRefreshInterval int `default:"300" yaml:"synchronizer_cache_refresh_interval"` + EncoderCacheRefreshInterval int `default:"36000" yaml:"encoder_cache_refresh_interval"` ResourceMaxID0 int `default:"64000" yaml:"resource_max_id_0"` ResourceMaxID1 int `default:"499999" yaml:"resource_max_id_1"` APPLabelIndexMax int `default:"255" yaml:"app_label_index"` diff --git a/server/controller/prometheus/encoder/encoder.go b/server/controller/prometheus/encoder/encoder.go index 68d12cc65fa..649e5409ff6 100644 --- a/server/controller/prometheus/encoder/encoder.go +++ b/server/controller/prometheus/encoder/encoder.go @@ -51,7 +51,7 @@ func newEncoder(cfg prometheuscfg.Config, orgID int) (*Encoder, error) { e.metricName = newMetricName(org, cfg.ResourceMaxID1) e.labelName = newLabelName(org, cfg.ResourceMaxID0) e.labelValue = newLabelValue(org) - e.label = newLabel(org) + e.label = newLabel(org, e.labelName, e.labelValue) e.LabelLayout = newLabelLayout(org, cfg) return e, nil } diff --git a/server/controller/prometheus/encoder/label.go b/server/controller/prometheus/encoder/label.go index 2b71b703dae..1bb41240eb7 100644 --- a/server/controller/prometheus/encoder/label.go +++ b/server/controller/prometheus/encoder/label.go @@ -31,45 +31,90 @@ type label struct { org *common.ORG lock sync.Mutex resourceType string - labelKeyToID sync.Map - labelIDToKey sync.Map + labelKeyToID map[cache.LabelKey]int + labelName *labelName + labelValue *labelValue + + isRefreshing bool + pendingKeys map[cache.LabelKey]int } -func newLabel(org *common.ORG) *label { +func newLabel(org *common.ORG, ln *labelName, lv *labelValue) *label { return &label{ org: org, resourceType: "label", + labelKeyToID: make(map[cache.LabelKey]int), + labelName: ln, + labelValue: lv, } } func (l *label) store(item *metadbmodel.PrometheusLabel) { - l.labelKeyToID.Store(cache.NewLabelKey(item.Name, item.Value), item.ID) - l.labelIDToKey.Store(item.ID, cache.NewLabelKey(item.Name, item.Value)) -} + key := cache.NewLabelKey(item.NameID, item.ValueID) + l.labelKeyToID[key] = item.ID -func (l *label) getKey(id int) (cache.LabelKey, bool) { - if item, ok := l.labelIDToKey.Load(id); ok { - return item.(cache.LabelKey), true + if l.isRefreshing { + l.pendingKeys[key] = item.ID } - return cache.LabelKey{}, false } func (l *label) getID(key cache.LabelKey) (int, bool) { - if item, ok := l.labelKeyToID.Load(key); ok { - return item.(int), true - } - return 0, false + id, ok := l.labelKeyToID[key] + return id, ok +} + +func (l *label) MarkRefresh() { + l.lock.Lock() + defer l.lock.Unlock() + l.isRefreshing = true + l.pendingKeys = make(map[cache.LabelKey]int) +} + +func (l *label) MarkRefreshDone() { + l.lock.Lock() + defer l.lock.Unlock() + l.isRefreshing = false + l.pendingKeys = nil } func (l *label) refresh(args ...interface{}) error { - var items []*metadbmodel.PrometheusLabel - err := l.org.DB.Find(&items).Error + l.MarkRefresh() + defer l.MarkRefreshDone() + + log.Info("TODO start") + + var count int64 + if err := l.org.DB.Model(&metadbmodel.PrometheusLabel{}).Count(&count).Error; err != nil { + return err + } + + rows, err := l.org.DB.Model(&metadbmodel.PrometheusLabel{}).Select("id", "name_id", "value_id").Rows() if err != nil { return err } - for _, item := range items { - l.store(item) + defer rows.Close() + + newMap := make(map[cache.LabelKey]int, count) + for rows.Next() { + var id, nameID, valueID int + if scanErr := rows.Scan(&id, &nameID, &valueID); scanErr != nil { + return scanErr + } + newMap[cache.NewLabelKey(nameID, valueID)] = id + } + if err := rows.Err(); err != nil { + return err + } + + log.Info("TODO end") + + l.lock.Lock() + for k, v := range l.pendingKeys { + newMap[k] = v } + l.labelKeyToID = newMap + l.lock.Unlock() + return nil } @@ -77,38 +122,51 @@ func (l *label) encode(toAdd []*controller.PrometheusLabelRequest) ([]*controlle l.lock.Lock() defer l.lock.Unlock() + type pendingLabel struct { + item *metadbmodel.PrometheusLabel + } + resp := make([]*controller.PrometheusLabel, 0) - var dbToAdd []*metadbmodel.PrometheusLabel + var pending []pendingLabel for _, item := range toAdd { n := item.GetName() v := item.GetValue() - if id, ok := l.getID(cache.NewLabelKey(n, v)); ok { + nameID, okN := l.labelName.getID(n) + valueID, okV := l.labelValue.getID(v) + if !okN || !okV { + // Name or value not yet encoded — skip; the next encode round + // (after the caller retries) will succeed once they are present. + log.Warningf("label (%s=%s): name_id or value_id not found, skipping", n, v, l.org.LogPrefix) + continue + } + if id, ok := l.getID(cache.NewLabelKey(nameID, valueID)); ok { resp = append(resp, &controller.PrometheusLabel{ - Name: &n, - Value: &v, - Id: proto.Uint32(uint32(id)), + Id: proto.Uint32(uint32(id)), + NameId: proto.Uint32(uint32(nameID)), + ValueId: proto.Uint32(uint32(valueID)), }) continue } - dbToAdd = append(dbToAdd, &metadbmodel.PrometheusLabel{ - Name: n, - Value: v, + pending = append(pending, pendingLabel{ + item: &metadbmodel.PrometheusLabel{NameID: nameID, ValueID: valueID}, }) } - err := addBatch(l.org.DB, dbToAdd, l.resourceType) - if err != nil { + dbToAdd := make([]*metadbmodel.PrometheusLabel, len(pending)) + for i := range pending { + dbToAdd[i] = pending[i].item + } + if err := addBatch(l.org.DB, dbToAdd, l.resourceType); err != nil { log.Errorf("add %s error: %s", l.resourceType, err.Error(), l.org.LogPrefix) return nil, err } - for _, item := range dbToAdd { - l.store(item) + for i, p := range pending { + l.store(dbToAdd[i]) resp = append(resp, &controller.PrometheusLabel{ - Name: &item.Name, - Value: &item.Value, - Id: proto.Uint32(uint32(item.ID)), + Id: proto.Uint32(uint32(dbToAdd[i].ID)), + NameId: proto.Uint32(uint32(p.item.NameID)), + ValueId: proto.Uint32(uint32(p.item.ValueID)), }) - } return resp, nil } diff --git a/server/controller/prometheus/encoder/label_layout.go b/server/controller/prometheus/encoder/label_layout.go index b713948e99f..d9fdfef7b58 100644 --- a/server/controller/prometheus/encoder/label_layout.go +++ b/server/controller/prometheus/encoder/label_layout.go @@ -155,7 +155,13 @@ func (ll *labelLayout) refresh(args ...interface{}) error { ia, _ := ll.createIndexAllocatorIfNotExists(mn) ia.refresh(lnToIdx) } + ll.lock.Lock() + currentAllocators := make(map[string]*indexAllocator, len(ll.metricNameToIdxAllocator)) for mn, ia := range ll.metricNameToIdxAllocator { + currentAllocators[mn] = ia + } + ll.lock.Unlock() + for mn, ia := range currentAllocators { if _, ok := mnToLnIdx[mn]; !ok { ia.refresh(make(map[string]int)) } @@ -193,15 +199,15 @@ func (ll *labelLayout) createIndexAllocatorIfNotExists(metricName string) (*inde ia := newIndexAllocator(ll.org, metricName, ll.appLabelIndexMax) ia.refresh(make(map[string]int)) ll.metricNameToIdxAllocator[metricName] = ia - return ll.metricNameToIdxAllocator[metricName], nil + return ia, nil } -func (ll *labelLayout) getIndexAllocator(metricName string) (*indexAllocator, bool) { - ll.lock.Lock() - defer ll.lock.Unlock() - allocator, ok := ll.metricNameToIdxAllocator[metricName] - return allocator, ok -} +// func (ll *labelLayout) getIndexAllocator(metricName string) (*indexAllocator, bool) { +// ll.lock.Lock() +// defer ll.lock.Unlock() +// allocator, ok := ll.metricNameToIdxAllocator[metricName] +// return allocator, ok +// } func (ll *labelLayout) SingleEncode(metricName string, labelNames []string) ([]*controller.PrometheusMetricAPPLabelLayout, error) { log.Infof("encode metric: %s app label names: %v", metricName, labelNames, ll.org.LogPrefix) diff --git a/server/controller/prometheus/encoder/label_value.go b/server/controller/prometheus/encoder/label_value.go index 4d03045680b..67f33db3e61 100644 --- a/server/controller/prometheus/encoder/label_value.go +++ b/server/controller/prometheus/encoder/label_value.go @@ -30,26 +30,75 @@ type labelValue struct { org *common.ORG lock sync.Mutex resourceType string - strToID sync.Map + strToID map[string]int + + isRefreshing bool + pendingKeys map[string]int } func newLabelValue(org *common.ORG) *labelValue { return &labelValue{ org: org, resourceType: "label_value", + strToID: make(map[string]int), } } +func (lv *labelValue) MarkRefresh() { + lv.lock.Lock() + defer lv.lock.Unlock() + lv.isRefreshing = true + lv.pendingKeys = make(map[string]int) +} + +func (lv *labelValue) MarkRefreshDone() { + lv.lock.Lock() + defer lv.lock.Unlock() + lv.isRefreshing = false + lv.pendingKeys = nil +} + func (lv *labelValue) refresh(args ...interface{}) error { - var items []*metadbmodel.PrometheusLabelValue - err := lv.org.DB.Unscoped().Find(&items).Error + lv.MarkRefresh() + defer lv.MarkRefreshDone() + + log.Info("TODO start") + + var count int64 + if err := lv.org.DB.Model(&metadbmodel.PrometheusLabelValue{}).Count(&count).Error; err != nil { + log.Errorf("db query %s failed: %v", lv.resourceType, err, lv.org.LogPrefix) + return err + } + + rows, err := lv.org.DB.Model(&metadbmodel.PrometheusLabelValue{}).Select("id", "value").Rows() if err != nil { log.Errorf("db query %s failed: %v", lv.resourceType, err, lv.org.LogPrefix) return err } - for _, item := range items { - lv.store(item) + defer rows.Close() + + newMap := make(map[string]int, count) + for rows.Next() { + var id int + var value string + if scanErr := rows.Scan(&id, &value); scanErr != nil { + log.Errorf("db stream scan %s interrupted: %v", lv.resourceType, scanErr, lv.org.LogPrefix) + return scanErr + } + newMap[value] = id + } + if err := rows.Err(); err != nil { + log.Errorf("db stream %s error: %v", lv.resourceType, err, lv.org.LogPrefix) + return err } + + lv.lock.Lock() + for k, v := range lv.pendingKeys { + newMap[k] = v + } + lv.strToID = newMap + lv.lock.Unlock() + return nil } @@ -61,7 +110,7 @@ func (lv *labelValue) encode(strs []string) ([]*controller.PrometheusLabelValue, dbToAdd := make([]*metadbmodel.PrometheusLabelValue, 0) for i := range strs { str := strs[i] - if id, ok := lv.getID(str); ok { + if id, ok := lv.getIDLocked(str); ok { resp = append(resp, &controller.PrometheusLabelValue{Value: &str, Id: proto.Uint32(uint32(id))}) continue } @@ -83,13 +132,23 @@ func (lv *labelValue) encode(strs []string) ([]*controller.PrometheusLabelValue, return resp, nil } +// getIDLocked reads strToID without acquiring lv.lock. Caller must hold lv.lock. +func (lv *labelValue) getIDLocked(str string) (int, bool) { + id, ok := lv.strToID[str] + return id, ok +} + +// getID is safe for concurrent callers; it acquires lv.lock internally. func (lv *labelValue) getID(str string) (int, bool) { - if item, ok := lv.strToID.Load(str); ok { - return item.(int), true - } - return 0, false + lv.lock.Lock() + defer lv.lock.Unlock() + return lv.getIDLocked(str) } func (lv *labelValue) store(item *metadbmodel.PrometheusLabelValue) { - lv.strToID.Store(item.Value, item.ID) + lv.strToID[item.Value] = item.ID + + if lv.isRefreshing { + lv.pendingKeys[item.Value] = item.ID + } } diff --git a/server/controller/prometheus/label.go b/server/controller/prometheus/label.go index 14a9630b7d5..5b9ada57a24 100644 --- a/server/controller/prometheus/label.go +++ b/server/controller/prometheus/label.go @@ -260,7 +260,7 @@ func (s *LabelSynchronizer) generateSyncRequest(toEncode *dataToEncode) *control return res }(toEncode.metricAPPLabelLayouts.ToSlice()), - Labels: func(ks []cache.LabelKey) []*controller.PrometheusLabelRequest { + Labels: func(ks []labelStrKey) []*controller.PrometheusLabelRequest { res := make([]*controller.PrometheusLabelRequest, 0, len(ks)) for i := range ks { res = append(res, &controller.PrometheusLabelRequest{ @@ -363,10 +363,25 @@ func (s *LabelSynchronizer) addMetricAPPLabelLayoutCache(arg ...interface{}) err func (s *LabelSynchronizer) addLabelCache(arg ...interface{}) error { ls := arg[0].([]*controller.PrometheusLabel) - s.cache.Label.Add(ls) + entries := make([]cache.LabelCacheEntry, 0, len(ls)) + for _, l := range ls { + entries = append(entries, cache.LabelCacheEntry{ + NameID: int(l.GetNameId()), + ValueID: int(l.GetValueId()), + LabelID: int(l.GetId()), + }) + } + s.cache.Label.Add(entries) return nil } +// labelStrKey is a temporary string-based label identifier used only within +// a single encode request to deduplicate pending labels before IDs are assigned. +type labelStrKey struct { + Name string + Value string +} + type dataToEncode struct { cache *cache.Cache @@ -374,7 +389,7 @@ type dataToEncode struct { labelNames mapset.Set[string] labelValues mapset.Set[string] metricAPPLabelLayouts mapset.Set[cache.LayoutKey] - labels mapset.Set[cache.LabelKey] + labels mapset.Set[labelStrKey] metricNameToLabelNames map[string]mapset.Set[string] } @@ -386,7 +401,7 @@ func newDataToEncode(c *cache.Cache) *dataToEncode { labelNames: mapset.NewSet[string](), labelValues: mapset.NewSet[string](), metricAPPLabelLayouts: mapset.NewSet[cache.LayoutKey](), - labels: mapset.NewSet[cache.LabelKey](), + labels: mapset.NewSet[labelStrKey](), metricNameToLabelNames: make(map[string]mapset.Set[string], 0), } } @@ -426,7 +441,15 @@ func (d *dataToEncode) appendMetricAPPLabelLayout(metricName, labelName string) } func (d *dataToEncode) tryAppendLabel(name, value string) { - if _, ok := d.cache.Label.GetIDByKey(cache.NewLabelKey(name, value)); !ok { - d.labels.Add(cache.NewLabelKey(name, value)) + // If name_id and value_id are already known, use the int-based label cache + // for O(1) lookup before deciding to encode. Otherwise the label is new + // (name or value not yet registered) and must be encoded regardless. + nameID, okN := d.cache.LabelName.GetIDByName(name) + valueID, okV := d.cache.LabelValue.GetIDByValue(value) + if okN && okV { + if _, ok := d.cache.Label.GetIDByKey(cache.NewLabelKey(nameID, valueID)); ok { + return + } } + d.labels.Add(labelStrKey{Name: name, Value: value}) } diff --git a/server/controller/prometheus/synchronizer.go b/server/controller/prometheus/synchronizer.go index da7aa669ec6..fe7ac6b10bd 100644 --- a/server/controller/prometheus/synchronizer.go +++ b/server/controller/prometheus/synchronizer.go @@ -18,6 +18,8 @@ package prometheus import ( // "sort" + "fmt" + "strings" mapset "github.com/deckarep/golang-set/v2" "google.golang.org/protobuf/proto" @@ -30,6 +32,27 @@ import ( var log = logger.MustGetLogger("prometheus.synchronizer") +const ( + maxLogCount = 10 + maxLogSize = 64 * 1024 // 64KB +) + +func logNotFoundDetail(items interface{}) string { + logItems := items.([]interface{}) + if len(logItems) > maxLogCount { + logItems = logItems[:maxLogCount] + } + logItemsStr := make([]string, 0, len(logItems)) + for _, item := range logItems { + logItemsStr = append(logItemsStr, fmt.Sprintf("%v", item)) + } + itemsStr := strings.Join(logItemsStr, ",") + if len(itemsStr) > maxLogSize { + itemsStr = itemsStr[:maxLogSize] + "... (truncated)" + } + return fmt.Sprintf("count: %d, <= %d items: %s", len(logItems), maxLogCount, itemsStr) +} + type counter struct { SendMetricCount uint64 SendLabelCount uint64 @@ -51,30 +74,27 @@ func newSynchronizer(c *cache.Cache) Synchronizer { } func (s *Synchronizer) assembleMetricLabelFully() ([]*trident.MetricLabelResponse, error) { - var err error nonLabelNames := mapset.NewSet[string]() metricNameToAPPLabelNames := make(map[string][]*trident.LabelResponse, 0) - s.cache.MetricAndAPPLabelLayout.Get().Range(func(k, v interface{}) bool { - key := k.(cache.LayoutKey) - labelNameID, ok := s.cache.LabelName.GetIDByName(key.LabelName) + for k, v := range s.cache.MetricAndAPPLabelLayout.GetLayoutKeyToIndex() { + labelNameID, ok := s.cache.LabelName.GetIDByName(k.LabelName) if !ok { - nonLabelNames.Add(key.LabelName) - return true + nonLabelNames.Add(k.LabelName) + continue } - metricNameToAPPLabelNames[key.MetricName] = append( - metricNameToAPPLabelNames[key.MetricName], + metricNameToAPPLabelNames[k.MetricName] = append( + metricNameToAPPLabelNames[k.MetricName], &trident.LabelResponse{ - Name: &key.LabelName, + Name: &k.LabelName, NameId: proto.Uint32(uint32(labelNameID)), - AppLabelColumnIndex: proto.Uint32(uint32(v.(uint8))), + AppLabelColumnIndex: proto.Uint32(uint32(v)), }) - return true - }) + } mLabels := make([]*trident.MetricLabelResponse, 0) - s.cache.MetricName.Get().Range(func(k, v interface{}) bool { - metricName := k.(string) - metricID := v.(int) + for k, v := range s.cache.MetricName.GetNameToID() { + metricName := k + metricID := v mLabels = append( mLabels, &trident.MetricLabelResponse{ @@ -84,43 +104,42 @@ func (s *Synchronizer) assembleMetricLabelFully() ([]*trident.MetricLabelRespons LabelIds: metricNameToAPPLabelNames[metricName], }) s.counter.SendMetricCount++ - return true - }) + } if nonLabelNames.Cardinality() > 0 { - log.Warningf("label name id not found, names: %v", nonLabelNames.ToSlice(), s.org.LogPrefix) + log.Warningf("ids of label names not found, %s", logNotFoundDetail(nonLabelNames.ToSlice())) } - return mLabels, err + return mLabels, nil } func (s *Synchronizer) assembleLabelFully() ([]*trident.LabelResponse, error) { ls := make([]*trident.LabelResponse, 0) - nonLabelNames := mapset.NewSet[string]() - nonLabelValues := mapset.NewSet[string]() - for iter := range s.cache.Label.GetKeyToID().IterBuffered() { - k := iter.Key - ni, ok := s.cache.LabelName.GetIDByName(k.Name) - if !ok { - nonLabelNames.Add(k.Name) + nonNameIDs := mapset.NewSet[int]() + nonValueIDs := mapset.NewSet[int]() + for k := range s.cache.Label.GetKeyToID() { + name, okN := s.cache.LabelName.GetNameByID(k.NameID) + if !okN { + nonNameIDs.Add(k.NameID) continue } - vi, ok := s.cache.LabelValue.GetIDByValue(k.Value) - if !ok { - nonLabelValues.Add(k.Value) + value, okV := s.cache.LabelValue.GetValueByID(k.ValueID) + if !okV { + nonValueIDs.Add(k.ValueID) continue } + n, v := name, value ls = append(ls, &trident.LabelResponse{ - Name: &k.Name, - Value: &k.Value, - NameId: proto.Uint32(uint32(ni)), - ValueId: proto.Uint32(uint32(vi)), + Name: &n, + Value: &v, + NameId: proto.Uint32(uint32(k.NameID)), + ValueId: proto.Uint32(uint32(k.ValueID)), }) s.counter.SendLabelCount++ } - if nonLabelNames.Cardinality() > 0 { - log.Warningf("label name id not found, names: %v", nonLabelNames.ToSlice(), s.org.LogPrefix) + if nonNameIDs.Cardinality() > 0 { + log.Warningf("strings for label name ids not found, %s", logNotFoundDetail(nonNameIDs.ToSlice())) } - if nonLabelValues.Cardinality() > 0 { - log.Warningf("label value id not found, values: %v", nonLabelValues.ToSlice(), s.org.LogPrefix) + if nonValueIDs.Cardinality() > 0 { + log.Warningf("strings for label value ids not found, %s", logNotFoundDetail(nonValueIDs.ToSlice())) } return ls, nil } diff --git a/server/controller/tagrecorder/ch_app_label.go b/server/controller/tagrecorder/ch_app_label.go index 02629ab259b..dea79009fdf 100644 --- a/server/controller/tagrecorder/ch_app_label.go +++ b/server/controller/tagrecorder/ch_app_label.go @@ -17,7 +17,7 @@ package tagrecorder import ( - "slices" + // "slices" "github.com/deepflowio/deepflow/server/controller/db/metadb" metadbmodel "github.com/deepflowio/deepflow/server/controller/db/metadb/model" @@ -40,39 +40,52 @@ func NewChAPPLabel() *ChAPPLabel { func (l *ChAPPLabel) generateNewData(db *metadb.DB) (map[PrometheusAPPLabelKey]metadbmodel.ChAPPLabel, bool) { log.Infof("generate data for %s", l.resourceTypeName, db.LogPrefixORGID) - var prometheusLabels []metadbmodel.PrometheusLabel - err := db.Unscoped().Find(&prometheusLabels).Error - + labelRows, err := db.Unscoped().Model(&metadbmodel.PrometheusLabel{}).Select("id", "name_id", "value_id").Rows() if err != nil { log.Errorf(dbQueryResourceFailed(l.resourceTypeName, err), db.LogPrefixORGID) return nil, false } + defer labelRows.Close() - appLabelSlice, ok := l.generateAPPLabelData(db) + var prometheusLabels []metadbmodel.PrometheusLabel + for labelRows.Next() { + var item metadbmodel.PrometheusLabel + if scanErr := labelRows.Scan(&item.ID, &item.NameID, &item.ValueID); scanErr != nil { + log.Errorf("stream scan %s prometheus_label interrupted: %v", l.resourceTypeName, scanErr, db.LogPrefixORGID) + return nil, false + } + prometheusLabels = append(prometheusLabels, item) + } + if err := labelRows.Err(); err != nil { + log.Errorf("stream read %s prometheus_label error: %v", l.resourceTypeName, err, db.LogPrefixORGID) + return nil, false + } + + _, ok := l.generateAPPLabelData(db) - labelNameIDMap, valueNameIDMap, ok := l.generateNameIDData(db) + _, _, ok = l.generateNameIDData(db) if !ok { return nil, false } keyToItem := make(map[PrometheusAPPLabelKey]metadbmodel.ChAPPLabel) - for _, prometheusLabel := range prometheusLabels { - labelName := prometheusLabel.Name - if slices.Contains(appLabelSlice, labelName) { - labelNameID, nameOK := labelNameIDMap[labelName] - labelValue := prometheusLabel.Value - labelValueID, valueOK := valueNameIDMap[labelValue] - if !nameOK || !valueOK { - log.Warningf("label name or value not found in db, labelName: %s, labelValue: %s", labelName, labelValue) - continue - } - keyToItem[PrometheusAPPLabelKey{LabelNameID: labelNameID, LabelValueID: labelValueID}] = metadbmodel.ChAPPLabel{ - LabelNameID: labelNameID, - LabelValue: labelValue, - LabelValueID: labelValueID, - } - } - + for _, _ = range prometheusLabels { + // @jinzhou TODO + // labelName := prometheusLabel.Name + // if slices.Contains(appLabelSlice, labelName) { + // labelNameID, nameOK := labelNameIDMap[labelName] + // labelValue := prometheusLabel.Value + // labelValueID, valueOK := valueNameIDMap[labelValue] + // if !nameOK || !valueOK { + // log.Warningf("label name or value not found in db, labelName: %s, labelValue: %s", labelName, labelValue) + // continue + // } + // keyToItem[PrometheusAPPLabelKey{LabelNameID: labelNameID, LabelValueID: labelValueID}] = metadbmodel.ChAPPLabel{ + // LabelNameID: labelNameID, + // LabelValue: labelValue, + // LabelValueID: labelValueID, + // } + // } } return keyToItem, true } @@ -111,29 +124,44 @@ func (l *ChAPPLabel) generateAPPLabelData(db *metadb.DB) ([]string, bool) { func (l *ChAPPLabel) generateNameIDData(db *metadb.DB) (map[string]int, map[string]int, bool) { labelNameIDMap := make(map[string]int) valueNameIDMap := make(map[string]int) - var prometheusLabelNames []metadbmodel.PrometheusLabelName - var prometheusLabelValues []metadbmodel.PrometheusLabelValue - - err := db.Unscoped().Find(&prometheusLabelNames).Error - + nameRows, err := db.Unscoped().Model(&metadbmodel.PrometheusLabelName{}).Select("id", "name").Rows() if err != nil { log.Errorf(dbQueryResourceFailed(l.resourceTypeName, err), db.LogPrefixORGID) return nil, nil, false } + defer nameRows.Close() + for nameRows.Next() { + var id int + var name string + if scanErr := nameRows.Scan(&id, &name); scanErr != nil { + log.Errorf("stream scan %s prometheus_label_name interrupted: %v", l.resourceTypeName, scanErr, db.LogPrefixORGID) + return nil, nil, false + } + labelNameIDMap[name] = id + } + if err := nameRows.Err(); err != nil { + log.Errorf("stream read %s prometheus_label_name error: %v", l.resourceTypeName, err, db.LogPrefixORGID) + return nil, nil, false + } - err = db.Unscoped().Find(&prometheusLabelValues).Error - + valueRows, err := db.Unscoped().Model(&metadbmodel.PrometheusLabelValue{}).Select("id", "value").Rows() if err != nil { log.Errorf(dbQueryResourceFailed(l.resourceTypeName, err), db.LogPrefixORGID) return nil, nil, false } - - for _, prometheusLabelName := range prometheusLabelNames { - labelNameIDMap[prometheusLabelName.Name] = prometheusLabelName.ID + defer valueRows.Close() + for valueRows.Next() { + var id int + var value string + if scanErr := valueRows.Scan(&id, &value); scanErr != nil { + log.Errorf("stream scan %s prometheus_label_value interrupted: %v", l.resourceTypeName, scanErr, db.LogPrefixORGID) + return nil, nil, false + } + valueNameIDMap[value] = id } - - for _, prometheusLabelValue := range prometheusLabelValues { - valueNameIDMap[prometheusLabelValue.Value] = prometheusLabelValue.ID + if err := valueRows.Err(); err != nil { + log.Errorf("stream read %s prometheus_label_value error: %v", l.resourceTypeName, err, db.LogPrefixORGID) + return nil, nil, false } return labelNameIDMap, valueNameIDMap, true } diff --git a/server/server.yaml b/server/server.yaml index 0a37779ca23..5f1c2573cc4 100644 --- a/server/server.yaml +++ b/server/server.yaml @@ -449,9 +449,9 @@ controller: prometheus: # synchronizer cache refresh interval, unit: second - synchronizer_cache_refresh_interval: 60 + synchronizer_cache_refresh_interval: 300 # encoder cache refresh interval, unit: second - encoder_cache_refresh_interval: 3600 + encoder_cache_refresh_interval: 36000 # time interval for regularly clearing prometheus expired data, unit: minute, default: 60 * 24 # time interval should be greater than or equal to ingester: prometheus-label-cache-expiration configuration data_clean_interval: 1440