Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions pkg/runner/partial_dnstap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package runner

import (
"io"
"log/slog"
"testing"
"time"

dnstap "github.com/dnstap/golang-dnstap"
"github.com/miekg/dns"
)

func TestParsePacketMissingTimestamps(t *testing.T) {
edm := newPartialDnstapTestMinimiser()
wire := packedDNSMessage(t, "example.com.")
epoch := time.Unix(0, 0).UTC()

tests := []struct {
name string
isQuery bool
dt *dnstap.Dnstap
}{
{
name: "query timestamp missing",
isQuery: true,
dt: &dnstap.Dnstap{
Message: &dnstap.Message{QueryMessage: wire},
},
},
{
name: "response timestamp missing",
isQuery: false,
dt: &dnstap.Dnstap{
Message: &dnstap.Message{ResponseMessage: wire},
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
msg, got := edm.parsePacket(tc.dt, tc.isQuery)
if msg == nil {
t.Fatal("parsePacket returned nil DNS message")
}
if !got.Equal(epoch) {
t.Fatalf("timestamp have: %s, want: %s", got, epoch)
}
})
}
}

func TestNewSessionAllowsMissingSocketMetadata(t *testing.T) {
edm := newPartialDnstapTestMinimiser()
msg := new(dns.Msg)
msg.SetQuestion("example.com.", dns.TypeA)

sd := edm.newSession(&dnstap.Dnstap{
Message: &dnstap.Message{},
}, msg, false, defaultLabelLimit, time.Unix(0, 0).UTC())

if sd.DNSProtocol != nil {
t.Fatalf("DNSProtocol should be nil when SocketProtocol is missing, have: %d", *sd.DNSProtocol)
}
if sd.SourceIPv4 != nil || sd.DestIPv4 != nil ||
sd.SourceIPv6Network != nil || sd.SourceIPv6Host != nil ||
sd.DestIPv6Network != nil || sd.DestIPv6Host != nil {
t.Fatalf("IP fields should stay nil when SocketFamily is missing: %#v", sd)
}
}

func TestFormatDnstapEndpointPortWithoutAddress(t *testing.T) {
port := uint32(12345)
got := formatDnstapEndpoint(nil, &port)
if got != "?:12345" {
t.Fatalf("endpoint have: %s, want: ?:12345", got)
}
}

func newPartialDnstapTestMinimiser() *dnstapMinimiser {
return &dnstapMinimiser{
log: slog.New(slog.NewTextHandler(io.Discard, nil)),
}
}

func packedDNSMessage(t *testing.T, qname string) []byte {
t.Helper()

msg := new(dns.Msg)
msg.SetQuestion(qname, dns.TypeA)
msg.Response = true
wire, err := msg.Pack()
if err != nil {
t.Fatalf("dns message Pack: %s", err)
}
return wire
}
97 changes: 52 additions & 45 deletions pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2111,7 +2111,7 @@ func (edm *dnstapMinimiser) newSession(dt *dnstap.Dnstap, msg *dns.Msg, isQuery
sd.ServerID = &sID
}

