diff --git a/CHANGELOG.md b/CHANGELOG.md index 772af5c845a..9779b0dc517 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### New +- **General**: Provide Prometheus metric with indication of total number of triggers per trigger type in `ScaledJob`/`ScaledObject`. ([#3663](https://github.com/kedacore/keda/issues/3663)) - **Azure Service Bus**: Add support for Shared Access Signature (SAS) tokens for authentication. ([#2920](https://github.com/kedacore/keda/issues/2920)) ### Improvements diff --git a/controllers/keda/scaledjob_controller.go b/controllers/keda/scaledjob_controller.go index 4023a521ac4..ecaacc7c8d8 100644 --- a/controllers/keda/scaledjob_controller.go +++ b/controllers/keda/scaledjob_controller.go @@ -19,6 +19,7 @@ package keda import ( "context" "fmt" + "sync" "time" "github.com/go-logr/logr" @@ -27,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" @@ -38,6 +40,7 @@ import ( kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util" "github.com/kedacore/keda/v2/pkg/eventreason" + "github.com/kedacore/keda/v2/pkg/metrics" "github.com/kedacore/keda/v2/pkg/scaling" ) @@ -51,12 +54,24 @@ type ScaledJobReconciler struct { GlobalHTTPTimeout time.Duration Recorder record.EventRecorder - scaleHandler scaling.ScaleHandler + scaledJobGenerations *sync.Map + scaleHandler scaling.ScaleHandler +} + +var ( + scaledJobTriggers map[string][]string + scaledJobTriggersLock *sync.Mutex +) + +func init() { + scaledJobTriggers = make(map[string][]string) + scaledJobTriggersLock = &sync.Mutex{} } // SetupWithManager initializes the ScaledJobReconciler instance and starts a new controller managed by the passed Manager instance. func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error { r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.GlobalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler")) + r.scaledJobGenerations = &sync.Map{} return ctrl.NewControllerManagedBy(mgr). WithOptions(options). @@ -90,8 +105,9 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( // Check if the ScaledJob instance is marked to be deleted, which is // indicated by the deletion timestamp being set. if scaledJob.GetDeletionTimestamp() != nil { - return ctrl.Result{}, r.finalizeScaledJob(ctx, reqLogger, scaledJob) + return ctrl.Result{}, r.finalizeScaledJob(ctx, reqLogger, scaledJob, req.NamespacedName.String()) } + r.updateTriggerTotals(reqLogger, scaledJob, req.NamespacedName.String()) // ensure finalizer is set on this CR if err := r.ensureFinalizer(ctx, reqLogger, scaledJob); err != nil { @@ -213,11 +229,96 @@ func (r *ScaledJobReconciler) deletePreviousVersionScaleJobs(ctx context.Context // requestScaleLoop request ScaleLoop handler for the respective ScaledJob func (r *ScaledJobReconciler) requestScaleLoop(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error { logger.V(1).Info("Starting a new ScaleLoop") - return r.scaleHandler.HandleScalableObject(ctx, scaledJob) + + key, err := cache.MetaNamespaceKeyFunc(scaledJob) + if err != nil { + logger.Error(err, "Error getting key for scaledJob") + return err + } + + if err = r.scaleHandler.HandleScalableObject(ctx, scaledJob); err != nil { + return err + } + + r.scaledJobGenerations.Store(key, scaledJob.Generation) + + return nil } // stopScaleLoop stops ScaleLoop handler for the respective ScaledJob func (r *ScaledJobReconciler) stopScaleLoop(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error { logger.V(1).Info("Stopping a ScaleLoop") - return r.scaleHandler.DeleteScalableObject(ctx, scaledJob) + + key, err := cache.MetaNamespaceKeyFunc(scaledJob) + if err != nil { + logger.Error(err, "Error getting key for scaledJob") + return err + } + + if err = r.scaleHandler.DeleteScalableObject(ctx, scaledJob); err != nil { + return err + } + + r.scaledJobGenerations.Delete(key) + return nil +} + +// scaledJobGenerationChanged returns true if ScaledJob's Generation was changed, ie. ScaledJob.Spec was changed +func (r *ScaledJobReconciler) scaledJobGenerationChanged(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (bool, error) { + key, err := cache.MetaNamespaceKeyFunc(scaledJob) + if err != nil { + logger.Error(err, "Error getting key for scaledJob") + return true, err + } + + value, loaded := r.scaledJobGenerations.Load(key) + if loaded { + generation := value.(int64) + if generation == scaledJob.Generation { + return false, nil + } + } + return true, nil +} + +func (r *ScaledJobReconciler) updateTriggerTotals(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, namespacedName string) { + specChanged, err := r.scaledJobGenerationChanged(logger, scaledJob) + if err != nil { + logger.Error(err, "failed to update trigger totals") + return + } + + if !specChanged { + return + } + + scaledJobTriggersLock.Lock() + defer scaledJobTriggersLock.Unlock() + + if triggerTypes, ok := scaledJobTriggers[namespacedName]; ok { + for _, triggerType := range triggerTypes { + metrics.DecrementTriggerTotal(triggerType) + } + } + + triggerTypes := make([]string, len(scaledJob.Spec.Triggers)) + for _, trigger := range scaledJob.Spec.Triggers { + metrics.IncrementTriggerTotal(trigger.Type) + triggerTypes = append(triggerTypes, trigger.Type) + } + + scaledJobTriggers[namespacedName] = triggerTypes +} + +func (r *ScaledJobReconciler) updateTriggerTotalsOnDelete(namespacedName string) { + scaledJobTriggersLock.Lock() + defer scaledJobTriggersLock.Unlock() + + if triggerTypes, ok := scaledJobTriggers[namespacedName]; ok { + for _, triggerType := range triggerTypes { + metrics.DecrementTriggerTotal(triggerType) + } + } + + delete(scaledJobTriggers, namespacedName) } diff --git a/controllers/keda/scaledjob_finalizer.go b/controllers/keda/scaledjob_finalizer.go index f910a1e9984..faf63bb1f7c 100644 --- a/controllers/keda/scaledjob_finalizer.go +++ b/controllers/keda/scaledjob_finalizer.go @@ -32,7 +32,8 @@ const ( ) // finalizeScaledJob runs finalization logic on ScaledJob if there's finalizer -func (r *ScaledJobReconciler) finalizeScaledJob(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error { +func (r *ScaledJobReconciler) finalizeScaledJob(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, + namespacedName string) error { if util.Contains(scaledJob.GetFinalizers(), scaledJobFinalizer) { // Run finalization logic for scaledJobFinalizer. If the // finalization logic fails, don't remove the finalizer so @@ -48,6 +49,8 @@ func (r *ScaledJobReconciler) finalizeScaledJob(ctx context.Context, logger logr logger.Error(err, "Failed to update ScaledJob after removing a finalizer", "finalizer", scaledJobFinalizer) return err } + + r.updateTriggerTotalsOnDelete(namespacedName) } logger.Info("Successfully finalized ScaledJob") diff --git a/controllers/keda/scaledobject_controller.go b/controllers/keda/scaledobject_controller.go index e848a389ba4..5ffbe9f19f1 100644 --- a/controllers/keda/scaledobject_controller.go +++ b/controllers/keda/scaledobject_controller.go @@ -48,6 +48,7 @@ import ( kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util" "github.com/kedacore/keda/v2/pkg/eventreason" + "github.com/kedacore/keda/v2/pkg/metrics" "github.com/kedacore/keda/v2/pkg/scaling" kedautil "github.com/kedacore/keda/v2/pkg/util" ) @@ -76,14 +77,22 @@ type ScaledObjectReconciler struct { kubeVersion kedautil.K8sVersion } -// A cache mapping "resource.group" to true or false if we know if this resource is scalable. -var isScalableCache *sync.Map +var ( + // A cache mapping "resource.group" to true or false if we know if this resource is scalable. + isScalableCache *sync.Map + + scaledObjectTriggers map[string][]string + scaledObjectTriggersLock *sync.Mutex +) func init() { // Prefill the cache with some known values for core resources in case of future parallelism to avoid stampeding herd on startup. isScalableCache = &sync.Map{} isScalableCache.Store("deployments.apps", true) isScalableCache.Store("statefulsets.apps", true) + + scaledObjectTriggers = make(map[string][]string) + scaledObjectTriggersLock = &sync.Mutex{} } // SetupWithManager initializes the ScaledObjectReconciler instance and starts a new controller managed by the passed Manager instance. @@ -166,8 +175,9 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request // Check if the ScaledObject instance is marked to be deleted, which is // indicated by the deletion timestamp being set. if scaledObject.GetDeletionTimestamp() != nil { - return ctrl.Result{}, r.finalizeScaledObject(ctx, reqLogger, scaledObject) + return ctrl.Result{}, r.finalizeScaledObject(ctx, reqLogger, scaledObject, req.NamespacedName.String()) } + r.updateTriggerTotals(reqLogger, scaledObject, req.NamespacedName.String()) // ensure finalizer is set on this CR if err := r.ensureFinalizer(ctx, reqLogger, scaledObject); err != nil { @@ -469,3 +479,45 @@ func (r *ScaledObjectReconciler) scaledObjectGenerationChanged(logger logr.Logge } return true, nil } + +func (r *ScaledObjectReconciler) updateTriggerTotals(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, namespacedName string) { + specChanged, err := r.scaledObjectGenerationChanged(logger, scaledObject) + if err != nil { + logger.Error(err, "failed to update trigger totals") + return + } + + if !specChanged { + return + } + + scaledObjectTriggersLock.Lock() + defer scaledObjectTriggersLock.Unlock() + + if triggerTypes, ok := scaledObjectTriggers[namespacedName]; ok { + for _, triggerType := range triggerTypes { + metrics.DecrementTriggerTotal(triggerType) + } + } + + triggerTypes := make([]string, len(scaledObject.Spec.Triggers)) + for _, trigger := range scaledObject.Spec.Triggers { + metrics.IncrementTriggerTotal(trigger.Type) + triggerTypes = append(triggerTypes, trigger.Type) + } + + scaledObjectTriggers[namespacedName] = triggerTypes +} + +func (r *ScaledObjectReconciler) updateTriggerTotalsOnDelete(namespacedName string) { + scaledObjectTriggersLock.Lock() + defer scaledObjectTriggersLock.Unlock() + + if triggerTypes, ok := scaledObjectTriggers[namespacedName]; ok { + for _, triggerType := range triggerTypes { + metrics.DecrementTriggerTotal(triggerType) + } + } + + delete(scaledObjectTriggers, namespacedName) +} diff --git a/controllers/keda/scaledobject_finalizer.go b/controllers/keda/scaledobject_finalizer.go index 258fe7856dc..77a74fdb314 100644 --- a/controllers/keda/scaledobject_finalizer.go +++ b/controllers/keda/scaledobject_finalizer.go @@ -34,7 +34,8 @@ const ( ) // finalizeScaledObject runs finalization logic on ScaledObject if there's finalizer -func (r *ScaledObjectReconciler) finalizeScaledObject(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error { +func (r *ScaledObjectReconciler) finalizeScaledObject(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, + namespacedName string) error { if util.Contains(scaledObject.GetFinalizers(), scaledObjectFinalizer) { // Run finalization logic for scaledObjectFinalizer. If the // finalization logic fails, don't remove the finalizer so @@ -77,6 +78,8 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(ctx context.Context, logge logger.Error(err, "Failed to update ScaledObject after removing a finalizer", "finalizer", scaledObjectFinalizer) return err } + + r.updateTriggerTotalsOnDelete(namespacedName) } logger.Info("Successfully finalized ScaledObject") diff --git a/go.mod b/go.mod index c2007603ea0..d83641885d5 100644 --- a/go.mod +++ b/go.mod @@ -49,6 +49,7 @@ require ( github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.13.0 + github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.37.0 github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 github.com/robfig/cron/v3 v3.0.1 @@ -227,7 +228,6 @@ require ( github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/ryanuber/go-glob v1.0.0 // indirect diff --git a/pkg/metrics/prometheus_metrics.go b/pkg/metrics/adapter_prom_metrics.go similarity index 100% rename from pkg/metrics/prometheus_metrics.go rename to pkg/metrics/adapter_prom_metrics.go diff --git a/pkg/metrics/operator_prom_metrics.go b/pkg/metrics/operator_prom_metrics.go new file mode 100644 index 00000000000..ede0c1f4e7e --- /dev/null +++ b/pkg/metrics/operator_prom_metrics.go @@ -0,0 +1,49 @@ +/* +Copyright 2022 The KEDA Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "sigs.k8s.io/controller-runtime/pkg/metrics" +) + +var ( + triggerTotalsGaugeVec = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "keda_operator", + Subsystem: "trigger", + Name: "totals", + }, + []string{"type"}, + ) +) + +func init() { + metrics.Registry.MustRegister(triggerTotalsGaugeVec) +} + +func IncrementTriggerTotal(triggerType string) { + if triggerType != "" { + triggerTotalsGaugeVec.WithLabelValues(triggerType).Inc() + } +} + +func DecrementTriggerTotal(triggerType string) { + if triggerType != "" { + triggerTotalsGaugeVec.WithLabelValues(triggerType).Dec() + } +} diff --git a/tests/helper/helper.go b/tests/helper/helper.go index d7e43420d3c..7706d4cc2fa 100644 --- a/tests/helper/helper.go +++ b/tests/helper/helper.go @@ -28,6 +28,8 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/remotecommand" "sigs.k8s.io/controller-runtime/pkg/client/config" + + "github.com/kedacore/keda/v2/pkg/generated/clientset/versioned/typed/keda/v1alpha1" ) const ( @@ -55,8 +57,9 @@ var ( ) var ( - KubeClient *kubernetes.Clientset - KubeConfig *rest.Config + KubeClient *kubernetes.Clientset + KedaKubeClient *v1alpha1.KedaV1alpha1Client + KubeConfig *rest.Config ) type ExecutionError struct { @@ -178,6 +181,21 @@ func GetKubernetesClient(t *testing.T) *kubernetes.Clientset { return KubeClient } +func GetKedaKubernetesClient(t *testing.T) *v1alpha1.KedaV1alpha1Client { + if KedaKubeClient != nil && KubeConfig != nil { + return KedaKubeClient + } + + var err error + KubeConfig, err = config.GetConfig() + assert.NoErrorf(t, err, "cannot fetch kube config file - %s", err) + + KedaKubeClient, err = v1alpha1.NewForConfig(KubeConfig) + assert.NoErrorf(t, err, "cannot create keda kubernetes client - %s", err) + + return KedaKubeClient +} + // Creates a new namespace. If it already exists, make sure it is deleted first. func CreateNamespace(t *testing.T, kc *kubernetes.Clientset, nsName string) { DeleteNamespace(t, kc, nsName) diff --git a/tests/internals/prometheus_metrics/prometheus_metrics_test.go b/tests/internals/prometheus_metrics/prometheus_metrics_test.go index 190e548aed3..3db46563a56 100644 --- a/tests/internals/prometheus_metrics/prometheus_metrics_test.go +++ b/tests/internals/prometheus_metrics/prometheus_metrics_test.go @@ -4,12 +4,16 @@ package prometheus_metrics_test import ( + "context" "fmt" "strings" "testing" + promModel "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" "github.com/stretchr/testify/assert" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" . "github.com/kedacore/keda/v2/tests/helper" ) @@ -23,15 +27,19 @@ var ( deploymentName = fmt.Sprintf("%s-deployment", testName) monitoredDeploymentName = fmt.Sprintf("%s-monitored", testName) scaledObjectName = fmt.Sprintf("%s-so", testName) + cronScaledJobName = fmt.Sprintf("%s-cron-sj", testName) clientName = fmt.Sprintf("%s-client", testName) + serviceName = fmt.Sprintf("%s-service", testName) ) type templateData struct { TestNamespace string DeploymentName string ScaledObjectName string + CronScaledJobName string MonitoredDeploymentName string ClientName string + ServiceName string } const ( @@ -102,6 +110,44 @@ spec: value: '1' ` + cronScaledJobTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledJob +metadata: + name: {{.CronScaledJobName}} + namespace: {{.TestNamespace}} +spec: + jobTargetRef: + template: + spec: + containers: + - name: external-executor + image: busybox + command: + - sleep + - "30" + imagePullPolicy: IfNotPresent + restartPolicy: Never + backoffLimit: 1 + pollingInterval: 5 + maxReplicaCount: 3 + successfulJobsHistoryLimit: 0 + failedJobsHistoryLimit: 0 + triggers: + - type: cron + metadata: + timezone: Etc/UTC + start: 0 * * * * + end: 1 * * * * + desiredReplicas: '4' + - type: cron + metadata: + timezone: Etc/UTC + start: 1 * * * * + end: 2 * * * * + desiredReplicas: '4' +` + clientTemplate = ` apiVersion: v1 kind: Pod @@ -116,6 +162,21 @@ spec: - sh - -c - "exec tail -f /dev/null"` + + serviceTemplate = ` +apiVersion: v1 +kind: Service +metadata: + name: {{.ServiceName}} + namespace: keda +spec: + ports: + - name: metrics + port: 8080 + targetPort: 8080 + selector: + app: keda-operator +` ) func TestScaler(t *testing.T) { @@ -133,6 +194,7 @@ func TestScaler(t *testing.T) { "replica count should be 2 after 2 minute") testHPAScalerMetricValue(t) + testTriggerTotalMetric(t, kc, data) // cleanup DeleteKubernetesResources(t, kc, testNamespace, data, templates) @@ -145,18 +207,19 @@ func getTemplateData() (templateData, []Template) { ScaledObjectName: scaledObjectName, MonitoredDeploymentName: monitoredDeploymentName, ClientName: clientName, + ServiceName: serviceName, + CronScaledJobName: cronScaledJobName, }, []Template{ {Name: "deploymentTemplate", Config: deploymentTemplate}, {Name: "monitoredDeploymentTemplate", Config: monitoredDeploymentTemplate}, {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, {Name: "clientTemplate", Config: clientTemplate}, + {Name: "serviceTemplate", Config: serviceTemplate}, } } -func testHPAScalerMetricValue(t *testing.T) { - t.Log("--- testing hpa scaler metric value ---") - - out, _, err := ExecCommandOnSpecificPod(t, clientName, testNamespace, "curl --insecure http://keda-metrics-apiserver.keda:9022/metrics") +func fetchAndParsePrometheusMetrics(t *testing.T, cmd string) map[string]*promModel.MetricFamily { + out, _, err := ExecCommandOnSpecificPod(t, clientName, testNamespace, cmd) assert.NoErrorf(t, err, "cannot execute command - %s", err) parser := expfmt.TextParser{} @@ -165,6 +228,14 @@ func testHPAScalerMetricValue(t *testing.T) { family, err := parser.TextToMetricFamilies(reader) assert.NoErrorf(t, err, "cannot parse metrics - %s", err) + return family +} + +func testHPAScalerMetricValue(t *testing.T) { + t.Log("--- testing hpa scaler metric value ---") + + family := fetchAndParsePrometheusMetrics(t, "curl --insecure http://keda-metrics-apiserver.keda:9022/metrics") + if val, ok := family["keda_metrics_adapter_scaler_metrics_value"]; ok { var found bool metrics := val.GetMetric() @@ -182,3 +253,67 @@ func testHPAScalerMetricValue(t *testing.T) { t.Errorf("metric not available") } } + +func testTriggerTotalMetric(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing trigger total metric ---") + testTriggerTotalMetricValue(t, getTriggerTotalsManually(t, kc)) + + KubectlApplyWithTemplate(t, data, "cronScaledJobTemplate", cronScaledJobTemplate) + testTriggerTotalMetricValue(t, getTriggerTotalsManually(t, kc)) + + KubectlDeleteWithTemplate(t, data, "cronScaledJobTemplate", cronScaledJobTemplate) + testTriggerTotalMetricValue(t, getTriggerTotalsManually(t, kc)) +} + +func getTriggerTotalsManually(t *testing.T, kc *kubernetes.Clientset) map[string]int { + kedaKc := GetKedaKubernetesClient(t) + + triggerTotals := make(map[string]int) + + namespaceList, err := kc.CoreV1().Namespaces().List(context.Background(), v1.ListOptions{}) + assert.NoErrorf(t, err, "failed to list namespaces - %s", err) + + for _, namespace := range namespaceList.Items { + scaledObjectList, err := kedaKc.ScaledObjects(namespace.Name).List(context.Background(), v1.ListOptions{}) + assert.NoErrorf(t, err, "failed to list scaledObjects in namespace - %s with err - %s", namespace.Name, err) + + for _, scaledObject := range scaledObjectList.Items { + for _, trigger := range scaledObject.Spec.Triggers { + triggerTotals[trigger.Type]++ + } + } + + scaledJobList, err := kedaKc.ScaledJobs(namespace.Name).List(context.Background(), v1.ListOptions{}) + assert.NoErrorf(t, err, "failed to list scaledJobs in namespace - %s with err - %s", namespace.Name, err) + + for _, scaledJob := range scaledJobList.Items { + for _, trigger := range scaledJob.Spec.Triggers { + triggerTotals[trigger.Type]++ + } + } + } + + return triggerTotals +} + +func testTriggerTotalMetricValue(t *testing.T, expected map[string]int) { + family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure http://%s.keda:8080/metrics", serviceName)) + + if val, ok := family["keda_operator_trigger_totals"]; ok { + var found bool + metrics := val.GetMetric() + for _, metric := range metrics { + labels := metric.GetLabel() + for _, label := range labels { + if *label.Name == "type" { + assert.Equalf(t, float64(expected[*label.Value]), *metric.Gauge.Value, "expected %f got %f for type %s", + float64(expected[*label.Value]), *metric.Gauge.Value, *label.Value) + found = true + } + } + } + assert.Equal(t, true, found) + } else { + t.Errorf("metric not available") + } +}