Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions operator/api/v1beta1/aistore_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,12 @@ type DaemonSpec struct {
ExternalAccess *ExternalAccessSpec `json:"externalAccess,omitempty"`
}

// IsSoftScaleDown reports whether soft scale-down (maintenance mode) is enabled
// for targets. Returns false when unset.
func (t TargetSpec) IsSoftScaleDown() bool {
return t.SoftScaleDown != nil && *t.SoftScaleDown
}

// ProbeSpec defines optional overrides for Kubernetes probe timing parameters.
// All fields are optional; unset fields use operator defaults.
// SuccessThreshold is intentionally omitted: Kubernetes requires it to be 1 for liveness
Expand Down Expand Up @@ -654,6 +660,15 @@ type TargetSpec struct {
// during node drain or cluster scale-down operations by cloud providers.
// +optional
PodDisruptionBudget *PDBSpec `json:"pdb,omitempty"`

// SoftScaleDown controls the way that nodes are scaled down.
// When set to true, scale down leaves data on the node. This is useful in cases where you expect another pod to be
// immediately scheduled on the node. Note that if this is not the case then any data on the node will become inaccessible.
// When set to false, scale down decommissions nodes, which will either rebalance data off the node or delete it, depending
// on whether rebalance is enabled.
// The default value is false.
// +optional
SoftScaleDown *bool `json:"softScaleDown,omitempty"`
}

// LogSidecarSpec defines a sidecar container to expose AIS logs to K8s
Expand Down Expand Up @@ -944,10 +959,6 @@ func (ais *AIStore) ShouldIncludeClientCert() bool {
return tls.ClientAuthType(*ais.Spec.ConfigToUpdate.Net.HTTP.ClientAuthTLS) > tls.NoClientCert
}

func (ais *AIStore) IsFullyAutoScaling() bool {
return ais.GetTargetSize() == -1 && ais.GetProxySize() == -1
}

