Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1280,9 +1280,15 @@ func Run(logger *slog.Logger, loggerLevel *slog.LevelVar) {
WriteTimeout: 31 * time.Second,
}

var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
err := pprofServer.ListenAndServe()
logger.Error("pprofServer failed", "error", err)
if err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Error("pprofServer error", "error", err)
}
}()

metricsMux := http.NewServeMux()
Expand All @@ -1296,13 +1302,15 @@ func Run(logger *slog.Logger, loggerLevel *slog.LevelVar) {

// Setup custom promHandler since we want to use our per-edm registry
metricsMux.Handle("/metrics", promhttp.InstrumentMetricHandler(edm.promReg, promhttp.HandlerFor(edm.promReg, promhttp.HandlerOpts{Registry: edm.promReg})))
wg.Add(1)
go func() {
defer wg.Done()
err := metricsServer.ListenAndServe()
logger.Error("metricsServer failed", "error", err)
if err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Error("metricsServer error", "error", err)
}
}()

var wg sync.WaitGroup

// Write histogram file to an outbox dir where it will get picked up by
// the histogram sender. Upon being sent it will be moved to the sent dir.
dataDir := startConf.DataDir
Expand Down Expand Up @@ -1393,7 +1401,11 @@ func Run(logger *slog.Logger, loggerLevel *slog.LevelVar) {
}
edm.reloadMinimiserMutex.Unlock()

// Start dnstap.Input
// Start dnstap.Input. The golang-dnstap library does not provide a
// stop/close mechanism for ReadInto, so we cannot track this goroutine
// in the WaitGroup without blocking shutdown indefinitely. The
// inputChannel is intentionally left unclosed because ReadInto sends
// to it and would panic; the OS reclaims the goroutine on process exit.
go dti.ReadInto(edm.inputChannel)

// Wait here until all instances of runMinimiser() is done
Expand All @@ -1411,6 +1423,16 @@ func Run(logger *slog.Logger, loggerLevel *slog.LevelVar) {
edm.autopahoCancel()
}

// Gracefully shutdown HTTP servers before waiting for workers.
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := pprofServer.Shutdown(shutdownCtx); err != nil && !errors.Is(err, http.ErrServerClosed) {
edm.log.Error("pprofServer shutdown error", "error", err)
}
if err := metricsServer.Shutdown(shutdownCtx); err != nil && !errors.Is(err, http.ErrServerClosed) {
edm.log.Error("metricsServer shutdown error", "error", err)
}

// Wait for all workers to exit
edm.log.Info("Run: waiting for other workers to exit")
wg.Wait()
Expand Down