diff --git a/pkg/runner/aggregate_sender.go b/pkg/runner/aggregate_sender.go index 440835e..03c4de4 100644 --- a/pkg/runner/aggregate_sender.go +++ b/pkg/runner/aggregate_sender.go @@ -8,7 +8,6 @@ import ( "fmt" "io" "log/slog" - "math" "net" "net/http" "net/url" @@ -126,10 +125,8 @@ func (as aggregateSender) send(fileName string, ts time.Time, duration time.Dura req.ContentLength = fileSize // Expected by aggrec, e.g: - // Aggregate-Interval: 2023-11-16T09:24:13.487591+01:00/PT1M - minutesFloat := duration.Minutes() - minutes := int(math.Round(minutesFloat)) - req.Header.Add("Aggregate-Interval", fmt.Sprintf("%s/PT%dM", ts.Truncate(time.Minute).Format(time.RFC3339), minutes)) + // Aggregate-Interval: 2023-11-16T09:24:13+01:00/PT45S + req.Header.Add("Aggregate-Interval", fmt.Sprintf("%s/%s", ts.Format(time.RFC3339), iso8601Duration(duration))) as.log.Info("aggregateSender.send", "filename", fileName, "url", histogramURL) startTime := time.Now() @@ -171,3 +168,36 @@ func (as aggregateSender) send(fileName string, ts time.Time, duration time.Dura return nil } + +func iso8601Duration(duration time.Duration) string { + if duration <= 0 { + return "PT0S" + } + + total := int64(duration / time.Second) + nanoseconds := duration % time.Second + + hours := total / 3600 + total %= 3600 + minutes := total / 60 + seconds := total % 60 + + res := "PT" + if hours > 0 { + res += strconv.FormatInt(hours, 10) + "H" + } + if minutes > 0 { + res += strconv.FormatInt(minutes, 10) + "M" + } + if seconds > 0 || nanoseconds > 0 || res == "PT" { + if nanoseconds == 0 { + res += strconv.FormatInt(seconds, 10) + } else { + secondsFloat := float64(seconds) + float64(nanoseconds)/float64(time.Second) + res += strconv.FormatFloat(secondsFloat, 'f', -1, 64) + } + res += "S" + } + + return res +} diff --git a/pkg/runner/aggregate_sender_test.go b/pkg/runner/aggregate_sender_test.go new file mode 100644 index 0000000..732e1e4 --- /dev/null +++ b/pkg/runner/aggregate_sender_test.go @@ -0,0 +1,127 @@ +package runner + +import ( + "crypto/ed25519" + "crypto/rand" + "io" + "log/slog" + "net/http" + "net/http/httptest" + "net/url" + "os" + "testing" + "time" + + "github.com/lestrrat-go/jwx/v2/jwa" + "github.com/lestrrat-go/jwx/v2/jwk" +) + +func TestISO8601Duration(t *testing.T) { + tests := []struct { + name string + in time.Duration + want string + }{ + { + name: "zero", + in: 0, + want: "PT0S", + }, + { + name: "seconds", + in: 45 * time.Second, + want: "PT45S", + }, + { + name: "minutes", + in: time.Minute, + want: "PT1M", + }, + { + name: "mixed", + in: time.Hour + 2*time.Minute + 3*time.Second, + want: "PT1H2M3S", + }, + { + name: "fractional seconds", + in: 1500 * time.Millisecond, + want: "PT1.5S", + }, + { + name: "negative clamps to zero", + in: -time.Second, + want: "PT0S", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + if got := iso8601Duration(tc.in); got != tc.want { + t.Fatalf("have: %s, want: %s", got, tc.want) + } + }) + } +} + +func TestAggregateSenderUsesExactIntervalHeader(t *testing.T) { + var gotInterval string + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotInterval = r.Header.Get("Aggregate-Interval") + if _, err := io.Copy(io.Discard, r.Body); err != nil { + t.Fatalf("reading request body: %s", err) + } + w.Header().Set("Location", "/stored") + w.WriteHeader(http.StatusCreated) + })) + t.Cleanup(server.Close) + + file, err := os.CreateTemp(t.TempDir(), "aggregate-*.parquet") + if err != nil { + t.Fatalf("CreateTemp: %s", err) + } + if _, err := file.WriteString("payload"); err != nil { + t.Fatalf("write temp aggregate: %s", err) + } + if err := file.Close(); err != nil { + t.Fatalf("close temp aggregate: %s", err) + } + + _, signingKey, err := ed25519.GenerateKey(rand.Reader) + if err != nil { + t.Fatalf("GenerateKey: %s", err) + } + signingJWK, err := jwk.FromRaw(signingKey) + if err != nil { + t.Fatalf("FromRaw: %s", err) + } + if err := signingJWK.Set(jwk.AlgorithmKey, jwa.EdDSA); err != nil { + t.Fatalf("set Algorithm: %s", err) + } + if err := signingJWK.Set(jwk.KeyIDKey, "aggregate-test-key"); err != nil { + t.Fatalf("set KeyID: %s", err) + } + + aggrecURL, err := url.Parse(server.URL) + if err != nil { + t.Fatalf("parse server URL: %s", err) + } + + edm := &dnstapMinimiser{ + log: slog.New(slog.NewTextHandler(io.Discard, nil)), + httpClientCertStore: newCertStore(), + } + as, err := edm.newAggregateSender(aggrecURL, signingJWK, nil) + if err != nil { + t.Fatalf("newAggregateSender: %s", err) + } + + start := time.Date(2026, 4, 29, 12, 34, 45, 0, time.UTC) + if err := as.send(file.Name(), start, 45*time.Second); err != nil { + t.Fatalf("send: %s", err) + } + + want := "2026-04-29T12:34:45Z/PT45S" + if gotInterval != want { + t.Fatalf("Aggregate-Interval header\n have: %s\n want: %s", gotInterval, want) + } +}