diff --git a/operator/api/v1beta1/aistore_types.go b/operator/api/v1beta1/aistore_types.go index 64f67716..1f4ab5e4 100644 --- a/operator/api/v1beta1/aistore_types.go +++ b/operator/api/v1beta1/aistore_types.go @@ -604,6 +604,19 @@ type DaemonSpec struct { ExternalAccess *ExternalAccessSpec `json:"externalAccess,omitempty"` } +// ScaleDownMode defines the behavior when scaling down targets. +type ScaleDownMode string + +const ( + // ScaleDownModeDecommission rebalances data off the node (or deletes it if + // rebalance is disabled) before removing the target. + ScaleDownModeDecommission ScaleDownMode = "decommission" + // ScaleDownModeRetain leaves data on the node by putting the target into + // maintenance mode. Useful when another pod is expected to be immediately + // scheduled; otherwise data may become inaccessible. + ScaleDownModeRetain ScaleDownMode = "retain" +) + // ProbeSpec defines optional overrides for Kubernetes probe timing parameters. // All fields are optional; unset fields use operator defaults. // SuccessThreshold is intentionally omitted: Kubernetes requires it to be 1 for liveness @@ -654,6 +667,24 @@ type TargetSpec struct { // during node drain or cluster scale-down operations by cloud providers. // +optional PodDisruptionBudget *PDBSpec `json:"pdb,omitempty"` + + // ScaleDownMode controls how targets are scaled down. + // "decommission" (default) rebalances data off the node or deletes it, depending on whether rebalance is enabled. + // "retain" leaves data on the node by putting the target into maintenance mode. This is useful when you expect + // another pod to be immediately scheduled on the node. Note that if this is not the case then any data on the node + // will become inaccessible. + // +kubebuilder:validation:Enum=decommission;retain + // +kubebuilder:default:=decommission + // +optional + ScaleDownMode *ScaleDownMode `json:"scaleDownMode,omitempty"` +} + +// GetScaleDownMode returns the effective scale-down mode, defaulting to decommission. +func (t TargetSpec) GetScaleDownMode() ScaleDownMode { + if t.ScaleDownMode == nil { + return ScaleDownModeDecommission + } + return *t.ScaleDownMode } // LogSidecarSpec defines a sidecar container to expose AIS logs to K8s @@ -944,10 +975,6 @@ func (ais *AIStore) ShouldIncludeClientCert() bool { return tls.ClientAuthType(*ais.Spec.ConfigToUpdate.Net.HTTP.ClientAuthTLS) > tls.NoClientCert } -func (ais *AIStore) IsFullyAutoScaling() bool { - return ais.GetTargetSize() == -1 && ais.GetProxySize() == -1 -} - func (ais *AIStore) IsTargetAutoScaling() bool { if ais.Spec.Size != nil { if *ais.Spec.Size == -1 { diff --git a/operator/api/v1beta1/aistore_types_test.go b/operator/api/v1beta1/aistore_types_test.go index db94fc14..6b9d6a2a 100644 --- a/operator/api/v1beta1/aistore_types_test.go +++ b/operator/api/v1beta1/aistore_types_test.go @@ -288,4 +288,14 @@ var _ = Describe("AIStore", func() { Entry("disabled", AIStore{Spec: AIStoreSpec{}}, false), ) }) + + Describe("TargetSpec.GetScaleDownMode", func() { + DescribeTable("should return the correct value", func(t TargetSpec, want ScaleDownMode) { + Expect(t.GetScaleDownMode()).To(Equal(want)) + }, + Entry("nil field (defaults to decommission)", TargetSpec{}, ScaleDownModeDecommission), + Entry("explicitly decommission", TargetSpec{ScaleDownMode: aisapc.Ptr(ScaleDownModeDecommission)}, ScaleDownModeDecommission), + Entry("explicitly retain", TargetSpec{ScaleDownMode: aisapc.Ptr(ScaleDownModeRetain)}, ScaleDownModeRetain), + ) + }) }) diff --git a/operator/api/v1beta1/aistore_webhook.go b/operator/api/v1beta1/aistore_webhook.go index 40a8ebbb..5a39b1c3 100644 --- a/operator/api/v1beta1/aistore_webhook.go +++ b/operator/api/v1beta1/aistore_webhook.go @@ -292,6 +292,7 @@ func validateProxyUpdate(prev, ais *AIStore) error { func validateTargetUpdate(prev, ais *AIStore) error { allowDaemonSpecUpdates(&prev.Spec.TargetSpec.DaemonSpec, &ais.Spec.TargetSpec.DaemonSpec) prev.Spec.TargetSpec.PodDisruptionBudget = ais.Spec.TargetSpec.PodDisruptionBudget + prev.Spec.TargetSpec.ScaleDownMode = ais.Spec.TargetSpec.ScaleDownMode if !equality.Semantic.DeepEqual(ais.Spec.TargetSpec, prev.Spec.TargetSpec) { diff := deep.Equal(ais.Spec.TargetSpec, prev.Spec.TargetSpec) webhooklog.Info(fmt.Sprintf("Differences found in target spec: [%s]", strings.Join(diff, ", "))) diff --git a/operator/api/v1beta1/zz_generated.deepcopy.go b/operator/api/v1beta1/zz_generated.deepcopy.go index 822551ab..1ab39fb6 100644 --- a/operator/api/v1beta1/zz_generated.deepcopy.go +++ b/operator/api/v1beta1/zz_generated.deepcopy.go @@ -2202,6 +2202,11 @@ func (in *TargetSpec) DeepCopyInto(out *TargetSpec) { *out = new(PDBSpec) (*in).DeepCopyInto(*out) } + if in.ScaleDownMode != nil { + in, out := &in.ScaleDownMode, &out.ScaleDownMode + *out = new(ScaleDownMode) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TargetSpec. diff --git a/operator/config/base/crd/ais.nvidia.com_aistores.yaml b/operator/config/base/crd/ais.nvidia.com_aistores.yaml index cc440e0f..bccfddcd 100644 --- a/operator/config/base/crd/ais.nvidia.com_aistores.yaml +++ b/operator/config/base/crd/ais.nvidia.com_aistores.yaml @@ -5872,6 +5872,18 @@ spec: More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ type: object type: object + scaleDownMode: + default: decommission + description: |- + ScaleDownMode controls how targets are scaled down. + "decommission" (default) rebalances data off the node or deletes it, depending on whether rebalance is enabled. + "retain" leaves data on the node by putting the target into maintenance mode. This is useful when you expect + another pod to be immediately scheduled on the node. Note that if this is not the case then any data on the node + will become inaccessible. + enum: + - decommission + - retain + type: string securityContext: description: SecurityContext defines pod-level security attributes and common container settings for AIS proxy and target pods. diff --git a/operator/helm/ais-operator/templates/aistore-crd.yaml b/operator/helm/ais-operator/templates/aistore-crd.yaml index b23459c0..53ae0cf4 100644 --- a/operator/helm/ais-operator/templates/aistore-crd.yaml +++ b/operator/helm/ais-operator/templates/aistore-crd.yaml @@ -5860,6 +5860,18 @@ spec: More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ type: object type: object + scaleDownMode: + default: decommission + description: |- + ScaleDownMode controls how targets are scaled down. + "decommission" (default) rebalances data off the node or deletes it, depending on whether rebalance is enabled. + "retain" leaves data on the node by putting the target into maintenance mode. This is useful when you expect + another pod to be immediately scheduled on the node. Note that if this is not the case then any data on the node + will become inaccessible. + enum: + - decommission + - retain + type: string securityContext: description: SecurityContext defines pod-level security attributes and common container settings for AIS proxy and target pods. diff --git a/operator/pkg/controllers/target_controllers.go b/operator/pkg/controllers/target_controllers.go index 63ce7ad3..6ec7a6e4 100644 --- a/operator/pkg/controllers/target_controllers.go +++ b/operator/pkg/controllers/target_controllers.go @@ -236,7 +236,7 @@ func (r *AIStoreReconciler) resolveStatefulSetScaling(ctx context.Context, ais * } logger.Info("Scaling up target statefulset to match AIS cluster spec size", "desiredSize", expectedSize) } else if expectedSize < currentSize { - // Wait for decommission to complete before scaling the StatefulSet down + // If applicable, wait for decommission to complete before scaling the StatefulSet down if ready, scaleErr := r.isReadyToScaleDown(ctx, ais, currentSize); scaleErr != nil || !ready { return scaleErr } @@ -260,6 +260,9 @@ func (r *AIStoreReconciler) isReadyToScaleDown(ctx context.Context, ais *aisv1.A if err != nil { return } + if ais.Spec.TargetSpec.GetScaleDownMode() == aisv1.ScaleDownModeRetain { + return true, nil + } // If any targets are still in the smap as decommissioning, delay scaling for _, targetNode := range smap.Tmap { if smap.InMaintOrDecomm(targetNode.ID()) && !smap.InMaint(targetNode) { @@ -281,7 +284,10 @@ func (r *AIStoreReconciler) startTargetScaling(ctx context.Context, ais *aisv1.A return r.scaleUpLB(ctx, ais) } - // Otherwise - scale down. + err := r.scaleDownLB(ctx, ais, ss) + if err != nil { + return err + } // Ensure rebalance is enabled before decommissioning so data can migrate // off the targets being decommissioned. if err := r.enableRebalanceCondition(ctx, ais); err != nil { @@ -290,15 +296,11 @@ func (r *AIStoreReconciler) startTargetScaling(ctx context.Context, ais *aisv1.A if err := r.handleConfigState(ctx, ais, true /*force*/); err != nil { return err } - err := r.scaleDownLB(ctx, ais, ss) - if err != nil { - return err - } - // Decommission target through AIS API - return r.decommissionTargets(ctx, ais, *ss.Spec.Replicas) + // Prepare targets for scale down either via maintenance mode or decommission. + return r.prepareTargetsForScaleDown(ctx, ais, *ss.Spec.Replicas) } -func (r *AIStoreReconciler) decommissionTargets(ctx context.Context, ais *aisv1.AIStore, actualSize int32) error { +func (r *AIStoreReconciler) prepareTargetsForScaleDown(ctx context.Context, ais *aisv1.AIStore, actualSize int32) error { logger := logf.FromContext(ctx) apiClient, err := r.clientManager.GetClient(ctx, ais) if err != nil { @@ -308,10 +310,10 @@ func (r *AIStoreReconciler) decommissionTargets(ctx context.Context, ais *aisv1. if err != nil { return err } - logger.Info("Decommissioning targets", "Smap version", smap) + logger.Info("Preparing targets for scale down", "Smap version", smap) for idx := actualSize; idx > ais.GetTargetSize(); idx-- { podName := target.PodName(ais, idx-1) - logger.Info("Attempting to decommission target", "podName", podName) + logger.Info("Attempting to prepare target for scale down", "podName", podName) node, err := findAISNodeByPodName(smap.Tmap, podName) if err != nil { // If target is not in the cluster map, fetch the pod and inspect state. @@ -320,25 +322,34 @@ func (r *AIStoreReconciler) decommissionTargets(ctx context.Context, ais *aisv1. pod, podErr := r.k8sClient.GetPod(ctx, types.NamespacedName{Name: podName, Namespace: ais.Namespace}) switch { case k8serrors.IsNotFound(podErr): - logger.Info("Target pod not found, skipping decommission", "podName", podName) + logger.Info("Target pod not found, skipping scale-down preparation", "podName", podName) continue case podErr != nil: return fmt.Errorf("failed to get pod %s: %w", podName, podErr) case isPodUnschedulable(pod): - logger.Info("Target pod is unschedulable, skipping decommission", "podName", podName) + logger.Info("Target pod is unschedulable, skipping scale-down preparation", "podName", podName) continue case isPodInCrashLoopBackOff(pod): - logger.Info("Target pod is in CrashLoopBackOff, skipping decommission", "podName", podName) + logger.Info("Target pod is in CrashLoopBackOff, skipping scale-down preparation", "podName", podName) continue } return fmt.Errorf("waiting for target %s to register in smap", podName) } if !smap.InMaintOrDecomm(node.ID()) { - logger.Info("Decommissioning target", "nodeID", node.ID()) - _, err = apiClient.DecommissionNode(&aisapc.ActValRmNode{DaemonID: node.ID(), RmUserData: true}) - if err != nil { - logger.Error(err, "Failed to decommission node", "nodeID", node.ID()) - return err + if ais.Spec.TargetSpec.GetScaleDownMode() == aisv1.ScaleDownModeRetain { + logger.Info("Putting target in maintenance mode", "nodeID", node.ID()) + _, err = apiClient.StartMaintenance(&aisapc.ActValRmNode{DaemonID: node.ID(), SkipRebalance: true}) + if err != nil { + logger.Error(err, "Failed to put node in maintenance", "nodeID", node.ID()) + return err + } + } else { + logger.Info("Decommissioning target", "nodeID", node.ID()) + _, err = apiClient.DecommissionNode(&aisapc.ActValRmNode{DaemonID: node.ID(), RmUserData: true}) + if err != nil { + logger.Error(err, "Failed to decommission node", "nodeID", node.ID()) + return err + } } } else { logger.Info("AIS target is already in decommissioning state", "nodeID", node.ID()) diff --git a/operator/pkg/controllers/target_controllers_test.go b/operator/pkg/controllers/target_controllers_test.go index 43e82d58..28d0b53d 100644 --- a/operator/pkg/controllers/target_controllers_test.go +++ b/operator/pkg/controllers/target_controllers_test.go @@ -7,6 +7,8 @@ package controllers import ( "context" + "github.com/NVIDIA/aistore/api/apc" + aismeta "github.com/NVIDIA/aistore/core/meta" aisv1 "github.com/ais-operator/api/v1beta1" aisclient "github.com/ais-operator/pkg/client" "github.com/ais-operator/pkg/resources/target" @@ -14,8 +16,10 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "go.uber.org/mock/gomock" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/client-go/tools/events" ctrl "sigs.k8s.io/controller-runtime" @@ -123,3 +127,186 @@ var _ = Describe("prepareTargetForRollout", func() { Expect(requeue).To(BeFalse()) }) }) + +var _ = Describe("scaleDownMode", func() { + var ( + r *AIStoreReconciler + ais *aisv1.AIStore + mockCtrl *gomock.Controller + apiClient *mocks.MockAIStoreClientInterface + namespace string + ctx = context.TODO() + ) + + BeforeEach(func() { + namespace = "ais-test-" + rand.String(10) + Expect(k8sClient.Create(ctx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}})).To(Succeed()) + + ais = &aisv1.AIStore{ + ObjectMeta: metav1.ObjectMeta{Name: "ais", Namespace: namespace}, + Spec: aisv1.AIStoreSpec{ + InitImage: "init:latest", + NodeImage: "node:latest", + ProxySpec: aisv1.DaemonSpec{ + Size: apc.Ptr[int32](1), + ServiceSpec: aisv1.ServiceSpec{ + ServicePort: intstr.FromInt32(51080), + PublicPort: intstr.FromInt32(51081), + IntraControlPort: intstr.FromInt32(51082), + IntraDataPort: intstr.FromInt32(51083), + }, + }, + TargetSpec: aisv1.TargetSpec{ + DaemonSpec: aisv1.DaemonSpec{ + Size: apc.Ptr[int32](2), + ServiceSpec: aisv1.ServiceSpec{ + ServicePort: intstr.FromInt32(51080), + PublicPort: intstr.FromInt32(51081), + IntraControlPort: intstr.FromInt32(51082), + IntraDataPort: intstr.FromInt32(51083), + }, + }, + Mounts: []aisv1.Mount{{Path: "/data"}}, + }, + StateStorage: &aisv1.StateStorage{ + HostPath: &aisv1.StateHostPathConfig{Prefix: "/ais"}, + }, + }, + } + Expect(k8sClient.Create(ctx, ais)).To(Succeed()) + + tmpClient := aisclient.NewClient(k8sClient, k8sClient.Scheme()) + mockCtrl = gomock.NewController(GinkgoT()) + apiClient = mocks.NewMockAIStoreClientInterface(mockCtrl) + clientManager := mocks.NewMockAISClientManagerInterface(mockCtrl) + clientManager.EXPECT().GetClient(gomock.Any(), gomock.Any()).Return(apiClient, nil).AnyTimes() + r = NewAISReconciler(tmpClient, &events.FakeRecorder{}, ctrl.Log, clientManager) + }) + + AfterEach(func() { + mockCtrl.Finish() + }) + + makeSS := func(replicas int32) *appsv1.StatefulSet { + return &appsv1.StatefulSet{ + Spec: appsv1.StatefulSetSpec{ + Replicas: &replicas, + }, + } + } + + Describe("isReadyToScaleDown", func() { + Context("when scaleDownMode is decommission (default)", func() { + It("queries the cluster map to check decommission state", func() { + smap := &aismeta.Smap{ + Tmap: aismeta.NodeMap{}, + } + apiClient.EXPECT().GetClusterMap().Return(smap, nil) + + // 2 targets in smap but currentSize is 3, so not all active → ready + ready, err := r.isReadyToScaleDown(ctx, ais, 3) + Expect(err).NotTo(HaveOccurred()) + Expect(ready).To(BeTrue()) + }) + + It("delays scaling when all targets are still active", func() { + t1 := &aismeta.Snode{DaeID: "t1", DaeType: apc.Target, ControlNet: aismeta.NetInfo{Hostname: "ais-target-0"}} + t2 := &aismeta.Snode{DaeID: "t2", DaeType: apc.Target, ControlNet: aismeta.NetInfo{Hostname: "ais-target-1"}} + t3 := &aismeta.Snode{DaeID: "t3", DaeType: apc.Target, ControlNet: aismeta.NetInfo{Hostname: "ais-target-2"}} + smap := &aismeta.Smap{ + Tmap: aismeta.NodeMap{"t1": t1, "t2": t2, "t3": t3}, + } + apiClient.EXPECT().GetClusterMap().Return(smap, nil) + + ready, err := r.isReadyToScaleDown(ctx, ais, 3) + Expect(err).NotTo(HaveOccurred()) + Expect(ready).To(BeFalse()) + }) + }) + + Context("when scaleDownMode is retain", func() { + BeforeEach(func() { + ais.Spec.TargetSpec.ScaleDownMode = apc.Ptr(aisv1.ScaleDownModeRetain) + Expect(k8sClient.Update(ctx, ais)).To(Succeed()) + }) + + It("is ready when the target above the desired size is in maintenance", func() { + t3 := &aismeta.Snode{DaeID: "t3", DaeType: apc.Target, ControlNet: aismeta.NetInfo{Hostname: "ais-target-2"}, Flags: aismeta.SnodeMaint} + smap := &aismeta.Smap{Tmap: aismeta.NodeMap{"t3": t3}} + apiClient.EXPECT().GetClusterMap().Return(smap, nil) + + ready, err := r.isReadyToScaleDown(ctx, ais, 3) + Expect(err).NotTo(HaveOccurred()) + Expect(ready).To(BeTrue()) + }) + + It("is ready immediately without checking maintenance state", func() { + t3 := &aismeta.Snode{DaeID: "t3", DaeType: apc.Target, ControlNet: aismeta.NetInfo{Hostname: "ais-target-2"}} + smap := &aismeta.Smap{Tmap: aismeta.NodeMap{"t3": t3}} + apiClient.EXPECT().GetClusterMap().Return(smap, nil) + + ready, err := r.isReadyToScaleDown(ctx, ais, 3) + Expect(err).NotTo(HaveOccurred()) + Expect(ready).To(BeTrue()) + }) + }) + }) + + Describe("startTargetScaling", func() { + BeforeEach(func() { + // Pre-set rebalance condition so enableRebalanceCondition is a no-op + ais.SetCondition(aisv1.ConditionReadyRebalance) + Expect(k8sClient.Status().Update(ctx, ais)).To(Succeed()) + }) + + Context("when scaleDownMode is decommission (default)", func() { + It("decommissions targets with RmUserData=true", func() { + ss := makeSS(3) + + apiClient.EXPECT().SetClusterConfigUsingMsg(gomock.Any(), false).Return(nil) + + t3 := &aismeta.Snode{DaeID: "t3", DaeType: apc.Target, ControlNet: aismeta.NetInfo{Hostname: "ais-target-2"}} + smap := &aismeta.Smap{ + Tmap: aismeta.NodeMap{"t3": t3}, + } + apiClient.EXPECT().GetClusterMap().Return(smap, nil) + + apiClient.EXPECT().DecommissionNode(gomock.Any()).DoAndReturn(func(act *apc.ActValRmNode) (string, error) { + Expect(act.RmUserData).To(BeTrue()) + return "xid", nil + }) + + err := r.startTargetScaling(ctx, ais, ss) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Context("when scaleDownMode is retain", func() { + BeforeEach(func() { + ais.Spec.TargetSpec.ScaleDownMode = apc.Ptr(aisv1.ScaleDownModeRetain) + Expect(k8sClient.Update(ctx, ais)).To(Succeed()) + }) + + It("puts targets in maintenance with SkipRebalance=true", func() { + ss := makeSS(3) + + apiClient.EXPECT().SetClusterConfigUsingMsg(gomock.Any(), false).Return(nil) + + t3 := &aismeta.Snode{DaeID: "t3", DaeType: apc.Target, ControlNet: aismeta.NetInfo{Hostname: "ais-target-2"}} + smap := &aismeta.Smap{ + Tmap: aismeta.NodeMap{"t3": t3}, + } + apiClient.EXPECT().GetClusterMap().Return(smap, nil) + + apiClient.EXPECT().StartMaintenance(gomock.Any()).DoAndReturn(func(act *apc.ActValRmNode) (string, error) { + Expect(act.SkipRebalance).To(BeTrue()) + Expect(act.DaemonID).To(Equal("t3")) + return "xid", nil + }) + + err := r.startTargetScaling(ctx, ais, ss) + Expect(err).NotTo(HaveOccurred()) + }) + }) + }) +})