Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
355 changes: 355 additions & 0 deletions e2e/cse_timing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,355 @@
package e2e

import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"
"testing"
"time"

"github.com/Azure/agentbaker/e2e/toolkit"
)

const (
// cseEventsDir is the directory where CSE task timing events are stored on the VM.
// This matches EVENTS_LOGGING_DIR defined in both cse_helpers.sh and cse_start.sh.
// Events are written directly here (not in per-handler subdirectories) — each file
// is a single-line JSON object named <epoch-ms>.json.
cseEventsDir = "/var/log/azure/Microsoft.Azure.Extensions.CustomScript/events/"
Comment thread
djsly marked this conversation as resolved.
Comment thread
djsly marked this conversation as resolved.
// provisionJSONPath is the path to the provision.json file with overall boot timing.
provisionJSONPath = "/var/log/azure/aks/provision.json"
)

// CSETaskTiming represents the timing of a single CSE task.
type CSETaskTiming struct {
TaskName string
StartTime time.Time
EndTime time.Time
Duration time.Duration
Message string
}

// CSEProvisionTiming represents the overall provisioning timing from provision.json.
type CSEProvisionTiming struct {
ExitCode string `json:"ExitCode"`
ExecDuration string `json:"ExecDuration"`
KernelStartTime string `json:"KernelStartTime"`
CloudInitLocalStart string `json:"CloudInitLocalStartTime"`
CloudInitStart string `json:"CloudInitStartTime"`
CloudFinalStart string `json:"CloudFinalStartTime"`
CSEStartTime string `json:"CSEStartTime"`
GuestAgentStartTime string `json:"GuestAgentStartTime"`
SystemdSummary string `json:"SystemdSummary"`
BootDatapoints json.RawMessage `json:"BootDatapoints"`
}

// CSETimingReport holds all parsed timing data from a VM.
type CSETimingReport struct {
Tasks []CSETaskTiming
Provision *CSEProvisionTiming
taskIndex map[string]*CSETaskTiming
}

// cseEventJSON matches the JSON structure written by logs_to_events() in cse_helpers.sh.
// Despite its name, OperationId stores the task *end* timestamp (not a GUID).
// This is by design: GA (Guest Agent) requires specific field names, and OperationId
// was repurposed to carry the end time. See cse_helpers.sh logs_to_events():
//
// --arg Timestamp "${startTime}"
// --arg OperationId "${endTime}"
type cseEventJSON struct {
Timestamp string `json:"Timestamp"`
OperationId string `json:"OperationId"` // end timestamp, not a GUID — see logs_to_events() in cse_helpers.sh
TaskName string `json:"TaskName"`
EventLevel string `json:"EventLevel"`
Message string `json:"Message"`
Comment thread
djsly marked this conversation as resolved.
}

Comment thread
djsly marked this conversation as resolved.
// GetTask returns the timing for a specific task, or nil if not found.
func (r *CSETimingReport) GetTask(name string) *CSETaskTiming {
if r.taskIndex == nil {
r.taskIndex = make(map[string]*CSETaskTiming, len(r.Tasks))
for i := range r.Tasks {
r.taskIndex[r.Tasks[i].TaskName] = &r.Tasks[i]
}
}
return r.taskIndex[name]
}

// TotalCSEDuration returns the duration of the cse_start task if present.
func (r *CSETimingReport) TotalCSEDuration() time.Duration {
if t := r.GetTask("AKS.CSE.cse_start"); t != nil {
return t.Duration
}
return 0
}

// LogReport logs all task timings to the test logger.
func (r *CSETimingReport) LogReport(_ context.Context, t interface{ Logf(string, ...any) }) {
t.Logf("=== CSE Task Timing Report ===")
t.Logf("%-60s %12s %12s", "Task", "Duration", "Start→End")
t.Logf("%s", strings.Repeat("-", 90))

sorted := make([]CSETaskTiming, len(r.Tasks))
copy(sorted, r.Tasks)
sort.Slice(sorted, func(i, j int) bool {
return sorted[i].StartTime.Before(sorted[j].StartTime)
})

for _, task := range sorted {
t.Logf("%-60s %10.2fs %s → %s",
task.TaskName,
task.Duration.Seconds(),
task.StartTime.Format("15:04:05.000"),
task.EndTime.Format("15:04:05.000"),
)
}

if total := r.TotalCSEDuration(); total > 0 {
t.Logf("%s", strings.Repeat("-", 90))
t.Logf("%-60s %10.2fs", "TOTAL (cse_start)", total.Seconds())
}

if r.Provision != nil {
t.Logf("\n=== Provision Summary ===")
t.Logf("ExitCode: %s, ExecDuration: %ss", r.Provision.ExitCode, r.Provision.ExecDuration)
t.Logf("KernelStart: %s, CSEStart: %s, GuestAgent: %s",
r.Provision.KernelStartTime, r.Provision.CSEStartTime, r.Provision.GuestAgentStartTime)
}
}

