Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 137 additions & 25 deletions server/controller/prometheus/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/deepflowio/deepflow/message/controller"
metadbmodel "github.com/deepflowio/deepflow/server/controller/db/metadb/model"
"github.com/deepflowio/deepflow/server/controller/prometheus/common"
"github.com/deepflowio/deepflow/server/libs/logger"
)
Expand All @@ -40,8 +41,11 @@ type Cache struct {
org *common.ORG
ctx context.Context

canRefresh chan bool
refreshInterval time.Duration
lastRefresh time.Time

refreshing bool
refreshCond *sync.Cond

MetricName *metricName
LabelName *labelName
Expand All @@ -62,52 +66,160 @@ func newCache(orgID int) (*Cache, error) {
lv := newLabelValue(org)
c := &Cache{
org: org,
canRefresh: make(chan bool, 1),
refreshCond: sync.NewCond(&sync.Mutex{}),
MetricName: mn,
LabelName: ln,
LabelValue: lv,
MetricAndAPPLabelLayout: newMetricAndAPPLabelLayout(org),
Label: newLabel(org, ln, lv),
}
c.canRefresh <- true
return c, nil
}

func (c *Cache) GetORG() *common.ORG {
return c.org
}

func (c *Cache) Refresh() (err error) {
LOOP:
for {
select {
case <-c.canRefresh:
err = c.refresh()
c.canRefresh <- true
break LOOP
default:
time.Sleep(time.Second)
log.Infof("last refresh cache not completed now", c.org.LogPrefix)
func (c *Cache) GetMetricNameID(name string) (int, bool) {
return c.MetricName.GetID(name)
}

func (c *Cache) SetMetricNameID(name string, id int) {
c.MetricName.setID(name, id)
}

func (c *Cache) GetLabelNameID(name string) (int, bool) {
return c.LabelName.GetID(name)
}

func (c *Cache) GetLabelValueID(value string) (int, bool) {
return c.LabelValue.GetID(value)
}

func (c *Cache) GetLabelID(name, value string) (int, bool) {
return c.Label.GetIDByKey(NewLabelKey(name, value))
}

func (c *Cache) GetLabelKeyToID() map[LabelKey]int {
return c.Label.GetKeyToID()
}

func (c *Cache) GetLabelNameByID(id int) (string, bool) {
return c.LabelName.GetNameByID(id)
}

func (c *Cache) GetLabelValueByID(id int) (string, bool) {
return c.LabelValue.GetValueByID(id)
}

func (c *Cache) GetMetricNameToID() map[string]int {
return c.MetricName.GetNameToID()
}

func (c *Cache) GetMetricAndAPPLabelLayout() map[LayoutKey]uint8 {
return c.MetricAndAPPLabelLayout.GetLayoutKeyToIndex()
}

func (c *Cache) GetMetricAndAPPLabelLayoutIndex(key LayoutKey) (uint8, bool) {
return c.MetricAndAPPLabelLayout.GetIndexByKey(key)
}

func (c *Cache) AddMetricAndAPPLabelLayoutsFromGrpc(batch []*controller.PrometheusMetricAPPLabelLayout) {
c.MetricAndAPPLabelLayout.AddFromGrpc(batch)
}

func (c *Cache) SetLabelNameID(name string, id int) {
c.LabelName.setID(name, id)
}

func (c *Cache) SetLabelValueID(value string, id int) {
c.LabelValue.setID(value, id)
}

func (c *Cache) AddMetricNames(batch []*metadbmodel.PrometheusMetricName) {
c.MetricName.Add(batch)
}

func (c *Cache) AddMetricNamesFromGrpc(batch []*controller.PrometheusMetricName) {
c.MetricName.AddFromGrpc(batch)
}

func (c *Cache) AddLabelNames(batch []*metadbmodel.PrometheusLabelName) {
c.LabelName.Add(batch)
}

func (c *Cache) AddLabelNamesFromGrpc(batch []*controller.PrometheusLabelName) {
c.LabelName.AddFromGrpc(batch)
}

func (c *Cache) AddLabelValues(batch []*metadbmodel.PrometheusLabelValue) {
c.LabelValue.Add(batch)
}

func (c *Cache) AddLabelValuesFromGrpc(batch []*controller.PrometheusLabelValue) {
c.LabelValue.AddFromGrpc(batch)
}

func (c *Cache) AddLabels(batch []*metadbmodel.PrometheusLabel) {
c.Label.Add(batch)
}

func (c *Cache) AddLabelsFromGrpc(batch []*controller.PrometheusLabel) {
c.Label.AddFromGrpc(batch)
}

func (c *Cache) Refresh(wait bool) error {
c.refreshCond.L.Lock()
if c.refreshing {
if wait {
// wait for refresh to complete
for c.refreshing {
c.refreshCond.Wait()
}
c.refreshCond.L.Unlock()
return nil
}
c.refreshCond.L.Unlock()
return nil
}
return

if !wait && c.refreshInterval > 0 && !c.lastRefresh.IsZero() && time.Since(c.lastRefresh) < c.refreshInterval {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

上次优化将 refreshInterval 默认值由 60s 改成 300s 了,cache 变成公用层在没有增量更新的情况下,应该需要改回 60s

c.refreshCond.L.Unlock()
return nil
}

c.refreshing = true
c.refreshCond.L.Unlock()

err := c.doRefresh()

c.refreshCond.L.Lock()
c.refreshing = false
c.refreshCond.Broadcast()
c.refreshCond.L.Unlock()
return err
}

func (c *Cache) doRefresh() error {
err := c.refresh()
if err == nil {
c.lastRefresh = time.Now()
}
return err
}

func (c *Cache) refresh() error {
log.Infof("refresh cache started", c.org.LogPrefix)
// LabelName and LabelValue must be refreshed before Label,
// because Label.refresh() converts name/value strings to IDs.
egRunAhead := &errgroup.Group{}
common.AppendErrGroup(egRunAhead, c.MetricName.refresh)
common.AppendErrGroup(egRunAhead, c.LabelName.refresh)
common.AppendErrGroup(egRunAhead, c.LabelValue.refresh)
if err := egRunAhead.Wait(); err != nil {
return err
}
eg := &errgroup.Group{}
common.AppendErrGroup(eg, c.Label.refresh)
common.AppendErrGroup(eg, c.MetricName.refresh)
common.AppendErrGroup(eg, c.LabelName.refresh)
common.AppendErrGroup(eg, c.LabelValue.refresh)
common.AppendErrGroup(eg, c.MetricAndAPPLabelLayout.refresh)
err := eg.Wait()
if err != nil {
return err
}
err = c.Label.refresh()
log.Infof("refresh cache completed", c.org.LogPrefix)
return err

Expand Down
69 changes: 69 additions & 0 deletions server/controller/prometheus/cache/cache_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func generateDBLabels(n int) []metadbmodel.PrometheusLabel {
now := time.Now()
for i := 0; i < n; i++ {
items[i] = metadbmodel.PrometheusLabel{
PrometheusAutoIncID: metadbmodel.PrometheusAutoIncID{ID: i + 1},
PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now},
Name: fmt.Sprintf("n_%d", i),
Value: fmt.Sprintf("v_%d", i),
Expand Down Expand Up @@ -252,11 +253,33 @@ func TestSelect_Label_LoadOnlyIDNameValue(t *testing.T) {
db := setupTestDB(t)
defer cleanupTestDB(t)

nameItems := make([]metadbmodel.PrometheusLabelName, 5000)
valueItems := make([]metadbmodel.PrometheusLabelValue, 5000)
now := time.Now()
for i := 0; i < 5000; i++ {
nameItems[i] = metadbmodel.PrometheusLabelName{
PrometheusID: metadbmodel.PrometheusID{ID: i + 1},
PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now},
Name: fmt.Sprintf("n_%d", i),
}
valueItems[i] = metadbmodel.PrometheusLabelValue{
PrometheusID: metadbmodel.PrometheusID{ID: i + 1},
PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now},
Value: fmt.Sprintf("v_%d", i),
}
}
require.NoError(t, batchInsert(db, nameItems, 1000))
require.NoError(t, batchInsert(db, valueItems, 1000))

items := generateDBLabels(5000)
require.NoError(t, batchInsert(db, items, 1000))

l := newTestLabel()
l.org = newTestORG(db)
l.labelName.org = l.org
l.labelValue.org = l.org
require.NoError(t, l.labelName.refresh())
require.NoError(t, l.labelValue.refresh())

err := l.refresh()
require.NoError(t, err)
Expand Down Expand Up @@ -298,12 +321,34 @@ func TestSelect_Label_RefreshDiscardsDeletedRows(t *testing.T) {
db := setupTestDB(t)
defer cleanupTestDB(t)

nameItems := make([]metadbmodel.PrometheusLabelName, 200)
valueItems := make([]metadbmodel.PrometheusLabelValue, 200)
now := time.Now()
for i := 0; i < 200; i++ {
nameItems[i] = metadbmodel.PrometheusLabelName{
PrometheusID: metadbmodel.PrometheusID{ID: i + 1},
PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now},
Name: fmt.Sprintf("n_%d", i),
}
valueItems[i] = metadbmodel.PrometheusLabelValue{
PrometheusID: metadbmodel.PrometheusID{ID: i + 1},
PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now},
Value: fmt.Sprintf("v_%d", i),
}
}
require.NoError(t, batchInsert(db, nameItems, 100))
require.NoError(t, batchInsert(db, valueItems, 100))

// 第一轮:插入 200 条并 refresh
items := generateDBLabels(200)
require.NoError(t, batchInsert(db, items, 100))

l := newTestLabel()
l.org = newTestORG(db)
l.labelName.org = l.org
l.labelValue.org = l.org
require.NoError(t, l.labelName.refresh())
require.NoError(t, l.labelValue.refresh())
require.NoError(t, l.refresh())
assert.Equal(t, 200, countLabelConcurrentMap(l.GetKeyToID()))

Expand Down Expand Up @@ -439,10 +484,34 @@ func TestSelect_Label_500K_Refresh(t *testing.T) {
insertStart := time.Now()
items := generateDBLabels(N)
require.NoError(t, batchInsert(db, items, 5000))

// Insert unique label names and values
labelNames := make([]metadbmodel.PrometheusLabelName, N)
labelValues := make([]metadbmodel.PrometheusLabelValue, N)
now := time.Now()
for i := 0; i < N; i++ {
labelNames[i] = metadbmodel.PrometheusLabelName{
PrometheusID: metadbmodel.PrometheusID{ID: i + 1},
PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now},
Name: fmt.Sprintf("n_%d", i),
}
labelValues[i] = metadbmodel.PrometheusLabelValue{
PrometheusID: metadbmodel.PrometheusID{ID: i + 1},
PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now},
Value: fmt.Sprintf("v_%d", i),
}
}
require.NoError(t, batchInsert(db, labelNames, 5000))
require.NoError(t, batchInsert(db, labelValues, 5000))

t.Logf("insert completed in %v", time.Since(insertStart))

l := newTestLabel()
l.org = newTestORG(db)
l.labelName.org = l.org
l.labelValue.org = l.org
require.NoError(t, l.labelName.refresh())
require.NoError(t, l.labelValue.refresh())

refreshStart := time.Now()
err := l.refresh()
Expand Down
Loading
Loading