diff --git a/docs/docs/configuration.md b/docs/docs/configuration.md index c7e16c2b8c..483543d1f5 100644 --- a/docs/docs/configuration.md +++ b/docs/docs/configuration.md @@ -961,6 +961,35 @@ The following encryption methods are supported: | `tls` | Enable TLS | | `mtls` | Enable mutual TLS | +## Metrics Export + +The `metrics_export` configuration enables exporting Prometheus metrics via OTLP to an OpenTelemetry Collector, independently of distributed tracing. + +```yaml +metrics_export: + type: otlp/http + address: localhost:4318 + export_interval_ms: 5000 + service_name: opa + encryption: "off" + allow_insecure_tls: false + tls_ca_cert_file: /path/to/ca.pem + tls_cert_file: /path/to/cert.pem + tls_private_key_file: /path/to/key.pem +``` + +| Field | Type | Required | Description | +| ------------------------------------- | -------- | -------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------- | +| `metrics_export.type` | `string` | No | `"otlp/grpc"` or `"otlp/http"`. Omit (or `""`) to disable metrics export. | +| `metrics_export.address` | `string` | No (default: `localhost:4317` if `type` is `otlp/grpc`, `localhost:4318` if `type` is `otlp/http`) | Address of the OpenTelemetry Collector endpoint. | +| `metrics_export.export_interval_ms` | `int` | No (default: `60000`) | Interval between metric exports in milliseconds. Must be > 0. | +| `metrics_export.service_name` | `string` | No (default: `opa`) | Logical name of the service reported in exported metrics. | +| `metrics_export.encryption` | `string` | No (default: `off`) | Configures TLS: `off`, `tls`, or `mtls`. | +| `metrics_export.allow_insecure_tls` | `bool` | No (default: `false`) | Allow insecure TLS. | +| `metrics_export.tls_ca_cert_file` | `string` | No | The path to the root CA certificate. | +| `metrics_export.tls_cert_file` | `string` | No (unless `encryption` equals `mtls`) | The path to the client certificate to authenticate with. | +| `metrics_export.tls_private_key_file` | `string` | No (unless `tls_cert_file` provided) | The path to the private key of the client certificate. | + ## Disk Storage The `storage` configuration key allows for enabling, and configuring, the diff --git a/go.mod b/go.mod index c9079bb3eb..a8ae2f1dec 100644 --- a/go.mod +++ b/go.mod @@ -35,13 +35,18 @@ require ( github.com/vektah/gqlparser/v2 v2.5.32 github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 github.com/yashtewari/glob-intersection v0.2.0 + go.opentelemetry.io/contrib/bridges/prometheus v0.67.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.67.0 go.opentelemetry.io/otel v1.42.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.42.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.42.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.42.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.42.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.42.0 go.opentelemetry.io/otel/sdk v1.42.0 + go.opentelemetry.io/otel/sdk/metric v1.42.0 go.opentelemetry.io/otel/trace v1.42.0 + go.opentelemetry.io/proto/otlp v1.9.0 go.uber.org/automaxprocs v1.6.0 go.yaml.in/yaml/v3 v3.0.4 golang.org/x/net v0.52.0 @@ -99,8 +104,8 @@ require ( github.com/olekukonko/ll v0.0.9 // indirect github.com/pelletier/go-toml/v2 v2.2.4 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/prometheus/common v0.66.1 // indirect - github.com/prometheus/procfs v0.17.0 // indirect + github.com/prometheus/common v0.67.5 // indirect + github.com/prometheus/procfs v0.20.1 // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/rogpeppe/go-internal v1.14.1 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect @@ -114,8 +119,7 @@ require ( github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/otel/metric v1.42.0 // indirect - go.opentelemetry.io/proto/otlp v1.9.0 // indirect - go.yaml.in/yaml/v2 v2.4.2 // indirect + go.yaml.in/yaml/v2 v2.4.3 // indirect golang.org/x/crypto v0.49.0 // indirect golang.org/x/mod v0.33.0 // indirect golang.org/x/sys v0.42.0 // indirect diff --git a/go.sum b/go.sum index 7c46db17e0..3ba700fb03 100644 --- a/go.sum +++ b/go.sum @@ -158,10 +158,10 @@ github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= -github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs= -github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= -github.com/prometheus/procfs v0.17.0 h1:FuLQ+05u4ZI+SS/w9+BWEM2TXiHKsUQ9TADiRH7DuK0= -github.com/prometheus/procfs v0.17.0/go.mod h1:oPQLaDAMRbA+u8H5Pbfq+dl3VDAvHxMUOVhe0wYB2zw= +github.com/prometheus/common v0.67.5 h1:pIgK94WWlQt1WLwAC5j2ynLaBRDiinoAb86HZHTUGI4= +github.com/prometheus/common v0.67.5/go.mod h1:SjE/0MzDEEAyrdr5Gqc6G+sXI67maCxzaT3A2+HqjUw= +github.com/prometheus/procfs v0.20.1 h1:XwbrGOIplXW/AU3YhIhLODXMJYyC1isLFfYCsTEycfc= +github.com/prometheus/procfs v0.20.1/go.mod h1:o9EMBZGRyvDrSPH1RqdxhojkuXstoe4UlK79eF5TGGo= github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8AOIL7EB/X911+m4EHsnWEHeJ0c+3TTBrg= github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= @@ -222,10 +222,16 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/contrib/bridges/prometheus v0.67.0 h1:dkBzNEAIKADEaFnuESzcXvpd09vxvDZsOjx11gjUqLk= +go.opentelemetry.io/contrib/bridges/prometheus v0.67.0/go.mod h1:Z5RIwRkZgauOIfnG5IpidvLpERjhTninpP1dTG2jTl4= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.67.0 h1:OyrsyzuttWTSur2qN/Lm0m2a8yqyIjUVBZcxFPuXq2o= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.67.0/go.mod h1:C2NGBr+kAB4bk3xtMXfZ94gqFDtg/GkI7e9zqGh5Beg= go.opentelemetry.io/otel v1.42.0 h1:lSQGzTgVR3+sgJDAU/7/ZMjN9Z+vUip7leaqBKy4sho= go.opentelemetry.io/otel v1.42.0/go.mod h1:lJNsdRMxCUIWuMlVJWzecSMuNjE7dOYyWlqOXWkdqCc= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.42.0 h1:MdKucPl/HbzckWWEisiNqMPhRrAOQX8r4jTuGr636gk= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.42.0/go.mod h1:RolT8tWtfHcjajEH5wFIZ4Dgh5jpPdFXYV9pTAk/qjc= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.42.0 h1:H7O6RlGOMTizyl3R08Kn5pdM06bnH8oscSj7o11tmLA= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.42.0/go.mod h1:mBFWu/WOVDkWWsR7Tx7h6EpQB8wsv7P0Yrh0Pb7othc= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.42.0 h1:THuZiwpQZuHPul65w4WcwEnkX2QIuMT+UFoOrygtoJw= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.42.0/go.mod h1:J2pvYM5NGHofZ2/Ru6zw/TNWnEQp5crgyDeSrYpXkAw= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.42.0 h1:zWWrB1U6nqhS/k6zYB74CjRpuiitRtLLi68VcgmOEto= @@ -246,8 +252,8 @@ go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= -go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= +go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= +go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/internal/distributedtracing/distributedtracing.go b/internal/distributedtracing/distributedtracing.go index ecdbf3be71..4eb9a3d1a0 100644 --- a/internal/distributedtracing/distributedtracing.go +++ b/internal/distributedtracing/distributedtracing.go @@ -7,10 +7,7 @@ package distributedtracing import ( "context" "crypto/tls" - "crypto/x509" - "errors" "fmt" - "os" "strings" "time" @@ -25,6 +22,7 @@ import ( semconv "go.opentelemetry.io/otel/semconv/v1.7.0" "google.golang.org/grpc/credentials" + "github.com/open-policy-agent/opa/internal/tlsutil" "github.com/open-policy-agent/opa/v1/config" "github.com/open-policy-agent/opa/v1/logging" "github.com/open-policy-agent/opa/v1/util" @@ -112,36 +110,31 @@ func Init(ctx context.Context, raw []byte, id string) (*otlptrace.Exporter, *tra return nil, nil, nil, nil } - certificate, err := loadCertificate(distributedTracingConfig.TLSCertFile, distributedTracingConfig.TLSCertPrivateKeyFile) + certificate, err := tlsutil.LoadCertificate(distributedTracingConfig.TLSCertFile, distributedTracingConfig.TLSCertPrivateKeyFile) if err != nil { return nil, nil, nil, err } - certPool, err := loadCertPool(distributedTracingConfig.TLSCACertFile) + certPool, err := tlsutil.LoadCertPool(distributedTracingConfig.TLSCACertFile) + if err != nil { + return nil, nil, nil, err + } + + tlsConfig, err := tlsutil.BuildTLSConfig(distributedTracingConfig.EncryptionScheme, *distributedTracingConfig.EncryptionSkipVerify, certificate, certPool) if err != nil { return nil, nil, nil, err } var traceExporter *otlptrace.Exporter if strings.EqualFold(distributedTracingConfig.Type, "grpc") { - tlsOption, err := grpcTLSOption(distributedTracingConfig.EncryptionScheme, *distributedTracingConfig.EncryptionSkipVerify, certificate, certPool) - if err != nil { - return nil, nil, nil, err - } - traceExporter = otlptracegrpc.NewUnstarted( otlptracegrpc.WithEndpoint(distributedTracingConfig.Address), - tlsOption, + grpcTLSOption(distributedTracingConfig.EncryptionScheme, tlsConfig), ) } else if strings.EqualFold(distributedTracingConfig.Type, "http") { - tlsOption, err := httpTLSOption(distributedTracingConfig.EncryptionScheme, *distributedTracingConfig.EncryptionSkipVerify, certificate, certPool) - if err != nil { - return nil, nil, nil, err - } - traceExporter = otlptracehttp.NewUnstarted( otlptracehttp.WithEndpoint(distributedTracingConfig.Address), - tlsOption, + httpTLSOption(distributedTracingConfig.EncryptionScheme, tlsConfig), ) } @@ -301,71 +294,18 @@ func (c *distributedTracingConfig) validateAndInjectDefaults() error { return nil } -func loadCertificate(tlsCertFile, tlsPrivateKeyFile string) (*tls.Certificate, error) { - - if tlsCertFile != "" && tlsPrivateKeyFile != "" { - cert, err := tls.LoadX509KeyPair(tlsCertFile, tlsPrivateKeyFile) - if err != nil { - return nil, err - } - return &cert, nil - } - - if tlsCertFile != "" || tlsPrivateKeyFile != "" { - return nil, errors.New("distributed_tracing.tls_cert_file and distributed_tracing.tls_private_key_file must be specified together") - } - - return nil, nil -} - -func loadCertPool(tlsCACertFile string) (*x509.CertPool, error) { - if tlsCACertFile == "" { - return nil, nil - } - - caCertPEM, err := os.ReadFile(tlsCACertFile) - if err != nil { - return nil, fmt.Errorf("read CA cert file: %v", err) - } - pool := x509.NewCertPool() - if ok := pool.AppendCertsFromPEM(caCertPEM); !ok { - return nil, fmt.Errorf("failed to parse CA cert %q", tlsCACertFile) - } - return pool, nil -} - -func grpcTLSOption(encryptionScheme string, encryptionSkipVerify bool, cert *tls.Certificate, certPool *x509.CertPool) (otlptracegrpc.Option, error) { +func grpcTLSOption(encryptionScheme string, tlsConfig *tls.Config) otlptracegrpc.Option { if encryptionScheme == "off" { - return otlptracegrpc.WithInsecure(), nil - } - tlsConfig := &tls.Config{ - RootCAs: certPool, - InsecureSkipVerify: encryptionSkipVerify, - } - if encryptionScheme == "mtls" { - if cert == nil { - return nil, errors.New("distributed_tracing.tls_cert_file required but not supplied") - } - tlsConfig.Certificates = []tls.Certificate{*cert} + return otlptracegrpc.WithInsecure() } - return otlptracegrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig)), nil + return otlptracegrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig)) } -func httpTLSOption(encryptionScheme string, encryptionSkipVerify bool, cert *tls.Certificate, certPool *x509.CertPool) (otlptracehttp.Option, error) { +func httpTLSOption(encryptionScheme string, tlsConfig *tls.Config) otlptracehttp.Option { if encryptionScheme == "off" { - return otlptracehttp.WithInsecure(), nil - } - tlsConfig := &tls.Config{ - RootCAs: certPool, - InsecureSkipVerify: encryptionSkipVerify, - } - if encryptionScheme == "mtls" { - if cert == nil { - return nil, errors.New("distributed_tracing.tls_cert_file required but not supplied") - } - tlsConfig.Certificates = []tls.Certificate{*cert} + return otlptracehttp.WithInsecure() } - return otlptracehttp.WithTLSClientConfig(tlsConfig), nil + return otlptracehttp.WithTLSClientConfig(tlsConfig) } type errorHandler struct { diff --git a/internal/distributedtracing/distributedtracing_test.go b/internal/distributedtracing/distributedtracing_test.go new file mode 100644 index 0000000000..1e2c02dfb2 --- /dev/null +++ b/internal/distributedtracing/distributedtracing_test.go @@ -0,0 +1,18 @@ +// Copyright 2026 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + +package distributedtracing + +import "testing" + +func TestInitNoTypeReturnsAllNil(t *testing.T) { + raw := []byte(`{"distributed_tracing": {}}`) + exp, tp, res, err := Init(t.Context(), raw, "test") + if err != nil { + t.Fatal(err) + } + if exp != nil || tp != nil || res != nil { + t.Fatal("expected all nil when type is not set") + } +} diff --git a/internal/metricsexport/metricsexport.go b/internal/metricsexport/metricsexport.go new file mode 100644 index 0000000000..7de247f287 --- /dev/null +++ b/internal/metricsexport/metricsexport.go @@ -0,0 +1,197 @@ +// Copyright 2024 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + +package metricsexport + +import ( + "context" + "crypto/tls" + "fmt" + "strings" + "time" + + prometheus_client "github.com/prometheus/client_golang/prometheus" + otelprometheus "go.opentelemetry.io/contrib/bridges/prometheus" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.7.0" + "google.golang.org/grpc/credentials" + + "github.com/open-policy-agent/opa/internal/tlsutil" + "github.com/open-policy-agent/opa/v1/config" + "github.com/open-policy-agent/opa/v1/util" +) + +const ( + defaultGRPCAddress = "localhost:4317" + defaultHTTPAddress = "localhost:4318" + defaultExportIntervalMs = 60000 + defaultServiceName = "opa" + defaultEncryptionScheme = "off" +) + +type metricsExportConfig struct { + Type string `json:"type,omitempty"` + Address string `json:"address,omitempty"` + ExportIntervalMs *int `json:"export_interval_ms,omitempty"` + ServiceName string `json:"service_name,omitempty"` + EncryptionScheme string `json:"encryption,omitempty"` + EncryptionSkipVerify *bool `json:"allow_insecure_tls,omitempty"` + TLSCertFile string `json:"tls_cert_file,omitempty"` + TLSCertPrivateKeyFile string `json:"tls_private_key_file,omitempty"` + TLSCACertFile string `json:"tls_ca_cert_file,omitempty"` +} + +var supportedEncryptionScheme = map[string]struct{}{ + "off": {}, "tls": {}, "mtls": {}, +} + +func (c *metricsExportConfig) validateAndInjectDefaults() error { + switch strings.ToLower(c.Type) { + case "", "otlp/grpc", "otlp/http": // OK + default: + return fmt.Errorf("unknown metrics_export.type %q, must be \"otlp/grpc\", \"otlp/http\" or \"\" (unset)", c.Type) + } + + if c.Address == "" { + switch strings.ToLower(c.Type) { + case "otlp/grpc": + c.Address = defaultGRPCAddress + case "otlp/http": + c.Address = defaultHTTPAddress + } + } + + if c.ServiceName == "" { + c.ServiceName = defaultServiceName + } + + if c.ExportIntervalMs == nil { + v := defaultExportIntervalMs + c.ExportIntervalMs = &v + } + if *c.ExportIntervalMs <= 0 { + return fmt.Errorf("metrics_export.export_interval_ms must be a positive value, got %d", *c.ExportIntervalMs) + } + + if c.EncryptionScheme == "" { + c.EncryptionScheme = defaultEncryptionScheme + } + if _, ok := supportedEncryptionScheme[c.EncryptionScheme]; !ok { + return fmt.Errorf("unsupported metrics_export.encryption %q", c.EncryptionScheme) + } + + if c.EncryptionSkipVerify == nil { + v := false + c.EncryptionSkipVerify = &v + } + + return nil +} + +func parseMetricsExportConfig(raw []byte) (*metricsExportConfig, error) { + if raw == nil { + return &metricsExportConfig{}, nil + } + var cfg metricsExportConfig + if err := util.Unmarshal(raw, &cfg); err != nil { + return nil, err + } + if err := cfg.validateAndInjectDefaults(); err != nil { + return nil, err + } + return &cfg, nil +} + +func grpcTLSOption(encryptionScheme string, tlsConfig *tls.Config) otlpmetricgrpc.Option { + if encryptionScheme == "off" { + return otlpmetricgrpc.WithInsecure() + } + return otlpmetricgrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig)) +} + +func httpTLSOption(encryptionScheme string, tlsConfig *tls.Config) otlpmetrichttp.Option { + if encryptionScheme == "off" { + return otlpmetrichttp.WithInsecure() + } + return otlpmetrichttp.WithTLSClientConfig(tlsConfig) +} + +// Init initializes metrics export based on the provided configuration. +// If the type is empty or the gatherer is nil, it returns nil. +func Init(ctx context.Context, raw []byte, id string, gatherer prometheus_client.Gatherer) (*metric.MeterProvider, error) { + parsedConfig, err := config.ParseConfig(raw, id) + if err != nil { + return nil, err + } + + cfg, err := parseMetricsExportConfig(parsedConfig.MetricsExport) + if err != nil { + return nil, err + } + + if cfg.Type == "" || gatherer == nil { + return nil, nil + } + + certificate, err := tlsutil.LoadCertificate(cfg.TLSCertFile, cfg.TLSCertPrivateKeyFile) + if err != nil { + return nil, err + } + + certPool, err := tlsutil.LoadCertPool(cfg.TLSCACertFile) + if err != nil { + return nil, err + } + + tlsConfig, err := tlsutil.BuildTLSConfig(cfg.EncryptionScheme, *cfg.EncryptionSkipVerify, certificate, certPool) + if err != nil { + return nil, err + } + + var metricExporter metric.Exporter + + if strings.EqualFold(cfg.Type, "otlp/grpc") { + opts := []otlpmetricgrpc.Option{ + otlpmetricgrpc.WithEndpoint(cfg.Address), + grpcTLSOption(cfg.EncryptionScheme, tlsConfig), + } + metricExporter, err = otlpmetricgrpc.New(ctx, opts...) + } else { + opts := []otlpmetrichttp.Option{ + otlpmetrichttp.WithEndpoint(cfg.Address), + httpTLSOption(cfg.EncryptionScheme, tlsConfig), + } + metricExporter, err = otlpmetrichttp.New(ctx, opts...) + } + if err != nil { + return nil, fmt.Errorf("create OTLP metric exporter: %w", err) + } + + res, err := resource.New(ctx, + resource.WithAttributes( + semconv.ServiceNameKey.String(cfg.ServiceName), + ), + ) + if err != nil { + return nil, err + } + + interval := time.Duration(*cfg.ExportIntervalMs) * time.Millisecond + producer := otelprometheus.NewMetricProducer(otelprometheus.WithGatherer(gatherer)) + + reader := metric.NewPeriodicReader(metricExporter, + metric.WithInterval(interval), + metric.WithProducer(producer), + ) + + mp := metric.NewMeterProvider( + metric.WithResource(res), + metric.WithReader(reader), + ) + + return mp, nil +} diff --git a/internal/metricsexport/metricsexport_test.go b/internal/metricsexport/metricsexport_test.go new file mode 100644 index 0000000000..d4f048e87f --- /dev/null +++ b/internal/metricsexport/metricsexport_test.go @@ -0,0 +1,126 @@ +// Copyright 2024 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + +package metricsexport + +import ( + "testing" + + prometheus_client "github.com/prometheus/client_golang/prometheus" +) + +func TestParseMetricsExportConfigDefaults(t *testing.T) { + cfg, err := parseMetricsExportConfig(nil) + if err != nil { + t.Fatal(err) + } + if cfg.Type != "" { + t.Fatal("expected empty type for nil config") + } +} + +func TestParseMetricsExportConfigOTLPGRPC(t *testing.T) { + raw := []byte(`{"type": "otlp/grpc"}`) + cfg, err := parseMetricsExportConfig(raw) + if err != nil { + t.Fatal(err) + } + if cfg.Address != "localhost:4317" { + t.Fatalf("expected default gRPC address, got %s", cfg.Address) + } +} + +func TestParseMetricsExportConfigOTLPHTTP(t *testing.T) { + raw := []byte(`{"type": "otlp/http"}`) + cfg, err := parseMetricsExportConfig(raw) + if err != nil { + t.Fatal(err) + } + if cfg.Address != "localhost:4318" { + t.Fatalf("expected default HTTP address, got %s", cfg.Address) + } +} + +func TestParseMetricsExportConfigCustomInterval(t *testing.T) { + raw := []byte(`{"type": "otlp/grpc", "export_interval_ms": 30000}`) + cfg, err := parseMetricsExportConfig(raw) + if err != nil { + t.Fatal(err) + } + if cfg.ExportIntervalMs == nil || *cfg.ExportIntervalMs != 30000 { + t.Fatalf("expected export_interval_ms to be 30000, got %v", *cfg.ExportIntervalMs) + } +} + +func TestValidateMetricsExportInvalidType(t *testing.T) { + raw := []byte(`{"type": "unknown"}`) + _, err := parseMetricsExportConfig(raw) + if err == nil { + t.Fatal("expected error for invalid type") + } +} + +func TestValidateMetricsExportIntervalZero(t *testing.T) { + raw := []byte(`{"type": "otlp/grpc", "export_interval_ms": 0}`) + _, err := parseMetricsExportConfig(raw) + if err == nil { + t.Fatal("expected error when export_interval_ms is 0") + } +} + +func TestValidateMetricsExportIntervalNegative(t *testing.T) { + raw := []byte(`{"type": "otlp/grpc", "export_interval_ms": -1}`) + _, err := parseMetricsExportConfig(raw) + if err == nil { + t.Fatal("expected error when export_interval_ms is negative") + } +} + +func TestInitDisabled(t *testing.T) { + raw := []byte(`{}`) + mp, err := Init(t.Context(), raw, "test", prometheus_client.NewRegistry()) + if err != nil { + t.Fatal(err) + } + if mp != nil { + t.Fatal("expected nil MeterProvider when type is empty") + } +} + +func TestInitNilGatherer(t *testing.T) { + raw := []byte(`{"metrics_export": {"type": "otlp/grpc"}}`) + mp, err := Init(t.Context(), raw, "test", nil) + if err != nil { + t.Fatal(err) + } + if mp != nil { + t.Fatal("expected nil MeterProvider when gatherer is nil") + } +} + +func TestInitGRPCWithGatherer(t *testing.T) { + registry := prometheus_client.NewRegistry() + raw := []byte(`{"metrics_export": {"type": "otlp/grpc"}}`) + mp, err := Init(t.Context(), raw, "test", registry) + if err != nil { + t.Fatal(err) + } + if mp == nil { + t.Fatal("expected non-nil MeterProvider") + } + _ = mp.Shutdown(t.Context()) +} + +func TestInitHTTPWithGatherer(t *testing.T) { + registry := prometheus_client.NewRegistry() + raw := []byte(`{"metrics_export": {"type": "otlp/http"}}`) + mp, err := Init(t.Context(), raw, "test", registry) + if err != nil { + t.Fatal(err) + } + if mp == nil { + t.Fatal("expected non-nil MeterProvider") + } + _ = mp.Shutdown(t.Context()) +} diff --git a/internal/prometheus/prometheus.go b/internal/prometheus/prometheus.go index 9f75e7ba67..496387ccaa 100644 --- a/internal/prometheus/prometheus.go +++ b/internal/prometheus/prometheus.go @@ -67,6 +67,11 @@ func New(inner metrics.Metrics, logger loggerFunc, httpRequestBuckets []float64) } } +// Gatherer returns the underlying prometheus.Gatherer for exporting metrics via OTLP. +func (p *Provider) Gatherer() prometheus.Gatherer { + return p.registry +} + // RegisterEndpoints registers `/metrics` endpoint func (p *Provider) RegisterEndpoints(registrar func(path, method string, handler http.Handler)) { registrar("/metrics/alloc_bytes", http.MethodGet, http.HandlerFunc(allocHandler)) diff --git a/internal/tlsutil/tlsutil.go b/internal/tlsutil/tlsutil.go new file mode 100644 index 0000000000..65fff463a4 --- /dev/null +++ b/internal/tlsutil/tlsutil.go @@ -0,0 +1,68 @@ +// Copyright 2024 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + +package tlsutil + +import ( + "crypto/tls" + "crypto/x509" + "errors" + "fmt" + "os" +) + +// LoadCertificate loads a TLS certificate from the given cert and key files. +// If both are empty, it returns nil. If only one is provided, it returns an error. +func LoadCertificate(certFile, keyFile string) (*tls.Certificate, error) { + if certFile != "" && keyFile != "" { + cert, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + return nil, err + } + return &cert, nil + } + + if certFile != "" || keyFile != "" { + return nil, errors.New("tls_cert_file and tls_private_key_file must be specified together") + } + + return nil, nil +} + +// LoadCertPool loads a certificate pool from the given CA cert file. +// If the file is empty, it returns nil. +func LoadCertPool(caCertFile string) (*x509.CertPool, error) { + if caCertFile == "" { + return nil, nil + } + + caCertPEM, err := os.ReadFile(caCertFile) + if err != nil { + return nil, fmt.Errorf("read CA cert file: %v", err) + } + pool := x509.NewCertPool() + if ok := pool.AppendCertsFromPEM(caCertPEM); !ok { + return nil, fmt.Errorf("failed to parse CA cert %q", caCertFile) + } + return pool, nil +} + +// BuildTLSConfig creates a *tls.Config based on the encryption scheme. +// For "off", it returns nil. For "mtls", a certificate is required. +func BuildTLSConfig(scheme string, skipVerify bool, cert *tls.Certificate, pool *x509.CertPool) (*tls.Config, error) { + if scheme == "off" { + return nil, nil + } + tlsConfig := &tls.Config{ + RootCAs: pool, + InsecureSkipVerify: skipVerify, + } + if scheme == "mtls" { + if cert == nil { + return nil, errors.New("tls_cert_file required but not supplied") + } + tlsConfig.Certificates = []tls.Certificate{*cert} + } + return tlsConfig, nil +} diff --git a/internal/tlsutil/tlsutil_test.go b/internal/tlsutil/tlsutil_test.go new file mode 100644 index 0000000000..6b476d2993 --- /dev/null +++ b/internal/tlsutil/tlsutil_test.go @@ -0,0 +1,24 @@ +// Copyright 2024 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + +package tlsutil + +import "testing" + +func TestBuildTLSConfigOff(t *testing.T) { + tlsConfig, err := BuildTLSConfig("off", false, nil, nil) + if err != nil { + t.Fatal(err) + } + if tlsConfig != nil { + t.Fatal("expected nil tls config for 'off' encryption") + } +} + +func TestBuildTLSConfigMTLSNoCert(t *testing.T) { + _, err := BuildTLSConfig("mtls", false, nil, nil) + if err == nil { + t.Fatal("expected error for mtls without cert") + } +} diff --git a/v1/config/config.go b/v1/config/config.go index cfec875fd1..1dc16e9f62 100644 --- a/v1/config/config.go +++ b/v1/config/config.go @@ -104,6 +104,7 @@ type Config struct { NDBuiltinCache bool `json:"nd_builtin_cache,omitempty"` PersistenceDirectory *string `json:"persistence_directory,omitempty"` DistributedTracing json.RawMessage `json:"distributed_tracing,omitempty"` + MetricsExport json.RawMessage `json:"metrics_export,omitempty"` Server *ServerConfig `json:"server,omitempty"` Storage *StorageConfig `json:"storage,omitempty"` Extra map[string]json.RawMessage `json:"-"` @@ -274,6 +275,10 @@ func (c *Config) Clone() *Config { clone.DistributedTracing = make(json.RawMessage, len(c.DistributedTracing)) copy(clone.DistributedTracing, c.DistributedTracing) } + if c.MetricsExport != nil { + clone.MetricsExport = make(json.RawMessage, len(c.MetricsExport)) + copy(clone.MetricsExport, c.MetricsExport) + } if c.DefaultDecision != nil { s := *c.DefaultDecision diff --git a/v1/runtime/runtime.go b/v1/runtime/runtime.go index 11eb61cc90..3de65e885d 100644 --- a/v1/runtime/runtime.go +++ b/v1/runtime/runtime.go @@ -28,12 +28,14 @@ import ( "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/propagation" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.uber.org/automaxprocs/maxprocs" "github.com/open-policy-agent/opa/internal/compiler" "github.com/open-policy-agent/opa/internal/config" internal_tracing "github.com/open-policy-agent/opa/internal/distributedtracing" internal_logging "github.com/open-policy-agent/opa/internal/logging" + internal_metrics "github.com/open-policy-agent/opa/internal/metricsexport" "github.com/open-policy-agent/opa/internal/pathwatcher" "github.com/open-policy-agent/opa/internal/prometheus" "github.com/open-policy-agent/opa/internal/ref" @@ -344,6 +346,7 @@ type Runtime struct { metrics *prometheus.Provider versionChecker versioncheck.Checker traceExporter *otlptrace.Exporter + meterProvider *sdkmetric.MeterProvider loadedPathsResult *initload.LoadPathsResult serverStatus ServerStatus @@ -490,6 +493,11 @@ func NewRuntime(ctx context.Context, params Params) (*Runtime, error) { if err != nil { return nil, fmt.Errorf("config error: %w", err) } + + meterProvider, err := internal_metrics.Init(ctx, config, params.ID, metrics.Gatherer()) + if err != nil { + return nil, fmt.Errorf("config error: %w", err) + } if tracerProvider != nil { params.DistributedTracingOpts = tracing.NewOptions( otelhttp.WithTracerProvider(tracerProvider), @@ -563,6 +571,7 @@ func NewRuntime(ctx context.Context, params Params) (*Runtime, error) { versionChecker: versionChecker, serverStatus: ServerNotStarted, traceExporter: traceExporter, + meterProvider: meterProvider, loadedPathsResult: loaded, } @@ -1019,6 +1028,12 @@ func (rt *Runtime) gracefulServerShutdown(s *server.Server) error { } } + if rt.meterProvider != nil { + if err := rt.meterProvider.Shutdown(ctx); err != nil { + rt.logger.WithFields(map[string]any{"err": err}).Error("Failed to shutdown OpenTelemetry meter provider gracefully.") + } + } + // Close storage if it implements the storage.Closer interface if closer, ok := rt.Store.(storage.Closer); ok { if err := closer.Close(ctx); err != nil { diff --git a/v1/test/e2e/metricsexport/otlpmetrics_grpc_test.go b/v1/test/e2e/metricsexport/otlpmetrics_grpc_test.go new file mode 100644 index 0000000000..9d853866cb --- /dev/null +++ b/v1/test/e2e/metricsexport/otlpmetrics_grpc_test.go @@ -0,0 +1,129 @@ +// Copyright 2024 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + +package metricsexport + +import ( + "context" + "net" + "net/http" + "sync" + "testing" + "time" + + "github.com/open-policy-agent/opa/v1/runtime" + "github.com/open-policy-agent/opa/v1/test/e2e" + colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + "google.golang.org/grpc" +) + +// mockOTLPGRPCCollector is a lightweight mock OTLP gRPC collector that records +// incoming ExportMetricsServiceRequest messages. +type mockOTLPGRPCCollector struct { + colmetricpb.UnimplementedMetricsServiceServer + mu sync.Mutex + requests []*colmetricpb.ExportMetricsServiceRequest + server *grpc.Server + addr string +} + +func newMockOTLPGRPCCollector(t *testing.T) *mockOTLPGRPCCollector { + t.Helper() + + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + + c := &mockOTLPGRPCCollector{ + server: grpc.NewServer(), + addr: lis.Addr().String(), + } + colmetricpb.RegisterMetricsServiceServer(c.server, c) + + go func() { + _ = c.server.Serve(lis) + }() + + return c +} + +func (c *mockOTLPGRPCCollector) Export(_ context.Context, req *colmetricpb.ExportMetricsServiceRequest) (*colmetricpb.ExportMetricsServiceResponse, error) { + c.mu.Lock() + c.requests = append(c.requests, req) + c.mu.Unlock() + return &colmetricpb.ExportMetricsServiceResponse{}, nil +} + +func (c *mockOTLPGRPCCollector) getRequests() []*colmetricpb.ExportMetricsServiceRequest { + c.mu.Lock() + defer c.mu.Unlock() + out := make([]*colmetricpb.ExportMetricsServiceRequest, len(c.requests)) + copy(out, c.requests) + return out +} + +func (c *mockOTLPGRPCCollector) stop() { + c.server.GracefulStop() +} + +// metricNames extracts all metric names from the collected requests. +func (c *mockOTLPGRPCCollector) metricNames() map[string]bool { + names := make(map[string]bool) + for _, req := range c.getRequests() { + for _, rm := range req.GetResourceMetrics() { + for _, sm := range rm.GetScopeMetrics() { + for _, m := range sm.GetMetrics() { + names[m.GetName()] = true + } + } + } + } + return names +} + +// address returns the host:port of the gRPC collector. +func (c *mockOTLPGRPCCollector) address() string { + return c.addr +} + +func TestOTLPMetricsExportGRPC(t *testing.T) { + collector := newMockOTLPGRPCCollector(t) + defer collector.stop() + + testServerParams := e2e.NewAPIServerTestParams() + testServerParams.ConfigOverrides = []string{ + "metrics_export.type=otlp/grpc", + "metrics_export.address=" + collector.address(), + "metrics_export.export_interval_ms=500", + } + testServerParams.Logging = runtime.LoggingConfig{Level: "error"} + + e2e.WithRuntime(t, e2e.TestRuntimeOpts{}, testServerParams, func(rt *e2e.TestRuntime) { + // Make a few requests to OPA to generate HTTP metrics. + for range 3 { + resp, err := http.Get(rt.URL() + "/v1/data") + if err != nil { + t.Fatal(err) + } + resp.Body.Close() + } + + // Wait for the periodic reader to export. The interval is 500ms, + // so we poll for up to 5 seconds. + var names map[string]bool + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + names = collector.metricNames() + if names["http_request_duration_seconds"] { + break + } + time.Sleep(250 * time.Millisecond) + } + + if !names["http_request_duration_seconds"] { + t.Fatalf("expected http_request_duration_seconds metric to be exported, got: %v", names) + } + }) +} diff --git a/v1/test/e2e/metricsexport/otlpmetrics_http_test.go b/v1/test/e2e/metricsexport/otlpmetrics_http_test.go new file mode 100644 index 0000000000..13f80a6359 --- /dev/null +++ b/v1/test/e2e/metricsexport/otlpmetrics_http_test.go @@ -0,0 +1,128 @@ +// Copyright 2024 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + +package metricsexport + +import ( + "io" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/open-policy-agent/opa/v1/runtime" + "github.com/open-policy-agent/opa/v1/test/e2e" + colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + "google.golang.org/protobuf/proto" +) + +// mockOTLPHTTPCollector is a lightweight mock OTLP HTTP collector that records +// incoming ExportMetricsServiceRequest messages. +type mockOTLPHTTPCollector struct { + mu sync.Mutex + requests []*colmetricpb.ExportMetricsServiceRequest + server *httptest.Server +} + +func newMockOTLPHTTPCollector() *mockOTLPHTTPCollector { + c := &mockOTLPHTTPCollector{} + mux := http.NewServeMux() + mux.HandleFunc("/v1/metrics", c.handleMetrics) + c.server = httptest.NewServer(mux) + return c +} + +func (c *mockOTLPHTTPCollector) handleMetrics(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + req := &colmetricpb.ExportMetricsServiceRequest{} + if err := proto.Unmarshal(body, req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + c.mu.Lock() + c.requests = append(c.requests, req) + c.mu.Unlock() + + w.WriteHeader(http.StatusOK) +} + +func (c *mockOTLPHTTPCollector) getRequests() []*colmetricpb.ExportMetricsServiceRequest { + c.mu.Lock() + defer c.mu.Unlock() + out := make([]*colmetricpb.ExportMetricsServiceRequest, len(c.requests)) + copy(out, c.requests) + return out +} + +func (c *mockOTLPHTTPCollector) stop() { + c.server.Close() +} + +// metricNames extracts all metric names from the collected requests. +func (c *mockOTLPHTTPCollector) metricNames() map[string]bool { + names := make(map[string]bool) + for _, req := range c.getRequests() { + for _, rm := range req.GetResourceMetrics() { + for _, sm := range rm.GetScopeMetrics() { + for _, m := range sm.GetMetrics() { + names[m.GetName()] = true + } + } + } + } + return names +} + +// address returns the host:port portion of the collector URL (no scheme). +func (c *mockOTLPHTTPCollector) address() string { + // Strip "http://" prefix + return c.server.URL[len("http://"):] +} + +func TestOTLPMetricsExportHTTP(t *testing.T) { + collector := newMockOTLPHTTPCollector() + defer collector.stop() + + testServerParams := e2e.NewAPIServerTestParams() + testServerParams.ConfigOverrides = []string{ + "metrics_export.type=otlp/http", + "metrics_export.address=" + collector.address(), + "metrics_export.export_interval_ms=500", + } + testServerParams.Logging = runtime.LoggingConfig{Level: "error"} + + e2e.WithRuntime(t, e2e.TestRuntimeOpts{}, testServerParams, func(rt *e2e.TestRuntime) { + // Make a few requests to OPA to generate HTTP metrics. + for range 3 { + resp, err := http.Get(rt.URL() + "/v1/data") + if err != nil { + t.Fatal(err) + } + resp.Body.Close() + } + + // Wait for the periodic reader to export. The interval is 500ms, + // so we poll for up to 5 seconds. + var names map[string]bool + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + names = collector.metricNames() + if names["http_request_duration_seconds"] { + break + } + time.Sleep(250 * time.Millisecond) + } + + if !names["http_request_duration_seconds"] { + t.Fatalf("expected http_request_duration_seconds metric to be exported, got: %v", names) + } + }) +}