Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

### New

- **General**: Provide Prometheus metric with indication of total number of triggers per trigger type in `ScaledJob`/`ScaledObject`. ([#3663](https://github.com/kedacore/keda/issues/3663))
- **Azure Service Bus**: Add support for Shared Access Signature (SAS) tokens for authentication. ([#2920](https://github.com/kedacore/keda/issues/2920))

### Improvements
Expand Down
109 changes: 105 additions & 4 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package keda
import (
"context"
"fmt"
"sync"
"time"

"github.com/go-logr/logr"
Expand All @@ -27,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand All @@ -38,6 +40,7 @@ import (
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/metrics"
"github.com/kedacore/keda/v2/pkg/scaling"
)

Expand All @@ -51,12 +54,24 @@ type ScaledJobReconciler struct {
GlobalHTTPTimeout time.Duration
Recorder record.EventRecorder

scaleHandler scaling.ScaleHandler
scaledJobGenerations *sync.Map
scaleHandler scaling.ScaleHandler
}

var (
scaledJobTriggers map[string][]string
scaledJobTriggersLock *sync.Mutex
)

func init() {
scaledJobTriggers = make(map[string][]string)
scaledJobTriggersLock = &sync.Mutex{}
}

// SetupWithManager initializes the ScaledJobReconciler instance and starts a new controller managed by the passed Manager instance.
func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.GlobalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler"))
r.scaledJobGenerations = &sync.Map{}

return ctrl.NewControllerManagedBy(mgr).
WithOptions(options).
Expand Down Expand Up @@ -90,8 +105,9 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// Check if the ScaledJob instance is marked to be deleted, which is
// indicated by the deletion timestamp being set.
if scaledJob.GetDeletionTimestamp() != nil {
return ctrl.Result{}, r.finalizeScaledJob(ctx, reqLogger, scaledJob)
return ctrl.Result{}, r.finalizeScaledJob(ctx, reqLogger, scaledJob, req.NamespacedName.String())
}
r.updateTriggerTotals(reqLogger, scaledJob, req.NamespacedName.String())

// ensure finalizer is set on this CR
if err := r.ensureFinalizer(ctx, reqLogger, scaledJob); err != nil {
Expand Down Expand Up @@ -213,11 +229,96 @@ func (r *ScaledJobReconciler) deletePreviousVersionScaleJobs(ctx context.Context
// requestScaleLoop request ScaleLoop handler for the respective ScaledJob
func (r *ScaledJobReconciler) requestScaleLoop(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {
logger.V(1).Info("Starting a new ScaleLoop")
return r.scaleHandler.HandleScalableObject(ctx, scaledJob)

key, err := cache.MetaNamespaceKeyFunc(scaledJob)
if err != nil {
logger.Error(err, "Error getting key for scaledJob")
return err
}

if err = r.scaleHandler.HandleScalableObject(ctx, scaledJob); err != nil {
return err
}

r.scaledJobGenerations.Store(key, scaledJob.Generation)

return nil
}

// stopScaleLoop stops ScaleLoop handler for the respective ScaledJob
func (r *ScaledJobReconciler) stopScaleLoop(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {
logger.V(1).Info("Stopping a ScaleLoop")
return r.scaleHandler.DeleteScalableObject(ctx, scaledJob)

key, err := cache.MetaNamespaceKeyFunc(scaledJob)
if err != nil {
logger.Error(err, "Error getting key for scaledJob")
return err
}

if err = r.scaleHandler.DeleteScalableObject(ctx, scaledJob); err != nil {
return err
}

r.scaledJobGenerations.Delete(key)
return nil
}

// scaledJobGenerationChanged returns true if ScaledJob's Generation was changed, ie. ScaledJob.Spec was changed
func (r *ScaledJobReconciler) scaledJobGenerationChanged(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (bool, error) {
key, err := cache.MetaNamespaceKeyFunc(scaledJob)
if err != nil {
logger.Error(err, "Error getting key for scaledJob")
return true, err
}

value, loaded := r.scaledJobGenerations.Load(key)
if loaded {
generation := value.(int64)
if generation == scaledJob.Generation {
return false, nil
}
}
return true, nil
}

func (r *ScaledJobReconciler) updateTriggerTotals(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, namespacedName string) {
specChanged, err := r.scaledJobGenerationChanged(logger, scaledJob)
if err != nil {
logger.Error(err, "failed to update trigger totals")
return
}

if !specChanged {
return
}

scaledJobTriggersLock.Lock()
defer scaledJobTriggersLock.Unlock()

if triggerTypes, ok := scaledJobTriggers[namespacedName]; ok {
for _, triggerType := range triggerTypes {
metrics.DecrementTriggerTotal(triggerType)
}
}

triggerTypes := make([]string, len(scaledJob.Spec.Triggers))
for _, trigger := range scaledJob.Spec.Triggers {
metrics.IncrementTriggerTotal(trigger.Type)
triggerTypes = append(triggerTypes, trigger.Type)
}

scaledJobTriggers[namespacedName] = triggerTypes
}

func (r *ScaledJobReconciler) updateTriggerTotalsOnDelete(namespacedName string) {
scaledJobTriggersLock.Lock()
defer scaledJobTriggersLock.Unlock()

if triggerTypes, ok := scaledJobTriggers[namespacedName]; ok {
for _, triggerType := range triggerTypes {
metrics.DecrementTriggerTotal(triggerType)
}
}

delete(scaledJobTriggers, namespacedName)
}
5 changes: 4 additions & 1 deletion controllers/keda/scaledjob_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ const (
)

// finalizeScaledJob runs finalization logic on ScaledJob if there's finalizer
func (r *ScaledJobReconciler) finalizeScaledJob(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {
func (r *ScaledJobReconciler) finalizeScaledJob(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob,
namespacedName string) error {
if util.Contains(scaledJob.GetFinalizers(), scaledJobFinalizer) {
// Run finalization logic for scaledJobFinalizer. If the
// finalization logic fails, don't remove the finalizer so
Expand All @@ -48,6 +49,8 @@ func (r *ScaledJobReconciler) finalizeScaledJob(ctx context.Context, logger logr
logger.Error(err, "Failed to update ScaledJob after removing a finalizer", "finalizer", scaledJobFinalizer)
return err
}

r.updateTriggerTotalsOnDelete(namespacedName)
}

logger.Info("Successfully finalized ScaledJob")
Expand Down
58 changes: 55 additions & 3 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/metrics"
"github.com/kedacore/keda/v2/pkg/scaling"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)
Expand Down Expand Up @@ -76,14 +77,22 @@ type ScaledObjectReconciler struct {
kubeVersion kedautil.K8sVersion
}

// A cache mapping "resource.group" to true or false if we know if this resource is scalable.
var isScalableCache *sync.Map
var (
// A cache mapping "resource.group" to true or false if we know if this resource is scalable.
isScalableCache *sync.Map

scaledObjectTriggers map[string][]string
scaledObjectTriggersLock *sync.Mutex
)

func init() {
// Prefill the cache with some known values for core resources in case of future parallelism to avoid stampeding herd on startup.
isScalableCache = &sync.Map{}
isScalableCache.Store("deployments.apps", true)
isScalableCache.Store("statefulsets.apps", true)

scaledObjectTriggers = make(map[string][]string)
scaledObjectTriggersLock = &sync.Mutex{}
}

// SetupWithManager initializes the ScaledObjectReconciler instance and starts a new controller managed by the passed Manager instance.
Expand Down Expand Up @@ -166,8 +175,9 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
// Check if the ScaledObject instance is marked to be deleted, which is
// indicated by the deletion timestamp being set.
if scaledObject.GetDeletionTimestamp() != nil {
return ctrl.Result{}, r.finalizeScaledObject(ctx, reqLogger, scaledObject)
return ctrl.Result{}, r.finalizeScaledObject(ctx, reqLogger, scaledObject, req.NamespacedName.String())
}
r.updateTriggerTotals(reqLogger, scaledObject, req.NamespacedName.String())

// ensure finalizer is set on this CR
if err := r.ensureFinalizer(ctx, reqLogger, scaledObject); err != nil {
Expand Down Expand Up @@ -469,3 +479,45 @@ func (r *ScaledObjectReconciler) scaledObjectGenerationChanged(logger logr.Logge
}
return true, nil
}

func (r *ScaledObjectReconciler) updateTriggerTotals(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, namespacedName string) {
specChanged, err := r.scaledObjectGenerationChanged(logger, scaledObject)
if err != nil {
logger.Error(err, "failed to update trigger totals")
return
}

if !specChanged {
return
}

scaledObjectTriggersLock.Lock()
defer scaledObjectTriggersLock.Unlock()

if triggerTypes, ok := scaledObjectTriggers[namespacedName]; ok {
for _, triggerType := range triggerTypes {
metrics.DecrementTriggerTotal(triggerType)
}
}

triggerTypes := make([]string, len(scaledObject.Spec.Triggers))
for _, trigger := range scaledObject.Spec.Triggers {
metrics.IncrementTriggerTotal(trigger.Type)
triggerTypes = append(triggerTypes, trigger.Type)
}

scaledObjectTriggers[namespacedName] = triggerTypes
}

func (r *ScaledObjectReconciler) updateTriggerTotalsOnDelete(namespacedName string) {
scaledObjectTriggersLock.Lock()
defer scaledObjectTriggersLock.Unlock()

if triggerTypes, ok := scaledObjectTriggers[namespacedName]; ok {
for _, triggerType := range triggerTypes {
metrics.DecrementTriggerTotal(triggerType)
}
}

delete(scaledObjectTriggers, namespacedName)
}
5 changes: 4 additions & 1 deletion controllers/keda/scaledobject_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ const (
)

// finalizeScaledObject runs finalization logic on ScaledObject if there's finalizer
func (r *ScaledObjectReconciler) finalizeScaledObject(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {
func (r *ScaledObjectReconciler) finalizeScaledObject(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject,
namespacedName string) error {
if util.Contains(scaledObject.GetFinalizers(), scaledObjectFinalizer) {
// Run finalization logic for scaledObjectFinalizer. If the
// finalization logic fails, don't remove the finalizer so
Expand Down Expand Up @@ -77,6 +78,8 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(ctx context.Context, logge
logger.Error(err, "Failed to update ScaledObject after removing a finalizer", "finalizer", scaledObjectFinalizer)
return err
}

r.updateTriggerTotalsOnDelete(namespacedName)
}

logger.Info("Successfully finalized ScaledObject")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ require (
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.13.0
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.37.0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/robfig/cron/v3 v3.0.1
Expand Down Expand Up @@ -227,7 +228,6 @@ require (
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/ryanuber/go-glob v1.0.0 // indirect
Expand Down
49 changes: 49 additions & 0 deletions pkg/metrics/operator_prom_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
Copyright 2022 The KEDA Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

import (
"github.com/prometheus/client_golang/prometheus"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)

var (
triggerTotalsGaugeVec = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "keda_operator",
Subsystem: "trigger",
Name: "totals",
},
[]string{"type"},
)
)

func init() {
metrics.Registry.MustRegister(triggerTotalsGaugeVec)
}

func IncrementTriggerTotal(triggerType string) {
if triggerType != "" {
triggerTotalsGaugeVec.WithLabelValues(triggerType).Inc()
}
}

func DecrementTriggerTotal(triggerType string) {
if triggerType != "" {
triggerTotalsGaugeVec.WithLabelValues(triggerType).Dec()
}
}
Loading