// ExtractCSETimings SSHes into the scenario VM and extracts all CSE task timings.
// Returns an error if no tasks could be parsed, since an empty report would make
// regression detection ineffective.
func ExtractCSETimings(ctx context.Context, s *Scenario) (*CSETimingReport, error) {
report := &CSETimingReport{}

// Read all event JSON files from the CSE events directory, explicitly
// appending a newline after each file so each JSON document is separated.
listCmd := fmt.Sprintf("sudo find %s -name '*.json' -exec sh -c 'cat \"$1\"; echo' _ {} \\;", cseEventsDir)
Comment thread
djsly marked this conversation as resolved.
result, err := execScriptOnVm(ctx, s, s.Runtime.VM, listCmd)
if err != nil {
return nil, fmt.Errorf("failed to read CSE events: %w", err)
}

// Parse event JSON objects using json.Decoder for robustness — handles both
// single-line and multi-line JSON, and doesn't break on embedded newlines.
decoder := json.NewDecoder(strings.NewReader(strings.TrimSpace(result.stdout)))
var parseErrors int
for decoder.More() {
var event cseEventJSON
if err := decoder.Decode(&event); err != nil {
parseErrors++
s.T.Logf("WARNING: failed to decode CSE event JSON: %v", err)
continue
}
if event.TaskName == "" || event.Timestamp == "" || event.OperationId == "" {
continue
}

startTime, err := parseCSETimestamp(event.Timestamp)
Comment thread
djsly marked this conversation as resolved.
if err != nil {
parseErrors++
s.T.Logf("WARNING: failed to parse CSE start timestamp for task %s: %v", event.TaskName, err)
continue
}
endTime, err := parseCSETimestamp(event.OperationId)
Comment thread
djsly marked this conversation as resolved.
if err != nil {
parseErrors++
s.T.Logf("WARNING: failed to parse CSE end timestamp for task %s: %v", event.TaskName, err)
continue
}
Comment thread
djsly marked this conversation as resolved.

report.Tasks = append(report.Tasks, CSETaskTiming{
TaskName: event.TaskName,
StartTime: startTime,
EndTime: endTime,
Duration: endTime.Sub(startTime),
Message: event.Message,
})
}

if parseErrors > 0 {
s.T.Logf("WARNING: %d CSE event parse errors encountered", parseErrors)
}
if len(report.Tasks) == 0 {
return report, fmt.Errorf("no CSE task timings were parsed (%d parse errors)", parseErrors)
}

// Read provision.json for overall boot timing
provResult, err := execScriptOnVm(ctx, s, s.Runtime.VM, fmt.Sprintf("sudo cat %s", provisionJSONPath))
if err == nil && provResult.stdout != "" {
var prov CSEProvisionTiming
if json.Unmarshal([]byte(strings.TrimSpace(provResult.stdout)), &prov) == nil {
report.Provision = &prov
}
}

return report, nil
}

// parseCSETimestamp parses the timestamp format used by logs_to_events: "YYYY-MM-DD HH:MM:SS.mmm"
func parseCSETimestamp(s string) (time.Time, error) {
layouts := []string{
"2006-01-02 15:04:05.000",
"2006-01-02 15:04:05",
}
for _, layout := range layouts {
if t, err := time.Parse(layout, s); err == nil {
return t, nil
}
}
return time.Time{}, fmt.Errorf("cannot parse CSE timestamp %q", s)
}
Comment on lines +193 to +205
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This introduces new parsing behavior that is easy to unit test (valid timestamps with/without milliseconds, invalid formats, and boundary values). Adding a small Go unit test for parseCSETimestamp (and optionally for the suffix-matching/threshold selection behavior) would help prevent regressions without requiring a full E2E run.

