From b3e64405d32f14d0f4088b94823a17dd69dbb37e Mon Sep 17 00:00:00 2001 From: Johan Lindh Date: Thu, 30 Apr 2026 14:04:44 +0200 Subject: [PATCH] feat(runner): use atomic reload snapshots --- pkg/runner/atomic_reload_test.go | 406 +++++++++++++++++++++++++++++ pkg/runner/cryptopan_extra_test.go | 122 +++++++++ pkg/runner/runner.go | 326 ++++++++++++----------- pkg/runner/runner_test.go | 100 +++---- pkg/runner/test_helpers_test.go | 60 +++++ 5 files changed, 814 insertions(+), 200 deletions(-) create mode 100644 pkg/runner/atomic_reload_test.go create mode 100644 pkg/runner/cryptopan_extra_test.go create mode 100644 pkg/runner/test_helpers_test.go diff --git a/pkg/runner/atomic_reload_test.go b/pkg/runner/atomic_reload_test.go new file mode 100644 index 0000000..87dd6d3 --- /dev/null +++ b/pkg/runner/atomic_reload_test.go @@ -0,0 +1,406 @@ +package runner + +import ( + "io" + "log/slog" + "net/netip" + "sync" + "sync/atomic" + "testing" + + dnstap "github.com/dnstap/golang-dnstap" + "github.com/miekg/dns" +) + +// The tests in this file exercise the lock-free reload paths introduced in +// ignored-IP and ignored-question lookups read +// atomic.Pointer snapshots on the hot path with no mutex, and reload +// writers atomic.Store fresh values. They are designed to fail under +// `go test -race` if a future change accidentally reintroduces unsynchronised +// access - for example, by replacing the atomic.Pointer with a bare +// pointer field. +// +// They do *not* try to assert what value a reader sees mid-reload (that +// is intentionally racy at the value level, just not at the memory-model +// level); they only assert that the readers and the writer can run +// concurrently without panicking and without the race detector flagging +// the access. + +// TestConcurrentIgnoredClientIPsReload reloads the ignored client IP set +// while a fleet of readers calls clientIPIsIgnored. Each reader uses a +// mix of IPv4 and IPv6 addresses, including some that may or may not be +// in the set depending on which reload was most recent. +// +// Run under -race to catch any unsynchronised access to the IPSet pointer +// or the CIDR count. +func TestConcurrentIgnoredClientIPsReload(t *testing.T) { + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + edm, err := newDnstapMinimiser(logger, defaultTC) + if err != nil { + t.Fatalf("newDnstapMinimiser: %s", err) + } + t.Cleanup(func() { cleanupTestMinimiser(edm) }) + + // Prime the set so readers don't all hit the early-return nil path. + edm.conf.IgnoredClientIPsFile = "testdata/ignored-client-ips.valid1" + if err := edm.setIgnoredClientIPs(); err != nil { + t.Fatalf("initial setIgnoredClientIPs: %s", err) + } + + // We alternate between two valid files plus the empty file (which + // stores nil), so readers exercise both the populated- and nil- + // snapshot paths. + files := []string{ + "testdata/ignored-client-ips.valid1", + "testdata/ignored-client-ips.valid2", + "testdata/ignored-client-ips.empty", + } + + addrs := []netip.Addr{ + netip.MustParseAddr("127.0.0.1"), + netip.MustParseAddr("127.0.0.3"), + netip.MustParseAddr("10.10.8.5"), + netip.MustParseAddr("198.51.100.10"), + netip.MustParseAddr("::1"), + netip.MustParseAddr("::3"), + netip.MustParseAddr("2001:db8:0010:0011::10"), + } + + var ( + stop atomic.Bool + wg sync.WaitGroup + ) + + // Start readers. Each reader spins clientIPIsIgnored across the + // address mix. We discard the result - what matters is that the call + // returns and -race observes no unsynchronised reads. + const numReaders = 8 + wg.Add(numReaders) + for r := range numReaders { + go func(seed int) { + defer wg.Done() + i := seed + for !stop.Load() { + addr := addrs[i%len(addrs)] + dt := &dnstap.Dnstap{ + Message: &dnstap.Message{ + QueryAddress: addr.AsSlice(), + }, + } + _ = edm.clientIPIsIgnored(dt) + _ = edm.getNumIgnoredClientCIDRs() + i++ + } + }(r) + } + + // Single writer: rotate the configured file and call + // setIgnoredClientIPs. We do a fixed number of rotations rather than + // running for a wall-clock duration so the test is deterministic + // under load and slow CI runners. + const rotations = 200 + for i := range rotations { + edm.conf.IgnoredClientIPsFile = files[i%len(files)] + if err := edm.setIgnoredClientIPs(); err != nil { + stop.Store(true) + wg.Wait() + t.Fatalf("rotation %d: setIgnoredClientIPs(%s): %s", i, edm.conf.IgnoredClientIPsFile, err) + } + } + + stop.Store(true) + wg.Wait() +} + +// TestConcurrentIgnoredQuestionsReload mirrors the IP test above but for +// the DAWG-backed ignored-question set, which is stored in an +// atomic.Pointer[dawgFinderHolder]. The wrapper exists because dawg.Finder +// is an interface and atomic.Pointer wants a concrete type - see the +// design note on the dnstapMinimiser struct in runner.go. +// +// As with the IP test the assertion is purely "no race, no panic". A +// future change that, say, reintroduced ignoredQuestionsMutex without +// updating readers would either deadlock (test would time out) or race +// (race detector would fail). +func TestConcurrentIgnoredQuestionsReload(t *testing.T) { + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + edm, err := newDnstapMinimiser(logger, defaultTC) + if err != nil { + t.Fatalf("newDnstapMinimiser: %s", err) + } + t.Cleanup(func() { cleanupTestMinimiser(edm) }) + + // Prime so readers exercise the non-nil snapshot branch initially. + edm.conf.IgnoredQuestionNamesFile = "testdata/ignored-question-names.valid1.dawg" + if err := edm.setIgnoredQuestionNames(); err != nil { + t.Fatalf("initial setIgnoredQuestionNames: %s", err) + } + + files := []string{ + "testdata/ignored-question-names.valid1.dawg", + "testdata/ignored-question-names.valid2.dawg", + "testdata/ignored-question-names.empty.dawg", // empty maps to nil holder + } + + questions := []string{ + "example.com.", + "www.example.net.", + "www.example.org.", + "www.example.edu.", + "unrelated.invalid.", + } + + var ( + stop atomic.Bool + wg sync.WaitGroup + ) + + const numReaders = 8 + wg.Add(numReaders) + for r := range numReaders { + go func(seed int) { + defer wg.Done() + i := seed + for !stop.Load() { + m := new(dns.Msg) + m.SetQuestion(questions[i%len(questions)], dns.TypeA) + _ = edm.questionIsIgnored(m) + i++ + } + }(r) + } + + const rotations = 200 + for i := range rotations { + edm.conf.IgnoredQuestionNamesFile = files[i%len(files)] + if err := edm.setIgnoredQuestionNames(); err != nil { + stop.Store(true) + wg.Wait() + t.Fatalf("rotation %d: setIgnoredQuestionNames(%s): %s", i, edm.conf.IgnoredQuestionNamesFile, err) + } + } + + stop.Store(true) + wg.Wait() +} + +// TestConcurrentSetCryptopanReload exercises the lock-free Crypto-PAn +// rotation path: workers Load the cryptopan pointer and the generation +// counter on the hot path, while a writer rotates the key. The atomic +// store for the pointer plus the atomic add for the generation must +// synchronise so a worker that sees the new generation also observes the +// new pointer (otherwise it would Purge its cache and then immediately +// re-fill it from the *old* cryptopan, defeating the rotation). +// +// We don't assert the exact ordering here - that is what -race plus the +// memory-model guarantees of atomic.Store/atomic.Add are for. What we +// do assert is the strict invariant: every observed generation bump +// corresponds to a non-nil cryptopan, and the pointer never reverts to +// nil mid-run. +func TestConcurrentSetCryptopanReload(t *testing.T) { + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + edm, err := newDnstapMinimiser(logger, defaultTC) + if err != nil { + t.Fatalf("newDnstapMinimiser: %s", err) + } + t.Cleanup(func() { cleanupTestMinimiser(edm) }) + + var ( + stop atomic.Bool + wg sync.WaitGroup + ) + + const numReaders = 4 + wg.Add(numReaders) + for range numReaders { + go func() { + defer wg.Done() + for !stop.Load() { + cpn := edm.cryptopan.Load() + gen := edm.cryptopanGen.Load() + // The minimiser construction installs an initial + // instance and the writer below only ever stores + // non-nil pointers via setCryptopan - so observing nil + // at any point is a contract violation. + if cpn == nil { + t.Errorf("cryptopan pointer observed as nil at gen=%d", gen) + return + } + } + }() + } + + // Rotate the cryptopan instance several times. setCryptopan is + // expensive (argon2 KDF, ~100–200ms per call) so we keep the count + // modest - the readers still spin tens of thousands of Loads in + // that window, which is plenty for the race detector to catch any + // regression. The salt stays constant; only the key changes so each + // call installs a distinct instance. + const rotations = 20 + for i := range rotations { + key := "rotation-key-" + // Avoid a strconv import dependency for this test by composing + // short keys. The exact value doesn't matter - only that it + // changes each iteration. + key += string(rune('a' + (i % 26))) + key += string(rune('a' + ((i / 26) % 26))) + if err := edm.setCryptopan(key, defaultTC.CryptopanKeySalt, defaultTC.CryptopanAddressEntries); err != nil { + stop.Store(true) + wg.Wait() + t.Fatalf("rotation %d: setCryptopan: %s", i, err) + } + } + + stop.Store(true) + wg.Wait() + + // After the writer has finished the generation must reflect every + // successful rotation - strictly monotonic, no skipped/dropped + // increments. The +1 accounts for the setCryptopan call inside + // newDnstapMinimiser. + wantGen := uint64(rotations + 1) + if got := edm.cryptopanGen.Load(); got != wantGen { + t.Fatalf("final cryptopanGen have: %d, want: %d", got, wantGen) + } +} + +// TestQuestionIsIgnoredMultipleQuestions documents the explicit "any +// matches" policy in questionIsIgnored when a DNS message carries more +// than one question. The runner.go comment states: "if there happens to +// be multiple questions in the packet we consider the message ignored if +// any of them matches" - but no existing test exercises a multi-question +// message, so a future refactor that, say, only inspected msg.Question[0] +// would silently regress with no test failure. +// +// In practice DNS messages with QDCOUNT > 1 are extremely rare and most +// recursors reject them, but the code intentionally handles the case; +// this test pins the behaviour. +func TestQuestionIsIgnoredMultipleQuestions(t *testing.T) { + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + edm, err := newDnstapMinimiser(logger, defaultTC) + if err != nil { + t.Fatalf("newDnstapMinimiser: %s", err) + } + t.Cleanup(func() { cleanupTestMinimiser(edm) }) + + edm.conf.IgnoredQuestionNamesFile = "testdata/ignored-question-names.valid1.dawg" + if err := edm.setIgnoredQuestionNames(); err != nil { + t.Fatalf("setIgnoredQuestionNames: %s", err) + } + + // example.com. is in valid1.dawg as an exact match (see existing + // TestIgnoredQuestionNamesValid). We pair it with a name that is NOT + // ignored, in both orders, to make sure the loop scans past + // non-matching questions and does not short-circuit on the first + // entry. + tests := []struct { + name string + questions []string + want bool + }{ + { + name: "single non-matching question", + questions: []string{"unrelated.invalid."}, + want: false, + }, + { + name: "matching question first", + questions: []string{"example.com.", "unrelated.invalid."}, + want: true, + }, + { + name: "matching question second", + questions: []string{"unrelated.invalid.", "example.com."}, + want: true, + }, + { + name: "no matches in any of multiple questions", + questions: []string{"unrelated.invalid.", "another.invalid."}, + want: false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + m := new(dns.Msg) + // SetQuestion only handles a single question; build the + // slice directly to model an unusual multi-question packet. + m.Question = make([]dns.Question, len(tc.questions)) + for i, q := range tc.questions { + m.Question[i] = dns.Question{ + Name: q, + Qtype: dns.TypeA, + Qclass: dns.ClassINET, + } + } + + if got := edm.questionIsIgnored(m); got != tc.want { + t.Fatalf("questionIsIgnored(%v) have: %t, want: %t", tc.questions, got, tc.want) + } + }) + } +} + +// TestClientIPIsIgnoredEmptyQueryAddress documents the deliberate +// "fail-closed" behaviour for unparseable QueryAddress slices when an +// ignore list is active: the packet is treated as ignored and an error +// counter is incremented. Existing TestIgnoredClientIPsInvalidClient +// covers a 5-byte slice; this complements it with the nil and empty-slice +// cases, which take the same code path through netip.AddrFromSlice but +// are easy to overlook in future refactors that try to "optimise" the +// nil check. +// +// The behaviour matters because production EDM applies the ignore list +// before any further parsing - silently allowing a packet with no +// QueryAddress through would defeat operator policy. +func TestClientIPIsIgnoredEmptyQueryAddress(t *testing.T) { + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + edm, err := newDnstapMinimiser(logger, defaultTC) + if err != nil { + t.Fatalf("newDnstapMinimiser: %s", err) + } + t.Cleanup(func() { cleanupTestMinimiser(edm) }) + + // Active list - exercise the fail-closed path. + edm.conf.IgnoredClientIPsFile = "testdata/ignored-client-ips.valid1" + if err := edm.setIgnoredClientIPs(); err != nil { + t.Fatalf("setIgnoredClientIPs: %s", err) + } + + cases := []struct { + name string + addr []byte + }{ + {"nil QueryAddress", nil}, + {"zero-length QueryAddress", []byte{}}, + // already covered by TestIgnoredClientIPsInvalidClient but + // included here for symmetry / regression-safety in this file. + {"odd-length QueryAddress", make([]byte, 7)}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + dt := &dnstap.Dnstap{Message: &dnstap.Message{QueryAddress: tc.addr}} + if got := edm.clientIPIsIgnored(dt); !got { + t.Fatalf("with active ignore list: have: %t, want: true (fail-closed)", got) + } + }) + } + + // With NO active list, an unparseable QueryAddress must NOT be + // treated as ignored - the early-return on a nil ipset is what + // allows the rest of the pipeline to handle (or log) the malformed + // packet itself rather than silently dropping it. + edm.conf.IgnoredClientIPsFile = "testdata/ignored-client-ips.empty" + if err := edm.setIgnoredClientIPs(); err != nil { + t.Fatalf("setIgnoredClientIPs(empty): %s", err) + } + for _, tc := range cases { + t.Run("inactive/"+tc.name, func(t *testing.T) { + dt := &dnstap.Dnstap{Message: &dnstap.Message{QueryAddress: tc.addr}} + if got := edm.clientIPIsIgnored(dt); got { + t.Fatalf("with no ignore list: have: %t, want: false", got) + } + }) + } +} + diff --git a/pkg/runner/cryptopan_extra_test.go b/pkg/runner/cryptopan_extra_test.go new file mode 100644 index 0000000..afcebde --- /dev/null +++ b/pkg/runner/cryptopan_extra_test.go @@ -0,0 +1,122 @@ +package runner + +import ( + "io" + "log/slog" + "testing" +) + +// TestSetCryptopanBumpsGeneration verifies the contract that runMinimiser +// workers rely on: every successful setCryptopan call must increment +// edm.cryptopanGen by exactly one and atomic.Store a new cryptopan +// pointer. Workers compare cryptopanGen against their last-seen value to +// know when to Purge their local Crypto-PAn cache; if the generation +// didn't strictly advance on each rotation +// stale entries from the previous key would silently leak through. +func TestSetCryptopanBumpsGeneration(t *testing.T) { + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + edm, err := newDnstapMinimiser(logger, defaultTC) + if err != nil { + t.Fatalf("unable to setup edm: %s", err) + } + t.Cleanup(func() { cleanupTestMinimiser(edm) }) + + // newDnstapMinimiser called setCryptopan once during construction; the + // generation we observe here is therefore the post-construction + // baseline, not zero. We only care about strict monotonic advancement + // per call, so capture the baseline and compare deltas. + baselineGen := edm.cryptopanGen.Load() + baselinePtr := edm.cryptopan.Load() + if baselinePtr == nil { + t.Fatalf("cryptopan pointer should be non-nil after newDnstapMinimiser") + } + + const rotations = 5 + for i := 1; i <= rotations; i++ { + // Use a different key each time so we'd notice if the cryptopan + // pointer was being reused (cryptopan.New produces a new instance + // per call, so identical-key calls also produce distinct pointers + // - but varying the key catches accidental short-circuit + // optimisations more obviously). + key := "rotation-key-" + string(rune('0'+i)) + if err := edm.setCryptopan(key, defaultTC.CryptopanKeySalt, defaultTC.CryptopanAddressEntries); err != nil { + t.Fatalf("rotation %d: setCryptopan failed: %s", i, err) + } + + gotGen := edm.cryptopanGen.Load() + wantGen := baselineGen + uint64(i) + if gotGen != wantGen { + t.Fatalf("rotation %d: cryptopanGen have: %d, want: %d", i, gotGen, wantGen) + } + + gotPtr := edm.cryptopan.Load() + if gotPtr == nil { + t.Fatalf("rotation %d: cryptopan pointer should not be nil", i) + } + if gotPtr == baselinePtr { + t.Fatalf("rotation %d: cryptopan pointer was not replaced (still equal to baseline)", i) + } + } +} + +// TestSetCryptopanCacheEntriesArgumentIgnored documents (and locks in) that +// the cacheEntries argument has been intentionally demoted to a no-op since +// Tier-2: caches are now owned per-worker by runMinimiser, and +// setCryptopan only swaps the cryptopan instance and bumps the generation. +// If a future change accidentally re-introduced shared cache state on +// setCryptopan it would re-introduce the contention this refactor was +// meant to remove, so we pin the contract here. +func TestSetCryptopanCacheEntriesArgumentIgnored(t *testing.T) { + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + edm, err := newDnstapMinimiser(logger, defaultTC) + if err != nil { + t.Fatalf("unable to setup edm: %s", err) + } + t.Cleanup(func() { cleanupTestMinimiser(edm) }) + + // Wildly different cacheEntries values - including 0 (which used to + // disable caching) and a very large value - must all behave the same + // from setCryptopan's perspective: bump generation, swap pointer, do + // not touch any per-worker cache state (there is none on edm itself). + for _, n := range []int{0, 1, 1_000, 1_000_000} { + genBefore := edm.cryptopanGen.Load() + err := edm.setCryptopan(defaultTC.CryptopanKey, defaultTC.CryptopanKeySalt, n) + if err != nil { + t.Fatalf("setCryptopan(cacheEntries=%d) failed: %s", n, err) + } + if got := edm.cryptopanGen.Load(); got != genBefore+1 { + t.Fatalf("setCryptopan(cacheEntries=%d): gen have: %d, want: %d", n, got, genBefore+1) + } + } +} + +// TestGetCryptopanAESKeyDeterministic locks in the key-derivation contract +// that operators depend on: identical (key, salt) must produce identical +// AES bytes across runs and process restarts (i.e. argon2 is deterministic +// for a given parameter set). Operators rely on this so that on-disk data +// pseudonymised before a restart can still be correlated against data +// pseudonymised after - provided the configured key/salt did not change. +func TestGetCryptopanAESKeyDeterministic(t *testing.T) { + const key = "operator-key" + const salt = "operator-salt-aabbccdd" + + first := getCryptopanAESKey(key, salt) + second := getCryptopanAESKey(key, salt) + + if len(first) != 32 { + t.Fatalf("aes key length have: %d, want: 32", len(first)) + } + if string(first) != string(second) { + t.Fatalf("getCryptopanAESKey not deterministic for the same input") + } + + // And differing inputs must produce different keys, otherwise the + // derivation would be pointless. We don't audit the Argon2 strength + // here - only that two trivially distinct inputs disagree. + if string(first) == string(getCryptopanAESKey(key+"!", salt)) { + t.Fatalf("getCryptopanAESKey returned same bytes for differing keys") + } + if string(first) == string(getCryptopanAESKey(key, salt+"!")) { + t.Fatalf("getCryptopanAESKey returned same bytes for differing salts") + } +} diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 17cb068..8ac09c1 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -27,6 +27,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "syscall" "time" @@ -428,25 +429,19 @@ func getCryptopanAESKey(key string, salt string) []byte { } func (edm *dnstapMinimiser) setCryptopan(key string, salt string, cacheEntries int) error { - var cpnCache *lru.Cache[netip.Addr, netip.Addr] - var err error - - if cacheEntries != 0 { - cpnCache, err = lru.New[netip.Addr, netip.Addr](cacheEntries) - if err != nil { - return fmt.Errorf("setCryptopan: unable to create cache: %w", err) - } - } + // cacheEntries is the per-worker LRU size. The caches themselves are + // owned by each minimiser worker (see runMinimiser); setCryptopan only + // installs the cryptopan instance and signals workers to purge their + // caches on the next call. + _ = cacheEntries cpn, err := createCryptopan(key, salt) if err != nil { return fmt.Errorf("setCryptopan: unable to create cryptopan: %w", err) } - edm.cryptopanMutex.Lock() - edm.cryptopan = cpn - edm.cryptopanCache = cpnCache - edm.cryptopanMutex.Unlock() + edm.cryptopan.Store(cpn) + edm.cryptopanGen.Add(1) return nil } @@ -710,7 +705,7 @@ func (edm *dnstapMinimiser) setupMQTT() { os.Exit(1) } - // Connect to the broker - this will return immediately after initiating the connection process + // Connect to the broker - this will return immediately after initiating the connection process. edm.autopahoWg.Add(1) go edm.runAutoPaho(autopahoCm, mqttJWK, mqttFileQueue != nil) } @@ -734,15 +729,7 @@ func (edm *dnstapMinimiser) setIgnoredQuestionNames() error { conf := edm.getConfig() if conf.IgnoredQuestionNamesFile == "" { - edm.ignoredQuestionsMutex.Lock() - if edm.ignoredQuestions != nil { - err := edm.ignoredQuestions.Close() - if err != nil { - edm.log.Error("setIgnoredQuestionNames: failed closing edm.ignoredQuestions for unset filename", "error", err) - } - edm.ignoredQuestions = nil - } - edm.ignoredQuestionsMutex.Unlock() + edm.ignoredQuestions.Store(nil) edm.log.Info("setIgnoredQuestionNames: DNS question ignore list unset", "filename", conf.IgnoredQuestionNamesFile, "num_names", 0) return nil } @@ -751,15 +738,7 @@ func (edm *dnstapMinimiser) setIgnoredQuestionNames() error { if err != nil { if errors.Is(err, errEmptyDawgFile) { // Treat the same as unset filename - edm.ignoredQuestionsMutex.Lock() - if edm.ignoredQuestions != nil { - err := edm.ignoredQuestions.Close() - if err != nil { - edm.log.Error("setIgnoredQuestionNames: failed closing edm.ignoredQuestions for unset filename", "error", err) - } - edm.ignoredQuestions = nil - } - edm.ignoredQuestionsMutex.Unlock() + edm.ignoredQuestions.Store(nil) edm.log.Info("setIgnoredQuestionNames: DNS question ignore list empty", "filename", conf.IgnoredQuestionNamesFile, "num_names", 0) return nil @@ -767,22 +746,15 @@ func (edm *dnstapMinimiser) setIgnoredQuestionNames() error { return fmt.Errorf("setIgnoredQuestionNames: unable to load dawg file '%s': %w", conf.IgnoredQuestionNamesFile, err) } - // We only use the dawg file if there exists at least one name - // in it. Since the file can be empty we must also be prepared to set - // our edm field to nil so we do not keep using an old list. - edm.ignoredQuestionsMutex.Lock() - if edm.ignoredQuestions != nil { - err = edm.ignoredQuestions.Close() - if err != nil { - edm.log.Error("setIgnoredQuestionNames: failed closing edm.ignoredQuestions", "error", err) - } - } + // We only use the dawg file if there exists at least one name in it. + // Atomic-pointer swap; deliberately do NOT Close the old finder here + // as that would race with hot-path readers still holding it. See + // Reloads leave the old finder for existing readers. if dawgFinder.NumAdded() > 0 { - edm.ignoredQuestions = dawgFinder + edm.ignoredQuestions.Store(&dawgFinderHolder{finder: dawgFinder}) } else { - edm.ignoredQuestions = nil + edm.ignoredQuestions.Store(nil) } - edm.ignoredQuestionsMutex.Unlock() if dawgFinder.NumAdded() > 0 { edm.log.Info("setIgnoredQuestionNames: DNS question ignore list loaded", "filename", conf.IgnoredQuestionNamesFile, "num_names", dawgFinder.NumAdded()) @@ -797,10 +769,8 @@ func (edm *dnstapMinimiser) setIgnoredClientIPs() error { conf := edm.getConfig() if conf.IgnoredClientIPsFile == "" { - edm.ignoredClientsIPSetMutex.Lock() - edm.ignoredClientsIPSet = nil - edm.ignoredClientCIDRsParsed = 0 - edm.ignoredClientsIPSetMutex.Unlock() + edm.ignoredClientsIPSet.Store(nil) + edm.ignoredClientCIDRsParsed.Store(0) edm.log.Info("setIgnoredClientIPs: DNS client ignore list unset", "filename", conf.IgnoredClientIPsFile, "num_cidrs", 0) return nil } @@ -846,10 +816,8 @@ func (edm *dnstapMinimiser) setIgnoredClientIPs() error { } } - edm.ignoredClientsIPSetMutex.Lock() - edm.ignoredClientsIPSet = ipset - edm.ignoredClientCIDRsParsed = numCIDRs - edm.ignoredClientsIPSetMutex.Unlock() + edm.ignoredClientsIPSet.Store(ipset) + edm.ignoredClientCIDRsParsed.Store(numCIDRs) if ipset != nil { edm.log.Info("setIgnoredClientIPs: DNS client ignore list loaded", "filename", conf.IgnoredClientIPsFile, "num_cidrs", numCIDRs) @@ -861,10 +829,7 @@ func (edm *dnstapMinimiser) setIgnoredClientIPs() error { } func (edm *dnstapMinimiser) getNumIgnoredClientCIDRs() uint64 { - edm.ignoredClientsIPSetMutex.RLock() - defer edm.ignoredClientsIPSetMutex.RUnlock() - - return edm.ignoredClientCIDRsParsed + return edm.ignoredClientCIDRsParsed.Load() } func (edm *dnstapMinimiser) fsEventWatcher() { @@ -1426,11 +1391,15 @@ type dnstapMinimiser struct { configer edmConfiger conf config confMutex sync.RWMutex - inputChannel chan []byte // the channel expected to be passed to dnstap ReadInto() - log *slog.Logger // any information logging is sent here - cryptopan *cryptopan.Cryptopan // used for pseudonymising IP addresses - cryptopanCache *lru.Cache[netip.Addr, netip.Addr] - cryptopanMutex sync.RWMutex // Mutex for protecting updates cryptopan at runtime + inputChannel chan []byte // the channel expected to be passed to dnstap ReadInto() + log *slog.Logger // any information logging is sent here + + // Cryptopan instance is held in an atomic.Pointer so the hot path + // reads it without locking. setCryptopan swaps the pointer and + // bumps cryptopanGen; per-worker caches compare their last-seen + // generation against this and Purge when it changes. + cryptopan atomic.Pointer[cryptopan.Cryptopan] + cryptopanGen atomic.Uint64 promReg *prometheus.Registry promCryptopanCacheHit prometheus.Counter promCryptopanCacheEvicted prometheus.Counter @@ -1458,11 +1427,17 @@ type dnstapMinimiser struct { autopahoCtx context.Context autopahoCancel context.CancelFunc autopahoWg sync.WaitGroup - ignoredClientsIPSet *netipx.IPSet - ignoredClientCIDRsParsed uint64 - ignoredClientsIPSetMutex sync.RWMutex // Mutex for protecting updates to ignored client IPs at runtime - ignoredQuestions dawg.Finder - ignoredQuestionsMutex sync.RWMutex + // Hot-path lookups (clientIPIsIgnored, questionIsIgnored) read these + // without locking. Reload writers atomic.Store a fresh value; the + // dawgFinderHolder wrapper is needed because dawg.Finder is an + // interface and atomic.Pointer wants a concrete type. The previous + // dawg finder is left for the GC to reclaim - calling Close() on + // swap would race with hot-path readers still holding the old + // pointer. With reloads being rare and the underlying mmap small, + // the bounded leak is acceptable. + ignoredClientsIPSet atomic.Pointer[netipx.IPSet] + ignoredClientCIDRsParsed atomic.Uint64 + ignoredQuestions atomic.Pointer[dawgFinderHolder] fsWatcher *fsnotify.Watcher fsWatcherFuncs map[string][]func() error fsWatcherMutex sync.RWMutex @@ -1620,19 +1595,44 @@ func newDnstapMinimiser(logger *slog.Logger, edmConf edmConfiger) (*dnstapMinimi return edm, nil } +// dawgFinderHolder is a tiny concrete-type wrapper so dawg.Finder (which is +// an interface) can be stored in an atomic.Pointer. Used for the +// ignoredQuestions atomic snapshot. +type dawgFinderHolder struct { + finder dawg.Finder +} + +// wkdSnapshot is the read-side view that hot-path callers need: the current +// DAWG finder plus the modtime that goes into wkdUpdate so the collector can +// detect post-rotation refresh. Stored in wellKnownDomainsTracker.snap as +// an atomic.Pointer so lookup() reads it without locking. +type wkdSnapshot struct { + dawgFinder dawg.Finder + dawgModTime time.Time +} + type wellKnownDomainsTracker struct { - mutex sync.RWMutex - wellKnownDomainsData + // snap holds the current DAWG + modtime. Replaced atomically when + // the dawg file rotates; readers in lookup() do a single Load. + snap atomic.Pointer[wkdSnapshot] + + // m is the per-dawgIndex histogram aggregator. It is read & written + // only by dataCollector (the same goroutine that calls + // rotateTracker), so it needs no lock. + m map[int]*histogramData + rotationTime time.Time + dawgIsRotated bool + updateCh chan wkdUpdate - dawgModTime time.Time retryCh chan wkdUpdate stop chan struct{} retryerDone chan struct{} } +// wellKnownDomainsData is the snapshot rotateTracker hands to the histogram +// writer. dawgFinder is the *previous* DAWG (used to look up names back into +// strings), m is the previous bucket map, dawgIsRotated marks the boundary. type wellKnownDomainsData struct { - // Store a pointer to histogramData so we can assign to it without - // "cannot assign to struct field in map" issues m map[int]*histogramData rotationTime time.Time dawgFinder dawg.Finder @@ -1640,17 +1640,15 @@ type wellKnownDomainsData struct { } func newWellKnownDomainsTracker(dawgFinder dawg.Finder, dawgModTime time.Time) (*wellKnownDomainsTracker, error) { - return &wellKnownDomainsTracker{ - wellKnownDomainsData: wellKnownDomainsData{ - m: map[int]*histogramData{}, - dawgFinder: dawgFinder, - }, + wkd := &wellKnownDomainsTracker{ + m: map[int]*histogramData{}, updateCh: make(chan wkdUpdate, 10000), retryCh: make(chan wkdUpdate, 10000), - dawgModTime: dawgModTime, stop: make(chan struct{}), retryerDone: make(chan struct{}), - }, nil + } + wkd.snap.Store(&wkdSnapshot{dawgFinder: dawgFinder, dawgModTime: dawgModTime}) + return wkd, nil } // Try to find a domain name string match in DAWG data and return the index as @@ -1692,12 +1690,9 @@ type wkdUpdate struct { } func (wkd *wellKnownDomainsTracker) lookup(msg *dns.Msg) (int, bool, time.Time) { - wkd.mutex.RLock() - defer wkd.mutex.RUnlock() - - dawgIndex, suffixMatch := getDawgIndex(wkd.dawgFinder, msg.Question[0].Name) - - return dawgIndex, suffixMatch, wkd.dawgModTime + snap := wkd.snap.Load() + dawgIndex, suffixMatch := getDawgIndex(snap.dawgFinder, msg.Question[0].Name) + return dawgIndex, suffixMatch, snap.dawgModTime } func (wkd *wellKnownDomainsTracker) updateRetryer(edm *dnstapMinimiser, wg *sync.WaitGroup) { @@ -1712,7 +1707,7 @@ func (wkd *wellKnownDomainsTracker) updateRetryer(edm *dnstapMinimiser, wg *sync dawgIndex, suffixMatch, dawgModTime := wkd.lookup(wu.msg) if dawgIndex == dawgNotFound { - edm.log.Info("ignoring wkd update because name does not exist in updated wkd tracker", "update_dawg_modtime", wkd.dawgModTime, "wkd_dawg_modtime", wkd.dawgModTime) + edm.log.Info("ignoring wkd update because name does not exist in updated wkd tracker", "update_dawg_modtime", wu.dawgModTime, "wkd_dawg_modtime", dawgModTime) continue } @@ -1792,30 +1787,33 @@ func (wkd *wellKnownDomainsTracker) rotateTracker(edm *dnstapMinimiser, dawgFile return nil, fmt.Errorf("rotateTracker: unable to stat dawgFile '%s': %w", dawgFile, err) } - if fileInfo.ModTime() != wkd.dawgModTime { + curSnap := wkd.snap.Load() + if fileInfo.ModTime() != curSnap.dawgModTime { dawgFinder, err = dawg.Load(dawgFile) if err != nil { return nil, fmt.Errorf("rotateTracker: dawg.Load(): %w", err) } dawgFileChanged = true - edm.log.Info("dawg file modification changed, will reload file", "prev_time", wkd.dawgModTime, "cur_time", fileInfo.ModTime()) + edm.log.Info("dawg file modification changed, will reload file", "prev_time", curSnap.dawgModTime, "cur_time", fileInfo.ModTime()) } - prevWKD := &wellKnownDomainsData{} - - // Swap the map in use so we can write parquet data outside of the write lock - wkd.mutex.Lock() - prevWKD.m = wkd.m - prevWKD.dawgFinder = wkd.dawgFinder + // rotateTracker runs in the dataCollector goroutine, which is also + // the only writer of wkd.m (see the case wu := <-wkd.updateCh branch). + // No lock needed for the map swap. The DAWG snapshot is a separate + // atomic Store so hot-path lookup() callers see a consistent view. + prevWKD := &wellKnownDomainsData{ + m: wkd.m, + dawgFinder: curSnap.dawgFinder, + rotationTime: rotationTime, + } wkd.m = map[int]*histogramData{} if dawgFileChanged { - wkd.dawgFinder = dawgFinder - wkd.dawgModTime = fileInfo.ModTime() + wkd.snap.Store(&wkdSnapshot{ + dawgFinder: dawgFinder, + dawgModTime: fileInfo.ModTime(), + }) prevWKD.dawgIsRotated = true } - wkd.mutex.Unlock() - - prevWKD.rotationTime = rotationTime return prevWKD, nil } @@ -1868,49 +1866,44 @@ func (edm *dnstapMinimiser) qnameSeen(msg *dns.Msg, seenQnameLRU *lru.Cache[stri } func (edm *dnstapMinimiser) clientIPIsIgnored(dt *dnstap.Dnstap) bool { - // edm.ignoredClientsIPSet can be modified at runtime so wrap everything - // in a RO lock - edm.ignoredClientsIPSetMutex.RLock() - defer edm.ignoredClientsIPSetMutex.RUnlock() - - if edm.ignoredClientsIPSet != nil { - clientIP, ok := netip.AddrFromSlice(dt.Message.QueryAddress) - if !ok { - // If we have a list of clients to - // ignore but are not able to - // understand the QueryAddress lets err - // on the side of caution and ignore - // such packets as well while making - // noise in logs so it can be investigated - edm.log.Error("unable to parse QueryAddress for ignore-checking, ignoring dnstap packet to be safe, please investigate") - edm.promClientIPIgnoredError.Inc() - return true - } - - if edm.ignoredClientsIPSet.Contains(clientIP) { - edm.promClientIPIgnored.Inc() - return true - } + // Atomic snapshot - no lock on the hot path. Reload writers + // atomic.Store the new IPSet; readers see either old or new value + // per Load. + ipset := edm.ignoredClientsIPSet.Load() + if ipset == nil { + return false + } + clientIP, ok := netip.AddrFromSlice(dt.Message.QueryAddress) + if !ok { + // If we have a list of clients to ignore but are not able to + // understand the QueryAddress let's err on the side of caution + // and ignore such packets as well while making noise in logs + // so it can be investigated. + edm.log.Error("unable to parse QueryAddress for ignore-checking, ignoring dnstap packet to be safe, please investigate") + edm.promClientIPIgnoredError.Inc() + return true + } + if ipset.Contains(clientIP) { + edm.promClientIPIgnored.Inc() + return true } return false } func (edm *dnstapMinimiser) questionIsIgnored(msg *dns.Msg) bool { - // edm.ignoredQuestions can be modified at runtime so wrap everything - // in a RO lock - edm.ignoredQuestionsMutex.RLock() - defer edm.ignoredQuestionsMutex.RUnlock() - - if edm.ignoredQuestions != nil { - // While uncommon, if there happens to be multiple questions in - // the packet we will consider the message ignored if any of them matches the - // ignore list. - for _, question := range msg.Question { - dawgIndex, _ := getDawgIndex(edm.ignoredQuestions, question.Name) - if dawgIndex != dawgNotFound { - edm.promQuestionNameIgnored.Inc() - return true - } + // Atomic snapshot - no lock on the hot path. See clientIPIsIgnored + // for the rationale. + holder := edm.ignoredQuestions.Load() + if holder == nil { + return false + } + // While uncommon, if there happens to be multiple questions in the + // packet we consider the message ignored if any of them matches. + for _, question := range msg.Question { + dawgIndex, _ := getDawgIndex(holder.finder, question.Name) + if dawgIndex != dawgNotFound { + edm.promQuestionNameIgnored.Inc() + return true } } return false @@ -1927,6 +1920,22 @@ func (edm *dnstapMinimiser) runMinimiser(minimiserID int, wg *sync.WaitGroup, se // startConf is used for things that do not handle reconfiguration at runtime startConf := edm.getConfig() + // Per-worker Crypto-PAn cache. Each worker holds its own LRU so the + // pseudonymise hot path takes no shared lock. cryptopanLastGen tracks + // the last cryptopan-instance generation we saw; when setCryptopan + // installs a new key it bumps edm.cryptopanGen and we Purge here so + // no stale (old-key) entries leak through. + var cryptopanCache *lru.Cache[netip.Addr, netip.Addr] + if startConf.CryptopanAddressEntries != 0 { + var lerr error + cryptopanCache, lerr = lru.New[netip.Addr, netip.Addr](startConf.CryptopanAddressEntries) + if lerr != nil { + edm.log.Error("runMinimiser: unable to create per-worker cryptopan cache", "error", lerr, "minimiser_id", minimiserID) + return + } + } + cryptopanLastGen := edm.cryptopanGen.Load() + // conf is meant to be dynamically modified if the config changes at runtime conf := edm.getConfig() @@ -1971,7 +1980,15 @@ minimiserLoop: dangerRealClientIP := make([]byte, len(dt.Message.QueryAddress)) copy(dangerRealClientIP, dt.Message.QueryAddress) - edm.pseudonymiseDnstap(dt) + // Detect cryptopan key rotation; purge our local cache so + // no IPs anonymised under the old key bleed through. + if gen := edm.cryptopanGen.Load(); gen != cryptopanLastGen { + if cryptopanCache != nil { + cryptopanCache.Purge() + } + cryptopanLastGen = gen + } + edm.pseudonymiseDnstap(dt, edm.cryptopan.Load(), cryptopanCache) msg, timestamp := edm.parsePacket(dt, isQuery) @@ -2791,30 +2808,29 @@ func certPoolFromFile(fileName string) (*x509.CertPool, error) { return certPool, nil } -// Pseudonymise IP address fields in a dnstap message -func (edm *dnstapMinimiser) pseudonymiseDnstap(dt *dnstap.Dnstap) { +// Pseudonymise IP address fields in a dnstap message. cache is the caller's +// per-worker LRU; cpn is a snapshot of the current cryptopan instance taken +// once per frame so QueryAddress and ResponseAddress see the same key. See +// the contention-removal rationale. +func (edm *dnstapMinimiser) pseudonymiseDnstap(dt *dnstap.Dnstap, cpn *cryptopan.Cryptopan, cache *lru.Cache[netip.Addr, netip.Addr]) { var err error - - // Lock is used here because the cryptopan instance can get updated at runtime. - edm.cryptopanMutex.RLock() - if dt.Message.QueryAddress != nil { - dt.Message.QueryAddress, err = edm.pseudonymiseIP(dt.Message.QueryAddress) + dt.Message.QueryAddress, err = edm.pseudonymiseIP(dt.Message.QueryAddress, cpn, cache) if err != nil { edm.log.Error("pseudonymiseDnstap: unable to parse dt.Message.QueryAddress", "error", err) } } if dt.Message.ResponseAddress != nil { - dt.Message.ResponseAddress, err = edm.pseudonymiseIP(dt.Message.ResponseAddress) + dt.Message.ResponseAddress, err = edm.pseudonymiseIP(dt.Message.ResponseAddress, cpn, cache) if err != nil { edm.log.Error("pseudonymiseDnstap: unable to parse dt.Message.ResponseAddress", "error", err) } } - edm.cryptopanMutex.RUnlock() } -// Pseudonymise IP address, even on error the returned []byte is usable (zeroed address) -func (edm *dnstapMinimiser) pseudonymiseIP(ipBytes []byte) ([]byte, error) { +// Pseudonymise IP address, even on error the returned []byte is usable (zeroed address). +// Caller passes the per-worker cache and the cryptopan snapshot; nil cache disables caching. +func (edm *dnstapMinimiser) pseudonymiseIP(ipBytes []byte, cpn *cryptopan.Cryptopan, cache *lru.Cache[netip.Addr, netip.Addr]) ([]byte, error) { addr, ok := netip.AddrFromSlice(ipBytes) if !ok { // Replace address with zeroes since we do not know if @@ -2825,15 +2841,15 @@ func (edm *dnstapMinimiser) pseudonymiseIP(ipBytes []byte) ([]byte, error) { var pseudonymisedAddr netip.Addr var cacheHit bool - if edm.cryptopanCache != nil { - pseudonymisedAddr, cacheHit = edm.cryptopanCache.Get(addr) + if cache != nil { + pseudonymisedAddr, cacheHit = cache.Get(addr) } if cacheHit { edm.promCryptopanCacheHit.Inc() } else { // Not in cache or cache disabled, calculate the pseudonymised IP - pseudonymisedAddr, ok = netip.AddrFromSlice(edm.cryptopan.Anonymize(addr.AsSlice())) + pseudonymisedAddr, ok = netip.AddrFromSlice(cpn.Anonymize(addr.AsSlice())) if !ok { // Replace address with zeroes here as well // since we do not know if the contained junk @@ -2847,8 +2863,8 @@ func (edm *dnstapMinimiser) pseudonymiseIP(ipBytes []byte) ([]byte, error) { // IPv4 addresses in our system so call Unmap() on it. pseudonymisedAddr = pseudonymisedAddr.Unmap() - if edm.cryptopanCache != nil { - evicted := edm.cryptopanCache.Add(addr, pseudonymisedAddr) + if cache != nil { + evicted := cache.Add(addr, pseudonymisedAddr) if evicted { edm.promCryptopanCacheEvicted.Inc() } @@ -2932,7 +2948,7 @@ collectorLoop: // to do a new lookup against the new dawg to make sure // we have the correct index number (or if it is even // present in the new dawg). - if wu.dawgModTime != wkd.dawgModTime { + if wu.dawgModTime != wkd.snap.Load().dawgModTime { if !retryChannelClosed { wkd.retryCh <- wu } else { diff --git a/pkg/runner/runner_test.go b/pkg/runner/runner_test.go index 2bbe84b..1bd4689 100644 --- a/pkg/runner/runner_test.go +++ b/pkg/runner/runner_test.go @@ -159,7 +159,7 @@ func TestWKD(t *testing.T) { for _, test := range wkdDawgIndexTests { m := new(dns.Msg) m.SetQuestion(test.domain, dns.TypeA) - i, suffixMatch := getDawgIndex(wkd.dawgFinder, m.Question[0].Name) + i, suffixMatch := getDawgIndex(wkd.snap.Load().dawgFinder, m.Question[0].Name) if test.found && i == dawgNotFound { t.Fatalf("%s: expected match %s, but was not found", test.name, test.domain) @@ -486,7 +486,7 @@ func TestIgnoredClientIPsEmpty(t *testing.T) { expectedValidNumCIDRs := 2 // Make sure we actually got anything loaded from the file with content - if edm.ignoredClientsIPSet == nil { + if edm.ignoredClientsIPSet.Load() == nil { t.Fatalf("edm.ignoredClientsIPSet parsed from '%s' should not be nil", testdataFile) } if edm.getNumIgnoredClientCIDRs() < 1 { @@ -507,8 +507,8 @@ func TestIgnoredClientIPsEmpty(t *testing.T) { t.Fatalf("unexpected number of CIDRs parsed from '%s': have: %d, want: %d", testdataFile, edm.getNumIgnoredClientCIDRs(), expectedNumCIDRs) } - if edm.ignoredClientsIPSet != nil { - t.Fatalf("edm.ignoredClientsIPSet should be nil, have: %#v", edm.ignoredClientsIPSet) + if got := edm.ignoredClientsIPSet.Load(); got != nil { + t.Fatalf("edm.ignoredClientsIPSet should be nil, have: %#v", got) } ipLookupTests := []struct { @@ -693,8 +693,8 @@ func TestIgnoredQuestionNamesValid(t *testing.T) { t.Fatalf("unable to parse testdata: %s", err) } - if edm.ignoredQuestions.NumAdded() != expectedNumNames { - t.Fatalf("unexpected number of names parsed from '%s': have: %d, want: %d", testdataFile1, edm.ignoredQuestions.NumAdded(), expectedNumNames) + if edm.ignoredQuestions.Load().finder.NumAdded() != expectedNumNames { + t.Fatalf("unexpected number of names parsed from '%s': have: %d, want: %d", testdataFile1, edm.ignoredQuestions.Load().finder.NumAdded(), expectedNumNames) } questionLookupTests := []struct { @@ -761,8 +761,8 @@ func TestIgnoredQuestionNamesValid(t *testing.T) { t.Fatalf("unable to parse testdata: %s", err) } - if edm.ignoredQuestions.NumAdded() != expectedNumNames { - t.Fatalf("unexpected number of names parsed from '%s': have: %d, want: %d", testdataFile2, edm.ignoredQuestions.NumAdded(), expectedNumNames) + if edm.ignoredQuestions.Load().finder.NumAdded() != expectedNumNames { + t.Fatalf("unexpected number of names parsed from '%s': have: %d, want: %d", testdataFile2, edm.ignoredQuestions.Load().finder.NumAdded(), expectedNumNames) } questionLookupTests2 := []struct { @@ -843,8 +843,8 @@ func TestIgnoredQuestionNamesEmpty(t *testing.T) { // Magic value counted by hand expectedNumNames := 2 - if edm.ignoredQuestions.NumAdded() != expectedNumNames { - t.Fatalf("unexpected number of names parsed from '%s': have: %d, want: %d", testdataFile, edm.ignoredQuestions.NumAdded(), expectedNumNames) + if edm.ignoredQuestions.Load().finder.NumAdded() != expectedNumNames { + t.Fatalf("unexpected number of names parsed from '%s': have: %d, want: %d", testdataFile, edm.ignoredQuestions.Load().finder.NumAdded(), expectedNumNames) } testdataFile = "testdata/ignored-question-names.empty.dawg" @@ -854,7 +854,7 @@ func TestIgnoredQuestionNamesEmpty(t *testing.T) { t.Fatalf("unable to parse testdata: %s", err) } - if edm.ignoredQuestions != nil { + if edm.ignoredQuestions.Load() != nil { t.Fatalf("edm.ignoredQuestions should be nil: have: %#v", edm.ignoredQuestions) } @@ -924,8 +924,8 @@ func TestIgnoredQuestionNamesUnset(t *testing.T) { // Magic value counted by hand expectedNumNames := 2 - if edm.ignoredQuestions.NumAdded() != expectedNumNames { - t.Fatalf("unexpected number of names parsed from '%s': have: %d, want: %d", testdataFile, edm.ignoredQuestions.NumAdded(), expectedNumNames) + if edm.ignoredQuestions.Load().finder.NumAdded() != expectedNumNames { + t.Fatalf("unexpected number of names parsed from '%s': have: %d, want: %d", testdataFile, edm.ignoredQuestions.Load().finder.NumAdded(), expectedNumNames) } // Now set an empty filename @@ -935,7 +935,7 @@ func TestIgnoredQuestionNamesUnset(t *testing.T) { t.Fatalf("unable to parse testdata: %s", err) } - if edm.ignoredQuestions != nil { + if edm.ignoredQuestions.Load() != nil { t.Fatalf("edm.ignoredQuestions should be nil: have: %#v", edm.ignoredQuestions) } @@ -1265,14 +1265,14 @@ func TestPseudonymiseDnstap(t *testing.T) { t.Fatalf("unable to setup edm: %s", err) } - if edm.cryptopanCache != nil { - if edm.cryptopanCache.Len() != 0 { - t.Fatalf("there should be no entries in newly initialised cryptopan cache but it contains items: %d", edm.cryptopanCache.Len()) + if edm.testCryptopanCache() != nil { + if edm.testCryptopanCache().Len() != 0 { + t.Fatalf("there should be no entries in newly initialised cryptopan cache but it contains items: %d", edm.testCryptopanCache().Len()) } } - edm.pseudonymiseDnstap(dt4) - edm.pseudonymiseDnstap(dt6) + edm.testPseudonymiseDnstap(dt4) + edm.testPseudonymiseDnstap(dt6) pseudoQueryAddr4, ok := netip.AddrFromSlice(dt4.Message.QueryAddress) if !ok { @@ -1328,13 +1328,13 @@ func TestPseudonymiseDnstap(t *testing.T) { t.Fatalf("pseudonymised IPv6 resp address %s is not the expected address %s", pseudoRespAddr6, expectedPseudoRespAddr6) } - if edm.cryptopanCache != nil { - if edm.cryptopanCache.Len() == 0 { + if edm.testCryptopanCache() != nil { + if edm.testCryptopanCache().Len() == 0 { t.Fatalf("there should be entries in the cryptopan cache but it is empty") } // Verify the entry in the cache is the same as the one we got back - cachedPseudoQueryAddr4, ok := edm.cryptopanCache.Get(origQueryAddr4) + cachedPseudoQueryAddr4, ok := edm.testCryptopanCache().Get(origQueryAddr4) if !ok { t.Fatalf("unable to lookup IPv4 query address %s in cache", origQueryAddr4) } @@ -1342,7 +1342,7 @@ func TestPseudonymiseDnstap(t *testing.T) { t.Fatalf("cached pseudonymised IPv4 query address %s is not the same as the calculated address %s", cachedPseudoQueryAddr4, pseudoQueryAddr4) } - cachedPseudoRespAddr4, ok := edm.cryptopanCache.Get(origRespAddr4) + cachedPseudoRespAddr4, ok := edm.testCryptopanCache().Get(origRespAddr4) if !ok { t.Fatalf("unable to lookup IPv4 response address %s in cache", origRespAddr4) } @@ -1350,7 +1350,7 @@ func TestPseudonymiseDnstap(t *testing.T) { t.Fatalf("cached pseudonymised IPv4 response address %s is not the same as the calculated address %s", cachedPseudoRespAddr4, pseudoRespAddr4) } - cachedPseudoQueryAddr6, ok := edm.cryptopanCache.Get(origQueryAddr6) + cachedPseudoQueryAddr6, ok := edm.testCryptopanCache().Get(origQueryAddr6) if !ok { t.Fatalf("unable to lookup IPv6 query address %s in cache", origQueryAddr6) } @@ -1358,7 +1358,7 @@ func TestPseudonymiseDnstap(t *testing.T) { t.Fatalf("cached pseudonymised IPv6 query address %s is not the same as the calculated address %s", cachedPseudoQueryAddr6, pseudoQueryAddr6) } - cachedPseudoRespAddr6, ok := edm.cryptopanCache.Get(origRespAddr6) + cachedPseudoRespAddr6, ok := edm.testCryptopanCache().Get(origRespAddr6) if !ok { t.Fatalf("unable to lookup IPv6 response address %s in cache", origRespAddr6) } @@ -1367,13 +1367,13 @@ func TestPseudonymiseDnstap(t *testing.T) { } } - if edm.cryptopanCache != nil { - t.Logf("number of pseudonymisation cache entries before reset: %d", edm.cryptopanCache.Len()) + if edm.testCryptopanCache() != nil { + t.Logf("number of pseudonymisation cache entries before reset: %d", edm.testCryptopanCache().Len()) } - if edm.cryptopanCache != nil { - for _, key := range edm.cryptopanCache.Keys() { - value, ok := edm.cryptopanCache.Get(key) + if edm.testCryptopanCache() != nil { + for _, key := range edm.testCryptopanCache().Keys() { + value, ok := edm.testCryptopanCache().Get(key) if !ok { t.Fatalf("unable to extract value for key before reset: %s", key) } @@ -1388,9 +1388,13 @@ func TestPseudonymiseDnstap(t *testing.T) { t.Fatalf("unable to call edm.SetCryptopan: %s", err) } - if edm.cryptopanCache != nil { - if edm.cryptopanCache.Len() != 0 { - t.Fatalf("there should be no cache entries in replaced cryptopan cache but it contains items: %d", edm.cryptopanCache.Len()) + // Mirror the per-worker cache purge that runMinimiser would do on + // detecting a cryptopan generation change. + edm.testResetCryptopanCache() + + if edm.testCryptopanCache() != nil { + if edm.testCryptopanCache().Len() != 0 { + t.Fatalf("there should be no cache entries in replaced cryptopan cache but it contains items: %d", edm.testCryptopanCache().Len()) } } @@ -1400,8 +1404,8 @@ func TestPseudonymiseDnstap(t *testing.T) { dt6.Message.QueryAddress = origQueryAddr6.AsSlice() dt6.Message.ResponseAddress = origRespAddr6.AsSlice() - edm.pseudonymiseDnstap(dt4) - edm.pseudonymiseDnstap(dt6) + edm.testPseudonymiseDnstap(dt4) + edm.testPseudonymiseDnstap(dt6) pseudoQueryAddrUpdated4, ok := netip.AddrFromSlice(dt4.Message.QueryAddress) if !ok { @@ -1462,10 +1466,10 @@ func TestPseudonymiseDnstap(t *testing.T) { t.Fatalf("updated pseudonymised IPv6 resp address %s is not the expected address %s", pseudoRespAddrUpdated6, expectedPseudoRespAddrUpdated6) } - if edm.cryptopanCache != nil { - t.Logf("number of pseudonymisation cache entries before end: %d", edm.cryptopanCache.Len()) - for _, key := range edm.cryptopanCache.Keys() { - value, ok := edm.cryptopanCache.Get(key) + if edm.testCryptopanCache() != nil { + t.Logf("number of pseudonymisation cache entries before end: %d", edm.testCryptopanCache().Len()) + for _, key := range edm.testCryptopanCache().Keys() { + value, ok := edm.testCryptopanCache().Get(key) if !ok { t.Fatalf("unable to extract value for key before end: %s", key) } @@ -1480,14 +1484,20 @@ func TestPseudonymiseDnstap(t *testing.T) { t.Fatalf("unable to call edm.SetCryptopan with 0 cache size: %s", err) } + // Mirror the per-worker cache purge + disable that runMinimiser would + // do in production: drop the existing test cache and zero the config + // so testCryptopanCache returns nil (uncached path). + edm.testResetCryptopanCache() + edm.conf.CryptopanAddressEntries = 0 + // Reset the addresses and pseudonymise again with the updated key dt4.Message.QueryAddress = origQueryAddr4.AsSlice() dt4.Message.ResponseAddress = origRespAddr4.AsSlice() dt6.Message.QueryAddress = origQueryAddr6.AsSlice() dt6.Message.ResponseAddress = origRespAddr6.AsSlice() - edm.pseudonymiseDnstap(dt4) - edm.pseudonymiseDnstap(dt6) + edm.testPseudonymiseDnstap(dt4) + edm.testPseudonymiseDnstap(dt6) uncachedPseudoQueryAddr4, ok := netip.AddrFromSlice(dt4.Message.QueryAddress) if !ok { @@ -1568,7 +1578,7 @@ func BenchmarkPseudonymiseDnstapWithCache4(b *testing.B) { ResponseAddress: origRespAddr4.AsSlice(), }, } - edm.pseudonymiseDnstap(dt4) + edm.testPseudonymiseDnstap(dt4) } } @@ -1600,7 +1610,7 @@ func BenchmarkPseudonymiseDnstapWithoutCache4(b *testing.B) { ResponseAddress: origRespAddr4.AsSlice(), }, } - edm.pseudonymiseDnstap(dt4) + edm.testPseudonymiseDnstap(dt4) } } @@ -1629,7 +1639,7 @@ func BenchmarkPseudonymiseDnstapWithCache6(b *testing.B) { ResponseAddress: origRespAddr6.AsSlice(), }, } - edm.pseudonymiseDnstap(dt6) + edm.testPseudonymiseDnstap(dt6) } } @@ -1661,7 +1671,7 @@ func BenchmarkPseudonymiseDnstapWithoutCache6(b *testing.B) { ResponseAddress: origRespAddr6.AsSlice(), }, } - edm.pseudonymiseDnstap(dt6) + edm.testPseudonymiseDnstap(dt6) } } diff --git a/pkg/runner/test_helpers_test.go b/pkg/runner/test_helpers_test.go new file mode 100644 index 0000000..7cccb4c --- /dev/null +++ b/pkg/runner/test_helpers_test.go @@ -0,0 +1,60 @@ +package runner + +import ( + "net/netip" + "sync" + + dnstap "github.com/dnstap/golang-dnstap" + lru "github.com/hashicorp/golang-lru/v2" +) + +// testPseudonymiseDnstap is the test-side equivalent of the producer +// hot path. After Tier 2 #5, pseudonymiseDnstap takes the per-worker +// cache + cryptopan snapshot as parameters; tests don't run inside a +// real worker so they need their own cache. We give each *dnstapMinimiser +// instance one shared cache via a sync.Map keyed by the minimiser +// pointer, so repeated test calls accumulate hits like the old API did. +// +// This is purely a test convenience - production code does not use it. +var testCryptopanCaches sync.Map // map[*dnstapMinimiser]*lru.Cache[netip.Addr, netip.Addr] + +func (edm *dnstapMinimiser) testPseudonymiseDnstap(dt *dnstap.Dnstap) { + cache := edm.testCryptopanCache() + edm.pseudonymiseDnstap(dt, edm.cryptopan.Load(), cache) +} + +// testCryptopanCache returns the shared per-edm-instance cache, creating +// it lazily so callers don't have to set it up. cacheEntries is read from +// the current config; 0 disables caching, mirroring production behaviour. +func (edm *dnstapMinimiser) testCryptopanCache() *lru.Cache[netip.Addr, netip.Addr] { + conf := edm.getConfig() + if conf.CryptopanAddressEntries == 0 { + return nil + } + if c, ok := testCryptopanCaches.Load(edm); ok { + return c.(*lru.Cache[netip.Addr, netip.Addr]) + } + c, err := lru.New[netip.Addr, netip.Addr](conf.CryptopanAddressEntries) + if err != nil { + panic(err) + } + actual, _ := testCryptopanCaches.LoadOrStore(edm, c) + return actual.(*lru.Cache[netip.Addr, netip.Addr]) +} + +// testResetCryptopanCache drops the test-side cache for edm. Used by +// tests after setCryptopan to mirror the per-worker Purge that +// runMinimiser does on cryptopanGen change. +func (edm *dnstapMinimiser) testResetCryptopanCache() { + testCryptopanCaches.Delete(edm) +} + +func cleanupTestMinimiser(edm *dnstapMinimiser) { + if edm.stop != nil { + edm.stop() + } + if edm.fsWatcher != nil { + _ = edm.fsWatcher.Close() + edm.fsWatcher = nil + } +}