From 74748b8bdcaba65b21cdbb7e4a928b8d35d14845 Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Wed, 22 Apr 2026 10:49:59 +0400 Subject: [PATCH 1/3] batch processing for iterateEvalTree --- frac/active_index.go | 14 +++ frac/fraction_test.go | 9 +- frac/processor/hist_map.go | 46 ++++++++ frac/processor/search.go | 186 +++++++++++++++++++-------------- frac/sealed/seqids/provider.go | 15 +++ frac/sealed_index.go | 15 +++ 6 files changed, 205 insertions(+), 80 deletions(-) create mode 100644 frac/processor/hist_map.go diff --git a/frac/active_index.go b/frac/active_index.go index 27e4e464..0b014998 100644 --- a/frac/active_index.go +++ b/frac/active_index.go @@ -167,11 +167,25 @@ func (p *activeIDsIndex) GetMID(lid seq.LID) seq.MID { return seq.MID(p.mids[restoredLID]) } +func (p *activeIDsIndex) GetMIDs(lids []node.LID, out []seq.MID) []seq.MID { + for _, lid := range lids { + out = append(out, p.GetMID(lid.ToSeqLID())) + } + return out +} + func (p *activeIDsIndex) GetRID(lid seq.LID) seq.RID { restoredLID := p.inverser.Revert(uint32(lid)) return seq.RID(p.rids[restoredLID]) } +func (p *activeIDsIndex) GetRIDs(lids []node.LID, out []seq.RID) []seq.RID { + for _, lid := range lids { + out = append(out, p.GetRID(lid.ToSeqLID())) + } + return out +} + func (p *activeIDsIndex) Len() int { return p.inverser.Len() } diff --git a/frac/fraction_test.go b/frac/fraction_test.go index ec5f3d85..c6c82117 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -1592,6 +1592,12 @@ func (s *FractionTestSuite) TestSearchLargeFrac() { for _, ord := range orders { histBuckets := make(map[string]uint64) for _, doc := range testDocs { + if doc.timestamp.Before(fromTime) { + continue + } + if doc.timestamp.After(midTime) { + continue + } if doc.service == "database" && doc.level == 3 { bucketTime := doc.timestamp.Truncate(time.Second) bucketKey := bucketTime.Format(time.RFC3339Nano) @@ -1601,7 +1607,8 @@ func (s *FractionTestSuite) TestSearchLargeFrac() { searchParams := s.query( "service:database AND level:3", - withTo(toTime.Format(time.RFC3339Nano)), + withFrom(fromTime.Format(time.RFC3339Nano)), + withTo(midTime.Format(time.RFC3339Nano)), withHist(1000)) searchParams.Order = ord diff --git a/frac/processor/hist_map.go b/frac/processor/hist_map.go new file mode 100644 index 00000000..ca08bc72 --- /dev/null +++ b/frac/processor/hist_map.go @@ -0,0 +1,46 @@ +package processor + +import "github.com/ozontech/seq-db/seq" + +// HistMap is an optimized array-based map for histogram. +type HistMap struct { + buckets []uint64 + start seq.MID + interval seq.MID + base uint64 +} + +func NewHistMap(from, to seq.MID, intervalMillis uint64) HistMap { + interval := seq.MillisToMID(intervalMillis) + base := uint64(from) / uint64(interval) + size := uint64(to)/uint64(interval) - base + 1 + return HistMap{ + buckets: make([]uint64, size), + start: from - from%interval, + interval: interval, + base: base, + } +} + +func (h *HistMap) Update(mids []seq.MID) { + // TODO(cheb0): unroll/vectorize/whatever when we optimize everything else + for _, mid := range mids { + bucketIndex := uint64(mid)/uint64(h.interval) - h.base + h.buckets[bucketIndex]++ + } +} + +func (h HistMap) ToMap() map[seq.MID]uint64 { + if len(h.buckets) == 0 { + return nil + } + res := make(map[seq.MID]uint64, len(h.buckets)) + bucket := h.start + for _, cnt := range h.buckets { + if cnt > 0 { + res[bucket] = cnt + } + bucket += h.interval + } + return res +} diff --git a/frac/processor/search.go b/frac/processor/search.go index 56be9f17..ad8721b0 100644 --- a/frac/processor/search.go +++ b/frac/processor/search.go @@ -26,7 +26,9 @@ type idsIndex interface { // LessOrEqual checks if seq.ID in LID position less or equal searched seq.ID, i.e. seqID(lid) <= id LessOrEqual(lid seq.LID, id seq.ID) bool GetMID(seq.LID) seq.MID + GetMIDs(lids []node.LID, out []seq.MID) []seq.MID GetRID(seq.LID) seq.RID + GetRIDs(lids []node.LID, out []seq.RID) []seq.RID Len() int } @@ -152,7 +154,7 @@ func IndexSearch( } m = sw.Start("iterate_eval_tree") - total, ids, histogram, aggs, err := iterateEvalTree(ctx, params, index, evalTreeIter, aggSupplier, sw) + total, ids, histMap, aggs, err := iterateEvalTree(ctx, params, index, evalTreeIter, aggSupplier, sw) m.Stop() if err != nil { @@ -186,7 +188,7 @@ func IndexSearch( IDs: ids, Aggs: aggsResult, Total: uint64(total), - Histogram: convertHistToMap(params, histogram), + Histogram: histMap.ToMap(), } stats.UpdateMetrics() @@ -194,22 +196,6 @@ func IndexSearch( return qpr, nil } -func convertHistToMap(params SearchParams, hist []uint64) map[seq.MID]uint64 { - if len(hist) == 0 { - return nil - } - res := make(map[seq.MID]uint64, len(hist)) - histIntervalMID := seq.MillisToMID(params.HistInterval) - bucket := params.From - params.From%histIntervalMID - for _, cnt := range hist { - if cnt > 0 { - res[bucket] = cnt - } - bucket += histIntervalMID - } - return res -} - func iterateEvalTree( ctx context.Context, params SearchParams, @@ -217,20 +203,13 @@ func iterateEvalTree( evalTree func(need int, buf lidsBuf) LIDsIter, aggSupplier func() ([]Aggregator, error), sw *stopwatch.Stopwatch, -) (int, seq.IDSources, []uint64, []Aggregator, error) { +) (int, seq.IDSources, HistMap, []Aggregator, error) { hasHist := params.HasHist() needScanAllRange := params.IsScanAllRequest() - var ( - histBase uint64 - histogram []uint64 - histInterval seq.MID - ) + var hist HistMap if hasHist { - histInterval = seq.MillisToMID(params.HistInterval) - histBase = uint64(params.From) / uint64(histInterval) - histSize := uint64(params.To)/uint64(histInterval) - histBase + 1 - histogram = make([]uint64, histSize) + hist = NewHistMap(params.From, params.To, params.HistInterval) } total := 0 @@ -238,100 +217,149 @@ func iterateEvalTree( var lastID seq.ID buf := lidsBufPool.Get().(lidsBuf) defer lidsBufPool.Put(buf) + mids := make([]seq.MID, 0, 4096) + rids := make([]seq.RID, 0, 4096) timerEval := sw.Timer("eval_tree_next") timerMID := sw.Timer("get_mid") + filterMIDs := sw.Timer("filter_mids") + updateHist := sw.Timer("update_hist") timerRID := sw.Timer("get_rid") timerAgg := sw.Timer("agg_node_count") var aggs []Aggregator for { if util.IsCancelled(ctx) { - return total, ids, histogram, aggs, ctx.Err() + return total, ids, hist, aggs, ctx.Err() } needMore := len(ids) < params.Limit if !needMore && !needScanAllRange { break } - need := params.Limit - len(ids) + needLids := params.Limit - len(ids) if needScanAllRange { - need = math.MaxUint32 + needLids = math.MaxUint32 } // limit how much we drain from eval tree for one-by-one flow. ignored for batched flow - need = min(need, maxLidsToDrain) + needLids = min(needLids, maxLidsToDrain) timerEval.Start() - lidBatch := evalTree(need, buf) + lidBatch := evalTree(needLids, buf) timerEval.Stop() if lidBatch.Len() == 0 { break } - for _, lid := range lidBatch.Lids(buf.lids) { + lidsSlice := lidBatch.Lids(buf.lids) - needMore = len(ids) < params.Limit - if !needMore && !needScanAllRange { - break - } - seqLID := lid.ToSeqLID() - - if needMore || hasHist { - timerMID.Start() - mid := idsIndex.GetMID(seqLID) - timerMID.Stop() - - if hasHist { - if mid < params.From || mid > params.To { - logger.Error("MID value outside the query range", - zap.Time("from", params.From.Time()), - zap.Time("to", params.To.Time()), - zap.Time("mid", mid.Time())) - continue - } - bucketIndex := uint64(mid)/uint64(histInterval) - histBase - histogram[bucketIndex]++ - } + needMids := min(params.Limit-len(ids), len(lidsSlice)) + if hasHist { + // need to fetch mids for all lids for hist + needMids = len(lidsSlice) + } + + // Get MIDs + if needMids > 0 { + timerMID.Start() + mids = idsIndex.GetMIDs(lidsSlice[0:needMids], mids[:0]) + timerMID.Stop() + } - if needMore { - timerRID.Start() - rid := idsIndex.GetRID(seqLID) - timerRID.Stop() + // Filter out-of-range MIDs (only for hists) + if hasHist { + filterMIDs.Start() + mids, lidsSlice = filterOutOfRangeMIDs(params, mids, lidsSlice) + filterMIDs.Stop() + } - id := seq.ID{MID: mid, RID: rid} + // Get RIDs + // compute number of ids we can get here, since some MIDs might have been filtered out + needIds := min(params.Limit-len(ids), len(lidsSlice)) + if needIds > 0 { + timerRID.Start() + rids = idsIndex.GetRIDs(lidsSlice[0:needIds], rids[:0]) + timerRID.Stop() + } - if total == 0 || lastID != id { // lids increase monotonically, it's enough to compare current id with the last one - ids = append(ids, seq.IDSource{ID: id}) - } - lastID = id - } + // Fill IDs for search + for i := 0; i < needIds; i++ { + id := seq.ID{MID: mids[i], RID: rids[i]} + + if i == 0 || lastID != id { // lids increase monotonically, it's enough to compare current id with the last one + ids = append(ids, seq.IDSource{ID: id}) } + lastID = id + } - total++ // increment found counter, use aggNode, calculate histogram and collect ids only if id in borders + // Update hist map + if hasHist { + updateHist.Start() + hist.Update(mids) + updateHist.Stop() + } - if params.HasAgg() { - if aggs == nil { - var err error - aggs, err = aggSupplier() - if err != nil { - return total, ids, histogram, nil, err - } + // Update aggregators + if params.HasAgg() { + if aggs == nil { + var err error + aggs, err = aggSupplier() // sw timer is activated inside aggSupplier + if err != nil { + return total, ids, hist, nil, err } + } - timerAgg.Start() - for i := range aggs { + timerAgg.Start() + for i := range aggs { + for _, lid := range lidsSlice { if err := aggs[i].Next(lid); err != nil { timerAgg.Stop() - return total, ids, histogram, aggs, err + return total, ids, hist, aggs, err } } - timerAgg.Stop() } + timerAgg.Stop() + } + + total += len(lidsSlice) + } + + return total, ids, hist, aggs, nil +} + +func filterOutOfRangeMIDs(params SearchParams, mids []seq.MID, lidsSlice []node.LID) ([]seq.MID, []node.LID) { + // most of the time we will never filter out any MIDs, therefore it's faster just to loop through and exit + needFilter := false + for i := 0; i < len(mids); i++ { + // TODO(cheb0): filter with arrow? + if mids[i] < params.From || mids[i] > params.To { + needFilter = true + break } } - return total, ids, histogram, aggs, nil + if needFilter { + writeIdx := 0 + filteredOut := 0 + for i := 0; i < len(mids); i++ { + if mids[i] < params.From || mids[i] > params.To { + logger.Error("MID value outside the query range", + zap.Time("from", params.From.Time()), + zap.Time("to", params.To.Time()), + zap.Time("mid", mids[i].Time())) + filteredOut++ + continue + } else { + lidsSlice[writeIdx] = lidsSlice[i] + mids[writeIdx] = mids[i] + writeIdx++ + } + } + lidsSlice = lidsSlice[0 : len(lidsSlice)-filteredOut] + mids = mids[0 : len(mids)-filteredOut] + } + return mids, lidsSlice } // lidsBuf maintains node.LID in slice as is (append order). diff --git a/frac/sealed/seqids/provider.go b/frac/sealed/seqids/provider.go index 18a1c57c..1e3290a6 100644 --- a/frac/sealed/seqids/provider.go +++ b/frac/sealed/seqids/provider.go @@ -3,6 +3,7 @@ package seqids import ( "github.com/ozontech/seq-db/cache" "github.com/ozontech/seq-db/config" + "github.com/ozontech/seq-db/node" "github.com/ozontech/seq-db/seq" "github.com/ozontech/seq-db/storage" ) @@ -53,6 +54,20 @@ func (p *Provider) MID(lid seq.LID) (seq.MID, error) { return seq.MID(p.midCache.GetValByLID(uint32(lid))), nil } +func (p *Provider) MIDs(lids []node.LID, out []seq.MID) ([]seq.MID, error) { + for _, lid := range lids { + rawLid := lid.Unpack() + blockIdx := p.table.GetIDBlockIndexByLID(rawLid) + if p.midCache.blockIndex != int(blockIdx) { + if err := p.fillMIDs(blockIdx, p.midCache); err != nil { + return nil, err + } + } + out = append(out, seq.MID(p.midCache.GetValByLID(rawLid))) + } + return out, nil +} + func (p *Provider) fillMIDs(blockIndex uint32, dst *unpackCache) error { if dst.blockIndex != int(blockIndex) { block, err := p.loader.GetMIDsBlock(blockIndex, dst.values[:0]) diff --git a/frac/sealed_index.go b/frac/sealed_index.go index 7c62713c..ef044c69 100644 --- a/frac/sealed_index.go +++ b/frac/sealed_index.go @@ -150,6 +150,14 @@ func (ii *sealedIDsIndex) GetMID(lid seq.LID) seq.MID { return mid } +func (ii *sealedIDsIndex) GetMIDs(lids []node.LID, out []seq.MID) []seq.MID { + mids, err := ii.provider.MIDs(lids, out) + if err != nil { + logger.Panic("get mids error", zap.String("frac", ii.fracName), zap.Int("lids_count", len(lids)), zap.Error(err)) + } + return mids +} + func (ii *sealedIDsIndex) GetRID(lid seq.LID) seq.RID { rid, err := ii.provider.RID(lid) if err != nil { @@ -158,6 +166,13 @@ func (ii *sealedIDsIndex) GetRID(lid seq.LID) seq.RID { return rid } +func (ii *sealedIDsIndex) GetRIDs(lids []node.LID, out []seq.RID) []seq.RID { + for _, lid := range lids { + out = append(out, ii.GetRID(lid.ToSeqLID())) + } + return out +} + func (ii *sealedIDsIndex) docPos(lid seq.LID) seq.DocPos { pos, err := ii.provider.DocPos(lid) if err != nil { From afb477fdd5288964566e19ac56a2c5b7d023ada4 Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Thu, 23 Apr 2026 15:44:11 +0400 Subject: [PATCH 2/3] linter issues --- frac/active_index.go | 8 ++++---- frac/sealed_index.go | 10 +++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/frac/active_index.go b/frac/active_index.go index 0b014998..09072e37 100644 --- a/frac/active_index.go +++ b/frac/active_index.go @@ -167,8 +167,8 @@ func (p *activeIDsIndex) GetMID(lid seq.LID) seq.MID { return seq.MID(p.mids[restoredLID]) } -func (p *activeIDsIndex) GetMIDs(lids []node.LID, out []seq.MID) []seq.MID { - for _, lid := range lids { +func (p *activeIDsIndex) GetMIDs(lidsBatch []node.LID, out []seq.MID) []seq.MID { + for _, lid := range lidsBatch { out = append(out, p.GetMID(lid.ToSeqLID())) } return out @@ -179,8 +179,8 @@ func (p *activeIDsIndex) GetRID(lid seq.LID) seq.RID { return seq.RID(p.rids[restoredLID]) } -func (p *activeIDsIndex) GetRIDs(lids []node.LID, out []seq.RID) []seq.RID { - for _, lid := range lids { +func (p *activeIDsIndex) GetRIDs(lidsBatch []node.LID, out []seq.RID) []seq.RID { + for _, lid := range lidsBatch { out = append(out, p.GetRID(lid.ToSeqLID())) } return out diff --git a/frac/sealed_index.go b/frac/sealed_index.go index ef044c69..a45962d4 100644 --- a/frac/sealed_index.go +++ b/frac/sealed_index.go @@ -150,10 +150,10 @@ func (ii *sealedIDsIndex) GetMID(lid seq.LID) seq.MID { return mid } -func (ii *sealedIDsIndex) GetMIDs(lids []node.LID, out []seq.MID) []seq.MID { - mids, err := ii.provider.MIDs(lids, out) +func (ii *sealedIDsIndex) GetMIDs(lidsBatch []node.LID, out []seq.MID) []seq.MID { + mids, err := ii.provider.MIDs(lidsBatch, out) if err != nil { - logger.Panic("get mids error", zap.String("frac", ii.fracName), zap.Int("lids_count", len(lids)), zap.Error(err)) + logger.Panic("get mids error", zap.String("frac", ii.fracName), zap.Int("lids_count", len(lidsBatch)), zap.Error(err)) } return mids } @@ -166,8 +166,8 @@ func (ii *sealedIDsIndex) GetRID(lid seq.LID) seq.RID { return rid } -func (ii *sealedIDsIndex) GetRIDs(lids []node.LID, out []seq.RID) []seq.RID { - for _, lid := range lids { +func (ii *sealedIDsIndex) GetRIDs(lidsBatch []node.LID, out []seq.RID) []seq.RID { + for _, lid := range lidsBatch { out = append(out, ii.GetRID(lid.ToSeqLID())) } return out From b62a56b98df57d3c40d82c2b9a51e0cc6e84bebf Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Thu, 23 Apr 2026 16:05:51 +0400 Subject: [PATCH 3/3] sync.pool for all buffers --- frac/processor/search.go | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/frac/processor/search.go b/frac/processor/search.go index ad8721b0..8429655e 100644 --- a/frac/processor/search.go +++ b/frac/processor/search.go @@ -49,12 +49,23 @@ type LIDsIter interface { Len() int } -var lidsBufPool = sync.Pool{ +type searchBuffers struct { + lids lidsBuf + mids []seq.MID + rids []seq.RID +} + +var searchBuffersPool = sync.Pool{ New: func() any { - return lidsBuf{ + lidsBuf := lidsBuf{ + lids: make([]node.LID, 0, consts.LIDBlockCap), + } + return searchBuffers{ // Currently, we drain up to 4k lids from eval tree, but with proper batching enabled // we can get as much as whole LID block can have (currently, 64k lids) - lids: make([]node.LID, 0, consts.LIDBlockCap), + lids: lidsBuf, + mids: make([]seq.MID, 0, consts.LIDBlockCap), + rids: make([]seq.RID, 0, consts.LIDBlockCap), } }, } @@ -215,10 +226,11 @@ func iterateEvalTree( total := 0 ids := seq.IDSources{} var lastID seq.ID - buf := lidsBufPool.Get().(lidsBuf) - defer lidsBufPool.Put(buf) - mids := make([]seq.MID, 0, 4096) - rids := make([]seq.RID, 0, 4096) + buffers := searchBuffersPool.Get().(searchBuffers) + defer searchBuffersPool.Put(buffers) + mids := buffers.mids + rids := buffers.rids + lidsBuffer := buffers.lids timerEval := sw.Timer("eval_tree_next") timerMID := sw.Timer("get_mid") @@ -245,14 +257,14 @@ func iterateEvalTree( needLids = min(needLids, maxLidsToDrain) timerEval.Start() - lidBatch := evalTree(needLids, buf) + lidBatch := evalTree(needLids, lidsBuffer) timerEval.Stop() if lidBatch.Len() == 0 { break } - lidsSlice := lidBatch.Lids(buf.lids) + lidsSlice := lidBatch.Lids(lidsBuffer.lids) needMids := min(params.Limit-len(ids), len(lidsSlice)) if hasHist {