diff --git a/frac/processor/search.go b/frac/processor/search.go index 3c08eae0..56be9f17 100644 --- a/frac/processor/search.go +++ b/frac/processor/search.go @@ -3,6 +3,7 @@ package processor import ( "context" "math" + "sync" "time" "go.uber.org/zap" @@ -41,6 +42,23 @@ type searchIndex interface { GetSkipLIDs(minLID, maxLID uint32, reverse bool) (node.Node, bool, error) } +type LIDsIter interface { + Lids(out []node.LID) []node.LID + Len() int +} + +var lidsBufPool = sync.Pool{ + New: func() any { + return lidsBuf{ + // Currently, we drain up to 4k lids from eval tree, but with proper batching enabled + // we can get as much as whole LID block can have (currently, 64k lids) + lids: make([]node.LID, 0, consts.LIDBlockCap), + } + }, +} + +const maxLidsToDrain = 4096 + func IndexSearch( ctx context.Context, params SearchParams, @@ -60,6 +78,10 @@ func IndexSearch( return evalLeaf(index, token, sw, stats, minLID, maxLID, params.Order) }, ) + if err != nil { + return nil, err + } + m.Stop() if err != nil { @@ -107,8 +129,30 @@ func IndexSearch( m.Stop() } + var evalTreeIter func(need int, out lidsBuf) LIDsIter + batchNode, ok := tryConvertToBatchedTree(evalTree) + + if ok { + evalTreeIter = func(need int, _ lidsBuf) LIDsIter { + // batched flow: juts get a batch and return + return batchNode.NextBatch() + } + } else { + evalTreeIter = func(need int, buf lidsBuf) LIDsIter { + // iterator flow: buffer LIDs one by one and return a batch + for i := 0; i < need; i++ { + lid := evalTree.Next() + if lid.IsNull() { + break + } + buf = buf.append(lid) + } + return buf + } + } + m = sw.Start("iterate_eval_tree") - total, ids, histogram, aggs, err := iterateEvalTree(ctx, params, index, evalTree, aggSupplier, sw) + total, ids, histogram, aggs, err := iterateEvalTree(ctx, params, index, evalTreeIter, aggSupplier, sw) m.Stop() if err != nil { @@ -170,7 +214,7 @@ func iterateEvalTree( ctx context.Context, params SearchParams, idsIndex idsIndex, - evalTree node.Node, + evalTree func(need int, buf lidsBuf) LIDsIter, aggSupplier func() ([]Aggregator, error), sw *stopwatch.Stopwatch, ) (int, seq.IDSources, []uint64, []Aggregator, error) { @@ -192,6 +236,8 @@ func iterateEvalTree( total := 0 ids := seq.IDSources{} var lastID seq.ID + buf := lidsBufPool.Get().(lidsBuf) + defer lidsBufPool.Put(buf) timerEval := sw.Timer("eval_tree_next") timerMID := sw.Timer("get_mid") @@ -199,9 +245,8 @@ func iterateEvalTree( timerAgg := sw.Timer("agg_node_count") var aggs []Aggregator - - for i := 0; ; i++ { - if i&1023 == 0 && util.IsCancelled(ctx) { + for { + if util.IsCancelled(ctx) { return total, ids, histogram, aggs, ctx.Err() } @@ -209,73 +254,117 @@ func iterateEvalTree( if !needMore && !needScanAllRange { break } + need := params.Limit - len(ids) + if needScanAllRange { + need = math.MaxUint32 + } + // limit how much we drain from eval tree for one-by-one flow. ignored for batched flow + need = min(need, maxLidsToDrain) timerEval.Start() - lid := evalTree.Next() + lidBatch := evalTree(need, buf) timerEval.Stop() - if lid.IsNull() { + if lidBatch.Len() == 0 { break } - rawLid := lid.Unpack() - - if needMore || hasHist { - timerMID.Start() - mid := idsIndex.GetMID(seq.LID(rawLid)) - timerMID.Stop() - - if hasHist { - if mid < params.From || mid > params.To { - logger.Error("MID value outside the query range", - zap.Time("from", params.From.Time()), - zap.Time("to", params.To.Time()), - zap.Time("mid", mid.Time())) - continue - } - bucketIndex := uint64(mid)/uint64(histInterval) - histBase - histogram[bucketIndex]++ + + for _, lid := range lidBatch.Lids(buf.lids) { + + needMore = len(ids) < params.Limit + if !needMore && !needScanAllRange { + break } + seqLID := lid.ToSeqLID() + + if needMore || hasHist { + timerMID.Start() + mid := idsIndex.GetMID(seqLID) + timerMID.Stop() + + if hasHist { + if mid < params.From || mid > params.To { + logger.Error("MID value outside the query range", + zap.Time("from", params.From.Time()), + zap.Time("to", params.To.Time()), + zap.Time("mid", mid.Time())) + continue + } + bucketIndex := uint64(mid)/uint64(histInterval) - histBase + histogram[bucketIndex]++ + } - if needMore { - timerRID.Start() - rid := idsIndex.GetRID(seq.LID(rawLid)) - timerRID.Stop() + if needMore { + timerRID.Start() + rid := idsIndex.GetRID(seqLID) + timerRID.Stop() - id := seq.ID{MID: mid, RID: rid} + id := seq.ID{MID: mid, RID: rid} - if total == 0 || lastID != id { // lids increase monotonically, it's enough to compare current id with the last one - ids = append(ids, seq.IDSource{ID: id}) + if total == 0 || lastID != id { // lids increase monotonically, it's enough to compare current id with the last one + ids = append(ids, seq.IDSource{ID: id}) + } + lastID = id } - lastID = id } - } - total++ // increment found counter, use aggNode, calculate histogram and collect ids only if id in borders + total++ // increment found counter, use aggNode, calculate histogram and collect ids only if id in borders - if params.HasAgg() { - if aggs == nil { - var err error - aggs, err = aggSupplier() - if err != nil { - return total, ids, histogram, nil, err + if params.HasAgg() { + if aggs == nil { + var err error + aggs, err = aggSupplier() + if err != nil { + return total, ids, histogram, nil, err + } } - } - timerAgg.Start() - for i := range aggs { - if err := aggs[i].Next(lid); err != nil { - timerAgg.Stop() - return total, ids, histogram, aggs, err + timerAgg.Start() + for i := range aggs { + if err := aggs[i].Next(lid); err != nil { + timerAgg.Stop() + return total, ids, histogram, aggs, err + } } + timerAgg.Stop() } - timerAgg.Stop() } - } return total, ids, histogram, aggs, nil } +// lidsBuf maintains node.LID in slice as is (append order). +// Used to drain batches of LIDs when eval tree doesn't support batching. +type lidsBuf struct { + lids []node.LID +} + +func (b lidsBuf) append(x node.LID) lidsBuf { + return lidsBuf{ + lids: append(b.lids, x), + } +} + +func (b lidsBuf) Len() int { + return len(b.lids) +} + +func (b lidsBuf) Lids(_ []node.LID) []node.LID { + return b.lids +} + +func tryConvertToBatchedTree(evalTree node.Node) (node.BatchedNode, bool) { + switch it := evalTree.(type) { + case *lids.IteratorDesc: + return it, true + case *lids.IteratorAsc: + return it, true + default: + return nil, false + } +} + // getLIDsBorders return min and max LID borders (including) for search func getLIDsBorders(params SearchParams, idsIndex idsIndex) (uint32, uint32) { if idsIndex.Len() == 0 { diff --git a/frac/sealed/lids/iterator_asc.go b/frac/sealed/lids/iterator_asc.go index a2fd0c5a..0230ebd2 100644 --- a/frac/sealed/lids/iterator_asc.go +++ b/frac/sealed/lids/iterator_asc.go @@ -103,3 +103,36 @@ func (it *IteratorAsc) NextGeq(nextID node.LID) node.LID { it.lids = it.lids[:0] } } + +func (it *IteratorAsc) NextBatch() node.LIDBatch { + return it.NextBatchGeq(node.NewAscZeroLID()) +} + +func (it *IteratorAsc) NextBatchGeq(nextID node.LID) node.LIDBatch { + for { + for len(it.lids) == 0 { + if !it.tryNextBlock { + return node.NewAscBatch(nil) + } + it.loadNextLIDsBlock() + it.lids, it.tryNextBlock = it.narrowLIDsRange(it.lids, it.tryNextBlock) + it.counter.AddLIDsCount(len(it.lids)) + } + + // fast path: smallest remaining > nextID => skip entire block + // TODO(cheb0): We could also pass LID into narrowLIDsRange to perform block skipping once we add something like MinLID to LID block header + if it.lids[0] > nextID.Unpack() { + it.lids = it.lids[:0] + continue + } + + idx := sort.Search(len(it.lids), func(i int) bool { return it.lids[i] > nextID.Unpack() }) - 1 + if idx >= 0 { + batch := it.lids[:idx+1] + it.lids = it.lids[:0] + return node.NewAscBatch(batch) + } + + it.lids = it.lids[:0] + } +} diff --git a/frac/sealed/lids/iterator_desc.go b/frac/sealed/lids/iterator_desc.go index 5c4e08d9..a3da6176 100644 --- a/frac/sealed/lids/iterator_desc.go +++ b/frac/sealed/lids/iterator_desc.go @@ -103,3 +103,40 @@ func (it *IteratorDesc) NextGeq(nextID node.LID) node.LID { it.lids = it.lids[:0] } } + +func (it *IteratorDesc) NextBatch() node.LIDBatch { + return it.NextBatchGeq(node.NewDescZeroLID()) +} + +func (it *IteratorDesc) NextBatchGeq(nextID node.LID) node.LIDBatch { + for { + for len(it.lids) == 0 { + if !it.tryNextBlock { + return node.NewDescBatch(nil) + } + it.loadNextLIDsBlock() + it.lids, it.tryNextBlock = it.narrowLIDsRange(it.lids, it.tryNextBlock) + it.counter.AddLIDsCount(len(it.lids)) + } + last := it.lids[len(it.lids)-1] + if nextID.Unpack() > last { + it.lids = it.lids[:0] + continue + } + + // fast path: last LID < nextLID => skip the entire block + if nextID.Unpack() > it.lids[len(it.lids)-1] { + it.lids = it.lids[:0] + continue + } + + idx := sort.Search(len(it.lids), func(i int) bool { return it.lids[i] >= nextID.Unpack() }) + if idx < len(it.lids) { + batch := it.lids[idx:len(it.lids)] + it.lids = it.lids[:0] + return node.NewDescBatch(batch) + } + + it.lids = it.lids[:0] + } +} diff --git a/node/batch.go b/node/batch.go new file mode 100644 index 00000000..8535b209 --- /dev/null +++ b/node/batch.go @@ -0,0 +1,54 @@ +package node + +import ( + "slices" +) + +// LIDBatch represents a batch of LIDs. lids are stored as uint32 slice and sorted in ascending order regardless of doc order. +// This allows to avoid copying and use reference to LID blocks data. +// Such batches are also logically immutable - we can't append or delete from them, only union or intersect. But we can zero out (reset) them. +type LIDBatch struct { + lids []uint32 + desc bool // if doc order is DESC (default order) +} + +// NewDescBatch creates a batch of lids for DESC docs order +func NewDescBatch(lids []uint32) LIDBatch { + return LIDBatch{ + lids: lids, + desc: true, + } +} + +// NewAscBatch creates a batch of lids for ASC docs order +func NewAscBatch(lids []uint32) LIDBatch { + return LIDBatch{ + lids: lids, + desc: false, + } +} + +func (b LIDBatch) Len() int { + return len(b.lids) +} + +func (b LIDBatch) Lids(out []LID) []LID { + if b.desc { + for _, lid := range b.lids { + out = append(out, NewDescLID(lid)) + } + } else { + for _, lid := range slices.Backward(b.lids) { + out = append(out, NewAscLID(lid)) + } + } + + return out +} + +func (b LIDBatch) Reset() LIDBatch { + return LIDBatch{ + lids: b.lids[:0], + desc: b.desc, + } +} diff --git a/node/lid.go b/node/lid.go index db242acc..92809b95 100644 --- a/node/lid.go +++ b/node/lid.go @@ -3,6 +3,8 @@ package node import ( "fmt" "math" + + "github.com/ozontech/seq-db/seq" ) const ( @@ -33,6 +35,14 @@ func NewDescLID(lid uint32) LID { } } +func NewDescZeroLID() LID { + return NewDescLID(0) +} + +func NewAscZeroLID() LID { + return NewAscLID(math.MaxUint32) +} + // NewAscLID returns LIDs for asc sort order func NewAscLID(lid uint32) LID { return LID{ @@ -89,6 +99,10 @@ func (c LID) Unpack() uint32 { return c.lid ^ c.mask } +func (c LID) ToSeqLID() seq.LID { + return seq.LID(c.Unpack()) +} + func (c LID) IsNull() bool { return c.lid == math.MaxUint32 } diff --git a/node/node.go b/node/node.go index 99649c90..5334f218 100644 --- a/node/node.go +++ b/node/node.go @@ -11,6 +11,14 @@ type Node interface { NextGeq(nextID LID) LID } +type BatchedNode interface { + fmt.Stringer + // NextBatch returns next batch. Returns nil when exhausted. + NextBatch() LIDBatch + // NextBatchGeq returns next batch (LIDs >= minLID). Returns nil when exhausted. + NextBatchGeq(nextLID LID) LIDBatch +} + type Sourced interface { fmt.Stringer // for testing // aggregation need source