Skip to content
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
119 commits
Select commit Hold shift + click to select a range
f97ec5f
[dbnode] Reduce index query allocations
robskillington Oct 22, 2020
00462c1
Merge branch 'master' into r/reduce-index-query-allocs
robskillington Oct 22, 2020
17b7398
Use new LRU cache and instrument
robskillington Oct 23, 2020
a6f05a7
Revert go.mod
robskillington Oct 23, 2020
c8683af
Use read only bitmap by default in FST segment
robskillington Oct 25, 2020
a0f1801
Implement read only bitmap for close to zero alloc operations on post…
robskillington Oct 26, 2020
5208869
Add tests
robskillington Oct 27, 2020
a76c6c4
Fix tests
robskillington Oct 28, 2020
9592410
Fix few issues
robskillington Oct 28, 2020
a0ce92f
Fix multi segments builder
robskillington Oct 28, 2020
6786415
Fix fieldsAndTermsIter
robskillington Oct 28, 2020
dd58315
Fix checker
robskillington Oct 28, 2020
cfac389
Merge branch 'master' into r/reduce-index-query-allocs
arnikola Oct 30, 2020
fd8e4e5
Fix multi-intersect searches that start with empty posting lists
robskillington Nov 1, 2020
f2688e0
Refactor read only bitmap range to own datastructure
robskillington Nov 2, 2020
c1fbb80
Add ability for RO postings lists to be enabled by env var
robskillington Nov 3, 2020
e1b5360
Merge branch 'master' into r/reduce-index-query-allocs
robskillington Nov 3, 2020
861fcdf
Fix integration test kickoff
robskillington Nov 3, 2020
6396865
Fix multi bitmap iterator and also implement field filtering for aggr…
robskillington Nov 3, 2020
8d74ee1
Fix field filtering based on restrict by tags query
robskillington Nov 3, 2020
8da22f2
Close iterator just once
robskillington Nov 3, 2020
4ea2ec6
Fix integration test
robskillington Nov 3, 2020
dbf3148
Add pooling of regexp searcher and multi bitmap iterators
robskillington Nov 4, 2020
339b4e7
Cache loaded vellum FST structs and reuse readers
robskillington Nov 4, 2020
91afefa
Add generated map types
robskillington Nov 4, 2020
8d989e2
Fix unlock
robskillington Nov 4, 2020
9160bd4
Merge branch 'master' into r/reduce-index-query-allocs
robskillington Nov 4, 2020
2108f7c
Merge branch 'master' into r/reduce-index-query-allocs
arnikola Nov 5, 2020
7a2683a
Merge branch 'master' into r/reduce-index-query-allocs
arnikola Nov 5, 2020
ee07598
Reuse containers used for mutable postings lists
robskillington Nov 9, 2020
0805169
Merge branch 'r/reduce-index-query-allocs' of github.com:m3db/m3 into…
robskillington Nov 9, 2020
d0cc13f
Apply capacity fix
robskillington Nov 9, 2020
fd2af0c
Add concurrency for queries with lost of segments and pool postings l…
robskillington Nov 10, 2020
099ecd7
Merge branch 'master' into r/reduce-index-query-allocs
robskillington Nov 10, 2020
d82a47d
Use separate pools for querying segments separately
robskillington Nov 10, 2020
e113d5b
Fix build
robskillington Nov 11, 2020
9d61cb2
Wire up new series index backoff to runtime options
robskillington Nov 12, 2020
dd38ead
[dbnode] Refactor wide query path (#2826)
arnikola Nov 11, 2020
a3bd18a
[dbnode] Introduce Aggregator type (#2840)
linasm Nov 11, 2020
3b5c0ff
[coordinator] Set default namespace tag to avoid colliding with commo…
robskillington Nov 12, 2020
24d79f8
Merge branch 'master' into r/reduce-index-query-allocs
robskillington Nov 12, 2020
567dd45
Add read through caching of searches to segments (including foreground)
robskillington Nov 14, 2020
3f46cfa
Merge branch 'r/reduce-index-query-allocs' of github.com:m3db/m3 into…
robskillington Nov 14, 2020
31d5816
Fix typo
robskillington Nov 14, 2020
7da3dd5
Set postings list pooling
robskillington Nov 15, 2020
1084aa8
Propagate postings list cache
robskillington Nov 16, 2020
03203b7
Use independent postings list cache for searches
robskillington Nov 16, 2020
36a1690
Use concurrent cache and striped context finalizers
robskillington Nov 18, 2020
0af1add
Use const for array size
robskillington Nov 18, 2020
a41e893
Remove test unsafe
robskillington Nov 18, 2020
ef5dd15
Rename to slot index
robskillington Nov 18, 2020
7f50c75
Add more realtime stats to number of mutable segments
robskillington Nov 18, 2020
8beb1a1
Reduce contention for add documents and query cache key building
robskillington Nov 19, 2020
6057969
Only acquire lock during stats update/read
robskillington Nov 19, 2020
4af3aba
Fix race with statsNoLock
robskillington Nov 19, 2020
178f02f
Always lock even if using non-concurrent builder
robskillington Nov 19, 2020
ccf9035
Force cold writes enabled
robskillington Dec 8, 2020
0483b50
Force cold writes enabled
robskillington Dec 8, 2020
2bbf6ba
Use fast IntersectsAny code path for aggregate queries
robskillington Dec 8, 2020
09eeb3b
Merge branch 'r/reduce-index-query-allocs' of github.com:m3db/m3 into…
robskillington Dec 8, 2020
4dbe593
Remove field not present for storage options
robskillington Dec 8, 2020
4cb4029
Fix non readonly postings
robskillington Dec 9, 2020
0b4db71
Fix non readonly postings
robskillington Dec 9, 2020
f668a84
Fix intersect array
robskillington Dec 9, 2020
a8eaf9a
Add in-memory block index block to replace per time window block, pha…
robskillington Jan 14, 2021
78eaa8b
Create the in memory block earlier
robskillington Jan 16, 2021
5f93d0c
Avoid indexing already indexed timeseries
robskillington Jan 20, 2021
7ce03c0
Add instrumentation
robskillington Jan 20, 2021
416dbfc
Fix build
robskillington Jan 20, 2021
71d54ca
Always query the in-memory block
robskillington Jan 20, 2021
95ca25f
Only run a compaction on a background segment if active block starts …
robskillington Jan 22, 2021
9fd2d4e
Don't require acquiring other locks from maybeBackgroundCompactWithLock
robskillington Jan 22, 2021
e98f77e
Add parallel background compaction
robskillington Jan 23, 2021
9857326
Add wg.done()
robskillington Jan 23, 2021
23e5ff2
Allocate number of background compactors
robskillington Jan 23, 2021
23f6548
Use up to num cpu / 2 for background compaction threads
robskillington Jan 23, 2021
1fb1850
Use num CPU minus one for background compactors
robskillington Jan 23, 2021
a333625
Cast num CPU minus one to int
robskillington Jan 23, 2021
ecbd20e
Use default compaction level of 0-500k
robskillington Jan 23, 2021
fe9eab3
Sort largest to smallest segments when using multi segments builder
robskillington Jan 23, 2021
d41284d
Use maps solely for determining existence for compaction purposes or not
robskillington Jan 24, 2021
e969766
Use indexedBloomFilterByTimeLock lock
robskillington Jan 24, 2021
c25a7bc
Fix cast to interface
robskillington Jan 24, 2021
976bf85
Allocation indexedSnapshot at creation
robskillington Jan 24, 2021
6284b7b
Fix build
robskillington Jan 24, 2021
0aed2f5
Allocate backgroundCompactIndexedSnapshot when creating mutable segments
robskillington Jan 25, 2021
b961dea
Remove lock acquiring to check indexed snapshot
robskillington Jan 25, 2021
16a1c54
Call NotifySealedBlocks out of lock and only do the full check of whe…
robskillington Jan 25, 2021
193c4f3
Separate GCing background segments from foreground segments
robskillington Jan 25, 2021
cdd8795
Use separate number of compactors per type of background compaction
robskillington Jan 25, 2021
5eb19b6
Alloc GC segments each background GC segment run, also do not mutate …
robskillington Jan 27, 2021
ccb3035
Reduce insert lock contention by spreading over many queues per CPU
robskillington Jan 29, 2021
de3ce9a
4x the queues per CPU core
robskillington Jan 29, 2021
834efa6
Use series lookup as source of truth for if indexed or not and if sho…
robskillington Feb 7, 2021
3699014
Only run background GC when sealed blocks updated
robskillington Feb 8, 2021
4934faf
Remove purge expired series warning
robskillington Feb 8, 2021
55632a9
Use o(1) lookup for negative offsets for postings IDs in multisegment…
robskillington Feb 9, 2021
3b8c1d0
Load term FSTs up front to avoid slow queries during compactions
robskillington Feb 10, 2021
1c86639
Fix CI tests
robskillington Feb 26, 2021
33590cc
Always consider the in-memory block
robskillington Feb 26, 2021
6a0aa10
Filter instead of shrink slice
robskillington Mar 8, 2021
9421bae
Only decrement readerwrite count if marking as success
robskillington Mar 10, 2021
daed491
Always re-resolve entry when checking if should GC series
robskillington Mar 12, 2021
3bc3b81
Merge commit 'e0bc12aeda246f2c39852d44ddb59a11cb3d9d31' into r/reduce…
robskillington Mar 29, 2021
3de1690
Avoid index block ticking while holding index write lock
robskillington Apr 8, 2021
da1aeb4
Rerun cached searches when replacing background segments to avoid nee…
robskillington Apr 6, 2021
0738d68
Add concurrent execution of cached searches
robskillington Apr 6, 2021
1f8a510
Initialize worker pool
robskillington Apr 6, 2021
9cf50ee
Reuse cached postings when populating cached searches
robskillington Apr 10, 2021
e151cbc
Fix search query proto
robskillington Apr 10, 2021
b886052
Fix missing postings for registry value
robskillington Apr 10, 2021
a867722
Add a postings cache TTL of 15min, also sort conjunction query for de…
robskillington Apr 12, 2021
b8f4d82
Fix build
robskillington Apr 12, 2021
34a8594
Restore simple LRU cache for accuracy when compacting segments
robskillington Apr 13, 2021
61e6185
Shard postings list cache
robskillington Apr 13, 2021
5e03570
Fix reconstructed searches not being executed
robskillington Apr 15, 2021
734d67f
Do not pool multi-bitmap iterators
robskillington Apr 16, 2021
43c0026
Add test for populate worker
robskillington Apr 16, 2021
6379d0b
If tag name is a graphite tag then do not allocate each time
robskillington May 12, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 31 additions & 19 deletions src/cmd/services/m3coordinator/ingest/carbon/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,21 @@ func NewIngester(
}
})