Copilot uses AI. Check for mistakes.

// CSETimingThresholds defines maximum acceptable durations for CSE tasks.
type CSETimingThresholds struct {
// TaskThresholds maps task name suffixes to maximum duration.
// Task names are matched by suffix to allow flexible matching
// (e.g., "installDebPackageFromFile" matches "AKS.CSE.installkubelet.installDebPackageFromFile").
TaskThresholds map[string]time.Duration

// TotalCSEThreshold is the maximum acceptable total CSE duration.
TotalCSEThreshold time.Duration

// DefaultTaskThreshold is the threshold applied to any task that exceeds it
// but has no specific entry in TaskThresholds. This ensures that ALL slow tasks
// appear as sub-tests in ADO Pipeline Analytics, even newly added ones.
// Tasks below this threshold are silently skipped.
// Set to 0 to disable dynamic tracking.
DefaultTaskThreshold time.Duration
}

// ValidateCSETimings extracts CSE task timings from the VM, logs them, and validates
// against thresholds. Each threshold check runs as a t.Run() sub-test so that ADO
// Pipeline Analytics (via gotestsum → JUnit XML → PublishTestResults) can track
// individual CSE task pass/fail and duration trends over time.
func ValidateCSETimings(ctx context.Context, s *Scenario, thresholds CSETimingThresholds) *CSETimingReport {
s.T.Helper()
defer toolkit.LogStep(s.T, "validating CSE task timings")()

// Unwrap the underlying *testing.T from the toolkit logger wrapper
// so we can use t.Run() for sub-tests (ADO Pipeline Analytics tracking).
tRunner := toolkit.UnwrapTestingT(s.T)
if tRunner == nil {
s.T.Fatalf("ValidateCSETimings requires *testing.T for sub-test support, got %T", s.T)
}

report, err := ExtractCSETimings(ctx, s)
if err != nil {
s.T.Fatalf("failed to extract CSE timings: %v", err)
}

// Always log the full timing report
report.LogReport(ctx, s.T)

// Fail if no tasks were parsed — an empty report makes regression detection ineffective
if len(report.Tasks) == 0 {
s.T.Fatalf("no CSE task timings were parsed; cannot validate performance thresholds")
}

// Fail hard if the critical cse_start task is missing — without it TotalCSEDuration()
// returns 0 and the total duration threshold check would silently pass.
if report.GetTask("AKS.CSE.cse_start") == nil {
s.T.Fatalf("expected AKS.CSE.cse_start task not found in timing report; cannot validate total CSE duration")
}

// Validate total CSE duration as a sub-test for ADO tracking
if thresholds.TotalCSEThreshold > 0 {
tRunner.Run("TotalCSEDuration", func(t *testing.T) {
totalDuration := report.TotalCSEDuration()
t.Logf("total CSE duration: %s (threshold: %s)", totalDuration, thresholds.TotalCSEThreshold)
if totalDuration > thresholds.TotalCSEThreshold {
toolkit.LogDuration(ctx, totalDuration, thresholds.TotalCSEThreshold,
fmt.Sprintf("CSE total duration %s exceeds threshold %s", totalDuration, thresholds.TotalCSEThreshold))
t.Errorf("CSE total duration %s exceeds threshold %s", totalDuration, thresholds.TotalCSEThreshold)
}
})
}
Comment thread
djsly marked this conversation as resolved.

// Validate individual task thresholds — each as a sub-test for ADO tracking.
// ADO Test Analytics will show per-task pass/fail trends and flag regressions.
// Sort suffixes by length descending so longer (more specific) suffixes match first,
// making matching deterministic when multiple suffixes could match the same task.
sortedSuffixes := make([]string, 0, len(thresholds.TaskThresholds))
for suffix := range thresholds.TaskThresholds {
sortedSuffixes = append(sortedSuffixes, suffix)
}
sort.Slice(sortedSuffixes, func(i, j int) bool {
return len(sortedSuffixes[i]) > len(sortedSuffixes[j])
})

matchedTasks := make(map[string]bool)
matchedSuffixes := make(map[string]bool)
for _, task := range report.Tasks {
for _, suffix := range sortedSuffixes {
maxDuration := thresholds.TaskThresholds[suffix]
if strings.HasSuffix(task.TaskName, suffix) {
matchedTasks[task.TaskName] = true
matchedSuffixes[suffix] = true
task := task
suffix := suffix
maxDuration := maxDuration
Comment thread
djsly marked this conversation as resolved.
// Include sanitized task name to avoid collisions when multiple tasks match different suffixes
shortTask := task.TaskName
if idx := strings.LastIndex(shortTask, "."); idx >= 0 {
shortTask = shortTask[idx+1:]
}
testName := suffix
if shortTask != suffix {
testName = fmt.Sprintf("%s/%s", shortTask, suffix)
}
tRunner.Run(fmt.Sprintf("Task_%s", testName), func(t *testing.T) {
t.Logf("task %s duration: %s (threshold: %s)", task.TaskName, task.Duration, maxDuration)
if task.Duration > maxDuration {
toolkit.LogDuration(ctx, task.Duration, maxDuration,
fmt.Sprintf("CSE task %s took %s (threshold: %s)", task.TaskName, task.Duration, maxDuration))
t.Errorf("CSE task %s took %s, exceeds threshold %s", task.TaskName, task.Duration, maxDuration)
}
})
break
}
}
}
Comment thread
djsly marked this conversation as resolved.

// Verify all configured threshold suffixes matched at least one task.
// This catches task renames or removals that would silently disable regression checks.
for _, suffix := range sortedSuffixes {
if !matchedSuffixes[suffix] {
s.T.Errorf("threshold suffix %q did not match any CSE task — task may have been renamed or removed", suffix)
}
}

// Dynamic tracking: create sub-tests for any task that exceeds DefaultTaskThreshold
// but wasn't matched by a specific threshold above. This ensures newly added CSE tasks
// automatically appear in ADO Pipeline Analytics without code changes.
if thresholds.DefaultTaskThreshold > 0 {
for _, task := range report.Tasks {
if matchedTasks[task.TaskName] {
continue
}
if task.Duration < thresholds.DefaultTaskThreshold {
continue
}
task := task
// Extract short name: "AKS.CSE.foo.bar" → "bar", or use full name if no dots
shortName := task.TaskName
if idx := strings.LastIndex(shortName, "."); idx >= 0 {
shortName = shortName[idx+1:]
}
defaultThreshold := thresholds.DefaultTaskThreshold
tRunner.Run(fmt.Sprintf("Task_%s", shortName), func(t *testing.T) {
Comment on lines +337 to +343
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using only the last dot-segment as the sub-test name can cause collisions when different tasks share the same suffix (e.g., multiple ...setup tasks). Go’s test runner will auto-disambiguate duplicate names, but ADO analytics can become harder to interpret. Consider incorporating a deterministic, sanitized full task name (or a longer stable prefix) into Task_... to keep tracking unambiguous.

Suggested change
// Extract short name: "AKS.CSE.foo.bar" → "bar", or use full name if no dots
shortName := task.TaskName
if idx := strings.LastIndex(shortName, "."); idx >= 0 {
shortName = shortName[idx+1:]
}
defaultThreshold := thresholds.DefaultTaskThreshold
tRunner.Run(fmt.Sprintf("Task_%s", shortName), func(t *testing.T) {
// Use a deterministic sanitized full task name to avoid collisions between
// tasks that share the same suffix, e.g. multiple "...setup" tasks.
sanitizedTaskName := strings.NewReplacer(".", "_", "/", "_", " ", "_", ":", "_", "-", "_").Replace(task.TaskName)
defaultThreshold := thresholds.DefaultTaskThreshold
tRunner.Run(fmt.Sprintf("Task_%s", sanitizedTaskName), func(t *testing.T) {

Copilot uses AI. Check for mistakes.
t.Logf("task %s duration: %s (default threshold: %s — no specific threshold configured)",
Comment thread
djsly marked this conversation as resolved.
task.TaskName, task.Duration, defaultThreshold)
if task.Duration > defaultThreshold {
t.Errorf("CSE task %s took %s, exceeds default threshold %s (consider adding a specific threshold)",
task.TaskName, task.Duration, defaultThreshold)
}
})
}
}

return report
}
Loading
Loading