switch *dt.Message.SocketFamily {
switch dt.Message.GetSocketFamily() {
case dnstap.SocketFamily_INET:
if dt.Message.QueryAddress != nil {
sourceIPInt, err := ipBytesToInt(dt.Message.QueryAddress)
Expand Down Expand Up @@ -2160,7 +2160,10 @@ func (edm *dnstapMinimiser) newSession(dt *dnstap.Dnstap, msg *dns.Msg, isQuery
edm.log.Error("packet is neither INET or INET6")
}

sd.DNSProtocol = (*int32)(dt.Message.SocketProtocol)
if dt.Message.SocketProtocol != nil {
dnsProtocol := int32(dt.Message.GetSocketProtocol())
sd.DNSProtocol = &dnsProtocol
}

return sd
}
Expand Down Expand Up @@ -2503,64 +2506,68 @@ func (edm *dnstapMinimiser) newQnamePublisher(wg *sync.WaitGroup) {
}

func (edm *dnstapMinimiser) parsePacket(dt *dnstap.Dnstap, isQuery bool) (*dns.Msg, time.Time) {
var t time.Time
var err error
var queryAddress, responseAddress string

qa := net.IP(dt.Message.QueryAddress)
ra := net.IP(dt.Message.ResponseAddress)

// Query address: 10.10.10.10:31337, 10.10.10.10:?, ?:31337 or ?
if qa != nil && dt.Message.QueryPort != nil {
queryAddress = qa.String() + ":" + strconv.FormatUint(uint64(*dt.Message.QueryPort), 10)
} else if qa != nil {
queryAddress = qa.String() + ":?"
} else if dt.Message.ResponsePort != nil {
queryAddress = "?:" + strconv.FormatUint(uint64(*dt.Message.QueryPort), 10)
} else {
queryAddress = "?"
}

// Response address: 10.10.10.10:31337, 10.10.10.10:?, ?:31337 or ?
if ra != nil && dt.Message.ResponsePort != nil {
responseAddress = ra.String() + ":" + strconv.FormatUint(uint64(*dt.Message.ResponsePort), 10)
} else if ra != nil {
responseAddress = ra.String() + ":?"
} else if dt.Message.ResponsePort != nil {
responseAddress = "?:" + strconv.FormatUint(uint64(*dt.Message.ResponsePort), 10)
} else {
responseAddress = "?"
if dt.Message == nil {
edm.log.Error("parsePacket: dnstap message is missing")
return nil, time.Unix(0, 0).UTC()
}

queryAddress := formatDnstapEndpoint(dt.Message.QueryAddress, dt.Message.QueryPort)
responseAddress := formatDnstapEndpoint(dt.Message.ResponseAddress, dt.Message.ResponsePort)

msg := new(dns.Msg)
if isQuery {
err = msg.Unpack(dt.Message.QueryMessage)
if err != nil {
edm.log.Error("unable to unpack query message", "error", err, "query_address", queryAddress, "response_address", responseAddress)
msg = nil
}
if *dt.Message.QueryTimeSec > math.MaxInt64 {
edm.log.Error("dt.Message.QueryTimeSec is too large for int64, setting time to 0", "value", *dt.Message.QueryTimeSec)
*dt.Message.QueryTimeSec = 0
*dt.Message.QueryTimeNsec = 0
}
t = time.Unix(int64(*dt.Message.QueryTimeSec), int64(*dt.Message.QueryTimeNsec)).UTC() // #nosec G115 -- Will be zeroed out above if too large, https://github.com/securego/gosec/issues/1212#issuecomment-2739574884
} else {
err = msg.Unpack(dt.Message.ResponseMessage)
if err != nil {
edm.log.Error("unable to unpack response message", "error", err, "query_address", queryAddress, "response_address", responseAddress)
msg = nil
}
if *dt.Message.ResponseTimeSec > math.MaxInt64 {
edm.log.Error("dt.Message.ResponseTimeSec is too large for int64, setting time to 0", "value", *dt.Message.ResponseTimeSec)
*dt.Message.ResponseTimeSec = 0
*dt.Message.ResponseTimeNsec = 0
}
t = time.Unix(int64(*dt.Message.ResponseTimeSec), int64(*dt.Message.ResponseTimeNsec)).UTC() // #nosec G115 -- Will be zeroed out above if too large, https://github.com/securego/gosec/issues/1212#issuecomment-2739574884
t := edm.dnstapTimestamp(dt.Message.QueryTimeSec, dt.Message.QueryTimeNsec, "dt.Message.QueryTimeSec")
return msg, t
}

err = msg.Unpack(dt.Message.ResponseMessage)
if err != nil {
edm.log.Error("unable to unpack response message", "error", err, "query_address", queryAddress, "response_address", responseAddress)
msg = nil
}
t := edm.dnstapTimestamp(dt.Message.ResponseTimeSec, dt.Message.ResponseTimeNsec, "dt.Message.ResponseTimeSec")
return msg, t
}

func formatDnstapEndpoint(ipBytes []byte, port *uint32) string {
ip, ok := netip.AddrFromSlice(ipBytes)
if ok && port != nil {
return ip.String() + ":" + strconv.FormatUint(uint64(*port), 10)
}
if ok {
return ip.String() + ":?"
}
if port != nil {
return "?:" + strconv.FormatUint(uint64(*port), 10)
}
return "?"
}

func (edm *dnstapMinimiser) dnstapTimestamp(sec *uint64, nsec *uint32, fieldName string) time.Time {
if sec == nil {
edm.log.Error(fieldName + " is missing, setting time to 0")
return time.Unix(0, 0).UTC()
}
if *sec > math.MaxInt64 {
edm.log.Error(fieldName+" is too large for int64, setting time to 0", "value", *sec)
return time.Unix(0, 0).UTC()
}

var nsecValue uint32
if nsec != nil {
nsecValue = *nsec
}

return time.Unix(int64(*sec), int64(nsecValue)).UTC() // #nosec G115 -- sec is checked above and nsec is uint32.
}

func ipBytesToInt(ip4Bytes []byte) (uint32, error) {
ip, ok := netip.AddrFromSlice(ip4Bytes)
if !ok {
Expand Down