-
Notifications
You must be signed in to change notification settings - Fork 10
feat: batching execution (single iterator queries) #390
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
cheb0
wants to merge
5
commits into
main
Choose a base branch
from
329-batching-1
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
52bc2fa
new batching flow (currently only of single-iterator queries)
cheb0 3a13fa7
max 4096 to batch on one-by-one flow
cheb0 f338fab
unused function
cheb0 421f574
Merge branch 'main' into 329-batching-1
cheb0 da8604a
larger lid buf
cheb0 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ package processor | |
| import ( | ||
| "context" | ||
| "math" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "go.uber.org/zap" | ||
|
|
@@ -41,6 +42,23 @@ type searchIndex interface { | |
| GetSkipLIDs(minLID, maxLID uint32, reverse bool) (node.Node, bool, error) | ||
| } | ||
|
|
||
| type LIDsIter interface { | ||
| Lids(out []node.LID) []node.LID | ||
| Len() int | ||
| } | ||
|
|
||
| var lidsBufPool = sync.Pool{ | ||
| New: func() any { | ||
| return lidsBuf{ | ||
| // Currently, we drain up to 4k lids from eval tree, but with proper batching enabled | ||
| // we can get as much as whole LID block can have (currently, 64k lids) | ||
| lids: make([]node.LID, 0, consts.LIDBlockCap), | ||
| } | ||
| }, | ||
| } | ||
|
|
||
| const maxLidsToDrain = 4096 | ||
|
|
||
| func IndexSearch( | ||
| ctx context.Context, | ||
| params SearchParams, | ||
|
|
@@ -60,6 +78,10 @@ func IndexSearch( | |
| return evalLeaf(index, token, sw, stats, minLID, maxLID, params.Order) | ||
| }, | ||
| ) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| m.Stop() | ||
|
|
||
| if err != nil { | ||
|
|
@@ -107,8 +129,30 @@ func IndexSearch( | |
| m.Stop() | ||
| } | ||
|
|
||
| var evalTreeIter func(need int, out lidsBuf) LIDsIter | ||
| batchNode, ok := tryConvertToBatchedTree(evalTree) | ||
|
|
||
| if ok { | ||
| evalTreeIter = func(need int, _ lidsBuf) LIDsIter { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: |
||
| // batched flow: juts get a batch and return | ||
| return batchNode.NextBatch() | ||
| } | ||
| } else { | ||
| evalTreeIter = func(need int, buf lidsBuf) LIDsIter { | ||
| // iterator flow: buffer LIDs one by one and return a batch | ||
| for i := 0; i < need; i++ { | ||
| lid := evalTree.Next() | ||
| if lid.IsNull() { | ||
| break | ||
| } | ||
| buf = buf.append(lid) | ||
| } | ||
| return buf | ||
| } | ||
| } | ||
|
|
||
| m = sw.Start("iterate_eval_tree") | ||
| total, ids, histogram, aggs, err := iterateEvalTree(ctx, params, index, evalTree, aggSupplier, sw) | ||
| total, ids, histogram, aggs, err := iterateEvalTree(ctx, params, index, evalTreeIter, aggSupplier, sw) | ||
| m.Stop() | ||
|
|
||
| if err != nil { | ||
|
|
@@ -170,7 +214,7 @@ func iterateEvalTree( | |
| ctx context.Context, | ||
| params SearchParams, | ||
| idsIndex idsIndex, | ||
| evalTree node.Node, | ||
| evalTree func(need int, buf lidsBuf) LIDsIter, | ||
| aggSupplier func() ([]Aggregator, error), | ||
| sw *stopwatch.Stopwatch, | ||
| ) (int, seq.IDSources, []uint64, []Aggregator, error) { | ||
|
|
@@ -192,90 +236,135 @@ func iterateEvalTree( | |
| total := 0 | ||
| ids := seq.IDSources{} | ||
| var lastID seq.ID | ||
| buf := lidsBufPool.Get().(lidsBuf) | ||
| defer lidsBufPool.Put(buf) | ||
|
|
||
| timerEval := sw.Timer("eval_tree_next") | ||
| timerMID := sw.Timer("get_mid") | ||
| timerRID := sw.Timer("get_rid") | ||
| timerAgg := sw.Timer("agg_node_count") | ||
|
|
||
| var aggs []Aggregator | ||
|
|
||
| for i := 0; ; i++ { | ||
| if i&1023 == 0 && util.IsCancelled(ctx) { | ||
| for { | ||
| if util.IsCancelled(ctx) { | ||
| return total, ids, histogram, aggs, ctx.Err() | ||
| } | ||
|
|
||
| needMore := len(ids) < params.Limit | ||
| if !needMore && !needScanAllRange { | ||
| break | ||
| } | ||
| need := params.Limit - len(ids) | ||
| if needScanAllRange { | ||
| need = math.MaxUint32 | ||
| } | ||
| // limit how much we drain from eval tree for one-by-one flow. ignored for batched flow | ||
| need = min(need, maxLidsToDrain) | ||
|
|
||
| timerEval.Start() | ||
| lid := evalTree.Next() | ||
| lidBatch := evalTree(need, buf) | ||
| timerEval.Stop() | ||
|
|
||
| if lid.IsNull() { | ||
| if lidBatch.Len() == 0 { | ||
| break | ||
| } | ||
| rawLid := lid.Unpack() | ||
|
|
||
| if needMore || hasHist { | ||
| timerMID.Start() | ||
| mid := idsIndex.GetMID(seq.LID(rawLid)) | ||
| timerMID.Stop() | ||
|
|
||
| if hasHist { | ||
| if mid < params.From || mid > params.To { | ||
| logger.Error("MID value outside the query range", | ||
| zap.Time("from", params.From.Time()), | ||
| zap.Time("to", params.To.Time()), | ||
| zap.Time("mid", mid.Time())) | ||
| continue | ||
| } | ||
| bucketIndex := uint64(mid)/uint64(histInterval) - histBase | ||
| histogram[bucketIndex]++ | ||
|
|
||
| for _, lid := range lidBatch.Lids(buf.lids) { | ||
|
|
||
| needMore = len(ids) < params.Limit | ||
| if !needMore && !needScanAllRange { | ||
| break | ||
| } | ||
| seqLID := lid.ToSeqLID() | ||
|
|
||
| if needMore || hasHist { | ||
| timerMID.Start() | ||
| mid := idsIndex.GetMID(seqLID) | ||
| timerMID.Stop() | ||
|
|
||
| if hasHist { | ||
| if mid < params.From || mid > params.To { | ||
| logger.Error("MID value outside the query range", | ||
| zap.Time("from", params.From.Time()), | ||
| zap.Time("to", params.To.Time()), | ||
| zap.Time("mid", mid.Time())) | ||
| continue | ||
| } | ||
| bucketIndex := uint64(mid)/uint64(histInterval) - histBase | ||
| histogram[bucketIndex]++ | ||
| } | ||
|
|
||
| if needMore { | ||
| timerRID.Start() | ||
| rid := idsIndex.GetRID(seq.LID(rawLid)) | ||
| timerRID.Stop() | ||
| if needMore { | ||
| timerRID.Start() | ||
| rid := idsIndex.GetRID(seqLID) | ||
| timerRID.Stop() | ||
|
|
||
| id := seq.ID{MID: mid, RID: rid} | ||
| id := seq.ID{MID: mid, RID: rid} | ||
|
|
||
| if total == 0 || lastID != id { // lids increase monotonically, it's enough to compare current id with the last one | ||
| ids = append(ids, seq.IDSource{ID: id}) | ||
| if total == 0 || lastID != id { // lids increase monotonically, it's enough to compare current id with the last one | ||
| ids = append(ids, seq.IDSource{ID: id}) | ||
| } | ||
| lastID = id | ||
| } | ||
| lastID = id | ||
| } | ||
| } | ||
|
|
||
| total++ // increment found counter, use aggNode, calculate histogram and collect ids only if id in borders | ||
| total++ // increment found counter, use aggNode, calculate histogram and collect ids only if id in borders | ||
|
|
||
| if params.HasAgg() { | ||
| if aggs == nil { | ||
| var err error | ||
| aggs, err = aggSupplier() | ||
| if err != nil { | ||
| return total, ids, histogram, nil, err | ||
| if params.HasAgg() { | ||
| if aggs == nil { | ||
| var err error | ||
| aggs, err = aggSupplier() | ||
| if err != nil { | ||
| return total, ids, histogram, nil, err | ||
| } | ||
| } | ||
| } | ||
|
|
||
| timerAgg.Start() | ||
| for i := range aggs { | ||
| if err := aggs[i].Next(lid); err != nil { | ||
| timerAgg.Stop() | ||
| return total, ids, histogram, aggs, err | ||
| timerAgg.Start() | ||
| for i := range aggs { | ||
| if err := aggs[i].Next(lid); err != nil { | ||
| timerAgg.Stop() | ||
| return total, ids, histogram, aggs, err | ||
| } | ||
| } | ||
| timerAgg.Stop() | ||
| } | ||
| timerAgg.Stop() | ||
| } | ||
|
|
||
| } | ||
|
|
||
| return total, ids, histogram, aggs, nil | ||
| } | ||
|
|
||
| // lidsBuf maintains node.LID in slice as is (append order). | ||
| // Used to drain batches of LIDs when eval tree doesn't support batching. | ||
| type lidsBuf struct { | ||
| lids []node.LID | ||
| } | ||
|
|
||
| func (b lidsBuf) append(x node.LID) lidsBuf { | ||
| return lidsBuf{ | ||
| lids: append(b.lids, x), | ||
| } | ||
| } | ||
|
|
||
| func (b lidsBuf) Len() int { | ||
| return len(b.lids) | ||
| } | ||
|
|
||
| func (b lidsBuf) Lids(_ []node.LID) []node.LID { | ||
| return b.lids | ||
| } | ||
|
|
||
| func tryConvertToBatchedTree(evalTree node.Node) (node.BatchedNode, bool) { | ||
| switch it := evalTree.(type) { | ||
| case *lids.IteratorDesc: | ||
| return it, true | ||
| case *lids.IteratorAsc: | ||
| return it, true | ||
| default: | ||
| return nil, false | ||
| } | ||
| } | ||
|
|
||
| // getLIDsBorders return min and max LID borders (including) for search | ||
| func getLIDsBorders(params SearchParams, idsIndex idsIndex) (uint32, uint32) { | ||
| if idsIndex.Len() == 0 { | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| package node | ||
|
|
||
| import ( | ||
| "slices" | ||
| ) | ||
|
|
||
| // LIDBatch represents a batch of LIDs. lids are stored as uint32 slice and sorted in ascending order regardless of doc order. | ||
| // This allows to avoid copying and use reference to LID blocks data. | ||
| // Such batches are also logically immutable - we can't append or delete from them, only union or intersect. But we can zero out (reset) them. | ||
| type LIDBatch struct { | ||
| lids []uint32 | ||
| desc bool // if doc order is DESC (default order) | ||
| } | ||
|
|
||
| // NewDescBatch creates a batch of lids for DESC docs order | ||
| func NewDescBatch(lids []uint32) LIDBatch { | ||
| return LIDBatch{ | ||
| lids: lids, | ||
| desc: true, | ||
| } | ||
| } | ||
|
|
||
| // NewAscBatch creates a batch of lids for ASC docs order | ||
| func NewAscBatch(lids []uint32) LIDBatch { | ||
| return LIDBatch{ | ||
| lids: lids, | ||
| desc: false, | ||
| } | ||
| } | ||
|
|
||
| func (b LIDBatch) Len() int { | ||
| return len(b.lids) | ||
| } | ||
|
|
||
| func (b LIDBatch) Lids(out []LID) []LID { | ||
| if b.desc { | ||
| for _, lid := range b.lids { | ||
| out = append(out, NewDescLID(lid)) | ||
| } | ||
| } else { | ||
| for _, lid := range slices.Backward(b.lids) { | ||
| out = append(out, NewAscLID(lid)) | ||
| } | ||
| } | ||
|
|
||
| return out | ||
| } | ||
|
|
||
| func (b LIDBatch) Reset() LIDBatch { | ||
| return LIDBatch{ | ||
| lids: b.lids[:0], | ||
| desc: b.desc, | ||
| } | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seem like we already have error check right after m.Stop() call