func (ais *AIStore) IsTargetAutoScaling() bool {
if ais.Spec.Size != nil {
if *ais.Spec.Size == -1 {
Expand Down
10 changes: 10 additions & 0 deletions operator/api/v1beta1/aistore_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,4 +288,14 @@ var _ = Describe("AIStore", func() {
Entry("disabled", AIStore{Spec: AIStoreSpec{}}, false),
)
})

Describe("TargetSpec.IsSoftScaleDown", func() {
DescribeTable("should return the correct value", func(t TargetSpec, want bool) {
Expect(t.IsSoftScaleDown()).To(Equal(want))
},
Entry("nil field (defaults to false)", TargetSpec{}, false),
Entry("explicitly false", TargetSpec{SoftScaleDown: aisapc.Ptr(false)}, false),
Entry("explicitly true", TargetSpec{SoftScaleDown: aisapc.Ptr(true)}, true),
)
})
})
1 change: 1 addition & 0 deletions operator/api/v1beta1/aistore_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ func validateProxyUpdate(prev, ais *AIStore) error {
func validateTargetUpdate(prev, ais *AIStore) error {
allowDaemonSpecUpdates(&prev.Spec.TargetSpec.DaemonSpec, &ais.Spec.TargetSpec.DaemonSpec)
prev.Spec.TargetSpec.PodDisruptionBudget = ais.Spec.TargetSpec.PodDisruptionBudget
prev.Spec.TargetSpec.SoftScaleDown = ais.Spec.TargetSpec.SoftScaleDown
if !equality.Semantic.DeepEqual(ais.Spec.TargetSpec, prev.Spec.TargetSpec) {
diff := deep.Equal(ais.Spec.TargetSpec, prev.Spec.TargetSpec)
webhooklog.Info(fmt.Sprintf("Differences found in target spec: [%s]", strings.Join(diff, ", ")))
Expand Down
5 changes: 5 additions & 0 deletions operator/api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions operator/config/base/crd/ais.nvidia.com_aistores.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6117,6 +6117,15 @@ spec:
format: int32
minimum: -1
type: integer
softScaleDown:
description: |-
SoftScaleDown controls the way that nodes are scaled down.
When set to true, scale down leaves data on the node. This is useful in cases where you expect another pod to be
immediately scheduled on the node. Note that if this is not the case then any data on the node will become inaccessible.
When set to false, scale down decommissions nodes, which will either rebalance data off the node or delete it, depending
on whether rebalance is enabled.
The default value is false.
type: boolean
tolerations:
description: Tolerations - list of tolerations for AIS Daemon
pod
Expand Down
9 changes: 9 additions & 0 deletions operator/helm/ais-operator/templates/aistore-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6105,6 +6105,15 @@ spec:
format: int32
minimum: -1
type: integer
softScaleDown:
description: |-
SoftScaleDown controls the way that nodes are scaled down.
When set to true, scale down leaves data on the node. This is useful in cases where you expect another pod to be
immediately scheduled on the node. Note that if this is not the case then any data on the node will become inaccessible.
When set to false, scale down decommissions nodes, which will either rebalance data off the node or delete it, depending
on whether rebalance is enabled.
The default value is false.
type: boolean
tolerations:
description: Tolerations - list of tolerations for AIS Daemon pod
items:
Expand Down
49 changes: 30 additions & 19 deletions operator/pkg/controllers/target_controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (r *AIStoreReconciler) resolveStatefulSetScaling(ctx context.Context, ais *
}
logger.Info("Scaling up target statefulset to match AIS cluster spec size", "desiredSize", expectedSize)
} else if expectedSize < currentSize {
// Wait for decommission to complete before scaling the StatefulSet down
// If applicable, wait for decommission to complete before scaling the StatefulSet down
if ready, scaleErr := r.isReadyToScaleDown(ctx, ais, currentSize); scaleErr != nil || !ready {
return scaleErr
}
Expand All @@ -260,6 +260,9 @@ func (r *AIStoreReconciler) isReadyToScaleDown(ctx context.Context, ais *aisv1.A
if err != nil {
return
}
if ais.Spec.TargetSpec.IsSoftScaleDown() {
return true, nil
}
// If any targets are still in the smap as decommissioning, delay scaling
for _, targetNode := range smap.Tmap {
if smap.InMaintOrDecomm(targetNode.ID()) && !smap.InMaint(targetNode) {
Expand All @@ -281,7 +284,10 @@ func (r *AIStoreReconciler) startTargetScaling(ctx context.Context, ais *aisv1.A
return r.scaleUpLB(ctx, ais)
}

// Otherwise - scale down.
err := r.scaleDownLB(ctx, ais, ss)
if err != nil {
return err
}
// Ensure rebalance is enabled before decommissioning so data can migrate
// off the targets being decommissioned.
if err := r.enableRebalanceCondition(ctx, ais); err != nil {
Expand All @@ -290,15 +296,11 @@ func (r *AIStoreReconciler) startTargetScaling(ctx context.Context, ais *aisv1.A
if err := r.handleConfigState(ctx, ais, true /*force*/); err != nil {
return err
}
err := r.scaleDownLB(ctx, ais, ss)
if err != nil {
return err
}
// Decommission target through AIS API
return r.decommissionTargets(ctx, ais, *ss.Spec.Replicas)
// Prepare targets for scale down either via maintenance mode or decommission.
return r.prepareTargetsForScaleDown(ctx, ais, *ss.Spec.Replicas)
}

func (r *AIStoreReconciler) decommissionTargets(ctx context.Context, ais *aisv1.AIStore, actualSize int32) error {
func (r *AIStoreReconciler) prepareTargetsForScaleDown(ctx context.Context, ais *aisv1.AIStore, actualSize int32) error {
logger := logf.FromContext(ctx)
apiClient, err := r.clientManager.GetClient(ctx, ais)
if err != nil {
Expand All @@ -308,10 +310,10 @@ func (r *AIStoreReconciler) decommissionTargets(ctx context.Context, ais *aisv1.
if err != nil {
return err
}
logger.Info("Decommissioning targets", "Smap version", smap)
logger.Info("Preparing targets for scale down", "Smap version", smap)
for idx := actualSize; idx > ais.GetTargetSize(); idx-- {
podName := target.PodName(ais, idx-1)
logger.Info("Attempting to decommission target", "podName", podName)
logger.Info("Attempting to prepare target for scale down", "podName", podName)
node, err := findAISNodeByPodName(smap.Tmap, podName)
if err != nil {
// If target is not in the cluster map, fetch the pod and inspect state.
Expand All @@ -320,25 +322,34 @@ func (r *AIStoreReconciler) decommissionTargets(ctx context.Context, ais *aisv1.
pod, podErr := r.k8sClient.GetPod(ctx, types.NamespacedName{Name: podName, Namespace: ais.Namespace})
switch {
case k8serrors.IsNotFound(podErr):
logger.Info("Target pod not found, skipping decommission", "podName", podName)
logger.Info("Target pod not found, skipping scale-down preparation", "podName", podName)
continue
case podErr != nil:
return fmt.Errorf("failed to get pod %s: %w", podName, podErr)
case isPodUnschedulable(pod):
logger.Info("Target pod is unschedulable, skipping decommission", "podName", podName)
logger.Info("Target pod is unschedulable, skipping scale-down preparation", "podName", podName)
continue
case isPodInCrashLoopBackOff(pod):
logger.Info("Target pod is in CrashLoopBackOff, skipping decommission", "podName", podName)
logger.Info("Target pod is in CrashLoopBackOff, skipping scale-down preparation", "podName", podName)
continue
}
return fmt.Errorf("waiting for target %s to register in smap", podName)
}
if !smap.InMaintOrDecomm(node.ID()) {
logger.Info("Decommissioning target", "nodeID", node.ID())
_, err = apiClient.DecommissionNode(&aisapc.ActValRmNode{DaemonID: node.ID(), RmUserData: true})
if err != nil {
logger.Error(err, "Failed to decommission node", "nodeID", node.ID())
return err
if ais.Spec.TargetSpec.IsSoftScaleDown() {
logger.Info("Putting target in maintenance mode", "nodeID", node.ID())
_, err = apiClient.StartMaintenance(&aisapc.ActValRmNode{DaemonID: node.ID(), SkipRebalance: true})
if err != nil {
logger.Error(err, "Failed to put node in maintenance", "nodeID", node.ID())
return err
}
} else {
logger.Info("Decommissioning target", "nodeID", node.ID())
_, err = apiClient.DecommissionNode(&aisapc.ActValRmNode{DaemonID: node.ID(), RmUserData: true})
if err != nil {
logger.Error(err, "Failed to decommission node", "nodeID", node.ID())
return err
}
}
} else {
logger.Info("AIS target is already in decommissioning state", "nodeID", node.ID())
Expand Down
Loading