scope := opts.InstrumentOptions.MetricsScope()
metrics, err := newCarbonIngesterMetrics(scope)
if err != nil {
return nil, err
}

ingester := &ingester{
downsamplerAndWriter: downsamplerAndWriter,
opts: opts,
logger: opts.InstrumentOptions.Logger(),
tagOpts: tagOpts,
metrics: newCarbonIngesterMetrics(
opts.InstrumentOptions.MetricsScope()),
lineResourcesPool: resourcePool,
metrics: metrics,
lineResourcesPool: resourcePool,
}

// No need to retain watch as NamespaceWatcher.Close() will handle closing any watches
// generated by creating listeners.
clusterNamespacesWatcher.RegisterListener(ingester)
Expand Down Expand Up @@ -170,7 +176,7 @@ func (i *ingester) OnUpdate(clusterNamespaces m3.ClusterNamespaces) {
return
}

compiledRules, err := i.compileRulesWithLock(*rules)
compiledRules, err := compileRules(*rules)
if err != nil {
i.logger.Error("failed to compile latest rules. continuing to use existing carbon ingestion "+
"rules", zap.Error(err))
Expand Down Expand Up @@ -401,10 +407,8 @@ func (i *ingester) writeWithOptions(
return err
}

err = i.downsamplerAndWriter.Write(
ctx, tags, resources.datapoints, xtime.Second, nil, opts,
)

err = i.downsamplerAndWriter.Write(ctx, tags, resources.datapoints,
xtime.Second, nil, opts)
if err != nil {
i.logger.Error("err writing carbon metric",
zap.String("name", string(resources.name)), zap.Error(err))
Expand All @@ -419,18 +423,26 @@ func (i *ingester) Close() {
// We don't maintain any state in-between connections so there is nothing to do here.
}

func newCarbonIngesterMetrics(m tally.Scope) carbonIngesterMetrics {
return carbonIngesterMetrics{
success: m.Counter("success"),
err: m.Counter("error"),
malformed: m.Counter("malformed"),
}
type carbonIngesterMetrics struct {
success tally.Counter
err tally.Counter
malformed tally.Counter
ingestLatency tally.Histogram
writeLatency tally.Histogram
}

type carbonIngesterMetrics struct {
success tally.Counter
err tally.Counter
malformed tally.Counter
func newCarbonIngesterMetrics(scope tally.Scope) (carbonIngesterMetrics, error) {
buckets, err := ingest.NewLatencyBuckets()
if err != nil {
return carbonIngesterMetrics{}, err
}
return carbonIngesterMetrics{
success: scope.Counter("success"),
err: scope.Counter("error"),
malformed: scope.Counter("malformed"),
writeLatency: scope.SubScope("write").Histogram("latency", buckets.WriteLatencyBuckets),
ingestLatency: scope.SubScope("ingest").Histogram("latency", buckets.IngestLatencyBuckets),
}, nil
}

// GenerateTagsFromName accepts a carbon metric name and blows it up into a list of
Expand Down Expand Up @@ -519,7 +531,7 @@ func generateTagsFromName(
// Note that only one rule will be applied per metric and rules are applied
// such that the first one that matches takes precedence. As a result we need
// to make sure to maintain the order of the rules when we generate the compiled ones.
func (i *ingester) compileRulesWithLock(rules CarbonIngesterRules) ([]ruleAndRegex, error) {
func compileRules(rules CarbonIngesterRules) ([]ruleAndRegex, error) {
var compiledRules []ruleAndRegex
for _, rule := range rules.Rules {
compiled, err := regexp.Compile(rule.Pattern)
Expand Down
8 changes: 4 additions & 4 deletions src/cmd/services/m3coordinator/ingest/carbon/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func TestIngesterNoStaticRules(t *testing.T) {
var expectationErr error
mockDownsamplerAndWriter, found := newMockDownsamplerAndWriter(ctrl, func(mappingRules []downsample.AutoMappingRule) {
if len(mappingRules) != 1 {
expectationErr = errors.New(fmt.Sprintf("expected: len(DownsampleMappingRules) == 1, got: %v", len(mappingRules)))
expectationErr = fmt.Errorf("expected: len(DownsampleMappingRules) == 1, got: %v", len(mappingRules))
}
policies := mappingRules[0].Policies

Expand All @@ -350,7 +350,7 @@ func TestIngesterNoStaticRules(t *testing.T) {
}
expectedPolicy := policy.NewStoragePolicy(10*time.Second, xtime.Second, 48*time.Hour)
if ok := expectedPolicy == policies[0]; !ok {
expectationErr = errors.New(fmt.Sprintf("expected storage policy: %+v, got: %+v", expectedPolicy, policies[0]))
expectationErr = fmt.Errorf("expected storage policy: %+v, got: %+v", expectedPolicy, policies[0])
}
})

Expand Down Expand Up @@ -416,11 +416,11 @@ func TestIngesterNoStaticRules(t *testing.T) {
}
expectedPolicy := policy.NewStoragePolicy(10*time.Second, xtime.Second, 48*time.Hour)
if ok := expectedPolicy == policies[0]; !ok {
expectationErr = errors.New(fmt.Sprintf("expected storage policy: %+v, got: %+v", expectedPolicy, policies[0]))
expectationErr = fmt.Errorf("expected storage policy: %+v, got: %+v", expectedPolicy, policies[0])
}
expectedPolicy = policy.NewStoragePolicy(1*time.Minute, xtime.Second, 168*time.Hour)
if ok := expectedPolicy == policies[1]; !ok {
expectationErr = errors.New(fmt.Sprintf("expected storage policy: %+v, got: %+v", expectedPolicy, policies[1]))
expectationErr = fmt.Errorf("expected storage policy: %+v, got: %+v", expectedPolicy, policies[1])
}
})

Expand Down
90 changes: 90 additions & 0 deletions src/cmd/services/m3coordinator/ingest/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package ingest

import (
"time"

"github.com/uber-go/tally"
)

// LatencyBuckets are a set of latency buckets useful for measuring latencies.
type LatencyBuckets struct {
WriteLatencyBuckets tally.DurationBuckets
IngestLatencyBuckets tally.DurationBuckets
}

// NewLatencyBuckets returns write and ingest latency buckets useful for
// measuring ingest latency (i.e. time from datapoint/sample created to time
// ingested) and write latency (i.e. time from received a sample from remote
// source to completion of that write locally).
func NewLatencyBuckets() (LatencyBuckets, error) {
upTo1sBuckets, err := tally.LinearDurationBuckets(0, 100*time.Millisecond, 10)
if err != nil {
return LatencyBuckets{}, err
}

upTo10sBuckets, err := tally.LinearDurationBuckets(time.Second, 500*time.Millisecond, 18)
if err != nil {
return LatencyBuckets{}, err
}

upTo60sBuckets, err := tally.LinearDurationBuckets(10*time.Second, 5*time.Second, 11)
if err != nil {
return LatencyBuckets{}, err
}

upTo60mBuckets, err := tally.LinearDurationBuckets(0, 5*time.Minute, 12)
if err != nil {
return LatencyBuckets{}, err
}
upTo60mBuckets = upTo60mBuckets[1:] // Remove the first 0s to get 5 min aligned buckets
Comment thread
robskillington marked this conversation as resolved.

upTo6hBuckets, err := tally.LinearDurationBuckets(time.Hour, 30*time.Minute, 12)
if err != nil {
return LatencyBuckets{}, err
}

upTo24hBuckets, err := tally.LinearDurationBuckets(6*time.Hour, time.Hour, 19)
if err != nil {
return LatencyBuckets{}, err
}
upTo24hBuckets = upTo24hBuckets[1:] // Remove the first 6h to get 1 hour aligned buckets
Comment thread
robskillington marked this conversation as resolved.

var writeLatencyBuckets tally.DurationBuckets
writeLatencyBuckets = append(writeLatencyBuckets, upTo1sBuckets...)
writeLatencyBuckets = append(writeLatencyBuckets, upTo10sBuckets...)
writeLatencyBuckets = append(writeLatencyBuckets, upTo60sBuckets...)
writeLatencyBuckets = append(writeLatencyBuckets, upTo60mBuckets...)

var ingestLatencyBuckets tally.DurationBuckets
ingestLatencyBuckets = append(ingestLatencyBuckets, upTo1sBuckets...)
ingestLatencyBuckets = append(ingestLatencyBuckets, upTo10sBuckets...)
ingestLatencyBuckets = append(ingestLatencyBuckets, upTo60sBuckets...)
ingestLatencyBuckets = append(ingestLatencyBuckets, upTo60mBuckets...)
ingestLatencyBuckets = append(ingestLatencyBuckets, upTo6hBuckets...)
ingestLatencyBuckets = append(ingestLatencyBuckets, upTo24hBuckets...)

return LatencyBuckets{
WriteLatencyBuckets: writeLatencyBuckets,
IngestLatencyBuckets: ingestLatencyBuckets,
}, nil
}
49 changes: 49 additions & 0 deletions src/cmd/services/m3coordinator/ingest/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package ingest

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
)

func TestLatencyBuckets(t *testing.T) {
buckets, err := NewLatencyBuckets()
require.NoError(t, err)

// NB(r): Bucket length is tested just to sanity check how many buckets we are creating
require.Equal(t, 50, len(buckets.WriteLatencyBuckets.AsDurations()))

// NB(r): Bucket values are tested to sanity check they look right
expected := "[0s 100ms 200ms 300ms 400ms 500ms 600ms 700ms 800ms 900ms 1s 1.5s 2s 2.5s 3s 3.5s 4s 4.5s 5s 5.5s 6s 6.5s 7s 7.5s 8s 8.5s 9s 9.5s 10s 15s 20s 25s 30s 35s 40s 45s 50s 55s 1m0s 5m0s 10m0s 15m0s 20m0s 25m0s 30m0s 35m0s 40m0s 45m0s 50m0s 55m0s]"
actual := fmt.Sprintf("%v", buckets.WriteLatencyBuckets.AsDurations())
require.Equal(t, expected, actual)

// NB(r): Bucket length is tested just to sanity check how many buckets we are creating
require.Equal(t, 80, len(buckets.IngestLatencyBuckets.AsDurations()))

// NB(r): Bucket values are tested to sanity check they look right
expected = "[0s 100ms 200ms 300ms 400ms 500ms 600ms 700ms 800ms 900ms 1s 1.5s 2s 2.5s 3s 3.5s 4s 4.5s 5s 5.5s 6s 6.5s 7s 7.5s 8s 8.5s 9s 9.5s 10s 15s 20s 25s 30s 35s 40s 45s 50s 55s 1m0s 5m0s 10m0s 15m0s 20m0s 25m0s 30m0s 35m0s 40m0s 45m0s 50m0s 55m0s 1h0m0s 1h30m0s 2h0m0s 2h30m0s 3h0m0s 3h30m0s 4h0m0s 4h30m0s 5h0m0s 5h30m0s 6h0m0s 6h30m0s 7h0m0s 8h0m0s 9h0m0s 10h0m0s 11h0m0s 12h0m0s 13h0m0s 14h0m0s 15h0m0s 16h0m0s 17h0m0s 18h0m0s 19h0m0s 20h0m0s 21h0m0s 22h0m0s 23h0m0s 24h0m0s]"
Comment thread
robskillington marked this conversation as resolved.
actual = fmt.Sprintf("%v", buckets.IngestLatencyBuckets.AsDurations())
require.Equal(t, expected, actual)
}
18 changes: 14 additions & 4 deletions src/dbnode/integration/index_single_node_high_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -377,7 +378,7 @@ func testIndexSingleNodeHighConcurrency(
// Now check all of them are individually indexed.
var (
fetchWg sync.WaitGroup
notIndexedErrs []error
notIndexedErrs []string
notIndexedLock sync.Mutex
)
for i := 0; i < opts.concurrencyEnqueueWorker; i++ {
Expand All @@ -401,10 +402,18 @@ func testIndexSingleNodeHighConcurrency(
found := isIndexed(t, session, md.ID(), id, tags)
return found
}, 30*time.Second)

// Final check to get the corrersponding error/mismatch.
indexed, err := isIndexedChecked(t, session, md.ID(), id, tags)
if !indexed {
err := fmt.Errorf("not indexed series: i=%d, j=%d", i, j)
if err != nil {
err = fmt.Errorf("not indexed: i=%d, j=%d, err=%v", i, j, err)
} else {
err = fmt.Errorf("not indexed: i=%d, j=%d, err=none", i, j)
}

notIndexedLock.Lock()
notIndexedErrs = append(notIndexedErrs, err)
notIndexedErrs = append(notIndexedErrs, err.Error())
notIndexedLock.Unlock()
}
})
Expand All @@ -414,7 +423,8 @@ func testIndexSingleNodeHighConcurrency(
fetchWg.Wait()

require.Equal(t, 0, len(notIndexedErrs),
fmt.Sprintf("not indexed errors: %v", notIndexedErrs[:min(5, len(notIndexedErrs))]))
fmt.Sprintf("not indexed errors: [%v]",
strings.Join(notIndexedErrs[:min(5, len(notIndexedErrs))], ", ")))
}

log.Info("data indexing verify done", zap.Duration("took", time.Since(start)))
Expand Down
1 change: 0 additions & 1 deletion src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1585,7 +1585,6 @@ func withEncodingAndPoolingOptions(
SetInstrumentOptions(iopts)).
SetFSTSegmentOptions(
opts.IndexOptions().FSTSegmentOptions().
SetPostingsListPool(postingsList).
SetInstrumentOptions(iopts).
SetContextPool(opts.ContextPool())).
SetSegmentBuilderOptions(
Expand Down
Loading