Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 137 additions & 48 deletions frac/processor/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package processor
import (
"context"
"math"
"sync"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -41,6 +42,23 @@ type searchIndex interface {
GetSkipLIDs(minLID, maxLID uint32, reverse bool) (node.Node, bool, error)
}

type LIDsIter interface {
Lids(out []node.LID) []node.LID
Len() int
}

var lidsBufPool = sync.Pool{
New: func() any {
return lidsBuf{
// Currently, we drain up to 4k lids from eval tree, but with proper batching enabled
// we can get as much as whole LID block can have (currently, 64k lids)
lids: make([]node.LID, 0, consts.LIDBlockCap),
}
},
}

const maxLidsToDrain = 4096

func IndexSearch(
ctx context.Context,
params SearchParams,
Expand All @@ -60,6 +78,10 @@ func IndexSearch(
return evalLeaf(index, token, sw, stats, minLID, maxLID, params.Order)
},
)
if err != nil {
return nil, err
}

m.Stop()

if err != nil {
Expand Down Expand Up @@ -107,8 +129,30 @@ func IndexSearch(
m.Stop()
}

var evalTreeIter func(need int, out lidsBuf) LIDsIter
batchNode, ok := tryConvertToBatchedTree(evalTree)

if ok {
evalTreeIter = func(need int, _ lidsBuf) LIDsIter {
// batched flow: juts get a batch and return
return batchNode.NextBatch()
}
} else {
evalTreeIter = func(need int, buf lidsBuf) LIDsIter {
// iterator flow: buffer LIDs one by one and return a batch
for i := 0; i < need; i++ {
lid := evalTree.Next()
if lid.IsNull() {
break
}
buf = buf.append(lid)
}
return buf
}
}

m = sw.Start("iterate_eval_tree")
total, ids, histogram, aggs, err := iterateEvalTree(ctx, params, index, evalTree, aggSupplier, sw)
total, ids, histogram, aggs, err := iterateEvalTree(ctx, params, index, evalTreeIter, aggSupplier, sw)
m.Stop()

if err != nil {
Expand Down Expand Up @@ -170,7 +214,7 @@ func iterateEvalTree(
ctx context.Context,
params SearchParams,
idsIndex idsIndex,
evalTree node.Node,
evalTree func(need int, buf lidsBuf) LIDsIter,
aggSupplier func() ([]Aggregator, error),
sw *stopwatch.Stopwatch,
) (int, seq.IDSources, []uint64, []Aggregator, error) {
Expand All @@ -192,90 +236,135 @@ func iterateEvalTree(
total := 0
ids := seq.IDSources{}
var lastID seq.ID
buf := lidsBufPool.Get().(lidsBuf)
defer lidsBufPool.Put(buf)

timerEval := sw.Timer("eval_tree_next")
timerMID := sw.Timer("get_mid")
timerRID := sw.Timer("get_rid")
timerAgg := sw.Timer("agg_node_count")

var aggs []Aggregator

for i := 0; ; i++ {
if i&1023 == 0 && util.IsCancelled(ctx) {
for {
if util.IsCancelled(ctx) {
return total, ids, histogram, aggs, ctx.Err()
}

needMore := len(ids) < params.Limit
if !needMore && !needScanAllRange {
break
}
need := params.Limit - len(ids)
if needScanAllRange {
need = math.MaxUint32
}
// limit how much we drain from eval tree for one-by-one flow. ignored for batched flow
need = min(need, maxLidsToDrain)

timerEval.Start()
lid := evalTree.Next()
lidBatch := evalTree(need, buf)
timerEval.Stop()

if lid.IsNull() {
if lidBatch.Len() == 0 {
break
}
rawLid := lid.Unpack()

if needMore || hasHist {
timerMID.Start()
mid := idsIndex.GetMID(seq.LID(rawLid))
timerMID.Stop()

if hasHist {
if mid < params.From || mid > params.To {
logger.Error("MID value outside the query range",
zap.Time("from", params.From.Time()),
zap.Time("to", params.To.Time()),
zap.Time("mid", mid.Time()))
continue
}
bucketIndex := uint64(mid)/uint64(histInterval) - histBase
histogram[bucketIndex]++

for _, lid := range lidBatch.Lids(buf.lids) {

needMore = len(ids) < params.Limit
if !needMore && !needScanAllRange {
break
}
seqLID := lid.ToSeqLID()

if needMore || hasHist {
timerMID.Start()
mid := idsIndex.GetMID(seqLID)
timerMID.Stop()

if hasHist {
if mid < params.From || mid > params.To {
logger.Error("MID value outside the query range",
zap.Time("from", params.From.Time()),
zap.Time("to", params.To.Time()),
zap.Time("mid", mid.Time()))
continue
}
bucketIndex := uint64(mid)/uint64(histInterval) - histBase
histogram[bucketIndex]++
}

if needMore {
timerRID.Start()
rid := idsIndex.GetRID(seq.LID(rawLid))
timerRID.Stop()
if needMore {
timerRID.Start()
rid := idsIndex.GetRID(seqLID)
timerRID.Stop()

id := seq.ID{MID: mid, RID: rid}
id := seq.ID{MID: mid, RID: rid}

if total == 0 || lastID != id { // lids increase monotonically, it's enough to compare current id with the last one
ids = append(ids, seq.IDSource{ID: id})
if total == 0 || lastID != id { // lids increase monotonically, it's enough to compare current id with the last one
ids = append(ids, seq.IDSource{ID: id})
}
lastID = id
}
lastID = id
}
}

total++ // increment found counter, use aggNode, calculate histogram and collect ids only if id in borders
total++ // increment found counter, use aggNode, calculate histogram and collect ids only if id in borders

if params.HasAgg() {
if aggs == nil {
var err error
aggs, err = aggSupplier()
if err != nil {
return total, ids, histogram, nil, err
if params.HasAgg() {
if aggs == nil {
var err error
aggs, err = aggSupplier()
if err != nil {
return total, ids, histogram, nil, err
}
}
}

timerAgg.Start()
for i := range aggs {
if err := aggs[i].Next(lid); err != nil {
timerAgg.Stop()
return total, ids, histogram, aggs, err
timerAgg.Start()
for i := range aggs {
if err := aggs[i].Next(lid); err != nil {
timerAgg.Stop()
return total, ids, histogram, aggs, err
}
}
timerAgg.Stop()
}
timerAgg.Stop()
}

}

return total, ids, histogram, aggs, nil
}

// lidsBuf maintains node.LID in slice as is (append order).
// Used to drain batches of LIDs when eval tree doesn't support batching.
type lidsBuf struct {
lids []node.LID
}

func (b lidsBuf) append(x node.LID) lidsBuf {
return lidsBuf{
lids: append(b.lids, x),
}
}

func (b lidsBuf) Len() int {
return len(b.lids)
}

func (b lidsBuf) Lids(_ []node.LID) []node.LID {
return b.lids
}

func tryConvertToBatchedTree(evalTree node.Node) (node.BatchedNode, bool) {
switch it := evalTree.(type) {
case *lids.IteratorDesc:
return it, true
case *lids.IteratorAsc:
return it, true
default:
return nil, false
}
}

// getLIDsBorders return min and max LID borders (including) for search
func getLIDsBorders(params SearchParams, idsIndex idsIndex) (uint32, uint32) {
if idsIndex.Len() == 0 {
Expand Down
33 changes: 33 additions & 0 deletions frac/sealed/lids/iterator_asc.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,36 @@ func (it *IteratorAsc) NextGeq(nextID node.LID) node.LID {
it.lids = it.lids[:0]
}
}

func (it *IteratorAsc) NextBatch() node.LIDBatch {
return it.NextBatchGeq(node.NewAscZeroLID())
}

func (it *IteratorAsc) NextBatchGeq(nextID node.LID) node.LIDBatch {
for {
for len(it.lids) == 0 {
if !it.tryNextBlock {
return node.NewAscBatch(nil)
}
it.loadNextLIDsBlock()
it.lids, it.tryNextBlock = it.narrowLIDsRange(it.lids, it.tryNextBlock)
it.counter.AddLIDsCount(len(it.lids))
}

// fast path: smallest remaining > nextID => skip entire block
// TODO(cheb0): We could also pass LID into narrowLIDsRange to perform block skipping once we add something like MinLID to LID block header
if it.lids[0] > nextID.Unpack() {
it.lids = it.lids[:0]
continue
}

idx := sort.Search(len(it.lids), func(i int) bool { return it.lids[i] > nextID.Unpack() }) - 1
if idx >= 0 {
batch := it.lids[:idx+1]
it.lids = it.lids[:0]
return node.NewAscBatch(batch)
}

it.lids = it.lids[:0]
}
}
37 changes: 37 additions & 0 deletions frac/sealed/lids/iterator_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,40 @@ func (it *IteratorDesc) NextGeq(nextID node.LID) node.LID {
it.lids = it.lids[:0]
}
}

func (it *IteratorDesc) NextBatch() node.LIDBatch {
return it.NextBatchGeq(node.NewDescZeroLID())
}

func (it *IteratorDesc) NextBatchGeq(nextID node.LID) node.LIDBatch {
for {
for len(it.lids) == 0 {
if !it.tryNextBlock {
return node.NewDescBatch(nil)
}
it.loadNextLIDsBlock()
it.lids, it.tryNextBlock = it.narrowLIDsRange(it.lids, it.tryNextBlock)
it.counter.AddLIDsCount(len(it.lids))
}
last := it.lids[len(it.lids)-1]
if nextID.Unpack() > last {
it.lids = it.lids[:0]
continue
}

// fast path: last LID < nextLID => skip the entire block
if nextID.Unpack() > it.lids[len(it.lids)-1] {
it.lids = it.lids[:0]
continue
}

idx := sort.Search(len(it.lids), func(i int) bool { return it.lids[i] >= nextID.Unpack() })
if idx < len(it.lids) {
batch := it.lids[idx:len(it.lids)]
it.lids = it.lids[:0]
return node.NewDescBatch(batch)
}

it.lids = it.lids[:0]
}
}
54 changes: 54 additions & 0 deletions node/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package node

import (
"slices"
)

// LIDBatch represents a batch of LIDs. lids are stored as uint32 slice and sorted in ascending order regardless of doc order.
// This allows to avoid copying and use reference to LID blocks data.
// Such batches are also logically immutable - we can't append or delete from them, only union or intersect. But we can zero out (reset) them.
type LIDBatch struct {
lids []uint32
desc bool // if doc order is DESC (default order)
}

// NewDescBatch creates a batch of lids for DESC docs order
func NewDescBatch(lids []uint32) LIDBatch {
return LIDBatch{
lids: lids,
desc: true,
}
}

// NewAscBatch creates a batch of lids for ASC docs order
func NewAscBatch(lids []uint32) LIDBatch {
return LIDBatch{
lids: lids,
desc: false,
}
}

func (b LIDBatch) Len() int {
return len(b.lids)
}

func (b LIDBatch) Lids(out []LID) []LID {
if b.desc {
for _, lid := range b.lids {
out = append(out, NewDescLID(lid))
}
} else {
for _, lid := range slices.Backward(b.lids) {
out = append(out, NewAscLID(lid))
}
}

return out
}

func (b LIDBatch) Reset() LIDBatch {
return LIDBatch{
lids: b.lids[:0],
desc: b.desc,
}
}
Loading
Loading