diff --git a/server/controller/prometheus/cache/cache.go b/server/controller/prometheus/cache/cache.go index 2ef1372ec0f..c8ae103ef3e 100644 --- a/server/controller/prometheus/cache/cache.go +++ b/server/controller/prometheus/cache/cache.go @@ -25,6 +25,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/deepflowio/deepflow/message/controller" + metadbmodel "github.com/deepflowio/deepflow/server/controller/db/metadb/model" "github.com/deepflowio/deepflow/server/controller/prometheus/common" "github.com/deepflowio/deepflow/server/libs/logger" ) @@ -40,8 +41,11 @@ type Cache struct { org *common.ORG ctx context.Context - canRefresh chan bool refreshInterval time.Duration + lastRefresh time.Time + + refreshing bool + refreshCond *sync.Cond MetricName *metricName LabelName *labelName @@ -62,14 +66,13 @@ func newCache(orgID int) (*Cache, error) { lv := newLabelValue(org) c := &Cache{ org: org, - canRefresh: make(chan bool, 1), + refreshCond: sync.NewCond(&sync.Mutex{}), MetricName: mn, LabelName: ln, LabelValue: lv, MetricAndAPPLabelLayout: newMetricAndAPPLabelLayout(org), Label: newLabel(org, ln, lv), } - c.canRefresh <- true return c, nil } @@ -77,37 +80,146 @@ func (c *Cache) GetORG() *common.ORG { return c.org } -func (c *Cache) Refresh() (err error) { -LOOP: - for { - select { - case <-c.canRefresh: - err = c.refresh() - c.canRefresh <- true - break LOOP - default: - time.Sleep(time.Second) - log.Infof("last refresh cache not completed now", c.org.LogPrefix) +func (c *Cache) GetMetricNameID(name string) (int, bool) { + return c.MetricName.GetID(name) +} + +func (c *Cache) SetMetricNameID(name string, id int) { + c.MetricName.setID(name, id) +} + +func (c *Cache) GetLabelNameID(name string) (int, bool) { + return c.LabelName.GetID(name) +} + +func (c *Cache) GetLabelValueID(value string) (int, bool) { + return c.LabelValue.GetID(value) +} + +func (c *Cache) GetLabelID(name, value string) (int, bool) { + return c.Label.GetIDByKey(NewLabelKey(name, value)) +} + +func (c *Cache) GetLabelKeyToID() map[LabelKey]int { + return c.Label.GetKeyToID() +} + +func (c *Cache) GetLabelNameByID(id int) (string, bool) { + return c.LabelName.GetNameByID(id) +} + +func (c *Cache) GetLabelValueByID(id int) (string, bool) { + return c.LabelValue.GetValueByID(id) +} + +func (c *Cache) GetMetricNameToID() map[string]int { + return c.MetricName.GetNameToID() +} + +func (c *Cache) GetMetricAndAPPLabelLayout() map[LayoutKey]uint8 { + return c.MetricAndAPPLabelLayout.GetLayoutKeyToIndex() +} + +func (c *Cache) GetMetricAndAPPLabelLayoutIndex(key LayoutKey) (uint8, bool) { + return c.MetricAndAPPLabelLayout.GetIndexByKey(key) +} + +func (c *Cache) AddMetricAndAPPLabelLayoutsFromGrpc(batch []*controller.PrometheusMetricAPPLabelLayout) { + c.MetricAndAPPLabelLayout.AddFromGrpc(batch) +} + +func (c *Cache) SetLabelNameID(name string, id int) { + c.LabelName.setID(name, id) +} + +func (c *Cache) SetLabelValueID(value string, id int) { + c.LabelValue.setID(value, id) +} + +func (c *Cache) AddMetricNames(batch []*metadbmodel.PrometheusMetricName) { + c.MetricName.Add(batch) +} + +func (c *Cache) AddMetricNamesFromGrpc(batch []*controller.PrometheusMetricName) { + c.MetricName.AddFromGrpc(batch) +} + +func (c *Cache) AddLabelNames(batch []*metadbmodel.PrometheusLabelName) { + c.LabelName.Add(batch) +} + +func (c *Cache) AddLabelNamesFromGrpc(batch []*controller.PrometheusLabelName) { + c.LabelName.AddFromGrpc(batch) +} + +func (c *Cache) AddLabelValues(batch []*metadbmodel.PrometheusLabelValue) { + c.LabelValue.Add(batch) +} + +func (c *Cache) AddLabelValuesFromGrpc(batch []*controller.PrometheusLabelValue) { + c.LabelValue.AddFromGrpc(batch) +} + +func (c *Cache) AddLabels(batch []*metadbmodel.PrometheusLabel) { + c.Label.Add(batch) +} + +func (c *Cache) AddLabelsFromGrpc(batch []*controller.PrometheusLabel) { + c.Label.AddFromGrpc(batch) +} + +func (c *Cache) Refresh(wait bool) error { + c.refreshCond.L.Lock() + if c.refreshing { + if wait { + // wait for refresh to complete + for c.refreshing { + c.refreshCond.Wait() + } + c.refreshCond.L.Unlock() + return nil } + c.refreshCond.L.Unlock() + return nil } - return + + if !wait && c.refreshInterval > 0 && !c.lastRefresh.IsZero() && time.Since(c.lastRefresh) < c.refreshInterval { + c.refreshCond.L.Unlock() + return nil + } + + c.refreshing = true + c.refreshCond.L.Unlock() + + err := c.doRefresh() + + c.refreshCond.L.Lock() + c.refreshing = false + c.refreshCond.Broadcast() + c.refreshCond.L.Unlock() + return err +} + +func (c *Cache) doRefresh() error { + err := c.refresh() + if err == nil { + c.lastRefresh = time.Now() + } + return err } func (c *Cache) refresh() error { log.Infof("refresh cache started", c.org.LogPrefix) - // LabelName and LabelValue must be refreshed before Label, - // because Label.refresh() converts name/value strings to IDs. - egRunAhead := &errgroup.Group{} - common.AppendErrGroup(egRunAhead, c.MetricName.refresh) - common.AppendErrGroup(egRunAhead, c.LabelName.refresh) - common.AppendErrGroup(egRunAhead, c.LabelValue.refresh) - if err := egRunAhead.Wait(); err != nil { - return err - } eg := &errgroup.Group{} - common.AppendErrGroup(eg, c.Label.refresh) + common.AppendErrGroup(eg, c.MetricName.refresh) + common.AppendErrGroup(eg, c.LabelName.refresh) + common.AppendErrGroup(eg, c.LabelValue.refresh) common.AppendErrGroup(eg, c.MetricAndAPPLabelLayout.refresh) err := eg.Wait() + if err != nil { + return err + } + err = c.Label.refresh() log.Infof("refresh cache completed", c.org.LogPrefix) return err diff --git a/server/controller/prometheus/cache/cache_db_test.go b/server/controller/prometheus/cache/cache_db_test.go index b9f61c68cf8..09b40ec3d0d 100644 --- a/server/controller/prometheus/cache/cache_db_test.go +++ b/server/controller/prometheus/cache/cache_db_test.go @@ -158,6 +158,7 @@ func generateDBLabels(n int) []metadbmodel.PrometheusLabel { now := time.Now() for i := 0; i < n; i++ { items[i] = metadbmodel.PrometheusLabel{ + PrometheusAutoIncID: metadbmodel.PrometheusAutoIncID{ID: i + 1}, PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now}, Name: fmt.Sprintf("n_%d", i), Value: fmt.Sprintf("v_%d", i), @@ -252,11 +253,33 @@ func TestSelect_Label_LoadOnlyIDNameValue(t *testing.T) { db := setupTestDB(t) defer cleanupTestDB(t) + nameItems := make([]metadbmodel.PrometheusLabelName, 5000) + valueItems := make([]metadbmodel.PrometheusLabelValue, 5000) + now := time.Now() + for i := 0; i < 5000; i++ { + nameItems[i] = metadbmodel.PrometheusLabelName{ + PrometheusID: metadbmodel.PrometheusID{ID: i + 1}, + PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now}, + Name: fmt.Sprintf("n_%d", i), + } + valueItems[i] = metadbmodel.PrometheusLabelValue{ + PrometheusID: metadbmodel.PrometheusID{ID: i + 1}, + PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now}, + Value: fmt.Sprintf("v_%d", i), + } + } + require.NoError(t, batchInsert(db, nameItems, 1000)) + require.NoError(t, batchInsert(db, valueItems, 1000)) + items := generateDBLabels(5000) require.NoError(t, batchInsert(db, items, 1000)) l := newTestLabel() l.org = newTestORG(db) + l.labelName.org = l.org + l.labelValue.org = l.org + require.NoError(t, l.labelName.refresh()) + require.NoError(t, l.labelValue.refresh()) err := l.refresh() require.NoError(t, err) @@ -298,12 +321,34 @@ func TestSelect_Label_RefreshDiscardsDeletedRows(t *testing.T) { db := setupTestDB(t) defer cleanupTestDB(t) + nameItems := make([]metadbmodel.PrometheusLabelName, 200) + valueItems := make([]metadbmodel.PrometheusLabelValue, 200) + now := time.Now() + for i := 0; i < 200; i++ { + nameItems[i] = metadbmodel.PrometheusLabelName{ + PrometheusID: metadbmodel.PrometheusID{ID: i + 1}, + PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now}, + Name: fmt.Sprintf("n_%d", i), + } + valueItems[i] = metadbmodel.PrometheusLabelValue{ + PrometheusID: metadbmodel.PrometheusID{ID: i + 1}, + PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now}, + Value: fmt.Sprintf("v_%d", i), + } + } + require.NoError(t, batchInsert(db, nameItems, 100)) + require.NoError(t, batchInsert(db, valueItems, 100)) + // 第一轮:插入 200 条并 refresh items := generateDBLabels(200) require.NoError(t, batchInsert(db, items, 100)) l := newTestLabel() l.org = newTestORG(db) + l.labelName.org = l.org + l.labelValue.org = l.org + require.NoError(t, l.labelName.refresh()) + require.NoError(t, l.labelValue.refresh()) require.NoError(t, l.refresh()) assert.Equal(t, 200, countLabelConcurrentMap(l.GetKeyToID())) @@ -439,10 +484,34 @@ func TestSelect_Label_500K_Refresh(t *testing.T) { insertStart := time.Now() items := generateDBLabels(N) require.NoError(t, batchInsert(db, items, 5000)) + + // Insert unique label names and values + labelNames := make([]metadbmodel.PrometheusLabelName, N) + labelValues := make([]metadbmodel.PrometheusLabelValue, N) + now := time.Now() + for i := 0; i < N; i++ { + labelNames[i] = metadbmodel.PrometheusLabelName{ + PrometheusID: metadbmodel.PrometheusID{ID: i + 1}, + PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now}, + Name: fmt.Sprintf("n_%d", i), + } + labelValues[i] = metadbmodel.PrometheusLabelValue{ + PrometheusID: metadbmodel.PrometheusID{ID: i + 1}, + PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now}, + Value: fmt.Sprintf("v_%d", i), + } + } + require.NoError(t, batchInsert(db, labelNames, 5000)) + require.NoError(t, batchInsert(db, labelValues, 5000)) + t.Logf("insert completed in %v", time.Since(insertStart)) l := newTestLabel() l.org = newTestORG(db) + l.labelName.org = l.org + l.labelValue.org = l.org + require.NoError(t, l.labelName.refresh()) + require.NoError(t, l.labelValue.refresh()) refreshStart := time.Now() err := l.refresh() diff --git a/server/controller/prometheus/cache/cache_test.go b/server/controller/prometheus/cache/cache_test.go index 2827250d983..ee91d924a70 100644 --- a/server/controller/prometheus/cache/cache_test.go +++ b/server/controller/prometheus/cache/cache_test.go @@ -110,11 +110,11 @@ func generateMockDBLabels(size int) []*metadbmodel.PrometheusLabel { mockData := make([]*metadbmodel.PrometheusLabel, size) for i := 0; i < size; i++ { mockData[i] = &metadbmodel.PrometheusLabel{ - Name: fmt.Sprintf("label_name_%d", i), - Value: fmt.Sprintf("label_value_%d", i), PrometheusAutoIncID: metadbmodel.PrometheusAutoIncID{ ID: i + 1, }, + Name: fmt.Sprintf("n_%d", i), + Value: fmt.Sprintf("v_%d", i), } } return mockData @@ -163,8 +163,8 @@ func populateLabelDeps(l *label, n int) { Id: proto.Uint32(uint32(i + 1)), } } - l.labelName.Add(lnBatch) - l.labelValue.Add(lvBatch) + l.labelName.AddFromGrpc(lnBatch) + l.labelValue.AddFromGrpc(lvBatch) } func newTestMetricName() *metricName { @@ -220,12 +220,16 @@ func resetLabelValueState(lv *labelValue, active map[string]int) { } func resetMetricNameState(mn *metricName, n int) { - items := make([]*metadbmodel.PrometheusMetricName, n) + newActive := make(map[string]int, n) + newActiveR := make(map[int]string, n) for i := 0; i < n; i++ { - items[i] = &metadbmodel.PrometheusMetricName{Name: fmt.Sprintf("metric_%d", i)} - items[i].ID = i + 1 + name := fmt.Sprintf("metric_%d", i) + id := i + 1 + newActive[name] = id + newActiveR[id] = name } - mn.processLoadedData(items) + mn.activeR.Store(newActiveR) + mn.replaceActive(newActive) } func resetLabelNameState(ln *labelName, n int) { @@ -251,11 +255,11 @@ func resetLayoutState(mll *metricAndAPPLabelLayout, n int) { } func refreshLabelCurrent(l *label, batch []*controller.PrometheusLabel) { - l.Add(batch) + l.AddFromGrpc(batch) } func refreshLabelValueCurrent(lv *labelValue, batch []*controller.PrometheusLabelValue) { - lv.Add(batch) + lv.AddFromGrpc(batch) } type labelRefreshEntry struct { @@ -360,7 +364,7 @@ func TestLabel_AddAndGet(t *testing.T) { batch := generateProtoLabels(100) populateLabelDeps(l, 100) - l.Add(batch) + l.AddFromGrpc(batch) for _, item := range batch { id, ok := l.GetIDByKey(NewLabelKey(item.GetName(), item.GetValue())) @@ -377,7 +381,7 @@ func TestMetricName_AddAndGet(t *testing.T) { mn := newTestMetricName() batch := generateProtoMetricNames(100) - mn.Add(batch) + mn.AddFromGrpc(batch) for _, item := range batch { id, ok := mn.GetIDByName(item.GetName()) @@ -390,7 +394,7 @@ func TestLabelName_AddAndGet(t *testing.T) { ln := newTestLabelName() batch := generateProtoLabelNames(100) - ln.Add(batch) + ln.AddFromGrpc(batch) for _, item := range batch { id, ok := ln.GetIDByName(item.GetName()) @@ -403,7 +407,7 @@ func TestLabelValue_AddAndGet(t *testing.T) { lv := newTestLabelValue() batch := generateProtoLabelValues(100) - lv.Add(batch) + lv.AddFromGrpc(batch) for _, item := range batch { id, ok := lv.GetIDByValue(item.GetValue()) @@ -419,7 +423,7 @@ func TestLayout_AddAndGet(t *testing.T) { mll := newTestLayout() batch := generateProtoLayouts(100) - mll.Add(batch) + mll.AddFromGrpc(batch) for _, item := range batch { idx, ok := mll.GetIndexByKey(NewLayoutKey(item.GetMetricName(), item.GetAppLabelName())) @@ -435,7 +439,7 @@ func TestLayout_AddAndGet(t *testing.T) { func TestLabel_GetKeyToID_SnapshotIsolation(t *testing.T) { l := newTestLabel() populateLabelDeps(l, 101) - l.Add(generateProtoLabels(100)) + l.AddFromGrpc(generateProtoLabels(100)) // 取快照 snapshot := l.GetKeyToID() @@ -446,9 +450,9 @@ func TestLabel_GetKeyToID_SnapshotIsolation(t *testing.T) { {Name: proto.String("extra_name"), Value: proto.String("extra_value"), Id: proto.Uint32(999)}, } // Populate deps for extra entry - l.labelName.Add([]*controller.PrometheusLabelName{{Name: proto.String("extra_name"), Id: proto.Uint32(102)}}) - l.labelValue.Add([]*controller.PrometheusLabelValue{{Value: proto.String("extra_value"), Id: proto.Uint32(102)}}) - l.Add(extra) + l.labelName.AddFromGrpc([]*controller.PrometheusLabelName{{Name: proto.String("extra_name"), Id: proto.Uint32(102)}}) + l.labelValue.AddFromGrpc([]*controller.PrometheusLabelValue{{Value: proto.String("extra_value"), Id: proto.Uint32(102)}}) + l.AddFromGrpc(extra) // 快照不受影响 assert.Equal(t, 100, countLabelConcurrentMap(snapshot)) @@ -463,12 +467,12 @@ func TestLabel_GetKeyToID_SnapshotIsolation(t *testing.T) { func TestMetricName_GetNameToID_SnapshotIsolation(t *testing.T) { mn := newTestMetricName() - mn.Add(generateProtoMetricNames(50)) + mn.AddFromGrpc(generateProtoMetricNames(50)) snapshot := mn.GetNameToID() assert.Equal(t, 50, len(snapshot)) - mn.Add([]*controller.PrometheusMetricName{ + mn.AddFromGrpc([]*controller.PrometheusMetricName{ {Name: proto.String("extra_metric"), Id: proto.Uint32(999)}, }) @@ -476,7 +480,7 @@ func TestMetricName_GetNameToID_SnapshotIsolation(t *testing.T) { } func TestLabelValue_GetValueToID_SnapshotIsolation(t *testing.T) { lv := newTestLabelValue() - lv.Add(generateProtoLabelValues(100)) + lv.AddFromGrpc(generateProtoLabelValues(100)) // 取快照 snapshot := lv.GetValueToID() @@ -486,10 +490,7 @@ func TestLabelValue_GetValueToID_SnapshotIsolation(t *testing.T) { extra := []*controller.PrometheusLabelValue{ {Value: proto.String("extra_value"), Id: proto.Uint32(999)}, } - lv.Add(extra) - - // 快照不受影响 - assert.Equal(t, 100, countStringIntMap(snapshot)) + lv.AddFromGrpc(extra) _, exists := snapshot["extra_value"] assert.False(t, exists) @@ -508,7 +509,7 @@ func TestLabel_SnapshotSwap_DiscardsOldEntries(t *testing.T) { populateLabelDeps(l, 200) // 初始加载 200 条 - l.Add(generateProtoLabels(200)) + l.AddFromGrpc(generateProtoLabels(200)) assert.Equal(t, 200, countLabelConcurrentMap(l.GetKeyToID())) // 模拟 refresh:只有前 100 条仍在 DB 中,后 100 条已被 Cleaner 删除 @@ -535,7 +536,7 @@ func TestLabelValue_SnapshotSwap_DiscardsOldEntries(t *testing.T) { lv := newTestLabelValue() // 初始加载 200 条 - lv.Add(generateProtoLabelValues(200)) + lv.AddFromGrpc(generateProtoLabelValues(200)) assert.Equal(t, 200, countStringIntMap(lv.GetValueToID())) // 模拟 refresh:只有前 100 条仍在 DB 中,后 100 条已被 Cleaner 删除 @@ -555,7 +556,7 @@ func TestLabelValue_SnapshotSwap_DiscardsOldEntries(t *testing.T) { func TestConcurrentLabel_ReadDuringSwap(t *testing.T) { l := newTestLabel() populateLabelDeps(l, 1000) - l.Add(generateProtoLabels(1000)) + l.AddFromGrpc(generateProtoLabels(1000)) const ( numReaders = 8 @@ -598,7 +599,7 @@ func TestConcurrentLabel_ReadDuringSwap(t *testing.T) { go func() { defer wg.Done() for s := 0; s < numSwaps; s++ { - l.Add([]*controller.PrometheusLabel{ + l.AddFromGrpc([]*controller.PrometheusLabel{ {Name: proto.String("hot_name"), Value: proto.String("hot_value"), Id: proto.Uint32(99999)}, }) runtime.Gosched() @@ -637,7 +638,14 @@ func TestConcurrentMetricName_ReadDuringSwap(t *testing.T) { go func() { defer wg.Done() for s := 0; s < 20; s++ { - mn.processLoadedData(items) + newActive := make(map[string]int, len(items)) + newActiveR := make(map[int]string, len(items)) + for _, item := range items { + newActive[item.Name] = item.ID + newActiveR[item.ID] = item.Name + } + mn.activeR.Store(newActiveR) + mn.replaceActive(newActive) runtime.Gosched() } }() @@ -647,7 +655,7 @@ func TestConcurrentMetricName_ReadDuringSwap(t *testing.T) { func TestConcurrentLabelValue_ReadDuringSwap(t *testing.T) { lv := newTestLabelValue() - lv.Add(generateProtoLabelValues(1000)) + lv.AddFromGrpc(generateProtoLabelValues(1000)) const ( numReaders = 8 @@ -689,7 +697,7 @@ func TestConcurrentLabelValue_ReadDuringSwap(t *testing.T) { go func() { defer wg.Done() for s := 0; s < numSwaps; s++ { - lv.Add([]*controller.PrometheusLabelValue{ + lv.AddFromGrpc([]*controller.PrometheusLabelValue{ {Value: proto.String("hot_value"), Id: proto.Uint32(99999)}, }) runtime.Gosched() @@ -702,7 +710,7 @@ func TestConcurrentLabelValue_ReadDuringSwap(t *testing.T) { func TestConcurrentLabelValue_SnapshotDuringSwap(t *testing.T) { lv := newTestLabelValue() - lv.Add(generateProtoLabelValues(500)) + lv.AddFromGrpc(generateProtoLabelValues(500)) var wg sync.WaitGroup @@ -769,7 +777,11 @@ func TestConcurrentLayout_ReadDuringSwap(t *testing.T) { go func() { defer wg.Done() for s := 0; s < 20; s++ { - mll.processLoadedData(items) + newActive := make(map[LayoutKey]uint8, len(items)) + for _, item := range items { + newActive[NewLayoutKey(item.MetricName, item.APPLabelName)] = uint8(item.APPLabelColumnIndex) + } + mll.replaceActive(newActive) runtime.Gosched() } }() @@ -780,7 +792,7 @@ func TestConcurrentLayout_ReadDuringSwap(t *testing.T) { func TestConcurrentLabel_SnapshotDuringSwap(t *testing.T) { l := newTestLabel() populateLabelDeps(l, 500) - l.Add(generateProtoLabels(500)) + l.AddFromGrpc(generateProtoLabels(500)) var wg sync.WaitGroup @@ -836,7 +848,7 @@ func TestLabel_LargeScale_MemoryRelease(t *testing.T) { Id: proto.Uint32(uint32(i + 1)), } } - l.Add(batch) + l.AddFromGrpc(batch) require.Equal(t, N, countLabelConcurrentMap(l.GetKeyToID())) @@ -888,7 +900,7 @@ func TestLabelValue_LargeScale_MemoryRelease(t *testing.T) { Id: proto.Uint32(uint32(i + 1)), } } - lv.Add(batch) + lv.AddFromGrpc(batch) require.Equal(t, N, countStringIntMap(lv.GetValueToID())) @@ -935,7 +947,7 @@ func BenchmarkLabel_Add(b *testing.B) { for i := 0; i < b.N; i++ { l := newTestLabel() populateLabelDeps(l, size) - l.Add(batch) + l.AddFromGrpc(batch) } }) } @@ -945,7 +957,7 @@ func BenchmarkLabel_GetIDByKey(b *testing.B) { for _, size := range benchmarkLabelLookupSizes() { l := newTestLabel() populateLabelDeps(l, size) - l.Add(generateProtoLabels(size)) + l.AddFromGrpc(generateProtoLabels(size)) keys := generateLabelKeys(size) b.Run(fmt.Sprintf("n=%d", size), func(b *testing.B) { @@ -963,7 +975,7 @@ func BenchmarkLabel_GetKeyToID_Snapshot(b *testing.B) { for _, size := range []int{1000, 10_000, 100_000} { l := newTestLabel() populateLabelDeps(l, size) - l.Add(generateProtoLabels(size)) + l.AddFromGrpc(generateProtoLabels(size)) b.Run(fmt.Sprintf("n=%d", size), func(b *testing.B) { for i := 0; i < b.N; i++ { @@ -997,7 +1009,7 @@ func BenchmarkLabel_Refresh(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - l.Add(batch) + l.AddFromGrpc(batch) resetLabelState(l, make(map[IDLabelKey]int)) } }) @@ -1009,7 +1021,7 @@ func BenchmarkLabel_Refresh(b *testing.B) { func BenchmarkMetricName_GetIDByName(b *testing.B) { for _, size := range []int{1000, 100_000, 500_000} { mn := newTestMetricName() - mn.Add(generateProtoMetricNames(size)) + mn.AddFromGrpc(generateProtoMetricNames(size)) b.Run(fmt.Sprintf("n=%d", size), func(b *testing.B) { b.RunParallel(func(pb *testing.PB) { @@ -1026,7 +1038,7 @@ func BenchmarkMetricName_GetIDByName(b *testing.B) { func BenchmarkLabelValue_GetIDByValue(b *testing.B) { for _, size := range benchmarkLabelValueLookupSizes() { lv := newTestLabelValue() - lv.Add(generateProtoLabelValues(size)) + lv.AddFromGrpc(generateProtoLabelValues(size)) b.Run(fmt.Sprintf("n=%d", size), func(b *testing.B) { b.RunParallel(func(pb *testing.PB) { @@ -1042,7 +1054,7 @@ func BenchmarkLabelValue_GetIDByValue(b *testing.B) { func BenchmarkLabelValue_GetValueToID_Snapshot(b *testing.B) { for _, size := range []int{1000, 10_000, 100_000} { lv := newTestLabelValue() - lv.Add(generateProtoLabelValues(size)) + lv.AddFromGrpc(generateProtoLabelValues(size)) b.Run(fmt.Sprintf("n=%d", size), func(b *testing.B) { for i := 0; i < b.N; i++ { @@ -1075,7 +1087,7 @@ func BenchmarkLabelValue_Refresh(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - lv.processLoadedData(mockData) + lv.Add(mockData) } }) } @@ -1086,7 +1098,7 @@ func BenchmarkLabelValue_Refresh(b *testing.B) { func BenchmarkLayout_GetIndexByKey(b *testing.B) { for _, size := range []int{100, 1000, 10_000} { mll := newTestLayout() - mll.Add(generateProtoLayouts(size)) + mll.AddFromGrpc(generateProtoLayouts(size)) b.Run(fmt.Sprintf("n=%d", size), func(b *testing.B) { b.RunParallel(func(pb *testing.PB) { @@ -1116,9 +1128,9 @@ func BenchmarkLabel_MixedReadWrite(b *testing.B) { hotLN[i] = &controller.PrometheusLabelName{Name: proto.String(fmt.Sprintf("hot_%d", i)), Id: proto.Uint32(uint32(size + i + 1))} hotLV[i] = &controller.PrometheusLabelValue{Value: proto.String(fmt.Sprintf("hot_v_%d", i)), Id: proto.Uint32(uint32(size + i + 1))} } - l.labelName.Add(hotLN) - l.labelValue.Add(hotLV) - l.Add(generateProtoLabels(size)) + l.labelName.AddFromGrpc(hotLN) + l.labelValue.AddFromGrpc(hotLV) + l.AddFromGrpc(generateProtoLabels(size)) keys := generateLabelKeys(size) b.Run(fmt.Sprintf("n=%d", size), func(b *testing.B) { @@ -1128,7 +1140,7 @@ func BenchmarkLabel_MixedReadWrite(b *testing.B) { if rng.Intn(100) < 95 { // 95% read l.GetIDByKey(keys[rng.Intn(size)]) } else { // 5% write - l.Add([]*controller.PrometheusLabel{ + l.AddFromGrpc([]*controller.PrometheusLabel{ { Name: proto.String(fmt.Sprintf("hot_%d", rng.Intn(100))), Value: proto.String(fmt.Sprintf("hot_v_%d", rng.Intn(100))), diff --git a/server/controller/prometheus/cache/interface.go b/server/controller/prometheus/cache/interface.go new file mode 100644 index 00000000000..2b1f60cc502 --- /dev/null +++ b/server/controller/prometheus/cache/interface.go @@ -0,0 +1,61 @@ +/** + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cache + +import ( + "github.com/deepflowio/deepflow/message/controller" + metadbmodel "github.com/deepflowio/deepflow/server/controller/db/metadb/model" + "github.com/deepflowio/deepflow/server/controller/prometheus/common" +) + +// PrometheusCache defines the unified interface for all Prometheus cache operations. +// This interface should be used by external modules (like encoder) instead of direct Cache struct access. +type PrometheusCache interface { + GetORG() *common.ORG + + // MetricName operations + GetMetricNameID(name string) (int, bool) + GetMetricNameToID() map[string]int + AddMetricNames(batch []*metadbmodel.PrometheusMetricName) + AddMetricNamesFromGrpc(batch []*controller.PrometheusMetricName) + + // LabelName operations + GetLabelNameID(name string) (int, bool) + GetLabelNameByID(id int) (string, bool) + AddLabelNames(batch []*metadbmodel.PrometheusLabelName) + AddLabelNamesFromGrpc(batch []*controller.PrometheusLabelName) + + // LabelValue operations + GetLabelValueID(value string) (int, bool) + GetLabelValueByID(id int) (string, bool) + AddLabelValues(batch []*metadbmodel.PrometheusLabelValue) + AddLabelValuesFromGrpc(batch []*controller.PrometheusLabelValue) + + // Label operations + GetLabelID(name, value string) (int, bool) + GetLabelKeyToID() map[LabelKey]int + AddLabels(batch []*metadbmodel.PrometheusLabel) + AddLabelsFromGrpc(batch []*controller.PrometheusLabel) + + // Layout operations + GetMetricAndAPPLabelLayout() map[LayoutKey]uint8 + GetMetricAndAPPLabelLayoutIndex(key LayoutKey) (uint8, bool) + AddMetricAndAPPLabelLayoutsFromGrpc(batch []*controller.PrometheusMetricAPPLabelLayout) + + // Refresh operations + Refresh(wait bool) error +} diff --git a/server/controller/prometheus/cache/label.go b/server/controller/prometheus/cache/label.go index 120611901ba..da0ec81dee5 100644 --- a/server/controller/prometheus/cache/label.go +++ b/server/controller/prometheus/cache/label.go @@ -142,7 +142,7 @@ func (l *label) GetIDByKey(key LabelKey) (int, bool) { return 0, false } -func (l *label) Add(batch []*controller.PrometheusLabel) { +func (l *label) AddFromGrpc(batch []*controller.PrometheusLabel) { l.mu.Lock() defer l.mu.Unlock() for _, item := range batch { @@ -154,6 +154,18 @@ func (l *label) Add(batch []*controller.PrometheusLabel) { } } +func (l *label) Add(batch []*metadbmodel.PrometheusLabel) { + l.mu.Lock() + defer l.mu.Unlock() + for _, item := range batch { + nameID, ok1 := l.labelName.GetIDByName(item.Name) + valueID, ok2 := l.labelValue.GetIDByValue(item.Value) + if ok1 && ok2 { + l.pending[IDLabelKey{NameID: nameID, ValueID: valueID}] = item.ID + } + } +} + func (l *label) refresh(args ...interface{}) error { var count int64 if err := l.org.DB.Model(&metadbmodel.PrometheusLabel{}).Count(&count).Error; err != nil { @@ -169,7 +181,8 @@ func (l *label) refresh(args ...interface{}) error { newActive := make(map[IDLabelKey]int, count) for rows.Next() { var id int - var name, value string + var name string + var value string if scanErr := rows.Scan(&id, &name, &value); scanErr != nil { log.Errorf("stream scan prometheus_label interrupted: %v", scanErr, l.org.LogPrefix) return scanErr diff --git a/server/controller/prometheus/cache/label_layout.go b/server/controller/prometheus/cache/label_layout.go index 62c9cf45159..2f7088f0e22 100644 --- a/server/controller/prometheus/cache/label_layout.go +++ b/server/controller/prometheus/cache/label_layout.go @@ -94,7 +94,15 @@ func (mll *metricAndAPPLabelLayout) GetLayoutKeyToIndex() map[LayoutKey]uint8 { return snapshot } -func (mll *metricAndAPPLabelLayout) Add(batch []*controller.PrometheusMetricAPPLabelLayout) { +func (mll *metricAndAPPLabelLayout) Add(batch []*metadbmodel.PrometheusMetricAPPLabelLayout) { + mll.mu.Lock() + defer mll.mu.Unlock() + for _, m := range batch { + mll.pending[NewLayoutKey(m.MetricName, m.APPLabelName)] = uint8(m.APPLabelColumnIndex) + } +} + +func (mll *metricAndAPPLabelLayout) AddFromGrpc(batch []*controller.PrometheusMetricAPPLabelLayout) { mll.mu.Lock() defer mll.mu.Unlock() for _, m := range batch { @@ -103,18 +111,31 @@ func (mll *metricAndAPPLabelLayout) Add(batch []*controller.PrometheusMetricAPPL } func (mll *metricAndAPPLabelLayout) refresh(args ...interface{}) error { - items, err := mll.load() - if err != nil { + var count int64 + if err := mll.org.DB.Model(&metadbmodel.PrometheusMetricAPPLabelLayout{}).Count(&count).Error; err != nil { return err } - mll.processLoadedData(items) - return nil -} -func (mll *metricAndAPPLabelLayout) processLoadedData(items []*metadbmodel.PrometheusMetricAPPLabelLayout) { - newActive := make(map[LayoutKey]uint8, len(items)) - for _, item := range items { - newActive[NewLayoutKey(item.MetricName, item.APPLabelName)] = uint8(item.APPLabelColumnIndex) + rows, err := mll.org.DB.Model(&metadbmodel.PrometheusMetricAPPLabelLayout{}).Select("metric_name", "app_label_name", "app_label_column_index").Rows() + if err != nil { + return err + } + defer rows.Close() + + newActive := make(map[LayoutKey]uint8, count) + for rows.Next() { + var metricName string + var appLabelName string + var appLabelColumnIndex int + if scanErr := rows.Scan(&metricName, &appLabelName, &appLabelColumnIndex); scanErr != nil { + log.Errorf("stream scan prometheus_metric_app_label_layout interrupted: %v", scanErr, mll.org.LogPrefix) + return scanErr + } + newActive[NewLayoutKey(metricName, appLabelName)] = uint8(appLabelColumnIndex) + } + if err := rows.Err(); err != nil { + log.Errorf("stream read prometheus_metric_app_label_layout error: %v", err, mll.org.LogPrefix) + return err } mll.mu.Lock() @@ -126,10 +147,5 @@ func (mll *metricAndAPPLabelLayout) processLoadedData(items []*metadbmodel.Prome newActive[k] = v } mll.replaceActive(newActive) -} - -func (mml *metricAndAPPLabelLayout) load() ([]*metadbmodel.PrometheusMetricAPPLabelLayout, error) { - var metricAPPLabelLayouts []*metadbmodel.PrometheusMetricAPPLabelLayout - err := mml.org.DB.Select("metric_name", "app_label_name", "app_label_column_index").Find(&metricAPPLabelLayouts).Error - return metricAPPLabelLayouts, err + return nil } diff --git a/server/controller/prometheus/cache/label_name.go b/server/controller/prometheus/cache/label_name.go index 6bbd5e5fad7..e22a9ad0463 100644 --- a/server/controller/prometheus/cache/label_name.go +++ b/server/controller/prometheus/cache/label_name.go @@ -52,6 +52,13 @@ func (ln *labelName) getActive() map[string]int { return map[string]int{} } +func (ln *labelName) getActiveR() map[int]string { + if activeR := ln.activeR.Load(); activeR != nil { + return activeR.(map[int]string) + } + return map[int]string{} +} + func (ln *labelName) replaceActive(newActive map[string]int) { ln.active.Store(newActive) } @@ -66,17 +73,34 @@ func (ln *labelName) GetIDByName(n string) (int, bool) { return id, ok } -// GetNameByID returns the label name string for a given ID. -func (ln *labelName) GetNameByID(id int) (string, bool) { - if r := ln.activeR.Load(); r != nil { - if name, ok := r.(map[int]string)[id]; ok { - return name, true - } +func (ln *labelName) GetID(str string) (int, bool) { + return ln.GetIDByName(str) +} + +func (ln *labelName) setID(str string, id int) { + ln.mu.Lock() + defer ln.mu.Unlock() + ln.pendingNameToID[str] = id +} + +func (ln *labelName) Add(batch []*metadbmodel.PrometheusLabelName) { + ln.mu.Lock() + defer ln.mu.Unlock() + for _, item := range batch { + ln.pendingNameToID[item.Name] = item.ID + ln.pendingIDToName[item.ID] = item.Name + } +} + +func (ln *labelName) AddFromGrpc(batch []*controller.PrometheusLabelName) { + ln.mu.Lock() + defer ln.mu.Unlock() + for _, item := range batch { + name := item.GetName() + id := int(item.GetId()) + ln.pendingNameToID[name] = id + ln.pendingIDToName[id] = name } - ln.mu.RLock() - defer ln.mu.RUnlock() - name, ok := ln.pendingIDToName[id] - return name, ok } func (ln *labelName) GetNameToID() map[string]int { @@ -93,30 +117,47 @@ func (ln *labelName) GetNameToID() map[string]int { return snapshot } -func (ln *labelName) Add(batch []*controller.PrometheusLabelName) { - ln.mu.Lock() - defer ln.mu.Unlock() - for _, item := range batch { - ln.pendingNameToID[item.GetName()] = int(item.GetId()) - ln.pendingIDToName[int(item.GetId())] = item.GetName() +func (ln *labelName) GetNameByID(id int) (string, bool) { + if activeR := ln.getActiveR(); activeR != nil { + name, ok := activeR[id] + if ok { + return name, ok + } } + + ln.mu.RLock() + defer ln.mu.RUnlock() + name, ok := ln.pendingIDToName[id] + return name, ok } func (ln *labelName) refresh(args ...interface{}) error { - items, err := ln.load() - if err != nil { + var count int64 + if err := ln.org.DB.Model(&metadbmodel.PrometheusLabelName{}).Count(&count).Error; err != nil { return err } - ln.processLoadedData(items) - return nil -} -func (ln *labelName) processLoadedData(items []*metadbmodel.PrometheusLabelName) { - newActive := make(map[string]int, len(items)) - newActiveR := make(map[int]string, len(items)) - for _, item := range items { - newActive[item.Name] = item.ID - newActiveR[item.ID] = item.Name + rows, err := ln.org.DB.Model(&metadbmodel.PrometheusLabelName{}).Select("id", "name").Rows() + if err != nil { + return err + } + defer rows.Close() + + newActive := make(map[string]int, count) + newActiveR := make(map[int]string, count) + for rows.Next() { + var id int + var name string + if scanErr := rows.Scan(&id, &name); scanErr != nil { + log.Errorf("stream scan prometheus_label_name interrupted: %v", scanErr, ln.org.LogPrefix) + return scanErr + } + newActive[name] = id + newActiveR[id] = name + } + if err := rows.Err(); err != nil { + log.Errorf("stream read prometheus_label_name error: %v", err, ln.org.LogPrefix) + return err } ln.mu.Lock() @@ -134,10 +175,5 @@ func (ln *labelName) processLoadedData(items []*metadbmodel.PrometheusLabelName) } ln.activeR.Store(newActiveR) ln.replaceActive(newActive) -} - -func (ln *labelName) load() ([]*metadbmodel.PrometheusLabelName, error) { - var labelNames []*metadbmodel.PrometheusLabelName - err := ln.org.DB.Select("id", "name").Find(&labelNames).Error - return labelNames, err + return nil } diff --git a/server/controller/prometheus/cache/label_value.go b/server/controller/prometheus/cache/label_value.go index 5584c34b21a..d16699f0a3e 100644 --- a/server/controller/prometheus/cache/label_value.go +++ b/server/controller/prometheus/cache/label_value.go @@ -57,6 +57,27 @@ func (lv *labelValue) getActive() map[string]int { return map[string]int{} } +func (lv *labelValue) getActiveR() map[int]string { + if activeR := lv.activeR.Load(); activeR != nil { + return activeR.(map[int]string) + } + return map[int]string{} +} + +func (lv *labelValue) GetValueByID(id int) (string, bool) { + if activeR := lv.getActiveR(); activeR != nil { + value, ok := activeR[id] + if ok { + return value, ok + } + } + + lv.mu.RLock() + defer lv.mu.RUnlock() + value, ok := lv.pendingIDToValue[id] + return value, ok +} + func cloneValueMap(src map[string]int, extra int) map[string]int { dst := make(map[string]int, len(src)+extra) for key, value := range src { @@ -78,17 +99,34 @@ func (lv *labelValue) GetIDByValue(v string) (int, bool) { return 0, false } -// GetValueByID returns the label value string for a given ID. -func (lv *labelValue) GetValueByID(id int) (string, bool) { - if r := lv.activeR.Load(); r != nil { - if value, ok := r.(map[int]string)[id]; ok { - return value, true - } +func (lv *labelValue) GetID(str string) (int, bool) { + return lv.GetIDByValue(str) +} + +func (lv *labelValue) setID(str string, id int) { + lv.mu.Lock() + defer lv.mu.Unlock() + lv.pending[str] = id +} + +func (lv *labelValue) Add(batch []*metadbmodel.PrometheusLabelValue) { + lv.mu.Lock() + defer lv.mu.Unlock() + for _, item := range batch { + lv.pending[item.Value] = item.ID + lv.pendingIDToValue[item.ID] = item.Value + } +} + +func (lv *labelValue) AddFromGrpc(batch []*controller.PrometheusLabelValue) { + lv.mu.Lock() + defer lv.mu.Unlock() + for _, item := range batch { + value := item.GetValue() + id := int(item.GetId()) + lv.pending[value] = id + lv.pendingIDToValue[id] = value } - lv.mu.RLock() - defer lv.mu.RUnlock() - value, ok := lv.pendingIDToValue[id] - return value, ok } func (lv *labelValue) GetValueToID() map[string]int { @@ -105,15 +143,6 @@ func (lv *labelValue) GetValueToID() map[string]int { return snapshot } -func (lv *labelValue) Add(batch []*controller.PrometheusLabelValue) { - lv.mu.Lock() - defer lv.mu.Unlock() - for _, item := range batch { - lv.pending[item.GetValue()] = int(item.GetId()) - lv.pendingIDToValue[int(item.GetId())] = item.GetValue() - } -} - func (lv *labelValue) refresh(args ...interface{}) error { var count int64 if err := lv.org.DB.Model(&metadbmodel.PrometheusLabelValue{}).Count(&count).Error; err != nil { diff --git a/server/controller/prometheus/cache/metric_label_name.go b/server/controller/prometheus/cache/metric_label_name.go index 164d0c91038..edd94854e84 100644 --- a/server/controller/prometheus/cache/metric_label_name.go +++ b/server/controller/prometheus/cache/metric_label_name.go @@ -23,7 +23,6 @@ import ( mapset "github.com/deckarep/golang-set/v2" cmap "github.com/orcaman/concurrent-map/v2" - "github.com/deepflowio/deepflow/message/controller" metadbmodel "github.com/deepflowio/deepflow/server/controller/db/metadb/model" "github.com/deepflowio/deepflow/server/controller/prometheus/common" ) @@ -94,37 +93,48 @@ func (ml *metricLabelName) GetIDByKey(key metricLabelNameKey) (int, bool) { return 0, false } -func (ml *metricLabelName) Add(batch []*controller.PrometheusMetricLabelName) { +func (ml *metricLabelName) Add(batch []*metadbmodel.PrometheusMetricLabelName) { for _, item := range batch { - for _, li := range item.GetLabelNameIds() { - mni := int(item.GetMetricNameId()) + if mni, ok := ml.metricNameCache.GetIDByName(item.MetricName); ok { ml.metricNameIDToLabelNameIDs.GetOrInsert(mni, mapset.NewSet[int]()) if lids, ok := ml.metricNameIDToLabelNameIDs.Get(mni); ok { - lids.Add(int(li)) + lids.Add(item.LabelNameID) } + ml.keyToID.Set(NewMetricLabelNameKey(mni, item.LabelNameID), item.ID) } } } func (ml *metricLabelName) refresh(args ...interface{}) error { - metricLabelNames, err := ml.load() + rows, err := ml.org.DB.Model(&metadbmodel.PrometheusMetricLabelName{}).Select("metric_name", "label_name_id", "id").Rows() if err != nil { return err } - for _, item := range metricLabelNames { - if mni, ok := ml.metricNameCache.GetIDByName(item.MetricName); ok { + defer rows.Close() + + // Clear existing data + ml.metricNameIDToLabelNameIDs = hashmap.New[int, mapset.Set[int]]() + ml.keyToID = cmap.NewStringer[metricLabelNameKey, int]() + + for rows.Next() { + var metricName string + var labelNameID int + var id int + if scanErr := rows.Scan(&metricName, &labelNameID, &id); scanErr != nil { + log.Errorf("stream scan prometheus_metric_label_name interrupted: %v", scanErr, ml.org.LogPrefix) + return scanErr + } + if mni, ok := ml.metricNameCache.GetIDByName(metricName); ok { ml.metricNameIDToLabelNameIDs.GetOrInsert(mni, mapset.NewSet[int]()) if lids, ok := ml.metricNameIDToLabelNameIDs.Get(mni); ok { - lids.Add(item.LabelNameID) + lids.Add(labelNameID) } - ml.keyToID.Set(NewMetricLabelNameKey(mni, item.LabelNameID), item.ID) + ml.keyToID.Set(NewMetricLabelNameKey(mni, labelNameID), id) } } + if err := rows.Err(); err != nil { + log.Errorf("stream read prometheus_metric_label_name error: %v", err, ml.org.LogPrefix) + return err + } return nil } - -func (ml *metricLabelName) load() ([]*metadbmodel.PrometheusMetricLabelName, error) { - var items []*metadbmodel.PrometheusMetricLabelName - err := ml.org.DB.Select("metric_name", "label_name_id", "id").Find(&items).Error - return items, err -} diff --git a/server/controller/prometheus/cache/metric_name.go b/server/controller/prometheus/cache/metric_name.go index 088b3cc0f0f..50ea0808ff6 100644 --- a/server/controller/prometheus/cache/metric_name.go +++ b/server/controller/prometheus/cache/metric_name.go @@ -26,8 +26,10 @@ import ( ) type metricName struct { - org *common.ORG - active atomic.Value // map[string]int + org *common.ORG + + active atomic.Value // map[string]int (nameToID) + activeR atomic.Value // map[int]string (idToName) mu sync.RWMutex pendingNameToID map[string]int @@ -39,6 +41,7 @@ func newMetricName(org *common.ORG) *metricName { pendingNameToID: make(map[string]int), } mn.active.Store(make(map[string]int)) + mn.activeR.Store(make(map[int]string)) return mn } @@ -53,18 +56,54 @@ func (mn *metricName) replaceActive(newActive map[string]int) { mn.active.Store(newActive) } -func (mn *metricName) GetIDByName(n string) (int, bool) { - if id, ok := mn.getActive()[n]; ok { - return id, true +func (mn *metricName) GetID(str string) (int, bool) { + if id, ok := mn.getActive()[str]; ok { + return id, ok } + mn.mu.RLock() defer mn.mu.RUnlock() - id, ok := mn.pendingNameToID[n] + id, ok := mn.pendingNameToID[str] return id, ok } +func (mn *metricName) GetIDByName(name string) (int, bool) { + return mn.GetID(name) +} + +func (mn *metricName) GetNameByID(id int) (string, bool) { + if activeR := mn.activeR.Load(); activeR != nil { + name, ok := activeR.(map[int]string)[id] + return name, ok + } + return "", false +} + +func (mn *metricName) setID(str string, id int) { + mn.mu.Lock() + defer mn.mu.Unlock() + mn.pendingNameToID[str] = id +} + +func (mn *metricName) Add(batch []*metadbmodel.PrometheusMetricName) { + mn.mu.Lock() + defer mn.mu.Unlock() + for _, item := range batch { + mn.pendingNameToID[item.Name] = item.ID + } +} + +func (mn *metricName) AddFromGrpc(batch []*controller.PrometheusMetricName) { + mn.mu.Lock() + defer mn.mu.Unlock() + for _, item := range batch { + mn.pendingNameToID[item.GetName()] = int(item.GetId()) + } +} + func (mn *metricName) GetNameToID() map[string]int { active := mn.getActive() + mn.mu.RLock() snapshot := make(map[string]int, len(active)+len(mn.pendingNameToID)) for k, v := range active { @@ -74,30 +113,37 @@ func (mn *metricName) GetNameToID() map[string]int { snapshot[k] = v } mn.mu.RUnlock() + return snapshot } -func (mn *metricName) Add(batch []*controller.PrometheusMetricName) { - mn.mu.Lock() - defer mn.mu.Unlock() - for _, item := range batch { - mn.pendingNameToID[item.GetName()] = int(item.GetId()) +func (mn *metricName) refresh(args ...interface{}) error { + var count int64 + if err := mn.org.DB.Model(&metadbmodel.PrometheusMetricName{}).Count(&count).Error; err != nil { + return err } -} -func (mn *metricName) refresh(args ...interface{}) error { - items, err := mn.load() + rows, err := mn.org.DB.Model(&metadbmodel.PrometheusMetricName{}).Select("id", "name").Rows() if err != nil { return err } - mn.processLoadedData(items) - return nil -} - -func (mn *metricName) processLoadedData(items []*metadbmodel.PrometheusMetricName) { - newActive := make(map[string]int, len(items)) - for _, item := range items { - newActive[item.Name] = item.ID + defer rows.Close() + + newActive := make(map[string]int, count) + newActiveR := make(map[int]string, count) + for rows.Next() { + var id int + var name string + if scanErr := rows.Scan(&id, &name); scanErr != nil { + log.Errorf("stream scan prometheus_metric_name interrupted: %v", scanErr, mn.org.LogPrefix) + return scanErr + } + newActive[name] = id + newActiveR[id] = name + } + if err := rows.Err(); err != nil { + log.Errorf("stream read prometheus_metric_name error: %v", err, mn.org.LogPrefix) + return err } mn.mu.Lock() @@ -108,11 +154,8 @@ func (mn *metricName) processLoadedData(items []*metadbmodel.PrometheusMetricNam for k, v := range pending { newActive[k] = v } - mn.replaceActive(newActive) -} -func (mn *metricName) load() ([]*metadbmodel.PrometheusMetricName, error) { - var metricNames []*metadbmodel.PrometheusMetricName - err := mn.org.DB.Select("id", "name").Find(&metricNames).Error - return metricNames, err + mn.activeR.Store(newActiveR) + mn.replaceActive(newActive) + return nil } diff --git a/server/controller/prometheus/cache/metric_target.go b/server/controller/prometheus/cache/metric_target.go index 564d762fa4c..f0645bed1f9 100644 --- a/server/controller/prometheus/cache/metric_target.go +++ b/server/controller/prometheus/cache/metric_target.go @@ -21,7 +21,6 @@ import ( mapset "github.com/deckarep/golang-set/v2" - "github.com/deepflowio/deepflow/message/controller" metadbmodel "github.com/deepflowio/deepflow/server/controller/db/metadb/model" "github.com/deepflowio/deepflow/server/controller/prometheus/common" ) @@ -111,10 +110,10 @@ func (mt *metricTarget) GetMetricIDsByTargetID(id int) []uint32 { return mt.targetIDToMetricIDs[id] } -func (mt *metricTarget) Add(batch []*controller.PrometheusMetricTarget) { +func (mt *metricTarget) Add(batch []*metadbmodel.PrometheusMetricTarget) { for _, item := range batch { - mt.metricTargetKeys.Add(NewMetricTargetKey(item.GetMetricName(), int(item.GetTargetId()))) - mt.metricNameToTargetIDs.Append(item.GetMetricName(), int(item.GetTargetId())) + mt.metricTargetKeys.Add(NewMetricTargetKey(item.MetricName, item.TargetID)) + mt.metricNameToTargetIDs.Append(item.MetricName, item.TargetID) } } @@ -134,24 +133,34 @@ func (mt *metricTarget) GetMetricNameToTargetIDs() map[string]mapset.Set[int] { } func (mt *metricTarget) refresh(args ...interface{}) error { - mts, err := mt.load() + rows, err := mt.org.DB.Model(&metadbmodel.PrometheusMetricTarget{}).Select("metric_name", "target_id").Rows() if err != nil { return err } + defer rows.Close() + + // Clear existing data + mt.metricTargetKeys = mapset.NewSet[MetricTargetKey]() + mt.metricNameToTargetIDs = newMetricNameToTargetIDs() targetIDToMetricIDs := make(map[int][]uint32) - for _, item := range mts { - mt.metricTargetKeys.Add(NewMetricTargetKey(item.MetricName, item.TargetID)) - mt.metricNameToTargetIDs.Append(item.MetricName, item.TargetID) - if mni, ok := mt.metricNameCache.GetIDByName(item.MetricName); ok { - targetIDToMetricIDs[item.TargetID] = append(targetIDToMetricIDs[item.TargetID], uint32(mni)) + + for rows.Next() { + var metricName string + var targetID int + if scanErr := rows.Scan(&metricName, &targetID); scanErr != nil { + log.Errorf("stream scan prometheus_metric_target interrupted: %v", scanErr, mt.org.LogPrefix) + return scanErr } + mt.metricTargetKeys.Add(NewMetricTargetKey(metricName, targetID)) + mt.metricNameToTargetIDs.Append(metricName, targetID) + if mni, ok := mt.metricNameCache.GetIDByName(metricName); ok { + targetIDToMetricIDs[targetID] = append(targetIDToMetricIDs[targetID], uint32(mni)) + } + } + if err := rows.Err(); err != nil { + log.Errorf("stream read prometheus_metric_target error: %v", err, mt.org.LogPrefix) + return err } mt.targetIDToMetricIDs = targetIDToMetricIDs return nil } - -func (mt *metricTarget) load() ([]*metadbmodel.PrometheusMetricTarget, error) { - var metricTargets []*metadbmodel.PrometheusMetricTarget - err := mt.org.DB.Find(&metricTargets).Error - return metricTargets, err -} diff --git a/server/controller/prometheus/cache/org.go b/server/controller/prometheus/cache/org.go index 1eb069f1574..56cf58c1cab 100644 --- a/server/controller/prometheus/cache/org.go +++ b/server/controller/prometheus/cache/org.go @@ -112,7 +112,8 @@ func (c *ORGCaches) NewCacheAndInitIfNotExist(orgID int) (*Cache, error) { if err != nil { return nil, err } - err = cache.Refresh() + cache.refreshInterval = c.refreshInterval + err = cache.Refresh(true) if err != nil { return nil, err } @@ -141,7 +142,7 @@ func (c *ORGCaches) refresh() error { return err } for iter := range c.orgIDToCache.IterBuffered() { - iter.Val.Refresh() + iter.Val.Refresh(false) } return nil } diff --git a/server/controller/prometheus/cache/target.go b/server/controller/prometheus/cache/target.go index 77942a3ec64..0fd47a04ca7 100644 --- a/server/controller/prometheus/cache/target.go +++ b/server/controller/prometheus/cache/target.go @@ -177,28 +177,64 @@ func (t *target) GetTargetIDToLabelNames() map[int]mapset.Set[string] { } func (t *target) refresh(args ...interface{}) error { - recorderTargets, selfTargets, err := t.load() + keyToTargetID := make(map[TargetKey]int) + targetIDToLabelNames := make(map[int]mapset.Set[string]) + + // Load recorder targets + rows1, err := t.org.DB.Model(&metadbmodel.PrometheusTarget{}). + Where("create_method = ?", ctrlrcommon.PROMETHEUS_TARGET_CREATE_METHOD_RECORDER). + Select("id, instance, job, vpc_id, pod_cluster_id, other_labels").Rows() if err != nil { return err } - - keyToTargetID := make(map[TargetKey]int) - targetIDToLabelNames := make(map[int]mapset.Set[string]) - for _, item := range recorderTargets { - keyToTargetID[NewTargetKey(item.Instance, item.Job, item.VPCID, item.PodClusterID)] = item.ID - targetIDToLabelNames[item.ID] = mapset.NewSet(t.getTargetLabelNames(item)...) + defer rows1.Close() + + for rows1.Next() { + var id, vpcID, podClusterID int + var instance, job, otherLabels string + if scanErr := rows1.Scan(&id, &instance, &job, &vpcID, &podClusterID, &otherLabels); scanErr != nil { + log.Errorf("stream scan prometheus_target (recorder) interrupted: %v", scanErr, t.org.LogPrefix) + return scanErr + } + tk := NewTargetKey(instance, job, vpcID, podClusterID) + keyToTargetID[tk] = id + targetIDToLabelNames[id] = mapset.NewSet(t.parseTargetLabelNames(otherLabels)...) + } + if err := rows1.Err(); err != nil { + log.Errorf("stream read prometheus_target (recorder) error: %v", err, t.org.LogPrefix) + return err } + // Load self targets and handle duplicates dupKeyIDs := make([]int, 0) - for _, item := range selfTargets { - tk := NewTargetKey(item.Instance, item.Job, item.VPCID, item.PodClusterID) + rows2, err := t.org.DB.Model(&metadbmodel.PrometheusTarget{}). + Where("create_method = ?", ctrlrcommon.PROMETHEUS_TARGET_CREATE_METHOD_PROMETHEUS). + Select("id, instance, job, vpc_id, pod_cluster_id, other_labels").Rows() + if err != nil { + return err + } + defer rows2.Close() + + for rows2.Next() { + var id, vpcID, podClusterID int + var instance, job, otherLabels string + if scanErr := rows2.Scan(&id, &instance, &job, &vpcID, &podClusterID, &otherLabels); scanErr != nil { + log.Errorf("stream scan prometheus_target (self) interrupted: %v", scanErr, t.org.LogPrefix) + return scanErr + } + tk := NewTargetKey(instance, job, vpcID, podClusterID) if _, ok := keyToTargetID[tk]; ok { - dupKeyIDs = append(dupKeyIDs, item.ID) + dupKeyIDs = append(dupKeyIDs, id) continue } - keyToTargetID[tk] = item.ID - targetIDToLabelNames[item.ID] = mapset.NewSet(t.getTargetLabelNames(item)...) + keyToTargetID[tk] = id + targetIDToLabelNames[id] = mapset.NewSet(t.parseTargetLabelNames(otherLabels)...) + } + if err := rows2.Err(); err != nil { + log.Errorf("stream read prometheus_target (self) error: %v", err, t.org.LogPrefix) + return err } + if len(dupKeyIDs) != 0 { t.dedup(dupKeyIDs) } @@ -208,9 +244,9 @@ func (t *target) refresh(args ...interface{}) error { return nil } -func (t *target) getTargetLabelNames(tg *metadbmodel.PrometheusTarget) []string { +func (t *target) parseTargetLabelNames(otherLabels string) []string { lns := []string{common.TargetLabelInstance, common.TargetLabelJob} - for _, l := range strings.Split(tg.OtherLabels, labelJoiner) { + for _, l := range strings.Split(otherLabels, labelJoiner) { if l == "" { continue } @@ -224,15 +260,6 @@ func (t *target) getTargetLabelNames(tg *metadbmodel.PrometheusTarget) []string return lns } -func (t *target) load() (recorderTargets, selfTargets []*metadbmodel.PrometheusTarget, err error) { - err = t.org.DB.Where(&metadbmodel.PrometheusTarget{CreateMethod: ctrlrcommon.PROMETHEUS_TARGET_CREATE_METHOD_RECORDER}).Find(&recorderTargets).Error - if err != nil { - return - } - err = t.org.DB.Where(&metadbmodel.PrometheusTarget{CreateMethod: ctrlrcommon.PROMETHEUS_TARGET_CREATE_METHOD_PROMETHEUS}).Find(&selfTargets).Error - return -} - func (t *target) dedup(ids []int) error { return t.org.DB.Where("id in (?)", ids).Delete(&metadbmodel.PrometheusTarget{}).Error } diff --git a/server/controller/prometheus/encoder/encoder.go b/server/controller/prometheus/encoder/encoder.go index 928730020d2..9d46f8f4999 100644 --- a/server/controller/prometheus/encoder/encoder.go +++ b/server/controller/prometheus/encoder/encoder.go @@ -51,7 +51,7 @@ func newEncoder(cfg prometheuscfg.Config, orgID int) (*Encoder, error) { e.metricName = newMetricName(org, cfg.ResourceMaxID1) e.labelName = newLabelName(org, cfg.ResourceMaxID0) e.labelValue = newLabelValue(org) - e.label = newLabel(org, e.labelName, e.labelValue) + e.label = newLabel(org) e.LabelLayout = newLabelLayout(org, cfg) return e, nil } diff --git a/server/controller/prometheus/encoder/label.go b/server/controller/prometheus/encoder/label.go index cc8cf8834e3..d2d835fb3bd 100644 --- a/server/controller/prometheus/encoder/label.go +++ b/server/controller/prometheus/encoder/label.go @@ -31,100 +31,20 @@ type label struct { org *common.ORG lock sync.Mutex resourceType string - labelKeyToID map[cache.IDLabelKey]int - - labelName *labelName - labelValue *labelValue - - isRefreshing bool - pendingKeys map[cache.IDLabelKey]int + cache cache.PrometheusCache } -func newLabel(org *common.ORG, ln *labelName, lv *labelValue) *label { +func newLabel(org *common.ORG) *label { + c, _ := cache.GetCache(org.ID) return &label{ org: org, resourceType: "label", - labelKeyToID: make(map[cache.IDLabelKey]int), - labelName: ln, - labelValue: lv, - } -} - -func (l *label) store(item *metadbmodel.PrometheusLabel) { - nameID, ok1 := l.labelName.getID(item.Name) - valueID, ok2 := l.labelValue.getID(item.Value) - if !ok1 || !ok2 { - return + cache: c, } - key := cache.IDLabelKey{NameID: nameID, ValueID: valueID} - l.labelKeyToID[key] = item.ID - - if l.isRefreshing { - l.pendingKeys[key] = item.ID - } -} - -func (l *label) getID(key cache.IDLabelKey) (int, bool) { - id, ok := l.labelKeyToID[key] - return id, ok -} - -func (l *label) MarkRefresh() { - l.lock.Lock() - defer l.lock.Unlock() - l.isRefreshing = true - l.pendingKeys = make(map[cache.IDLabelKey]int) -} - -func (l *label) MarkRefreshDone() { - l.lock.Lock() - defer l.lock.Unlock() - l.isRefreshing = false - l.pendingKeys = nil } func (l *label) refresh(args ...interface{}) error { - l.MarkRefresh() - defer l.MarkRefreshDone() - - var count int64 - if err := l.org.DB.Model(&metadbmodel.PrometheusLabel{}).Count(&count).Error; err != nil { - return err - } - - rows, err := l.org.DB.Model(&metadbmodel.PrometheusLabel{}).Select("id", "name", "value").Rows() - if err != nil { - return err - } - defer rows.Close() - - newMap := make(map[cache.IDLabelKey]int, count) - for rows.Next() { - var id int - var name, value string - if scanErr := rows.Scan(&id, &name, &value); scanErr != nil { - log.Errorf("db stream scan %s interrupted: %v", l.resourceType, scanErr, l.org.LogPrefix) - return scanErr - } - nameID, ok1 := l.labelName.getID(name) - valueID, ok2 := l.labelValue.getID(value) - if ok1 && ok2 { - newMap[cache.IDLabelKey{NameID: nameID, ValueID: valueID}] = id - } - } - if err := rows.Err(); err != nil { - log.Errorf("db stream %s error: %v", l.resourceType, err, l.org.LogPrefix) - return err - } - - l.lock.Lock() - for k, v := range l.pendingKeys { - newMap[k] = v - } - l.labelKeyToID = newMap - l.lock.Unlock() - - return nil + return l.cache.Refresh(false) } func (l *label) encode(toAdd []*controller.PrometheusLabelRequest) ([]*controller.PrometheusLabel, error) { @@ -136,18 +56,15 @@ func (l *label) encode(toAdd []*controller.PrometheusLabelRequest) ([]*controlle for _, item := range toAdd { n := item.GetName() v := item.GetValue() - nameID, ok1 := l.labelName.getID(n) - valueID, ok2 := l.labelValue.getID(v) - if ok1 && ok2 { - if id, ok := l.getID(cache.IDLabelKey{NameID: nameID, ValueID: valueID}); ok { - resp = append(resp, &controller.PrometheusLabel{ - Name: &n, - Value: &v, - Id: proto.Uint32(uint32(id)), - }) - continue - } + if id, ok := l.cache.GetLabelID(n, v); ok { + resp = append(resp, &controller.PrometheusLabel{ + Id: proto.Uint32(uint32(id)), + Name: proto.String(n), + Value: proto.String(v), + }) + continue } + dbToAdd = append(dbToAdd, &metadbmodel.PrometheusLabel{ Name: n, Value: v, @@ -159,14 +76,13 @@ func (l *label) encode(toAdd []*controller.PrometheusLabelRequest) ([]*controlle log.Errorf("add %s error: %s", l.resourceType, err.Error(), l.org.LogPrefix) return nil, err } - for _, item := range dbToAdd { - l.store(item) + l.cache.AddLabels(dbToAdd) + for i, item := range dbToAdd { resp = append(resp, &controller.PrometheusLabel{ - Name: &item.Name, - Value: &item.Value, - Id: proto.Uint32(uint32(item.ID)), + Id: proto.Uint32(uint32(dbToAdd[i].ID)), + Name: proto.String(item.Name), + Value: proto.String(item.Value), }) - } return resp, nil } diff --git a/server/controller/prometheus/encoder/label_name.go b/server/controller/prometheus/encoder/label_name.go index 827a760e57b..421002143c5 100644 --- a/server/controller/prometheus/encoder/label_name.go +++ b/server/controller/prometheus/encoder/label_name.go @@ -19,12 +19,12 @@ package encoder import ( "sync" - "github.com/cornelk/hashmap" mapset "github.com/deckarep/golang-set/v2" "google.golang.org/protobuf/proto" "github.com/deepflowio/deepflow/message/controller" metadbmodel "github.com/deepflowio/deepflow/server/controller/db/metadb/model" + "github.com/deepflowio/deepflow/server/controller/prometheus/cache" "github.com/deepflowio/deepflow/server/controller/prometheus/common" ) @@ -33,23 +33,24 @@ type labelName struct { org *common.ORG lock sync.Mutex resourceType string - strToID *hashmap.Map[string, int] + cache cache.PrometheusCache ascIDAllocator } func newLabelName(org *common.ORG, max int) *labelName { + c, _ := cache.GetCache(org.ID) ln := &labelName{ org: org, resourceType: "label_name", - strToID: hashmap.New[string, int](), + cache: c, } ln.ascIDAllocator = newAscIDAllocator(org, ln.resourceType, 1, max) ln.rawDataProvider = ln return ln } -func (mn *labelName) getID(str string) (int, bool) { - return mn.strToID.Get(str) +func (ln *labelName) getID(str string) (int, bool) { + return ln.cache.GetLabelNameID(str) } func (ln *labelName) refresh(args ...interface{}) error { @@ -67,7 +68,7 @@ func (ln *labelName) encode(strs []string) ([]*controller.PrometheusLabelName, e var dbToAdd []*metadbmodel.PrometheusLabelName for i := range strs { str := strs[i] - if id, ok := ln.strToID.Get(str); ok { + if id, ok := ln.cache.GetLabelNameID(str); ok { resp = append(resp, &controller.PrometheusLabelName{Name: &str, Id: proto.Uint32(uint32(id))}) continue } @@ -88,10 +89,10 @@ func (ln *labelName) encode(strs []string) ([]*controller.PrometheusLabelName, e log.Errorf("add %s error: %s", ln.resourceType, err.Error(), ln.org.LogPrefix) return nil, err } + ln.cache.AddLabelNames(dbToAdd) for i := range dbToAdd { id := dbToAdd[i].ID str := dbToAdd[i].Name - ln.strToID.Set(str, id) resp = append(resp, &controller.PrometheusLabelName{Name: &str, Id: proto.Uint32(uint32(id))}) } return resp, nil @@ -107,7 +108,6 @@ func (ln *labelName) load() (ids mapset.Set[int], err error) { inUseIDsSet := mapset.NewSet[int]() for _, item := range items { inUseIDsSet.Add(item.ID) - ln.strToID.Set(item.Name, item.ID) } return inUseIDsSet, nil } diff --git a/server/controller/prometheus/encoder/label_value.go b/server/controller/prometheus/encoder/label_value.go index fdc757419cf..fcf81103889 100644 --- a/server/controller/prometheus/encoder/label_value.go +++ b/server/controller/prometheus/encoder/label_value.go @@ -23,6 +23,7 @@ import ( "github.com/deepflowio/deepflow/message/controller" metadbmodel "github.com/deepflowio/deepflow/server/controller/db/metadb/model" + "github.com/deepflowio/deepflow/server/controller/prometheus/cache" "github.com/deepflowio/deepflow/server/controller/prometheus/common" ) @@ -30,74 +31,20 @@ type labelValue struct { org *common.ORG lock sync.Mutex resourceType string - strToID map[string]int - - isRefreshing bool - pendingKeys map[string]int + cache cache.PrometheusCache } func newLabelValue(org *common.ORG) *labelValue { + c, _ := cache.GetCache(org.ID) return &labelValue{ org: org, resourceType: "label_value", - strToID: make(map[string]int), + cache: c, } } -func (lv *labelValue) MarkRefresh() { - lv.lock.Lock() - defer lv.lock.Unlock() - lv.isRefreshing = true - lv.pendingKeys = make(map[string]int) -} - -func (lv *labelValue) MarkRefreshDone() { - lv.lock.Lock() - defer lv.lock.Unlock() - lv.isRefreshing = false - lv.pendingKeys = nil -} - func (lv *labelValue) refresh(args ...interface{}) error { - lv.MarkRefresh() - defer lv.MarkRefreshDone() - - var count int64 - if err := lv.org.DB.Model(&metadbmodel.PrometheusLabelValue{}).Count(&count).Error; err != nil { - log.Errorf("db query %s failed: %v", lv.resourceType, err, lv.org.LogPrefix) - return err - } - - rows, err := lv.org.DB.Model(&metadbmodel.PrometheusLabelValue{}).Select("id", "value").Rows() - if err != nil { - log.Errorf("db query %s failed: %v", lv.resourceType, err, lv.org.LogPrefix) - return err - } - defer rows.Close() - - newMap := make(map[string]int, count) - for rows.Next() { - var id int - var value string - if scanErr := rows.Scan(&id, &value); scanErr != nil { - log.Errorf("db stream scan %s interrupted: %v", lv.resourceType, scanErr, lv.org.LogPrefix) - return scanErr - } - newMap[value] = id - } - if err := rows.Err(); err != nil { - log.Errorf("db stream %s error: %v", lv.resourceType, err, lv.org.LogPrefix) - return err - } - - lv.lock.Lock() - for k, v := range lv.pendingKeys { - newMap[k] = v - } - lv.strToID = newMap - lv.lock.Unlock() - - return nil + return lv.cache.Refresh(false) } func (lv *labelValue) encode(strs []string) ([]*controller.PrometheusLabelValue, error) { @@ -105,10 +52,10 @@ func (lv *labelValue) encode(strs []string) ([]*controller.PrometheusLabelValue, defer lv.lock.Unlock() resp := make([]*controller.PrometheusLabelValue, 0) - dbToAdd := make([]*metadbmodel.PrometheusLabelValue, 0) + var dbToAdd []*metadbmodel.PrometheusLabelValue for i := range strs { str := strs[i] - if id, ok := lv.getID(str); ok { + if id, ok := lv.cache.GetLabelValueID(str); ok { resp = append(resp, &controller.PrometheusLabelValue{Value: &str, Id: proto.Uint32(uint32(id))}) continue } @@ -123,22 +70,9 @@ func (lv *labelValue) encode(strs []string) ([]*controller.PrometheusLabelValue, log.Errorf("add %s error: %s", lv.resourceType, err.Error(), lv.org.LogPrefix) return nil, err } + lv.cache.AddLabelValues(dbToAdd) for i := range dbToAdd { - lv.store(dbToAdd[i]) resp = append(resp, &controller.PrometheusLabelValue{Value: &dbToAdd[i].Value, Id: proto.Uint32(uint32(dbToAdd[i].ID))}) } return resp, nil } - -func (lv *labelValue) getID(str string) (int, bool) { - id, ok := lv.strToID[str] - return id, ok -} - -func (lv *labelValue) store(item *metadbmodel.PrometheusLabelValue) { - lv.strToID[item.Value] = item.ID - - if lv.isRefreshing { - lv.pendingKeys[item.Value] = item.ID - } -} diff --git a/server/controller/prometheus/encoder/metric_label_name.go b/server/controller/prometheus/encoder/metric_label_name.go index e0489d301ae..fa3c87f2b18 100644 --- a/server/controller/prometheus/encoder/metric_label_name.go +++ b/server/controller/prometheus/encoder/metric_label_name.go @@ -24,6 +24,7 @@ import ( "github.com/deepflowio/deepflow/message/controller" metadbmodel "github.com/deepflowio/deepflow/server/controller/db/metadb/model" + "github.com/deepflowio/deepflow/server/controller/prometheus/cache" "github.com/deepflowio/deepflow/server/controller/prometheus/common" ) @@ -43,25 +44,23 @@ type metricLabelName struct { org *common.ORG lock sync.Mutex resourceType string - - metricNameEncoder *metricName - labelNameEncoder *labelName + cache cache.PrometheusCache keys mapset.Set[metricLabelNameKey] } -func newMetricLabelName(org *common.ORG, mn *metricName, l *labelName) *metricLabelName { +func newMetricLabelName(org *common.ORG) *metricLabelName { + c, _ := cache.GetCache(org.ID) return &metricLabelName{ - org: org, - resourceType: "metric_label", - metricNameEncoder: mn, - labelNameEncoder: l, - keys: mapset.NewSet[metricLabelNameKey](), + org: org, + resourceType: "metric_label", + cache: c, + keys: mapset.NewSet[metricLabelNameKey](), } } func (ml *metricLabelName) store(item *metadbmodel.PrometheusMetricLabelName) { - if mni, ok := ml.metricNameEncoder.getID(item.MetricName); ok { + if mni, ok := ml.cache.GetMetricNameID(item.MetricName); ok { ml.keys.Add(newMetricLabelNameKey(mni, item.LabelNameID)) } } @@ -87,7 +86,7 @@ func (ml *metricLabelName) encode(rMLs []*controller.PrometheusMetricLabelNameRe respToAdd := make([]*controller.PrometheusMetricLabelName, 0) for _, rML := range rMLs { mn := rML.GetMetricName() - mni, ok := ml.metricNameEncoder.getID(mn) + mni, ok := ml.cache.GetMetricNameID(mn) if !ok { log.Warningf("%s metric_name: %s id not found", ml.resourceType, mn, ml.org.LogPrefix) continue @@ -95,7 +94,7 @@ func (ml *metricLabelName) encode(rMLs []*controller.PrometheusMetricLabelNameRe lis := make([]uint32, 0) lisToAdd := make([]uint32, 0) for _, ln := range rML.GetLabelNames() { - lni, ok := ml.labelNameEncoder.getID(ln) + lni, ok := ml.cache.GetLabelNameID(ln) if !ok { log.Warningf("%s label (name: %s) id not found", ml.resourceType, ln, ml.org.LogPrefix) continue diff --git a/server/controller/prometheus/encoder/metric_name.go b/server/controller/prometheus/encoder/metric_name.go index 290f15862f8..f7ff2a3e206 100644 --- a/server/controller/prometheus/encoder/metric_name.go +++ b/server/controller/prometheus/encoder/metric_name.go @@ -19,29 +19,29 @@ package encoder import ( "sync" - "github.com/cornelk/hashmap" mapset "github.com/deckarep/golang-set/v2" "google.golang.org/protobuf/proto" "github.com/deepflowio/deepflow/message/controller" metadbmodel "github.com/deepflowio/deepflow/server/controller/db/metadb/model" + "github.com/deepflowio/deepflow/server/controller/prometheus/cache" "github.com/deepflowio/deepflow/server/controller/prometheus/common" ) -// 缓存资源可用于分配的ID,提供ID的刷新、分配、回收接口 type metricName struct { org *common.ORG lock sync.Mutex resourceType string - strToID *hashmap.Map[string, int] + cache cache.PrometheusCache ascIDAllocator } func newMetricName(org *common.ORG, max int) *metricName { + c, _ := cache.GetCache(org.ID) mn := &metricName{ org: org, resourceType: "metric_name", - strToID: hashmap.New[string, int](), + cache: c, } mn.ascIDAllocator = newAscIDAllocator(org, mn.resourceType, 1, max) mn.rawDataProvider = mn @@ -49,7 +49,7 @@ func newMetricName(org *common.ORG, max int) *metricName { } func (mn *metricName) getID(str string) (int, bool) { - return mn.strToID.Get(str) + return mn.cache.GetMetricNameID(str) } func (mn *metricName) refresh(args ...interface{}) error { @@ -67,7 +67,7 @@ func (mn *metricName) encode(strs []string) ([]*controller.PrometheusMetricName, dbToAdd := make([]*metadbmodel.PrometheusMetricName, 0) for i := range strs { str := strs[i] - if id, ok := mn.strToID.Get(str); ok { + if id, ok := mn.cache.GetMetricNameID(str); ok { resp = append(resp, &controller.PrometheusMetricName{Name: &str, Id: proto.Uint32(uint32(id))}) continue } @@ -88,12 +88,14 @@ func (mn *metricName) encode(strs []string) ([]*controller.PrometheusMetricName, log.Errorf("add %s error: %s", mn.resourceType, err.Error(), mn.org.LogPrefix) return nil, err } + + // Update cache using model structs directly to avoid redundant protobuf construction for i := range dbToAdd { id := dbToAdd[i].ID str := dbToAdd[i].Name - mn.strToID.Set(str, id) resp = append(resp, &controller.PrometheusMetricName{Name: &str, Id: proto.Uint32(uint32(id))}) } + mn.cache.AddMetricNames(dbToAdd) return resp, nil } @@ -107,7 +109,7 @@ func (mn *metricName) load() (ids mapset.Set[int], err error) { inUseIDsSet := mapset.NewSet[int]() for _, item := range items { inUseIDsSet.Add(item.ID) - mn.strToID.Set(item.Name, item.ID) + // Cache is refreshed separately, do not set here } return inUseIDsSet, nil } diff --git a/server/controller/prometheus/label.go b/server/controller/prometheus/label.go index 14a9630b7d5..41c5a4308e2 100644 --- a/server/controller/prometheus/label.go +++ b/server/controller/prometheus/label.go @@ -87,7 +87,7 @@ func (s *ORGLabelSynchronizers) assembleFully(version uint32) (*trident.Promethe func (s *ORGLabelSynchronizers) goAssembleFully(args ...interface{}) error { orgIDToResp := args[0].(cmap.ConcurrentMap[int, *trident.PrometheusLabelResponse]) - cache := args[1].(*cache.Cache) + cache := args[1].(cache.PrometheusCache) synchronizer := newLabelSynchronizer(cache) resp, err := synchronizer.assembleFully() if err != nil { @@ -149,7 +149,7 @@ type LabelSynchronizer struct { statsdCounter *statsd.PrometheusLabelIDsCounter } -func newLabelSynchronizer(c *cache.Cache) *LabelSynchronizer { +func newLabelSynchronizer(c cache.PrometheusCache) *LabelSynchronizer { return &LabelSynchronizer{ Synchronizer: newSynchronizer(c), grpcurl: new(GRPCURL), @@ -285,7 +285,7 @@ func (s *LabelSynchronizer) assembleMetricLabel(mls []*trident.MetricLabelReques var rls []*trident.LabelResponse mn := ml.GetMetricName() - mni, ok := s.cache.MetricName.GetIDByName(mn) + mni, ok := s.cache.GetMetricNameID(mn) if !ok { nonMetricNameToCount[mn]++ continue @@ -294,17 +294,17 @@ func (s *LabelSynchronizer) assembleMetricLabel(mls []*trident.MetricLabelReques for _, l := range ml.GetLabels() { ln := l.GetName() lv := l.GetValue() - ni, ok := s.cache.LabelName.GetIDByName(ln) + ni, ok := s.cache.GetLabelNameID(ln) if !ok { nonLabelNames.Add(ln) continue } - vi, ok := s.cache.LabelValue.GetIDByValue(lv) + vi, ok := s.cache.GetLabelValueID(lv) if !ok { nonLabelValues.Add(lv) continue } - id, _ := s.cache.MetricAndAPPLabelLayout.GetIndexByKey(cache.NewLayoutKey(mn, ln)) + id, _ := s.cache.GetMetricAndAPPLabelLayoutIndex(cache.NewLayoutKey(mn, ln)) rls = append(rls, &trident.LabelResponse{ Name: &ln, NameId: proto.Uint32(uint32(ni)), @@ -339,36 +339,36 @@ func (s *LabelSynchronizer) assembleMetricLabel(mls []*trident.MetricLabelReques func (s *LabelSynchronizer) addMetricNameCache(arg ...interface{}) error { mns := arg[0].([]*controller.PrometheusMetricName) - s.cache.MetricName.Add(mns) + s.cache.AddMetricNamesFromGrpc(mns) return nil } func (s *LabelSynchronizer) addLabelNameCache(arg ...interface{}) error { lns := arg[0].([]*controller.PrometheusLabelName) - s.cache.LabelName.Add(lns) + s.cache.AddLabelNamesFromGrpc(lns) return nil } func (s *LabelSynchronizer) addLabelValueCache(arg ...interface{}) error { lvs := arg[0].([]*controller.PrometheusLabelValue) - s.cache.LabelValue.Add(lvs) + s.cache.AddLabelValuesFromGrpc(lvs) return nil } func (s *LabelSynchronizer) addMetricAPPLabelLayoutCache(arg ...interface{}) error { ls := arg[0].([]*controller.PrometheusMetricAPPLabelLayout) - s.cache.MetricAndAPPLabelLayout.Add(ls) + s.cache.AddMetricAndAPPLabelLayoutsFromGrpc(ls) return nil } func (s *LabelSynchronizer) addLabelCache(arg ...interface{}) error { ls := arg[0].([]*controller.PrometheusLabel) - s.cache.Label.Add(ls) + s.cache.AddLabelsFromGrpc(ls) return nil } type dataToEncode struct { - cache *cache.Cache + cache cache.PrometheusCache metricNames mapset.Set[string] labelNames mapset.Set[string] @@ -378,7 +378,7 @@ type dataToEncode struct { metricNameToLabelNames map[string]mapset.Set[string] } -func newDataToEncode(c *cache.Cache) *dataToEncode { +func newDataToEncode(c cache.PrometheusCache) *dataToEncode { return &dataToEncode{ cache: c, @@ -397,26 +397,26 @@ func (d *dataToEncode) cardinality() int { } func (d *dataToEncode) tryAppendMetricName(name string) { - if _, ok := d.cache.MetricName.GetIDByName(name); !ok { + if _, ok := d.cache.GetMetricNameID(name); !ok { d.metricNames.Add(name) } } func (d *dataToEncode) tryAppendLabelName(name string) { - if _, ok := d.cache.LabelName.GetIDByName(name); !ok { + if _, ok := d.cache.GetLabelNameID(name); !ok { d.labelNames.Add(name) } } func (d *dataToEncode) tryAppendLabelValue(value string) { - if _, ok := d.cache.LabelValue.GetIDByValue(value); !ok { + if _, ok := d.cache.GetLabelValueID(value); !ok { d.labelValues.Add(value) } } func (d *dataToEncode) tryAppendMetricAPPLabelLayout(metricName, labelName string) { k := cache.NewLayoutKey(metricName, labelName) - if _, ok := d.cache.MetricAndAPPLabelLayout.GetIndexByKey(k); !ok { + if _, ok := d.cache.GetMetricAndAPPLabelLayoutIndex(k); !ok { d.metricAPPLabelLayouts.Add(k) } } @@ -426,7 +426,7 @@ func (d *dataToEncode) appendMetricAPPLabelLayout(metricName, labelName string) } func (d *dataToEncode) tryAppendLabel(name, value string) { - if _, ok := d.cache.Label.GetIDByKey(cache.NewLabelKey(name, value)); !ok { - d.labels.Add(cache.NewLabelKey(name, value)) + if _, ok := d.cache.GetLabelID(name, value); ok { + return } } diff --git a/server/controller/prometheus/synchronizer.go b/server/controller/prometheus/synchronizer.go index 6a8b3b5ba38..c6160c5ae47 100644 --- a/server/controller/prometheus/synchronizer.go +++ b/server/controller/prometheus/synchronizer.go @@ -58,11 +58,11 @@ type counter struct { type Synchronizer struct { org *common.ORG - cache *cache.Cache + cache cache.PrometheusCache counter *counter } -func newSynchronizer(c *cache.Cache) Synchronizer { +func newSynchronizer(c cache.PrometheusCache) Synchronizer { return Synchronizer{ org: c.GetORG(), cache: c, @@ -73,8 +73,8 @@ func newSynchronizer(c *cache.Cache) Synchronizer { func (s *Synchronizer) assembleMetricLabelFully() ([]*trident.MetricLabelResponse, error) { nonLabelNames := mapset.NewSet[string]() metricNameToAPPLabelNames := make(map[string][]*trident.LabelResponse, 0) - for k, v := range s.cache.MetricAndAPPLabelLayout.GetLayoutKeyToIndex() { - labelNameID, ok := s.cache.LabelName.GetIDByName(k.LabelName) + for k, v := range s.cache.GetMetricAndAPPLabelLayout() { + labelNameID, ok := s.cache.GetLabelNameID(k.LabelName) if !ok { nonLabelNames.Add(k.LabelName) continue @@ -89,7 +89,7 @@ func (s *Synchronizer) assembleMetricLabelFully() ([]*trident.MetricLabelRespons } mLabels := make([]*trident.MetricLabelResponse, 0) - for k, v := range s.cache.MetricName.GetNameToID() { + for k, v := range s.cache.GetMetricNameToID() { metricName := k metricID := v mLabels = append( @@ -110,32 +110,32 @@ func (s *Synchronizer) assembleMetricLabelFully() ([]*trident.MetricLabelRespons func (s *Synchronizer) assembleLabelFully() ([]*trident.LabelResponse, error) { ls := make([]*trident.LabelResponse, 0) - nonLabelNames := mapset.NewSet[string]() - nonLabelValues := mapset.NewSet[string]() - for k := range s.cache.Label.GetKeyToID() { - ni, ok := s.cache.LabelName.GetIDByName(k.Name) - if !ok { - nonLabelNames.Add(k.Name) + nonNameIDs := mapset.NewSet[string]() + nonValueIDs := mapset.NewSet[string]() + for k := range s.cache.GetLabelKeyToID() { + nameID, okN := s.cache.GetLabelNameID(k.Name) + if !okN { + nonNameIDs.Add(k.Name) continue } - vi, ok := s.cache.LabelValue.GetIDByValue(k.Value) - if !ok { - nonLabelValues.Add(k.Value) + valueID, okV := s.cache.GetLabelValueID(k.Value) + if !okV { + nonValueIDs.Add(k.Value) continue } ls = append(ls, &trident.LabelResponse{ Name: &k.Name, Value: &k.Value, - NameId: proto.Uint32(uint32(ni)), - ValueId: proto.Uint32(uint32(vi)), + NameId: proto.Uint32(uint32(nameID)), + ValueId: proto.Uint32(uint32(valueID)), }) s.counter.SendLabelCount++ } - if nonLabelNames.Cardinality() > 0 { - log.Warningf("ids of label names not found, %s", logNotFoundDetail(nonLabelNames.ToSlice()), s.org.LogPrefix) + if nonNameIDs.Cardinality() > 0 { + log.Warningf("ids of label names not found, %s", logNotFoundDetail(nonNameIDs.ToSlice()), s.org.LogPrefix) } - if nonLabelValues.Cardinality() > 0 { - log.Warningf("ids of label values not found, %s", logNotFoundDetail(nonLabelValues.ToSlice()), s.org.LogPrefix) + if nonValueIDs.Cardinality() > 0 { + log.Warningf("ids of label values not found, %s", logNotFoundDetail(nonValueIDs.ToSlice()), s.org.LogPrefix) } return ls, nil } diff --git a/server/controller/tagrecorder/ch_app_label.go b/server/controller/tagrecorder/ch_app_label.go index 213a37c8601..c91c370ff78 100644 --- a/server/controller/tagrecorder/ch_app_label.go +++ b/server/controller/tagrecorder/ch_app_label.go @@ -21,6 +21,7 @@ import ( "github.com/deepflowio/deepflow/server/controller/db/metadb" metadbmodel "github.com/deepflowio/deepflow/server/controller/db/metadb/model" + promcache "github.com/deepflowio/deepflow/server/controller/prometheus/cache" ) type ChAPPLabel struct { @@ -42,13 +43,18 @@ func (l *ChAPPLabel) generateNewData(db *metadb.DB) (map[PrometheusAPPLabelKey]m log.Infof("generate data for %s", l.resourceTypeName, db.LogPrefixORGID) appLabelSlice, ok := l.generateAPPLabelData(db) - - labelNameIDMap, valueNameIDMap, ok := l.generateNameIDData(db) if !ok { return nil, false } - labelRows, err := db.Unscoped().Model(&metadbmodel.PrometheusLabel{}).Select("id", "name", "value").Rows() + promCache, err := promcache.GetCache(db.ORGID) + if err != nil { + log.Errorf("get prometheus cache failed: %v", err) + return nil, false + } + promCache.Refresh(true) + + labelRows, err := db.Unscoped().Model(&metadbmodel.PrometheusLabel{}).Select("name", "value").Rows() if err != nil { log.Errorf(dbQueryResourceFailed(l.resourceTypeName, err), db.LogPrefixORGID) return nil, false @@ -57,17 +63,16 @@ func (l *ChAPPLabel) generateNewData(db *metadb.DB) (map[PrometheusAPPLabelKey]m keyToItem := make(map[PrometheusAPPLabelKey]metadbmodel.ChAPPLabel) for labelRows.Next() { - var id int var labelName, labelValue string - if scanErr := labelRows.Scan(&id, &labelName, &labelValue); scanErr != nil { + if scanErr := labelRows.Scan(&labelName, &labelValue); scanErr != nil { log.Errorf("stream scan %s prometheus_label interrupted: %v", l.resourceTypeName, scanErr, db.LogPrefixORGID) return nil, false } if slices.Contains(appLabelSlice, labelName) { - labelNameID, nameOK := labelNameIDMap[labelName] - labelValueID, valueOK := valueNameIDMap[labelValue] + labelNameID, nameOK := promCache.GetLabelNameID(labelName) + labelValueID, valueOK := promCache.GetLabelValueID(labelValue) if !nameOK || !valueOK { - log.Warningf("label name or value not found in db, labelName: %s, labelValue: %s", labelName, labelValue) + log.Warningf("label name or value not found in cache, labelName: %s, labelValue: %s", labelName, labelValue) continue } keyToItem[PrometheusAPPLabelKey{LabelNameID: labelNameID, LabelValueID: labelValueID}] = metadbmodel.ChAPPLabel{ @@ -114,49 +119,3 @@ func (l *ChAPPLabel) generateAPPLabelData(db *metadb.DB) ([]string, bool) { } return appLabelSlice, true } - -func (l *ChAPPLabel) generateNameIDData(db *metadb.DB) (map[string]int, map[string]int, bool) { - labelNameIDMap := make(map[string]int) - valueNameIDMap := make(map[string]int) - - nameRows, err := db.Unscoped().Model(&metadbmodel.PrometheusLabelName{}).Select("id", "name").Rows() - if err != nil { - log.Errorf(dbQueryResourceFailed(l.resourceTypeName, err), db.LogPrefixORGID) - return nil, nil, false - } - defer nameRows.Close() - for nameRows.Next() { - var id int - var name string - if scanErr := nameRows.Scan(&id, &name); scanErr != nil { - log.Errorf("stream scan %s prometheus_label_name interrupted: %v", l.resourceTypeName, scanErr, db.LogPrefixORGID) - return nil, nil, false - } - labelNameIDMap[name] = id - } - if err := nameRows.Err(); err != nil { - log.Errorf("stream read %s prometheus_label_name error: %v", l.resourceTypeName, err, db.LogPrefixORGID) - return nil, nil, false - } - - valueRows, err := db.Unscoped().Model(&metadbmodel.PrometheusLabelValue{}).Select("id", "value").Rows() - if err != nil { - log.Errorf(dbQueryResourceFailed(l.resourceTypeName, err), db.LogPrefixORGID) - return nil, nil, false - } - defer valueRows.Close() - for valueRows.Next() { - var id int - var value string - if scanErr := valueRows.Scan(&id, &value); scanErr != nil { - log.Errorf("stream scan %s prometheus_label_value interrupted: %v", l.resourceTypeName, scanErr, db.LogPrefixORGID) - return nil, nil, false - } - valueNameIDMap[value] = id - } - if err := valueRows.Err(); err != nil { - log.Errorf("stream read %s prometheus_label_value error: %v", l.resourceTypeName, err, db.LogPrefixORGID) - return nil, nil, false - } - return labelNameIDMap, valueNameIDMap, true -}