diff --git a/rules/engine.go b/rules/engine.go index 04e05ac..ac75075 100644 --- a/rules/engine.go +++ b/rules/engine.go @@ -270,8 +270,8 @@ func (e *baseEngine) addRule(rule DynamicRule, lockPattern string, callback interface{}, options ...RuleOption) { - ruleIndex := e.ruleMgr.addRule(rule) opts := makeRuleOptions(options...) + ruleIndex := e.ruleMgr.addRule(rule, opts) ttl := e.options.lockTimeout if opts.lockTimeout > 0 { ttl = opts.lockTimeout @@ -293,11 +293,7 @@ func (e *baseEngine) addRule(rule DynamicRule, func (e *v3Engine) Run() { e.logger.Info("Rules engine options", zap.Object("options", &e.options), zap.Int("rules", len(e.ruleMgr.rules))) - prefixSlice := []string{} - prefixes := e.ruleMgr.prefixes - // This is a map; used to ensure there are no duplicates - for prefix := range prefixes { - prefixSlice = append(prefixSlice, prefix) + for _, prefix := range e.ruleMgr.getWatcherPrefixes() { logger := e.logger.With(zap.String("prefix", prefix)) w, err := newV3Watcher(e.cl, prefix, logger, e.baseEngine.keyProc, e.options.watchTimeout, e.kvWrapper, e.metrics, e.watcherWrapper, e.options.watchDelay) if err != nil { @@ -314,7 +310,7 @@ func (e *v3Engine) Run() { logger, e.options.crawlMutex, e.options.lockAcquisitionTimeout, - prefixSlice, + e.ruleMgr.getPrioritizedPrefixes(), e.kvWrapper, e.options.syncDelay, e.locker, diff --git a/rules/int_crawler.go b/rules/int_crawler.go index 08cc082..a926c56 100644 --- a/rules/int_crawler.go +++ b/rules/int_crawler.go @@ -162,6 +162,7 @@ func (ic *intCrawler) singleRun(logger *zap.Logger) { ic.rulesProcessedCount = make(map[string]int) queryStart := time.Now() + prioritizedKeys := []string{} for _, prefix := range ic.prefixes { pCtx := SetMethod(ctx, crawlerMethodName+"-"+prefix) resp, err := ic.kv.Get(pCtx, prefix, v3.WithPrefix()) @@ -171,12 +172,15 @@ func (ic *intCrawler) singleRun(logger *zap.Logger) { } for _, kv := range resp.Kvs { values[string(kv.Key)] = string(kv.Value) + // Using a map unsorts the prefixes, keep + // the priority by using a slice + prioritizedKeys = append(prioritizedKeys, string(kv.Key)) } } metrics.CrawlerQueryTime(ic.name, queryStart) metrics.CrawlerValuesCount(ic.name, len(values)) evalStart := time.Now() - ic.processData(values, logger) + ic.processData(values, prioritizedKeys, logger) metrics.CrawlerEvalTime(ic.name, evalStart) ic.metricMutex.Lock() @@ -187,9 +191,10 @@ func (ic *intCrawler) singleRun(logger *zap.Logger) { } logger.Info("Crawler run complete", zap.Duration("time", time.Since(crawlerStart)), zap.Int("values", len(values))) } -func (ic *intCrawler) processData(values map[string]string, logger *zap.Logger) { + +func (ic *intCrawler) processData(values map[string]string, prioritizedKeys []string, logger *zap.Logger) { api := &cacheReadAPI{values: values} - for k := range values { + for _, k := range prioritizedKeys { v := values[k] if ic.isStopping() { return diff --git a/rules/key_processor.go b/rules/key_processor.go index 56d3858..c3291ef 100644 --- a/rules/key_processor.go +++ b/rules/key_processor.go @@ -154,7 +154,7 @@ type keyTask struct { func (bkp *baseKeyProcessor) processKey(key string, value *string, rapi readAPI, logger *zap.Logger, dispatcher workDispatcher, metadata map[string]string, timesEvaluated func(rulesID string)) { logger.Debug("Processing key", zap.String("key", key)) - rules := bkp.rm.getStaticRules(key, value) + rules, prioritized := bkp.rm.getStaticRules(key, value) valueString := "" if value != nil { valueString = *value @@ -168,7 +168,8 @@ func (bkp *baseKeyProcessor) processKey(key string, value *string, rapi readAPI, logger.Error("Error getting keys to evaluate rules", zap.Error(err), zap.Int("rules", len(rules)), zap.Int("keys", len(keys))) return } - for rule, index := range rules { + for _, rule := range prioritized { + index := rules[rule] ruleID := bkp.ruleIDs[index] if timesEvaluated != nil { timesEvaluated(ruleID) @@ -189,7 +190,7 @@ func (bkp *baseKeyProcessor) processKey(key string, value *string, rapi readAPI, } func (bkp *baseKeyProcessor) isWork(key string, value *string, api readAPI) bool { - rules := bkp.rm.getStaticRules(key, value) + rules, _ := bkp.rm.getStaticRules(key, value) for rule := range rules { satisfied, _ := rule.satisfied(api) // #nosec G104 -- Map lookup if satisfied { diff --git a/rules/key_processor_test.go b/rules/key_processor_test.go index 974a5fb..76a1324 100644 --- a/rules/key_processor_test.go +++ b/rules/key_processor_test.go @@ -52,7 +52,8 @@ func TestV3KeyProcessor(t *testing.T) { rule, err := NewEqualsLiteralRule("/test/:key", &value) assert.NoError(t, err) rm := newRuleManager(map[string]constraint{}, false) - rm.addRule(rule) + opts := makeRuleOptions() + rm.addRule(rule, opts) api := newMapReadAPI() api.put("/test/key", value) callbacks := map[int]V3RuleTaskCallback{0: v3DummyCallback} @@ -84,7 +85,8 @@ func TestNewV3KeyProcessor(t *testing.T) { rule, err := NewEqualsLiteralRule("/test/:key", &value) assert.NoError(t, err) rm := newRuleManager(map[string]constraint{}, false) - rm.addRule(rule) + opts := makeRuleOptions() + rm.addRule(rule, opts) api := newMapReadAPI() api.put("/test/key", value) diff --git a/rules/options.go b/rules/options.go index 3977033..c6604b2 100644 --- a/rules/options.go +++ b/rules/options.go @@ -339,12 +339,16 @@ type ruleOptions struct { lockTimeout int contextProvider ContextProvider ruleID string + crawlerOnly bool + priority uint } func makeRuleOptions(options ...RuleOption) ruleOptions { opts := ruleOptions{ lockTimeout: 0, ruleID: defaultRuleID, + crawlerOnly: false, + priority: 0, } for _, opt := range options { opt.apply(&opts) @@ -385,3 +389,24 @@ func RuleID(ruleID string) RuleOption { o.ruleID = ruleID }) } + +// CrawlerOnly makes it so the rule is only +// evaluated by the crawler and is not assigned a watcher +func CrawlerOnly() RuleOption { + return ruleOptionFunction((func(o *ruleOptions) { + o.crawlerOnly = true + })) +} + +// Priority sets the priority for fields +// that are associated with a rule during +// a crawler run. The higher the number, the +// higher the priority. The default is 0, or +// lowest priority. +// Watcher processing will +// still be done unless CrawlerOnly() is used. +func Priority(priority uint) RuleOption { + return ruleOptionFunction((func(o *ruleOptions) { + o.priority = priority + })) +} diff --git a/rules/rule_manager.go b/rules/rule_manager.go index 7855002..e1e1d2d 100644 --- a/rules/rule_manager.go +++ b/rules/rule_manager.go @@ -1,6 +1,7 @@ package rules import ( + "sort" "strings" ) @@ -8,26 +9,32 @@ type ruleManager struct { constraints map[string]constraint currentIndex int rulesBySlashCount map[int]map[DynamicRule]int - prefixes map[string]string - rules []DynamicRule + prefixes map[string]ruleMgrRuleOptions + rules map[DynamicRule]uint enhancedRuleFilter bool } +type ruleMgrRuleOptions struct { + crawlerOnly bool + priority uint +} + func newRuleManager(constraints map[string]constraint, enhancedRuleFilter bool) ruleManager { rm := ruleManager{ rulesBySlashCount: map[int]map[DynamicRule]int{}, - prefixes: map[string]string{}, + prefixes: map[string]ruleMgrRuleOptions{}, constraints: constraints, currentIndex: 0, - rules: []DynamicRule{}, + rules: map[DynamicRule]uint{}, enhancedRuleFilter: enhancedRuleFilter, } return rm } -func (rm *ruleManager) getStaticRules(key string, value *string) map[staticRule]int { +func (rm *ruleManager) getStaticRules(key string, value *string) (map[staticRule]int, []staticRule) { slashCount := strings.Count(key, "/") out := make(map[staticRule]int) + toSort := make(map[staticRule]uint) rules, ok := rm.rulesBySlashCount[slashCount] if ok { for rule, index := range rules { @@ -37,20 +44,22 @@ func (rm *ruleManager) getStaticRules(key string, value *string) map[staticRule] qSat := sRule.qSatisfiable(key, value) if qSat == qTrue || qSat == qMaybe { out[sRule] = index + toSort[sRule] = rm.rules[rule] } } else { if sRule.satisfiable(key, value) { out[sRule] = index + toSort[sRule] = rm.rules[rule] } } } } } - return out + return out, sortRulesByPriority(toSort) } -func (rm *ruleManager) addRule(rule DynamicRule) int { - rm.rules = append(rm.rules, rule) +func (rm *ruleManager) addRule(rule DynamicRule, opts ruleOptions) int { + rm.rules[rule] = opts.priority for _, pattern := range rule.getPatterns() { slashCount := strings.Count(pattern, "/") rules, ok := rm.rulesBySlashCount[slashCount] @@ -61,7 +70,22 @@ func (rm *ruleManager) addRule(rule DynamicRule) int { rules[rule] = rm.currentIndex } for _, prefix := range rule.getPrefixesWithConstraints(rm.constraints) { - rm.prefixes[prefix] = "" + + _, currentPriority := rm.prefixes[prefix] + // if value does not exist in map yet + if !currentPriority { + rm.prefixes[prefix] = ruleMgrRuleOptions{crawlerOnly: opts.crawlerOnly, priority: opts.priority} + } else { + // ensure that no high priority is overwritten + if rm.prefixes[prefix].priority < opts.priority { + rm.prefixes[prefix] = ruleMgrRuleOptions{crawlerOnly: rm.prefixes[prefix].crawlerOnly, priority: opts.priority} + } + // only update crawlerOnly value if new option is false + if !opts.crawlerOnly { + rm.prefixes[prefix] = ruleMgrRuleOptions{crawlerOnly: false, priority: rm.prefixes[prefix].priority} + } + } + } rm.prefixes = reducePrefixes(rm.prefixes) lastIndex := rm.currentIndex @@ -69,27 +93,77 @@ func (rm *ruleManager) addRule(rule DynamicRule) int { return lastIndex } +func (rm *ruleManager) getPrioritizedPrefixes() []string { + out := []string{} + for prefix := range rm.prefixes { + out = append(out, prefix) + } + // sort slice by highest priority value + sort.SliceStable(out, func(i, j int) bool { + return rm.prefixes[out[i]].priority > rm.prefixes[out[j]].priority + }) + return out +} + +func sortRulesByPriority(rules map[staticRule]uint) []staticRule { + out := []staticRule{} + for rule := range rules { + out = append(out, rule) + } + // sort slice by highest priority value + sort.SliceStable(out, func(i, j int) bool { + return rules[out[i]] > rules[out[j]] + }) + return out +} + +func (rm *ruleManager) getWatcherPrefixes() []string { + out := []string{} + for prefix, ruleOpt := range rm.prefixes { + if !ruleOpt.crawlerOnly { + out = append(out, prefix) + } + } + return out +} + // Removes any path prefixes that have other path prefixes as // string prefixes -func reducePrefixes(prefixes map[string]string) map[string]string { - out := map[string]string{} +func reducePrefixes(prefixes map[string]ruleMgrRuleOptions) map[string]ruleMgrRuleOptions { + out := map[string]ruleMgrRuleOptions{} sorted := sortPrefixesByLength(prefixes) for _, prefix := range sorted { add := true - for addedPrefix := range out { + optionsToAdd := prefixes[prefix] + for addedPrefix, addedOptions := range out { if strings.HasPrefix(prefix, addedPrefix) { add = false + optsToUpdate := out[addedPrefix] + // update the addedPrefix to be the + // highest priority of any + // overlapping prefixes + if addedOptions.priority < optionsToAdd.priority { + optsToUpdate.priority = optionsToAdd.priority + out[addedPrefix] = optsToUpdate + } + // if any rule associated with the prefix + // is not crawler only, set crawlerOnly option + // to be false + if !optionsToAdd.crawlerOnly { + optsToUpdate.crawlerOnly = false + out[addedPrefix] = optsToUpdate + } } } if add { - out[prefix] = "" + out[prefix] = optionsToAdd } } return out } // Sorts prefixes shortest to longest -func sortPrefixesByLength(prefixes map[string]string) []string { +func sortPrefixesByLength(prefixes map[string]ruleMgrRuleOptions) []string { out := []string{} for prefix := range prefixes { out = append(out, prefix) diff --git a/rules/rule_manager_test.go b/rules/rule_manager_test.go index c1526d0..aeb2873 100644 --- a/rules/rule_manager_test.go +++ b/rules/rule_manager_test.go @@ -6,39 +6,41 @@ import ( "github.com/stretchr/testify/assert" ) +var defaultOpts = ruleMgrRuleOptions{priority: 0, crawlerOnly: false} + func TestRuleManager(t *testing.T) { for _, erf := range []bool{true, false} { rm := newRuleManager(map[string]constraint{}, erf) rule1, err1 := NewEqualsLiteralRule("/this/is/:a/rule", nil) assert.NoError(t, err1) - rm.addRule(rule1) + opts := makeRuleOptions() + rm.addRule(rule1, opts) rule2, err2 := NewEqualsLiteralRule("/that/is/:a/nother", nil) assert.NoError(t, err2) - rm.addRule(rule2) + rm.addRule(rule2, opts) rule3, err3 := NewEqualsLiteralRule("/this/is/:a", nil) assert.NoError(t, err3) - rm.addRule(rule3) - rules := rm.getStaticRules("/this/is/a/rule", nil) + rm.addRule(rule3, opts) + rules, _ := rm.getStaticRules("/this/is/a/rule", nil) assert.Equal(t, 1, len(rules)) for r, index := range rules { assert.Equal(t, 0, index) assert.True(t, r.satisfiable("/this/is/a/rule", nil)) } - rules = rm.getStaticRules("/nothing", nil) + rules, _ = rm.getStaticRules("/nothing", nil) assert.Equal(t, 0, len(rules)) } } func TestReducePrefixes(t *testing.T) { - prefixes := map[string]string{"/servers/internal/states": "", "/servers/internal": "", "/servers": ""} + prefixes := map[string]ruleMgrRuleOptions{"/servers/internal/states": defaultOpts, "/servers/internal": {priority: 10, crawlerOnly: true}, "/servers": {priority: 0, crawlerOnly: true}} prefixes = reducePrefixes(prefixes) assert.Equal(t, 1, len(prefixes)) - assert.Equal(t, "", prefixes["/servers"]) - + assert.Equal(t, ruleMgrRuleOptions{priority: 10, crawlerOnly: false}, prefixes["/servers"]) } func TestSortPrefixesByLength(t *testing.T) { - prefixes := map[string]string{"/servers/internal": "", "/servers/internal/states": "", "/servers": ""} + prefixes := map[string]ruleMgrRuleOptions{"/servers/internal": defaultOpts, "/servers/internal/states": defaultOpts, "/servers": defaultOpts} sorted := sortPrefixesByLength(prefixes) assert.Equal(t, "/servers/internal/states", sorted[2]) assert.Equal(t, "/servers/internal", sorted[1]) @@ -70,3 +72,87 @@ func TestCombineRuleData(t *testing.T) { compareUnorderedStringArrays(t, testCase.expectedData, combineRuleData(rules, source), "index %d", idx) } } + +func TestGetPrioritizedPrefixes(t *testing.T) { + rm := newRuleManager(map[string]constraint{}, false) + + // Final priority - 300 + // shorter prefix rule, with lower priority than later overlapping rule + rule1, err1 := NewEqualsLiteralRule("/this/is/:a/rule", nil) + assert.NoError(t, err1) + rm.addRule(rule1, makeRuleOptions(Priority(100))) + + // overlapping, longer prefix with a higher priority + rule2, err2 := NewEqualsLiteralRule("/this/is/overlapping/:a", nil) + assert.NoError(t, err2) + rm.addRule(rule2, makeRuleOptions(Priority(300))) + + // Final priority - 200 + rule3, err3 := NewEqualsLiteralRule("/these/are/:a/ruleset", nil) + assert.NoError(t, err3) + rm.addRule(rule3, makeRuleOptions(Priority(200))) + + // same prefix as earlier, largest one should be considered + rule4, err4 := NewEqualsLiteralRule("/these/are/:a", nil) + assert.NoError(t, err4) + rm.addRule(rule4, makeRuleOptions(Priority(50))) + + // Final priority - 0 + // no priority, should be last + rule5, err5 := NewEqualsLiteralRule("/that/is/:a/nother", nil) + assert.NoError(t, err5) + rm.addRule(rule5, makeRuleOptions()) + + // Final priority - 100 + // third tier priority rule + rule6, err6 := NewEqualsLiteralRule("/this/one/is/:a", nil) + assert.NoError(t, err6) + rm.addRule(rule6, makeRuleOptions(Priority(100))) + + assert.Equal(t, []string{"/this/is/", "/these/are/", "/this/one/is/", "/that/is/"}, rm.getPrioritizedPrefixes()) +} + +func TestAddRuleCrawlerOnly(t *testing.T) { + rm := newRuleManager(map[string]constraint{}, false) + + rule1, err1 := NewEqualsLiteralRule("/this/is/:a/rule", nil) + assert.NoError(t, err1) + rm.addRule(rule1, makeRuleOptions(Priority(100), CrawlerOnly())) + + // overlapping, longer prefix with a higher priority + rule2, err2 := NewEqualsLiteralRule("/this/is/overlapping/:a", nil) + assert.NoError(t, err2) + rm.addRule(rule2, makeRuleOptions(Priority(300), CrawlerOnly())) + + assert.True(t, assert.ObjectsAreEqual(map[string]ruleMgrRuleOptions{"/this/is/": {priority: 300, crawlerOnly: true}}, rm.prefixes)) +} + +func TestGetStaticRules(t *testing.T) { + rm := newRuleManager(map[string]constraint{}, false) + + rule1, err1 := NewEqualsLiteralRule("/this/is/:a/rule", nil) + assert.NoError(t, err1) + rm.addRule(rule1, makeRuleOptions(Priority(100))) + + rule2, err2 := NewEqualsLiteralRule("/this/is/overlapping/:a", nil) + assert.NoError(t, err2) + rm.addRule(rule2, makeRuleOptions(Priority(300))) + + rule3, err3 := NewEqualsLiteralRule("/these/are/:a/ruleset", nil) + assert.NoError(t, err3) + rm.addRule(rule3, makeRuleOptions(Priority(50))) + + rule4, err4 := NewEqualsLiteralRule("/these/are/:a/ruleset", nil) + assert.NoError(t, err4) + rm.addRule(rule4, makeRuleOptions(Priority(200))) + + rule5, err5 := NewEqualsLiteralRule("/that/is/:a/nother", nil) + assert.NoError(t, err5) + rm.addRule(rule5, makeRuleOptions()) + + // The 200 priority rule, 3 rule added, should be first in + // the priority slice + index, priority := rm.getStaticRules("/these/are/a/ruleset", nil) + assert.Equal(t, 3, index[priority[0]]) + assert.Equal(t, 2, index[priority[1]]) +} diff --git a/rules/worker.go b/rules/worker.go index 653b2ac..71869cc 100644 --- a/rules/worker.go +++ b/rules/worker.go @@ -75,6 +75,8 @@ func (bw *baseWorker) doWork(loggerPtr **zap.Logger, metricsInfo metricsInfo, lockKey string, ruleID string, source string) { logger := *loggerPtr logger = logger.With(zap.String("ruleID", ruleID), zap.String("mutex", lockKey)) + // TODO remove, used for debugging to see order rules are processed + logger.Info("Processing rule") rule := *rulePtr capi, err1 := bw.api.getCachedAPI(rule.getKeys()) if err1 != nil { diff --git a/v3enginetest/main.go b/v3enginetest/main.go index 90dbeaa..65b57e9 100644 --- a/v3enginetest/main.go +++ b/v3enginetest/main.go @@ -20,11 +20,17 @@ var ( ) const ( - dataPath = "/rulesEngine/data/:id" - blockPath = "/rulesEngine/block/:id" - donePath = "/rulesEngine/done/:id" - doneRuleID = "done" - doneID = "4567" + dataPath = "/rulesEngine/data/:id" + blockPath = "/rulesEngine/block/:id" + donePath = "/rulesEngine/done/:id" + doneCrawlerPathLow = "/rulesEngineCrawlerLow/done/:id" + doneCrawlerPathHigh = "/rulesEngineCrawlerHigh/done/:id" + doneRuleID = "done" + doneRuleIDCrawlerLow = "doneCrawlerLow" + doneRuleIDCrawlerHigh = "doneCrawlerHigh" + doneID = "4567" + doneIDCrawlerLow = "8910" + doneIDCrawlerHigh = "1112" ) type polled struct { @@ -97,7 +103,7 @@ func main() { engine := rules.NewV3Engine(cfg, logger, rules.EngineContextProvider(cpFunc), rules.EngineMetricsCollector(mFunc), - rules.EngineSyncInterval(5), + rules.EngineSyncInterval(10), rules.EngineCrawlMutex("inttest", 5), rules.EngineLockAcquisitionTimeout(5)) mw := &rules.MockWatcherWrapper{ @@ -178,6 +184,36 @@ func main() { check(err) }, rules.RuleID(doneRuleID)) + doneCrawlerRuleLow, err := rules.NewEqualsLiteralRule(doneCrawlerPathLow, &doneFalse) + check(err) + // create a no priority crawler only rule + highPriorityCalled := false + engine.AddRule(doneCrawlerRuleLow, "/rulesEngineCrawlerDoneLow/:id", func(task *rules.V3RuleTask) { + path := task.Attr.Format(doneCrawlerPathLow) + if task.Metadata["source"] != "crawler" { + panic("Crawler only rule not processed by the crawler") + } else if !highPriorityCalled { + panic("High priority crawler rule not called yet") + } + doneTrue := "true" + _, err := kv.Put(task.Context, path, doneTrue) + check(err) + }, rules.RuleID(doneRuleIDCrawlerLow), rules.CrawlerOnly()) + + doneCrawlerRuleHigh, err := rules.NewEqualsLiteralRule(doneCrawlerPathHigh, &doneFalse) + check(err) + // create a high priority crawler only rule + engine.AddRule(doneCrawlerRuleHigh, "/rulesEngineCrawlerDoneHigh/:id", func(task *rules.V3RuleTask) { + highPriorityCalled = true + path := task.Attr.Format(doneCrawlerPathHigh) + if task.Metadata["source"] != "crawler" { + panic("Crawler only rule not processed by the crawler") + } + doneTrue := "true" + _, err := kv.Put(task.Context, path, doneTrue) + check(err) + }, rules.RuleID(doneRuleIDCrawlerHigh), rules.CrawlerOnly(), rules.Priority(100)) + engine.Run() time.Sleep(time.Second) // Write data to be polled to etcd; this will trigger the callback. @@ -200,10 +236,29 @@ func main() { _, err = kv.Put(context.Background(), strings.Replace(donePath, ":id", doneID, 1), doneFalse) check(err) + // Trigger the done crawler low-priority rule + _, err = kv.Put(context.Background(), strings.Replace(doneCrawlerPathLow, ":id", doneIDCrawlerLow, 1), doneFalse) + check(err) + + // Trigger the done crawler high-priority rule + _, err = kv.Put(context.Background(), strings.Replace(doneCrawlerPathHigh, ":id", doneIDCrawlerHigh, 1), doneFalse) + check(err) + // Verify that it ran - tenSecCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - err = cbHandler.WaitForCallback(tenSecCtx, doneRuleID, map[string]string{"id": doneID}) + tenSecCtx1, cancel1 := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel1() + err = cbHandler.WaitForCallback(tenSecCtx1, doneRuleID, map[string]string{"id": doneID}) + check(err) + + // Verify the crawler rules ran + tenSecCtx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel2() + err = cbHandler.WaitForCallback(tenSecCtx2, doneRuleIDCrawlerHigh, map[string]string{"id": doneIDCrawlerHigh}) check(err) + tenSecCtx3, cancel3 := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel3() + err = cbHandler.WaitForCallback(tenSecCtx3, doneRuleIDCrawlerLow, map[string]string{"id": doneIDCrawlerLow}) + check(err) + _ = engine.Shutdown(ctx) // #nosec G104 -- For testing only }