diff --git a/cmd/thv-operator/app/app.go b/cmd/thv-operator/app/app.go index 4bfb1b52fc..14614a06ce 100644 --- a/cmd/thv-operator/app/app.go +++ b/cmd/thv-operator/app/app.go @@ -12,9 +12,11 @@ import ( "fmt" "log/slog" "os" + "strconv" "strings" "github.com/go-logr/logr" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" @@ -42,10 +44,18 @@ var ( setupLog = log.Log.WithName("setup") ) +// envEnableStorageVersionMigrator is the opt-in for the StorageVersionMigrator +// controller. The controller defaults to OFF in this release so the change can +// ship safely without functional impact. Set to "true" (or "1", "t") to enable. +// A follow-up release will flip the default to true alongside the helm chart +// surface and user docs. +const envEnableStorageVersionMigrator = "TOOLHIVE_ENABLE_STORAGE_VERSION_MIGRATOR" + func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(mcpv1alpha1.AddToScheme(scheme)) utilruntime.Must(mcpv1beta1.AddToScheme(scheme)) + utilruntime.Must(apiextensionsv1.AddToScheme(scheme)) //+kubebuilder:scaffold:scheme } @@ -150,10 +160,56 @@ func setupControllersAndWebhooks(mgr ctrl.Manager, imagePullSecretsDefaults imag if err := setupAggregationControllers(mgr, imagePullSecretsDefaults); err != nil { return err } + enabled, err := isStorageVersionMigratorEnabled() + if err != nil { + return err + } + if enabled { + if err := setupStorageVersionMigrator(mgr); err != nil { + return err + } + } else { + setupLog.V(1).Info("StorageVersionMigrator disabled", "envVar", envEnableStorageVersionMigrator) + } //+kubebuilder:scaffold:builder return nil } +// setupStorageVersionMigrator wires the StorageVersionMigrator controller into +// the manager. The controller reconciles status.storedVersions on opted-in +// toolhive.stacklok.dev CRDs so a future operator release can drop deprecated +// versions from spec.versions without orphaning etcd objects. +func setupStorageVersionMigrator(mgr ctrl.Manager) error { + if err := (&controllers.StorageVersionMigratorReconciler{ + Client: mgr.GetClient(), + APIReader: mgr.GetAPIReader(), + Scheme: mgr.GetScheme(), + Recorder: mgr.GetEventRecorder("storageversionmigrator-controller"), + }).SetupWithManager(mgr); err != nil { + return fmt.Errorf("unable to create controller StorageVersionMigrator: %w", err) + } + return nil +} + +// isStorageVersionMigratorEnabled reports whether the StorageVersionMigrator +// controller should be registered. Defaults to false in this release — admins +// must explicitly opt in via TOOLHIVE_ENABLE_STORAGE_VERSION_MIGRATOR=true. +// An unparsable value returns an error so startup fails loudly rather than +// silently disabling the feature an admin asked to turn on. +func isStorageVersionMigratorEnabled() (bool, error) { + value, found := os.LookupEnv(envEnableStorageVersionMigrator) + if !found { + return false, nil + } + enabled, err := strconv.ParseBool(value) + if err != nil { + return false, fmt.Errorf( + "invalid value for %s: %q (expected true/false): %w", + envEnableStorageVersionMigrator, value, err) + } + return enabled, nil +} + // setupGroupRefFieldIndexes sets up field indexing for spec.groupRef on all resource types // that can reference an MCPGroup. This enables efficient lookups by groupRef in controllers. func setupGroupRefFieldIndexes(mgr ctrl.Manager) error { diff --git a/cmd/thv-operator/controllers/storageversionmigrator_controller.go b/cmd/thv-operator/controllers/storageversionmigrator_controller.go new file mode 100644 index 0000000000..24962f872a --- /dev/null +++ b/cmd/thv-operator/controllers/storageversionmigrator_controller.go @@ -0,0 +1,519 @@ +// SPDX-FileCopyrightText: Copyright 2025 Stacklok, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package controllers + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + apitypes "k8s.io/apimachinery/pkg/types" + kerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/client-go/tools/events" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +// Public contract for the StorageVersionMigrator controller. + +// AutoMigrateLabel identifies CRDs that opt in to storage-version migration. +// Will be applied via a kubebuilder marker on each root type in api/v1beta1/ +// in the follow-up PR; no toolhive CRD opts in yet, so the controller is a +// no-op even when the feature flag is set on this release. +const AutoMigrateLabel = "toolhive.stacklok.dev/auto-migrate-storage-version" + +// AutoMigrateValue is the label value that enables migration for a CRD. +const AutoMigrateValue = "true" + +// ToolhiveGroup is the API group the controller is scoped to (belt-and-braces +// filter in addition to the opt-in label). +const ToolhiveGroup = "toolhive.stacklok.dev" + +// EventReasonMigrationSucceeded and EventReasonMigrationFailed are the event +// reasons emitted on the owning CRD when a migration completes or fails. +const ( + EventReasonMigrationSucceeded = "StorageVersionMigrationSucceeded" + EventReasonMigrationFailed = "StorageVersionMigrationFailed" +) + +const ( + defaultMigrationCacheTTL = 1 * time.Hour + defaultListPageSize = 500 + defaultCacheGCInterval = 10 * time.Minute +) + +// errMigrationRetriedDueToConflicts is returned by restoreCRs when at least one +// CR re-store hit a typed Conflict (and no other errors occurred). The caller +// must NOT trim CRD.status.storedVersions in this case: the post-conflict state +// of the affected object is unverified, so reasoning about whether the storage +// re-encode actually happened is unsafe. The next reconcile retries cleanly. +var errMigrationRetriedDueToConflicts = errors.New( + "storage version migration retried due to concurrent writes; storedVersions left unchanged") + +// The wildcard CR RBAC below is intentional. The set of opted-in CRDs isn't +// known at codegen time — it's a per-CRD runtime label decision — so the +// kubebuilder marker can't enumerate kinds. The runtime gate is the +// isManagedCRD check inside Reconcile, which requires both the toolhive +// API group AND the opt-in label. Wildcard RBAC plus isManagedCRD form the +// defence in depth: RBAC bounds the controller to a single API group, and +// the label gate further restricts it to opted-in CRDs. +// +// Chart-consumer note: these markers regenerate role.yaml, so every chart +// install gains get/list/update on toolhive.stacklok.dev/* regardless of +// whether the migrator is opted in via TOOLHIVE_ENABLE_STORAGE_VERSION_MIGRATOR. +// Templating this rule behind a helm conditional is deferred to PR-C alongside +// the rest of the chart surface for the feature flag. + +//+kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get;list;watch +//+kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions/status,verbs=update;patch +//+kubebuilder:rbac:groups=toolhive.stacklok.dev,resources=*,verbs=get;list;update +//+kubebuilder:rbac:groups=toolhive.stacklok.dev,resources=*/status,verbs=update + +// StorageVersionMigratorReconciler reconciles CustomResourceDefinition objects +// in the toolhive.stacklok.dev group that carry the opt-in +// AutoMigrateLabel=AutoMigrateValue. For each such CRD it re-stores every CR +// at the current storage version by doing a Get + Update on the live object. +// The Update is a full PUT of the unmodified object; the apiserver re-encodes +// the request body at the current storage version, then compares the +// resulting bytes to what's in etcd. When the CR was originally stored at a +// different version (the actual migration scenario) the bytes carry a +// different apiVersion stamp than etcd's record, the comparison fails, and +// the write proceeds — re-encoding the object at the current storage +// version. When the CR is already at the current storage version, the bytes +// match and the apiserver harmlessly elides the write — there was nothing to +// migrate. After all CRs have been processed it patches +// CRD.status.storedVersions down to [] so a future +// release can drop deprecated versions from spec.versions without orphaning +// etcd objects. See https://github.com/kubernetes-sigs/kube-storage-version-migrator/issues/65 +// for the upstream maintainers' explanation of this mechanism. +// +// Disabled by default in this release. Admins opt in operator-wide via +// TOOLHIVE_ENABLE_STORAGE_VERSION_MIGRATOR=true. The helm chart surface and +// the default-on flip land together in a follow-up PR; until then, early +// adopters can set the env var directly through operator.env. +// Per-kind escape hatch: remove the label from the CRD (emergency only — will +// be re-applied by GitOps / helm upgrade). +type StorageVersionMigratorReconciler struct { + // used for CR Update writes and the CRD /status storedVersions patch; + // reads go through APIReader to bypass the informer cache. + client.Client + APIReader client.Reader // live reads for CRDs and CR list pages (bypasses informer) + Scheme *runtime.Scheme // kubebuilder reconciler convention + Recorder events.EventRecorder // MigrationSucceeded / MigrationFailed events on the CRD + PageSize int64 // overridable for tests; zero means defaultListPageSize + CacheGCInterval time.Duration // overridable for tests; zero means defaultCacheGCInterval + cache *migrationCache + // initOnce guards ensureInitialized so the lazy-default writes to PageSize, + // CacheGCInterval, and cache happen exactly once across all callers + // (SetupWithManager, Reconcile, and any future entrypoint). Without it, + // two concurrent callers seeing zero values could race on the field writes. + initOnce sync.Once +} + +// Reconcile runs for each opted-in toolhive.stacklok.dev CRD event. See the +// package-level docs on StorageVersionMigratorReconciler for the full flow. +// Returns a non-nil error to trigger exponential backoff; the CRD watch +// re-enqueues on any status change, so explicit requeue intervals are not used. +func (r *StorageVersionMigratorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx).WithValues("crd", req.Name) + + r.ensureInitialized() + + // Live-read the CRD. Informer cache may lag label or storedVersions updates. + crd := &apiextensionsv1.CustomResourceDefinition{} + if err := r.APIReader.Get(ctx, req.NamespacedName, crd); err != nil { + if apierrors.IsNotFound(err) { + return ctrl.Result{}, nil + } + return ctrl.Result{}, fmt.Errorf("get CRD %s: %w", req.Name, err) + } + + // Re-verify the filter against live state; watch predicate could have + // fired on stale informer data. + if !isManagedCRD(crd) { + return ctrl.Result{}, nil + } + + storageVersion, ok := findStorageVersion(crd) + if !ok { + // CRDs without a storage version are malformed from our perspective; + // log and skip rather than fail (the API server would have rejected + // a CRD without a storage version, so this is unreachable in practice). + logger.Info("CRD has no storage version, skipping", "spec.versions", crd.Spec.Versions) + return ctrl.Result{}, nil + } + + if !isMigrationNeeded(crd, storageVersion) { + return ctrl.Result{}, nil + } + + logger.Info("migrating storage versions", + "storageVersion", storageVersion, + "storedVersions", crd.Status.StoredVersions, + ) + + if err := r.restoreCRs(ctx, crd, storageVersion); err != nil { + // Concurrent-write conflicts are normal steady-state — the migration + // self-heals on the next reconcile. Don't surface them as Warning + // events. Real errors do still get a Warning. + if errors.Is(err, errMigrationRetriedDueToConflicts) { + logger.V(1).Info("storage version migration deferred due to concurrent writes; will retry", + "err", err) + } else { + r.Recorder.Eventf(crd, nil, corev1.EventTypeWarning, EventReasonMigrationFailed, + "RestoreCRs", "storage version migration failed: %v", err) + } + return ctrl.Result{}, fmt.Errorf("re-store CRs for %s: %w", crd.Name, err) + } + + if err := r.patchStoredVersions(ctx, crd, storageVersion); err != nil { + r.Recorder.Eventf(crd, nil, corev1.EventTypeWarning, EventReasonMigrationFailed, + "PatchStoredVersions", "storedVersions patch failed: %v", err) + return ctrl.Result{}, fmt.Errorf("patch storedVersions for %s: %w", crd.Name, err) + } + + r.Recorder.Eventf(crd, nil, corev1.EventTypeNormal, EventReasonMigrationSucceeded, + "Migrate", "storage version migrated to %s", storageVersion) + logger.Info("storage version migration complete", "storageVersion", storageVersion) + return ctrl.Result{}, nil +} + +// SetupWithManager wires the reconciler to watch CRDs using PartialObjectMetadata +// (no full-object cache), filtered on the opt-in label and the toolhive.stacklok.dev +// group. The filter is evaluated twice — once on informer events here, and again +// inside Reconcile after the live APIReader read — because label removals can +// briefly race the informer. +// +// It also registers a Runnable that periodically sweeps expired entries from +// the migration cache so deleted CRs (whose UIDs never recur in subsequent +// list pages and therefore never trigger lazy eviction in has()) don't grow +// the map without bound on long-running operators with high CR churn. +func (r *StorageVersionMigratorReconciler) SetupWithManager(mgr ctrl.Manager) error { + r.ensureInitialized() + + labelSelector, err := labels.Parse(AutoMigrateLabel + "=" + AutoMigrateValue) + if err != nil { + return fmt.Errorf("parse label selector: %w", err) + } + + if err := ctrl.NewControllerManagedBy(mgr). + Named("storageversionmigrator"). + For( + &apiextensionsv1.CustomResourceDefinition{}, + builder.OnlyMetadata, + builder.WithPredicates( + predicate.NewPredicateFuncs(func(obj client.Object) bool { + return labelSelector.Matches(labels.Set(obj.GetLabels())) && + isToolhiveCRDName(obj.GetName()) + }), + predicate.ResourceVersionChangedPredicate{}, + ), + ). + Complete(r); err != nil { + return err + } + + // Periodic cache GC. Registered after Complete so the controller is fully + // wired when the runnable starts. + return mgr.Add(manager.RunnableFunc(func(ctx context.Context) error { + ticker := time.NewTicker(r.CacheGCInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + r.cache.gc() + } + } + })) +} + +// ------------------------------------------------------------------ +// Private implementation below. +// ------------------------------------------------------------------ + +// ensureInitialized lazily fills in field defaults. Wrapped in sync.Once so +// concurrent callers (Setup vs. Reconcile vs. any future entrypoint) cannot +// race on the field writes. +func (r *StorageVersionMigratorReconciler) ensureInitialized() { + r.initOnce.Do(func() { + if r.PageSize == 0 { + r.PageSize = defaultListPageSize + } + if r.CacheGCInterval == 0 { + r.CacheGCInterval = defaultCacheGCInterval + } + if r.cache == nil { + r.cache = newMigrationCache(defaultMigrationCacheTTL) + } + }) +} + +// restoreCRs lists all CRs of the CRD's served kind (served version = storageVersion) +// and issues a main-resource Update on each one, forcing the apiserver to +// re-encode the object at the current storage version. +// +// Per-CR error handling: +// - IsNotFound: silently skipped (object deleted between list and update — +// it can't be at the old storage version anymore). +// - IsConflict: silently skipped at the per-CR level, but a function-level +// counter is incremented. After the loop, if any conflicts occurred and no +// other errors did, errMigrationRetriedDueToConflicts is returned so the +// caller leaves storedVersions untouched (the post-conflict state of the +// conflicting object is unverified). +// - All other errors are aggregated and returned. +func (r *StorageVersionMigratorReconciler) restoreCRs( + ctx context.Context, + crd *apiextensionsv1.CustomResourceDefinition, + storageVersion string, +) error { + logger := log.FromContext(ctx) + gvk := schema.GroupVersionKind{ + Group: crd.Spec.Group, + Version: storageVersion, + Kind: crd.Spec.Names.Kind, + } + + listGVK := gvk + listGVK.Kind = crd.Spec.Names.ListKind + + var errs []error + conflicts := 0 + var continueToken string + for { + list := &unstructured.UnstructuredList{} + list.SetGroupVersionKind(listGVK) + listOpts := []client.ListOption{client.Limit(r.PageSize)} + if continueToken != "" { + listOpts = append(listOpts, client.Continue(continueToken)) + } + if err := r.APIReader.List(ctx, list, listOpts...); err != nil { + return fmt.Errorf("list %s: %w", listGVK.String(), err) + } + + if err := meta.EachListItem(list, func(obj runtime.Object) error { + u, ok := obj.(*unstructured.Unstructured) + if !ok { + errs = append(errs, fmt.Errorf("unexpected list item type %T", obj)) + return nil + } + if r.cache.has(crd.Name, u.GetUID(), u.GetResourceVersion()) { + return nil + } + restored, err := r.restoreOne(ctx, gvk, u) + if err != nil { + switch { + case apierrors.IsNotFound(err): + logger.V(1).Info("skip CR — deleted", + "object", client.ObjectKeyFromObject(u), "err", err) + case apierrors.IsConflict(err): + conflicts++ + logger.V(1).Info("skip CR — concurrent write conflict", + "object", client.ObjectKeyFromObject(u), "err", err) + default: + errs = append(errs, fmt.Errorf("re-store %s/%s: %w", + u.GetNamespace(), u.GetName(), err)) + } + return nil + } + r.cache.add(crd.Name, restored.GetUID(), restored.GetResourceVersion()) + return nil + }); err != nil { + errs = append(errs, err) + } + + continueToken = list.GetContinue() + if continueToken == "" { + break + } + } + + if len(errs) == 0 && conflicts > 0 { + return errMigrationRetriedDueToConflicts + } + return kerrors.NewAggregate(errs) +} + +// restoreOne issues a plain Get + Update on the live CR. The apiserver +// re-encodes the request body at the current storage version and compares +// it to etcd's record; when the CR was originally stored at a different +// apiVersion the bytes differ, the write proceeds, and etcd is re-encoded +// at the current storage version. When the CR is already at the current +// storage version the bytes match and the apiserver harmlessly elides the +// write — there was nothing to migrate. The Update goes through the main +// resource, so validating/mutating admission webhooks on the kind see this +// request as part of normal admission flow; only requests that actually +// persist produce downstream state changes. Returns the live object after +// the update so the caller can record its post-update resourceVersion in +// the cache. +func (r *StorageVersionMigratorReconciler) restoreOne( + ctx context.Context, + gvk schema.GroupVersionKind, + original *unstructured.Unstructured, +) (*unstructured.Unstructured, error) { + live := &unstructured.Unstructured{} + live.SetGroupVersionKind(gvk) + if err := r.APIReader.Get(ctx, client.ObjectKeyFromObject(original), live); err != nil { + // IsNotFound is propagated to the caller, which handles it. + return nil, err + } + if err := r.Update(ctx, live); err != nil { + return nil, err + } + return live, nil +} + +// patchStoredVersions overwrites CRD.status.storedVersions to exactly +// [storageVersion], using an optimistic lock on the CRD's resourceVersion so +// a concurrent API-server write rejects the patch and triggers a requeue. +// +// Does NOT use controllerutil.MutateAndPatchStatus (the operator-wide helper +// mandated by .claude/rules/operator.md): the target CRD is an +// apiextensions.k8s.io type co-owned by kube-apiserver — the apiserver +// appends to storedVersions on first write at each version — so the +// optimistic lock is load-bearing here. The helper's plain MergeFrom would +// race with the apiserver's append. +func (r *StorageVersionMigratorReconciler) patchStoredVersions( + ctx context.Context, + crd *apiextensionsv1.CustomResourceDefinition, + storageVersion string, +) error { + // Mutate a copy, not the caller's CRD — per .claude/rules/go-style.md + // "Copy Before Mutating Caller Input". The original serves as the + // merge-patch base; the copy carries the desired state. + updated := crd.DeepCopy() + updated.Status.StoredVersions = []string{storageVersion} + return r.Client.Status().Patch(ctx, updated, + client.MergeFromWithOptions(crd, client.MergeFromWithOptimisticLock{})) +} + +// isManagedCRD returns true if a CRD is opted in to migration: the group matches +// toolhive.stacklok.dev and the opt-in label is set to the expected value. +func isManagedCRD(crd *apiextensionsv1.CustomResourceDefinition) bool { + if crd.Spec.Group != ToolhiveGroup { + return false + } + return crd.GetLabels()[AutoMigrateLabel] == AutoMigrateValue +} + +// isToolhiveCRDName checks whether a CRD name is of the form .toolhive.stacklok.dev, +// which is sufficient to filter at watch time. Reconcile re-verifies via the live CRD. +func isToolhiveCRDName(name string) bool { + return strings.HasSuffix(name, "."+ToolhiveGroup) +} + +// findStorageVersion returns the single version marked storage=true in the CRD spec. +func findStorageVersion(crd *apiextensionsv1.CustomResourceDefinition) (string, bool) { + for _, v := range crd.Spec.Versions { + if v.Storage { + return v.Name, true + } + } + return "", false +} + +// isMigrationNeeded returns true iff status.storedVersions is anything other +// than exactly [storageVersion]. The set of served versions does not affect +// this check — under spec.conversion.strategy=None with identical schemas, +// normal writers cannot reintroduce stale versions to storedVersions, so a +// defensive re-scan based on servedCount has no scenario to defend against. +func isMigrationNeeded( + crd *apiextensionsv1.CustomResourceDefinition, + storageVersion string, +) bool { + stored := crd.Status.StoredVersions + return len(stored) != 1 || stored[0] != storageVersion +} + +// ------------------------------------------------------------------ +// migrationCache: short-lived de-duplication of re-store writes. +// ------------------------------------------------------------------ + +// migrationCache records successfully-migrated (UID, resourceVersion) pairs +// so subsequent reconciles within the TTL window skip already-fresh objects. +// It is a correctness optimization only — a cache miss simply issues a +// redundant (but harmless) Update. +// +// Eviction: lazy on lookup in has(), plus a periodic sweep via gc() driven +// from a manager.Runnable registered in SetupWithManager. The periodic sweep +// is required because lookups never recur for deleted CRs, so without it +// their entries would persist forever. +type migrationCache struct { + mu sync.Mutex + entries map[string]cacheEntry + ttl time.Duration + now func() time.Time +} + +type cacheEntry struct { + resourceVersion string + expiresAt time.Time +} + +func newMigrationCache(ttl time.Duration) *migrationCache { + return &migrationCache{ + entries: make(map[string]cacheEntry), + ttl: ttl, + now: time.Now, + } +} + +func (c *migrationCache) has(crdName string, uid apitypes.UID, resourceVersion string) bool { + key := c.key(crdName, uid) + c.mu.Lock() + defer c.mu.Unlock() + entry, ok := c.entries[key] + if !ok { + return false + } + if c.now().After(entry.expiresAt) { + delete(c.entries, key) + return false + } + return entry.resourceVersion == resourceVersion +} + +func (c *migrationCache) add(crdName string, uid apitypes.UID, resourceVersion string) { + key := c.key(crdName, uid) + c.mu.Lock() + defer c.mu.Unlock() + c.entries[key] = cacheEntry{ + resourceVersion: resourceVersion, + expiresAt: c.now().Add(c.ttl), + } +} + +// gc evicts every expired entry from the cache. Called from a periodic +// manager.Runnable so entries for deleted CRs (whose UIDs never recur in +// subsequent list pages) don't accumulate without bound. +func (c *migrationCache) gc() { + c.mu.Lock() + defer c.mu.Unlock() + now := c.now() + for k, e := range c.entries { + if now.After(e.expiresAt) { + delete(c.entries, k) + } + } +} + +func (*migrationCache) key(crdName string, uid apitypes.UID) string { + return crdName + "|" + string(uid) +} diff --git a/cmd/thv-operator/controllers/storageversionmigrator_controller_test.go b/cmd/thv-operator/controllers/storageversionmigrator_controller_test.go new file mode 100644 index 0000000000..1ca3fa632c --- /dev/null +++ b/cmd/thv-operator/controllers/storageversionmigrator_controller_test.go @@ -0,0 +1,1029 @@ +// SPDX-FileCopyrightText: Copyright 2025 Stacklok, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package controllers + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + apitypes "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" +) + +// migrationCache + +// fakeClock is an injectable clock for migrationCache TTL tests. +type fakeClock struct { + mu sync.Mutex + now time.Time +} + +func newFakeClock(t time.Time) *fakeClock { return &fakeClock{now: t} } + +func (c *fakeClock) Now() time.Time { + c.mu.Lock() + defer c.mu.Unlock() + return c.now +} + +func (c *fakeClock) Advance(d time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + c.now = c.now.Add(d) +} + +// newCache builds a migrationCache wired to a fake clock so tests control time. +// All current callers use a 1-hour TTL. +func newCache(t *testing.T) (*migrationCache, *fakeClock) { + t.Helper() + clock := newFakeClock(time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)) + c := &migrationCache{ + entries: make(map[string]cacheEntry), + ttl: time.Hour, + now: clock.Now, + } + return c, clock +} + +func TestMigrationCache_HasReturnsFalseOnEmpty(t *testing.T) { + t.Parallel() + c, _ := newCache(t) + assert.False(t, c.has("crd-a", "uid-1", "rv-100")) +} + +func TestMigrationCache_AddThenHasReturnsTrue(t *testing.T) { + t.Parallel() + c, _ := newCache(t) + c.add("crd-a", "uid-1", "rv-100") + assert.True(t, c.has("crd-a", "uid-1", "rv-100")) +} + +func TestMigrationCache_HasReturnsFalseOnRVMismatch(t *testing.T) { + t.Parallel() + c, _ := newCache(t) + c.add("crd-a", "uid-1", "rv-100") + + // Same CRD and UID but a different RV — caller's CR was updated by some + // other writer after our last cache add. Must be treated as a miss so the + // CR gets re-Updated. + assert.False(t, c.has("crd-a", "uid-1", "rv-200")) + // Entry was a fresh-RV miss, not an expiry, so it must remain in the map. + assert.Len(t, c.entries, 1) +} + +func TestMigrationCache_TTLExpiryLazilyEvictsInHas(t *testing.T) { + t.Parallel() + c, clock := newCache(t) + c.add("crd-a", "uid-1", "rv-100") + require.True(t, c.has("crd-a", "uid-1", "rv-100")) + + clock.Advance(2 * time.Hour) + + assert.False(t, c.has("crd-a", "uid-1", "rv-100"), + "has must return false once the entry's TTL has elapsed") + assert.Empty(t, c.entries, + "expired entries must be removed from the map on lookup, not left to leak") +} + +func TestMigrationCache_AddOverwritesExistingEntry(t *testing.T) { + t.Parallel() + c, clock := newCache(t) + c.add("crd-a", "uid-1", "rv-100") + + // Advance halfway through the TTL, then re-add with a new RV. + clock.Advance(30 * time.Minute) + c.add("crd-a", "uid-1", "rv-200") + + // The new RV should be the cache's record (RV-100 must not match anymore). + assert.False(t, c.has("crd-a", "uid-1", "rv-100")) + assert.True(t, c.has("crd-a", "uid-1", "rv-200")) + + // The expiry should have been refreshed by the second add — going another + // 40 minutes forward (total 70m) must NOT expire the entry, because the + // re-add reset the clock to the 0-of-60m point. + clock.Advance(40 * time.Minute) + assert.True(t, c.has("crd-a", "uid-1", "rv-200"), + "add must refresh expiresAt, otherwise an in-flight CR would be re-walked at exactly the wrong moment") +} + +func TestMigrationCache_GCEvictsOnlyExpiredEntries(t *testing.T) { + t.Parallel() + c, clock := newCache(t) + + c.add("crd-a", "uid-1", "rv-100") + clock.Advance(30 * time.Minute) + c.add("crd-b", "uid-2", "rv-200") + + // Advance so the first entry is expired (90m total) but the second is not + // (60m since its own add, which is exactly TTL — strictly After is false). + clock.Advance(60 * time.Minute) + + c.gc() + + // uid-1 expired and must be evicted. + assert.False(t, c.has("crd-a", "uid-1", "rv-100")) + // uid-2 still inside its TTL window and must survive. + assert.True(t, c.has("crd-b", "uid-2", "rv-200")) + assert.Len(t, c.entries, 1) +} + +// TestMigrationCache_ConcurrentAccess exercises the mutex contract — running +// has/add/gc from many goroutines in parallel under -race must not produce a +// data race or panic. The assertions only verify the loop completed; the value +// of the test is the race detector. +func TestMigrationCache_ConcurrentAccess(t *testing.T) { + t.Parallel() + c, _ := newCache(t) + + const goroutines = 16 + const iterations = 200 + + var wg sync.WaitGroup + wg.Add(goroutines * 3) + + for i := 0; i < goroutines; i++ { + go func(g int) { + defer wg.Done() + for j := 0; j < iterations; j++ { + c.add("crd-a", apitypes.UID(rune('a'+g)), "rv-1") + } + }(i) + go func(g int) { + defer wg.Done() + for j := 0; j < iterations; j++ { + _ = c.has("crd-a", apitypes.UID(rune('a'+g)), "rv-1") + } + }(i) + go func() { + defer wg.Done() + for j := 0; j < iterations; j++ { + c.gc() + } + }() + } + + // Bounded wait so a deadlock fails fast instead of hanging the suite. + done := make(chan struct{}) + go func() { wg.Wait(); close(done) }() + select { + case <-done: + case <-time.After(10 * time.Second): + t.Fatal("timeout waiting for cache goroutines to finish — possible deadlock in cache mutex") + } +} + +// TestMigrationCache_KeyIsolation exercises both key-isolation axes: two +// distinct CRDs sharing a UID must not collide, and two distinct UIDs under +// the same CRD must not collide. UID re-use across CRDs is impossible in +// practice (apiserver UIDs are globally unique) but the cache must defend +// against it anyway. +func TestMigrationCache_KeyIsolation(t *testing.T) { + t.Parallel() + c, _ := newCache(t) + + // Cross-CRD, same UID. + c.add("crd-a", "uid-shared", "rv-100") + c.add("crd-b", "uid-shared", "rv-200") + assert.True(t, c.has("crd-a", "uid-shared", "rv-100")) + assert.True(t, c.has("crd-b", "uid-shared", "rv-200")) + assert.False(t, c.has("crd-a", "uid-shared", "rv-200")) + assert.False(t, c.has("crd-b", "uid-shared", "rv-100")) + + // Same CRD, cross UID. + c.add("crd-c", "uid-1", "rv-300") + c.add("crd-c", "uid-2", "rv-400") + assert.True(t, c.has("crd-c", "uid-1", "rv-300")) + assert.True(t, c.has("crd-c", "uid-2", "rv-400")) + assert.False(t, c.has("crd-c", "uid-1", "rv-400")) + assert.False(t, c.has("crd-c", "uid-2", "rv-300")) +} + +// ensureInitialized + +func TestEnsureInitialized_AppliesDefaultsOnZeroValues(t *testing.T) { + t.Parallel() + r := &StorageVersionMigratorReconciler{} + r.ensureInitialized() + assert.Equal(t, int64(defaultListPageSize), r.PageSize) + assert.Equal(t, defaultCacheGCInterval, r.CacheGCInterval) + require.NotNil(t, r.cache) +} + +func TestEnsureInitialized_PreservesExplicitValuesAndIsIdempotent(t *testing.T) { + t.Parallel() + customCache := newMigrationCache(time.Minute) + r := &StorageVersionMigratorReconciler{ + PageSize: 7, + CacheGCInterval: 42 * time.Second, + cache: customCache, + } + r.ensureInitialized() + assert.Equal(t, int64(7), r.PageSize, "non-zero PageSize must not be overwritten by defaults") + assert.Equal(t, 42*time.Second, r.CacheGCInterval, "non-zero CacheGCInterval must not be overwritten") + assert.Same(t, customCache, r.cache, "an existing cache must not be replaced") + + // Idempotent: a second call must not change any field. + r.ensureInitialized() + assert.Equal(t, int64(7), r.PageSize) + assert.Same(t, customCache, r.cache) +} + +// Reconcile early-return paths. Per-CR work paths are covered by the envtest +// suite, which can simulate real apiserver semantics the fake client cannot +// (storage-encoder elision, optimistic-lock 409). + +// reconcileCRD builds a CRD for Reconcile early-return tests. +func reconcileCRD(name, group string, labels map[string]string, versions []apiextensionsv1.CustomResourceDefinitionVersion, stored []string) *apiextensionsv1.CustomResourceDefinition { + return &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: name, Labels: labels, ResourceVersion: "1"}, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Group: group, + Names: apiextensionsv1.CustomResourceDefinitionNames{ + Kind: "MCPServer", ListKind: "MCPServerList", Plural: "mcpservers", Singular: "mcpserver", + }, + Versions: versions, + }, + Status: apiextensionsv1.CustomResourceDefinitionStatus{StoredVersions: stored}, + } +} + +func TestReconcile_EarlyReturns(t *testing.T) { + t.Parallel() + + const mcpName = "mcpservers.toolhive.stacklok.dev" + optIn := map[string]string{AutoMigrateLabel: AutoMigrateValue} + servedV1Beta1 := []apiextensionsv1.CustomResourceDefinitionVersion{{Name: "v1beta1", Storage: true, Served: true}} + + tests := []struct { + name string + crd *apiextensionsv1.CustomResourceDefinition // nil ⇒ CRD absent, expect IsNotFound branch + crdName string + }{ + { + name: "CRD not found returns nil without error", + crd: nil, + crdName: "missing.toolhive.stacklok.dev", + }, + { + name: "foreign group is skipped", + crd: reconcileCRD("widgets.example.com", "example.com", optIn, + []apiextensionsv1.CustomResourceDefinitionVersion{{Name: "v1", Storage: true, Served: true}}, + []string{"v1alpha1", "v1"}), + crdName: "widgets.example.com", + }, + { + name: "missing opt-in label is skipped", + crd: reconcileCRD(mcpName, ToolhiveGroup, nil, servedV1Beta1, []string{"v1alpha1", "v1beta1"}), + crdName: mcpName, + }, + { + // Pathological CRD — every version has Storage: false. The apiserver + // would normally reject this at CRD-create time, so envtest can't + // reach this branch. Reconcile must return nil rather than panic. + name: "no storage version is skipped", + crd: reconcileCRD(mcpName, ToolhiveGroup, optIn, + []apiextensionsv1.CustomResourceDefinitionVersion{ + {Name: "v1alpha1", Storage: false, Served: true}, + {Name: "v1beta1", Storage: false, Served: true}, + }, + []string{"v1alpha1", "v1beta1"}), + crdName: mcpName, + }, + { + name: "already-clean storedVersions returns early", + crd: reconcileCRD(mcpName, ToolhiveGroup, optIn, servedV1Beta1, []string{"v1beta1"}), + crdName: mcpName, + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + var initial []client.Object + if tc.crd != nil { + initial = []client.Object{tc.crd} + } + r := buildFakeReconciler(t, initial, nil) + res, err := r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: apitypes.NamespacedName{Name: tc.crdName}, + }) + require.NoError(t, err, "early-return branch must not surface as a reconcile error") + assert.Equal(t, ctrl.Result{}, res) + }) + } +} + +// isManagedCRD + +func TestIsManagedCRD(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + group string + labels map[string]string + want bool + }{ + {"toolhive group with opt-in label is managed", ToolhiveGroup, map[string]string{AutoMigrateLabel: AutoMigrateValue}, true}, + {"toolhive group with wrong label value is not managed", ToolhiveGroup, map[string]string{AutoMigrateLabel: "false"}, false}, + {"toolhive group with unrelated label is not managed", ToolhiveGroup, map[string]string{"unrelated.example.com/key": "true"}, false}, + {"toolhive group with nil labels map is not managed", ToolhiveGroup, nil, false}, + {"non-toolhive group with opt-in label is not managed", "example.com", map[string]string{AutoMigrateLabel: AutoMigrateValue}, false}, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + crd := &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{Labels: tc.labels}, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{Group: tc.group}, + } + assert.Equal(t, tc.want, isManagedCRD(crd)) + }) + } +} + +// isToolhiveCRDName + +func TestIsToolhiveCRDName(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + crdName string + expected bool + }{ + {"well-formed toolhive CRD name matches", "mcpservers.toolhive.stacklok.dev", true}, + {"empty string does not match", "", false}, + {"group string with no plural prefix does not match", "toolhive.stacklok.dev", false}, + {"group as bare suffix without leading dot does not match", "footoolhive.stacklok.dev", false}, + {"foreign group does not match", "widgets.example.com", false}, + {"toolhive suffix in the middle of the name does not match", "foo.toolhive.stacklok.dev.example.com", false}, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + assert.Equal(t, tc.expected, isToolhiveCRDName(tc.crdName)) + }) + } +} + +// findStorageVersion + +func TestFindStorageVersion(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + versions []apiextensionsv1.CustomResourceDefinitionVersion + wantName string + wantOK bool + }{ + { + "single storage version returns its name", + []apiextensionsv1.CustomResourceDefinitionVersion{{Name: "v1", Storage: true}}, + "v1", true, + }, + { + "multiple versions returns the storage entry", + []apiextensionsv1.CustomResourceDefinitionVersion{ + {Name: "v1alpha1", Storage: false}, + {Name: "v1beta1", Storage: true}, + {Name: "v1beta2", Storage: false}, + }, + "v1beta1", true, + }, + { + "no storage version returns empty and false", + []apiextensionsv1.CustomResourceDefinitionVersion{ + {Name: "v1alpha1", Storage: false}, + {Name: "v1beta1", Storage: false}, + }, + "", false, + }, + {"empty versions list returns empty and false", nil, "", false}, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + crd := &apiextensionsv1.CustomResourceDefinition{ + Spec: apiextensionsv1.CustomResourceDefinitionSpec{Versions: tc.versions}, + } + got, ok := findStorageVersion(crd) + assert.Equal(t, tc.wantName, got) + assert.Equal(t, tc.wantOK, ok) + }) + } +} + +// isMigrationNeeded + +func TestIsMigrationNeeded(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + storedVersions []string + storageVersion string + want bool + }{ + {"single matching entry is not needed", []string{"v1beta1"}, "v1beta1", false}, + {"single mismatching entry is needed", []string{"v1alpha1"}, "v1beta1", true}, + {"two entries including target is needed", []string{"v1alpha1", "v1beta1"}, "v1beta1", true}, + {"empty list is needed", nil, "v1beta1", true}, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + crd := &apiextensionsv1.CustomResourceDefinition{ + Status: apiextensionsv1.CustomResourceDefinitionStatus{StoredVersions: tc.storedVersions}, + } + assert.Equal(t, tc.want, isMigrationNeeded(crd, tc.storageVersion)) + }) + } +} + +// Shared helpers for restoreCRs / restoreOne / patchStoredVersions tests. + +const ( + testCRGroup = ToolhiveGroup + testCRVersion = "v1beta1" + testCRKind = "TestKind" + testCRListKind = "TestKindList" + testCRPlural = "testkinds" + testCRSingular = "testkind" + testCRDName = testCRPlural + "." + testCRGroup +) + +// testCRGVK is the singular GVK for the synthetic CR kind. +func testCRGVK() schema.GroupVersionKind { + return schema.GroupVersionKind{Group: testCRGroup, Version: testCRVersion, Kind: testCRKind} +} + +// schemeForCRD builds a runtime scheme with apiextensions/v1 plus the +// synthetic CR's singular and list kinds registered as unstructured (the +// fake client requires the kind to be in the scheme on List/Get/Update). +func schemeForCRD(t *testing.T) *runtime.Scheme { + t.Helper() + scheme := runtime.NewScheme() + require.NoError(t, apiextensionsv1.AddToScheme(scheme)) + gvk := testCRGVK() + listGVK := gvk + listGVK.Kind = testCRListKind + scheme.AddKnownTypeWithName(gvk, &unstructured.Unstructured{}) + scheme.AddKnownTypeWithName(listGVK, &unstructured.UnstructuredList{}) + return scheme +} + +// makeTestCRD builds a CRD wired to the synthetic CR kind with the supplied +// status.storedVersions and the opt-in label set. +func makeTestCRD(storedVersions []string) *apiextensionsv1.CustomResourceDefinition { + return &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: testCRDName, + Labels: map[string]string{AutoMigrateLabel: AutoMigrateValue}, + ResourceVersion: "1", + }, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Group: testCRGroup, + Names: apiextensionsv1.CustomResourceDefinitionNames{ + Kind: testCRKind, + ListKind: testCRListKind, + Plural: testCRPlural, + Singular: testCRSingular, + }, + Scope: apiextensionsv1.NamespaceScoped, + Versions: []apiextensionsv1.CustomResourceDefinitionVersion{ + {Name: testCRVersion, Storage: true, Served: true}, + }, + }, + Status: apiextensionsv1.CustomResourceDefinitionStatus{ + StoredVersions: storedVersions, + }, + } +} + +// makeTestCR returns an unstructured CR with the synthetic GVK in the +// "default" namespace. UID is derived from name so tests can assert cache +// state by name. +func makeTestCR(name string) *unstructured.Unstructured { + u := &unstructured.Unstructured{} + u.SetGroupVersionKind(testCRGVK()) + u.SetNamespace("default") + u.SetName(name) + u.SetUID(apitypes.UID(name + "-uid")) + u.SetResourceVersion("1") + return u +} + +// buildFakeReconciler constructs a fake-backed reconciler with the supplied +// initial objects (CRD + zero-or-more CRs) and optional interceptor funcs. +// The status subresource is registered for the CRD so Status().Patch works. +func buildFakeReconciler( + t *testing.T, + initialObjects []client.Object, + funcs *interceptor.Funcs, +) *StorageVersionMigratorReconciler { + t.Helper() + scheme := schemeForCRD(t) + builder := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(initialObjects...). + WithStatusSubresource(&apiextensionsv1.CustomResourceDefinition{}) + if funcs != nil { + builder = builder.WithInterceptorFuncs(*funcs) + } + cli := builder.Build() + r := &StorageVersionMigratorReconciler{ + Client: cli, + APIReader: cli, + Scheme: scheme, + Recorder: noopEventRecorder{}, + } + r.ensureInitialized() + return r +} + +// restoreOne + +func TestRestoreOne_HappyPath(t *testing.T) { + t.Parallel() + cr := makeTestCR("obj-1") + r := buildFakeReconciler(t, []client.Object{cr}, nil) + + restored, err := r.restoreOne(context.Background(), testCRGVK(), cr) + + require.NoError(t, err) + require.NotNil(t, restored) + assert.Equal(t, "obj-1", restored.GetName()) + assert.Equal(t, "default", restored.GetNamespace()) + // fake client bumps resourceVersion on Update; assert it changed so the + // caller has a usable post-update RV to record in the cache. + assert.NotEqual(t, "1", restored.GetResourceVersion(), + "fake client must bump RV on Update; if it doesn't, restoreOne's cache add will record a stale RV") +} + +func TestRestoreOne_PropagatesErrors(t *testing.T) { + t.Parallel() + + gr := schema.GroupResource{Group: testCRGroup, Resource: testCRPlural} + + // Per-row: when crPresent is false, the CR is absent from the fake store + // and Get returns NotFound (no interceptor needed). When crPresent is true, + // updateErr is returned from an Update interceptor. + tests := []struct { + name string + crPresent bool + updateErr error + check func(t *testing.T, err error) + }{ + { + name: "Get NotFound propagates", + crPresent: false, + check: func(t *testing.T, err error) { + t.Helper() + require.Error(t, err) + assert.True(t, apierrors.IsNotFound(err), + "Get NotFound must propagate verbatim so restoreCRs can classify it") + }, + }, + { + name: "Update Conflict propagates", + crPresent: true, + updateErr: apierrors.NewConflict(gr, "obj-1", errors.New("injected conflict")), + check: func(t *testing.T, err error) { + t.Helper() + require.Error(t, err) + assert.True(t, apierrors.IsConflict(err), + "Update Conflict must propagate verbatim so restoreCRs can classify it") + }, + }, + { + name: "Update generic error propagates", + crPresent: true, + updateErr: errors.New("injected generic failure"), + check: func(t *testing.T, err error) { + t.Helper() + require.Error(t, err) + assert.False(t, apierrors.IsConflict(err)) + assert.False(t, apierrors.IsNotFound(err)) + assert.Contains(t, err.Error(), "injected generic failure") + }, + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + target := makeTestCR("obj-1") + var initial []client.Object + if tc.crPresent { + initial = []client.Object{target} + } + var funcs *interceptor.Funcs + if tc.updateErr != nil { + updateErr := tc.updateErr + funcs = &interceptor.Funcs{ + Update: func(_ context.Context, _ client.WithWatch, _ client.Object, _ ...client.UpdateOption) error { + return updateErr + }, + } + } + r := buildFakeReconciler(t, initial, funcs) + _, err := r.restoreOne(context.Background(), testCRGVK(), target) + tc.check(t, err) + }) + } +} + +// restoreCRs + +func TestRestoreCRs_HappyPathAllUpdatedAndCachePopulated(t *testing.T) { + t.Parallel() + crd := makeTestCRD([]string{"v1alpha1", testCRVersion}) + crs := []client.Object{ + makeTestCR("obj-a"), + makeTestCR("obj-b"), + makeTestCR("obj-c"), + } + objs := append([]client.Object{crd}, crs...) + r := buildFakeReconciler(t, objs, nil) + + err := r.restoreCRs(context.Background(), crd, testCRVersion) + require.NoError(t, err) + + // Every CR's post-Update RV should now be in the cache. + for _, obj := range crs { + u := obj.(*unstructured.Unstructured) + live := &unstructured.Unstructured{} + live.SetGroupVersionKind(testCRGVK()) + require.NoError(t, r.Get(context.Background(), client.ObjectKeyFromObject(u), live)) + assert.True(t, r.cache.has(crd.Name, live.GetUID(), live.GetResourceVersion()), + "successful restoreOne must populate the cache with the post-Update RV") + } +} + +func TestRestoreCRs_EmptyListIsNoop(t *testing.T) { + t.Parallel() + crd := makeTestCRD([]string{"v1alpha1", testCRVersion}) + r := buildFakeReconciler(t, []client.Object{crd}, nil) + + err := r.restoreCRs(context.Background(), crd, testCRVersion) + require.NoError(t, err) + assert.Empty(t, r.cache.entries, "no CRs ⇒ no cache adds") +} + +// TestRestoreCRs_ErrorClassification covers the per-CR error classification +// logic: IsNotFound is silently skipped, IsConflict is counted and surfaces +// as the conflict sentinel iff no other errors occurred, generic errors are +// aggregated, and mixed conflict + generic errors surface the aggregate +// rather than the sentinel. +func TestRestoreCRs_ErrorClassification(t *testing.T) { + t.Parallel() + + gr := schema.GroupResource{Group: testCRGroup, Resource: testCRPlural} + + // Per-row: getErrs/updateErrs map a CR name → the error its Get/Update + // interceptor must return (other names fall through to the fake client). + // check is run against the (err, reconciler) pair after restoreCRs. + tests := []struct { + name string + crNames []string + getErrs map[string]error + updateErrs map[string]error + check func(t *testing.T, err error, r *StorageVersionMigratorReconciler) + }{ + { + name: "per-CR Get NotFound is silently skipped", + crNames: []string{"obj-a", "obj-b"}, + getErrs: map[string]error{"obj-a": apierrors.NewNotFound(gr, "obj-a")}, + check: func(t *testing.T, err error, r *StorageVersionMigratorReconciler) { + t.Helper() + require.NoError(t, err, "IsNotFound on a per-CR Get must not bubble up") + // Cache must contain only obj-b — NotFound must skip the cache add. + assert.Len(t, r.cache.entries, 1, "only the surviving CR may be cached") + }, + }, + { + name: "Conflict counted and sentinel returned", + crNames: []string{"obj-a", "obj-b"}, + updateErrs: map[string]error{"obj-a": apierrors.NewConflict(gr, "obj-a", errors.New("injected"))}, + check: func(t *testing.T, err error, _ *StorageVersionMigratorReconciler) { + t.Helper() + require.Error(t, err, "a swallowed Conflict must surface as a function-level error") + assert.ErrorIs(t, err, errMigrationRetriedDueToConflicts, + "the sentinel must be returned so the caller knows storedVersions is unsafe to trim") + }, + }, + { + name: "generic error is aggregated", + crNames: []string{"obj-a"}, + updateErrs: map[string]error{"obj-a": errors.New("injected generic update failure")}, + check: func(t *testing.T, err error, _ *StorageVersionMigratorReconciler) { + t.Helper() + require.Error(t, err) + assert.NotErrorIs(t, err, errMigrationRetriedDueToConflicts, + "a generic error must NOT be misclassified as the conflict sentinel") + assert.Contains(t, err.Error(), "injected generic update failure") + assert.Contains(t, err.Error(), "obj-a", "aggregated error should name the failing CR") + }, + }, + { + name: "conflicts plus generic errors returns aggregate, not sentinel", + crNames: []string{"obj-conflict", "obj-failure"}, + updateErrs: map[string]error{ + "obj-conflict": apierrors.NewConflict(gr, "obj-conflict", errors.New("injected conflict")), + "obj-failure": errors.New("injected generic failure"), + }, + check: func(t *testing.T, err error, _ *StorageVersionMigratorReconciler) { + t.Helper() + require.Error(t, err) + // When there is at least one non-conflict error the aggregate + // wins — the sentinel is only meaningful in the conflicts-only + // case. + assert.NotErrorIs(t, err, errMigrationRetriedDueToConflicts, + "mixed conflicts+errors must return the aggregate, not the conflict sentinel") + assert.Contains(t, err.Error(), "injected generic failure") + }, + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + crd := makeTestCRD([]string{"v1alpha1", testCRVersion}) + objs := []client.Object{crd} + for _, n := range tc.crNames { + objs = append(objs, makeTestCR(n)) + } + funcs := &interceptor.Funcs{ + Get: func(ctx context.Context, c client.WithWatch, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + if e, ok := tc.getErrs[key.Name]; ok { + return e + } + return c.Get(ctx, key, obj, opts...) + }, + Update: func(ctx context.Context, c client.WithWatch, obj client.Object, opts ...client.UpdateOption) error { + if e, ok := tc.updateErrs[obj.GetName()]; ok { + return e + } + return c.Update(ctx, obj, opts...) + }, + } + r := buildFakeReconciler(t, objs, funcs) + err := r.restoreCRs(context.Background(), crd, testCRVersion) + tc.check(t, err, r) + }) + } +} + +func TestRestoreCRs_FirstListFailsReturnsImmediately(t *testing.T) { + t.Parallel() + crd := makeTestCRD([]string{"v1alpha1", testCRVersion}) + funcs := &interceptor.Funcs{ + List: func(_ context.Context, _ client.WithWatch, _ client.ObjectList, _ ...client.ListOption) error { + return errors.New("injected list failure") + }, + } + r := buildFakeReconciler(t, []client.Object{crd}, funcs) + + err := r.restoreCRs(context.Background(), crd, testCRVersion) + require.Error(t, err) + // schema.GroupVersionKind.String() formats as "group/version, Kind=kind". + assert.Contains(t, err.Error(), "Kind="+testCRListKind, + "list failures must be wrapped with the list GVK for diagnosability") + assert.Contains(t, err.Error(), "injected list failure") +} + +func TestRestoreCRs_CacheSkipsAlreadyMigratedCRs(t *testing.T) { + t.Parallel() + crd := makeTestCRD([]string{"v1alpha1", testCRVersion}) + cr := makeTestCR("obj-a") + objs := []client.Object{crd, cr} + + // Pre-populate the cache so restoreCRs finds the CR's (UID, RV) on + // lookup and skips the Update altogether. + var updateCount int32 + funcs := &interceptor.Funcs{ + Update: func(ctx context.Context, c client.WithWatch, obj client.Object, opts ...client.UpdateOption) error { + atomic.AddInt32(&updateCount, 1) + return c.Update(ctx, obj, opts...) + }, + } + r := buildFakeReconciler(t, objs, funcs) + + // The CR's RV in the fake store is "1" after WithObjects. + r.cache.add(crd.Name, cr.GetUID(), "1") + + err := r.restoreCRs(context.Background(), crd, testCRVersion) + require.NoError(t, err) + assert.Equal(t, int32(0), atomic.LoadInt32(&updateCount), + "a fresh cache hit must skip the Update entirely") +} + +func TestRestoreCRs_PaginationWiresContinueTokenThroughOptions(t *testing.T) { + t.Parallel() + crd := makeTestCRD([]string{"v1alpha1", testCRVersion}) + r := buildFakeReconciler(t, []client.Object{crd}, nil) + r.PageSize = 3 + + // Replace APIReader with one that synthesizes pagination so the + // controller's continue-token plumbing actually has something to thread. + listCalls := []listCallRecord{} + r.APIReader = &paginatingFakeReader{ + Reader: r.Client, + records: &listCalls, + } + + err := r.restoreCRs(context.Background(), crd, testCRVersion) + require.NoError(t, err) + + require.Len(t, listCalls, 3, "PageSize=3, total=7 should yield exactly three list calls (3+3+1)") + // First call: Limit=3, no Continue token. + assert.Equal(t, int64(3), listCalls[0].limit) + assert.Empty(t, listCalls[0].continueToken) + // Subsequent calls: Limit=3, Continue token from prior page. + assert.Equal(t, int64(3), listCalls[1].limit) + assert.Equal(t, "page-2", listCalls[1].continueToken) + assert.Equal(t, int64(3), listCalls[2].limit) + assert.Equal(t, "page-3", listCalls[2].continueToken) +} + +// patchStoredVersions + +// TestPatchStoredVersions_SuccessAssertsAllProperties exercises the success +// path and verifies three orthogonal properties in one run: +// - storedVersions is trimmed to exactly [storageVersion] in the fake store, +// - the /status subresource endpoint is hit (and the main-resource Patch is NOT), +// - the patch body carries resourceVersion as a precondition (the marker that +// MergeFromWithOptimisticLock is in effect; plain MergeFrom omits it). +func TestPatchStoredVersions_SuccessAssertsAllProperties(t *testing.T) { + t.Parallel() + crd := makeTestCRD([]string{"v1alpha1", testCRVersion}) + + var mainResourcePatchCalls int32 + var statusSubresourcePatchCalls int32 + var capturedPatchBody []byte + + funcs := &interceptor.Funcs{ + Patch: func(ctx context.Context, c client.WithWatch, obj client.Object, patch client.Patch, opts ...client.PatchOption) error { + atomic.AddInt32(&mainResourcePatchCalls, 1) + return c.Patch(ctx, obj, patch, opts...) + }, + SubResourcePatch: func(ctx context.Context, c client.Client, subResourceName string, obj client.Object, patch client.Patch, opts ...client.SubResourcePatchOption) error { + if subResourceName == "status" { + atomic.AddInt32(&statusSubresourcePatchCalls, 1) + } + data, err := patch.Data(obj) + if err != nil { + return err + } + capturedPatchBody = data + return c.Status().Patch(ctx, obj, patch, opts...) + }, + } + r := buildFakeReconciler(t, []client.Object{crd}, funcs) + + require.NoError(t, r.patchStoredVersions(context.Background(), crd, testCRVersion)) + + // Subresource routing. + assert.Equal(t, int32(0), atomic.LoadInt32(&mainResourcePatchCalls), + "patchStoredVersions must NOT hit the main-resource Patch endpoint") + assert.Equal(t, int32(1), atomic.LoadInt32(&statusSubresourcePatchCalls), + "patchStoredVersions must hit the /status subresource exactly once") + + // Patch body carries the optimistic-lock precondition and the trimmed list. + require.NotEmpty(t, capturedPatchBody, "interceptor never captured the patch body") + body := string(capturedPatchBody) + assert.Contains(t, body, `"resourceVersion":"1"`, + "optimistic-lock patches must include the source resourceVersion as a precondition") + assert.Contains(t, body, `"storedVersions":["`+testCRVersion+`"]`, + "patch body must overwrite storedVersions to exactly [storageVersion]") + + // storedVersions actually trimmed in the fake store. + live := &apiextensionsv1.CustomResourceDefinition{} + require.NoError(t, r.Get(context.Background(), client.ObjectKey{Name: crd.Name}, live)) + assert.Equal(t, []string{testCRVersion}, live.Status.StoredVersions) +} + +func TestPatchStoredVersions_PropagatesError(t *testing.T) { + t.Parallel() + crd := makeTestCRD([]string{"v1alpha1", testCRVersion}) + + funcs := &interceptor.Funcs{ + SubResourcePatch: func(_ context.Context, _ client.Client, _ string, _ client.Object, _ client.Patch, _ ...client.SubResourcePatchOption) error { + return errors.New("injected patch failure") + }, + } + r := buildFakeReconciler(t, []client.Object{crd}, funcs) + + err := r.patchStoredVersions(context.Background(), crd, testCRVersion) + require.Error(t, err) + assert.Contains(t, err.Error(), "injected patch failure") +} + +// Test doubles used by the orchestration tests above + +// listCallRecord captures the ListOptions of a single List call so the +// pagination test can verify Limit + Continue threading. +type listCallRecord struct { + limit int64 + continueToken string +} + +// paginatingFakeReader satisfies client.Reader by synthesizing three pages +// of test CRs (7 items at PageSize=3 yields 3+3+1). It records each List +// call's options so the test can assert continue-token threading. +type paginatingFakeReader struct { + client.Reader // embedded only so Get satisfies the interface; List is overridden + records *[]listCallRecord +} + +func (p *paginatingFakeReader) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + // Synthesize Get responses for the per-CR Get in restoreOne. Any name + // that looks like one of our synthetic CRs is treated as found. + if u, ok := obj.(*unstructured.Unstructured); ok && strings.HasPrefix(key.Name, "obj-") { + u.SetGroupVersionKind(testCRGVK()) + u.SetNamespace(key.Namespace) + u.SetName(key.Name) + u.SetUID(apitypes.UID(key.Name + "-uid")) + u.SetResourceVersion("1") + return nil + } + return p.Reader.Get(ctx, key, obj, opts...) +} + +func (p *paginatingFakeReader) List(_ context.Context, list client.ObjectList, opts ...client.ListOption) error { + rec := listCallRecord{} + listOpts := &client.ListOptions{} + for _, o := range opts { + o.ApplyToList(listOpts) + } + rec.limit = listOpts.Limit + rec.continueToken = listOpts.Continue + *p.records = append(*p.records, rec) + + ul, ok := list.(*unstructured.UnstructuredList) + if !ok { + return fmt.Errorf("paginatingFakeReader only supports UnstructuredList, got %T", list) + } + + // Synthesize one of three pages based on the continue token. + type page struct { + names []string + next string + } + pages := map[string]page{ + "": {names: []string{"obj-1", "obj-2", "obj-3"}, next: "page-2"}, + "page-2": {names: []string{"obj-4", "obj-5", "obj-6"}, next: "page-3"}, + "page-3": {names: []string{"obj-7"}, next: ""}, + } + pg, found := pages[rec.continueToken] + if !found { + return fmt.Errorf("paginatingFakeReader: unknown continue token %q", rec.continueToken) + } + + items := make([]unstructured.Unstructured, 0, len(pg.names)) + for _, name := range pg.names { + u := makeTestCR(name) + items = append(items, *u) + } + ul.Items = items + ul.SetContinue(pg.next) + return nil +} + +// noopEventRecorder satisfies events.EventRecorder for tests that don't +// assert on emitted events. +type noopEventRecorder struct{} + +func (noopEventRecorder) Eventf(_ runtime.Object, _ runtime.Object, _, _, _, _ string, _ ...any) { +} diff --git a/cmd/thv-operator/test-integration/storageversionmigrator/controller_test.go b/cmd/thv-operator/test-integration/storageversionmigrator/controller_test.go new file mode 100644 index 0000000000..27bd348ed8 --- /dev/null +++ b/cmd/thv-operator/test-integration/storageversionmigrator/controller_test.go @@ -0,0 +1,801 @@ +// SPDX-FileCopyrightText: Copyright 2025 Stacklok, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package storageversionmigrator + +import ( + "context" + "fmt" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/events" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/stacklok/toolhive/cmd/thv-operator/controllers" +) + +const ( + toolhiveGroup = controllers.ToolhiveGroup + migrateLabel = controllers.AutoMigrateLabel + migrateValue = controllers.AutoMigrateValue +) + +// crdSpec describes a test CRD fixture. +type crdSpec struct { + Name string + Group string + Kind string + ListKind string + Plural string + Singular string + Versions []versionSpec + Labelled bool + HasStatusOnStored bool +} + +type versionSpec struct { + Name string + Served bool + Storage bool +} + +func buildCRD(s crdSpec) *apiextensionsv1.CustomResourceDefinition { + versions := make([]apiextensionsv1.CustomResourceDefinitionVersion, 0, len(s.Versions)) + for _, v := range s.Versions { + cdv := apiextensionsv1.CustomResourceDefinitionVersion{ + Name: v.Name, + Served: v.Served, + Storage: v.Storage, + Schema: &apiextensionsv1.CustomResourceValidation{ + OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{ + Type: "object", + Properties: map[string]apiextensionsv1.JSONSchemaProps{ + "spec": { + Type: "object", + XPreserveUnknownFields: ptrBool(true), + }, + "status": { + Type: "object", + XPreserveUnknownFields: ptrBool(true), + }, + }, + }, + }, + } + if v.Storage && s.HasStatusOnStored { + cdv.Subresources = &apiextensionsv1.CustomResourceSubresources{ + Status: &apiextensionsv1.CustomResourceSubresourceStatus{}, + } + } + versions = append(versions, cdv) + } + + crd := &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: s.Name}, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Group: s.Group, + Names: apiextensionsv1.CustomResourceDefinitionNames{ + Kind: s.Kind, + ListKind: s.ListKind, + Plural: s.Plural, + Singular: s.Singular, + }, + Scope: apiextensionsv1.NamespaceScoped, + Versions: versions, + }, + } + if s.Labelled { + crd.Labels = map[string]string{migrateLabel: migrateValue} + } + return crd +} + +func ptrBool(b bool) *bool { return &b } + +// installCRD creates a CRD and waits for the apiserver to publish it so +// unstructured CR creates of that kind will succeed. +func installCRD(c crdSpec) { + crd := buildCRD(c) + Expect(k8sClient.Create(ctx, crd)).To(Succeed()) + + Eventually(func() bool { + live := &apiextensionsv1.CustomResourceDefinition{} + if err := k8sClient.Get(ctx, types.NamespacedName{Name: c.Name}, live); err != nil { + return false + } + for _, cond := range live.Status.Conditions { + if cond.Type == apiextensionsv1.Established && cond.Status == apiextensionsv1.ConditionTrue { + return true + } + } + return false + }, time.Second*10, time.Millisecond*200).Should(BeTrue(), "CRD %s never became Established", c.Name) +} + +func deleteCRD(name string) { + crd := &apiextensionsv1.CustomResourceDefinition{} + if err := k8sClient.Get(ctx, types.NamespacedName{Name: name}, crd); err != nil { + if apierrors.IsNotFound(err) { + return + } + Fail(fmt.Sprintf("get CRD %s before delete: %v", name, err)) + } + Expect(k8sClient.Delete(ctx, crd)).To(Succeed()) + Eventually(func() bool { + return apierrors.IsNotFound(k8sClient.Get(ctx, types.NamespacedName{Name: name}, &apiextensionsv1.CustomResourceDefinition{})) + }, time.Second*30, time.Millisecond*200).Should(BeTrue(), "CRD %s never fully deleted", name) +} + +// setStoredVersions overwrites status.storedVersions, simulating a historical +// state where objects were stored at earlier versions. +func setStoredVersions(crdName string, versions []string) { + Eventually(func() error { + crd := &apiextensionsv1.CustomResourceDefinition{} + if err := k8sClient.Get(ctx, types.NamespacedName{Name: crdName}, crd); err != nil { + return err + } + orig := crd.DeepCopy() + crd.Status.StoredVersions = versions + return k8sClient.Status().Patch(ctx, crd, client.MergeFrom(orig)) + }, time.Second*5, time.Millisecond*100).Should(Succeed()) +} + +func getStoredVersions(crdName string) []string { + crd := &apiextensionsv1.CustomResourceDefinition{} + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: crdName}, crd)).To(Succeed()) + return append([]string{}, crd.Status.StoredVersions...) +} + +// createCRs creates count CRs in the default namespace with the given kind +// and a name derived from basename. Returns the created objects so tests can +// assert on them post-reconcile. +func createCRs(gvk schema.GroupVersionKind, basename string, count int) []*unstructured.Unstructured { + out := make([]*unstructured.Unstructured, 0, count) + for i := 0; i < count; i++ { + u := &unstructured.Unstructured{} + u.SetGroupVersionKind(gvk) + u.SetNamespace("default") + u.SetName(fmt.Sprintf("%s-%d", basename, i)) + Expect(unstructured.SetNestedField(u.Object, "placeholder", "spec", "marker")).To(Succeed()) + Expect(k8sClient.Create(ctx, u)).To(Succeed()) + out = append(out, u) + } + return out +} + +// Note on "did the re-store actually fire" verification: +// +// The controller does a plain Get + Update on each CR. When the CR is already +// at the current storage version the apiserver freshly re-encodes the request +// body, sees it matches etcd byte-for-byte, and elides the write — that's +// correct behaviour, not a controller bug, and it means the per-CR RV does +// not bump for an already-clean CR. The dedicated cross-version test +// ("re-encodes CRs that are stored at a prior storage version") proves the +// migration mechanism actually works for objects stored at older versions: +// it stores a CR at v1alpha1, flips storage to v1beta1, and asserts the CR's +// RV bumps after reconcile. +// +// The pagination test additionally verifies the continue-token loop via a +// list-call counter, and the partial-failure test asserts storedVersions is +// not trimmed when any CR re-store fails. + +// newReconciler constructs a StorageVersionMigratorReconciler for a single +// test. Every test has its own instance so the migration cache doesn't leak +// between tests and state is fully explicit. +func newReconciler() *controllers.StorageVersionMigratorReconciler { + return &controllers.StorageVersionMigratorReconciler{ + Client: k8sClient, + APIReader: k8sClient, + Scheme: k8sClient.Scheme(), + Recorder: &noopRecorder{}, + } +} + +// reconcile invokes the reconciler once for the given CRD and returns the +// result and error directly — tests assert on both. +func reconcile(r *controllers.StorageVersionMigratorReconciler, crdName string) (ctrl.Result, error) { + return r.Reconcile(ctx, ctrl.Request{NamespacedName: types.NamespacedName{Name: crdName}}) +} + +var crdCounter int + +func uniqueSuffix() string { + crdCounter++ + return fmt.Sprintf("t%d", crdCounter) +} + +// ------------------------------------------------------------------ +// Tests +// ------------------------------------------------------------------ + +var _ = Describe("StorageVersionMigrator", func() { + Describe("Reconcile", func() { + + It("is a noop when storedVersions is already [storageVersion] and only one version is served", func() { + suf := uniqueSuffix() + spec := crdSpec{ + Name: "noops" + suf + "." + toolhiveGroup, + Group: toolhiveGroup, + Kind: "Noop" + suf, + ListKind: "Noop" + suf + "List", + Plural: "noops" + suf, + Singular: "noop" + suf, + Labelled: true, + HasStatusOnStored: true, + Versions: []versionSpec{ + {Name: "v1beta1", Served: true, Storage: true}, + }, + } + installCRD(spec) + DeferCleanup(func() { deleteCRD(spec.Name) }) + + // envtest leaves storedVersions empty until a write happens. + // Seed it explicitly so the isMigrationNeeded check sees the + // "clean" state we want to exercise. + setStoredVersions(spec.Name, []string{"v1beta1"}) + + _, err := reconcile(newReconciler(), spec.Name) + Expect(err).NotTo(HaveOccurred()) + Expect(getStoredVersions(spec.Name)).To(Equal([]string{"v1beta1"})) + }) + + It("succeeds end-to-end with elided updates when all CRs are already at storage version", func() { + // Orchestration smoke test: lists CRs, calls per-CR Update + // (each elided by the apiserver because etcd already holds + // the storage-version representation), trims storedVersions. + // The cross-version test below is the load-bearing proof + // that the migration mechanism actually re-encodes etcd. + // This spec adds value by: (a) confirming Reconcile drives + // the full restoreCRs + patchStoredVersions sequence against + // a real apiserver, and (b) verifying the list loop ran at + // least once via a list-call counter. + suf := uniqueSuffix() + spec := crdSpec{ + Name: "happies" + suf + "." + toolhiveGroup, + Group: toolhiveGroup, + Kind: "Happy" + suf, + ListKind: "Happy" + suf + "List", + Plural: "happies" + suf, + Singular: "happy" + suf, + Labelled: true, + HasStatusOnStored: true, + Versions: []versionSpec{ + {Name: "v1alpha1", Served: true, Storage: false}, + {Name: "v1beta1", Served: true, Storage: true}, + }, + } + installCRD(spec) + DeferCleanup(func() { deleteCRD(spec.Name) }) + + createCRs( + schema.GroupVersionKind{Group: spec.Group, Version: "v1beta1", Kind: spec.Kind}, + "obj-"+suf, 3, + ) + + setStoredVersions(spec.Name, []string{"v1alpha1", "v1beta1"}) + + counting := &countingAPIReader{Reader: k8sClient, kind: spec.Kind} + r := &controllers.StorageVersionMigratorReconciler{ + Client: k8sClient, + APIReader: counting, + Scheme: k8sClient.Scheme(), + Recorder: &noopRecorder{}, + } + _, err := reconcile(r, spec.Name) + Expect(err).NotTo(HaveOccurred()) + Expect(getStoredVersions(spec.Name)).To(Equal([]string{"v1beta1"})) + Expect(counting.listCalls).To(Equal(1), + "list loop should have run exactly once for 3 CRs under default page size; got %d", + counting.listCalls) + }) + + // Load-bearing proof of the migration mechanism: a CR stored at + // v1alpha1, after the storage version has flipped to v1beta1, must + // have its resourceVersion bumped by reconcile — that's the + // observable evidence the apiserver actually re-encoded the etcd + // document. See the upstream confirmation at + // https://github.com/kubernetes-sigs/kube-storage-version-migrator/issues/65. + It("re-encodes CRs that are stored at a prior storage version", func() { + suf := uniqueSuffix() + crdName := "crossvers" + suf + "." + toolhiveGroup + kind := "CrossVer" + suf + plural := "crossvers" + suf + + versionSchema := func() *apiextensionsv1.CustomResourceValidation { + return &apiextensionsv1.CustomResourceValidation{ + OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{ + Type: "object", + Properties: map[string]apiextensionsv1.JSONSchemaProps{ + "spec": {Type: "object", XPreserveUnknownFields: ptrBool(true)}, + "status": {Type: "object", XPreserveUnknownFields: ptrBool(true)}, + }, + }, + } + } + + // Step 1: install CRD with v1alpha1 as the storage version so + // CRs created next are written to etcd as v1alpha1 bytes. + crd := &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: crdName, + Labels: map[string]string{migrateLabel: migrateValue}, + }, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Group: toolhiveGroup, + Names: apiextensionsv1.CustomResourceDefinitionNames{ + Kind: kind, + ListKind: kind + "List", + Plural: plural, + Singular: "crossver" + suf, + }, + Scope: apiextensionsv1.NamespaceScoped, + Versions: []apiextensionsv1.CustomResourceDefinitionVersion{ + {Name: "v1alpha1", Served: true, Storage: true, Schema: versionSchema()}, + {Name: "v1beta1", Served: true, Storage: false, Schema: versionSchema()}, + }, + }, + } + Expect(k8sClient.Create(ctx, crd)).To(Succeed()) + DeferCleanup(func() { deleteCRD(crdName) }) + + Eventually(func() bool { + live := &apiextensionsv1.CustomResourceDefinition{} + if err := k8sClient.Get(ctx, types.NamespacedName{Name: crdName}, live); err != nil { + return false + } + for _, c := range live.Status.Conditions { + if c.Type == apiextensionsv1.Established && c.Status == apiextensionsv1.ConditionTrue { + return true + } + } + return false + }, time.Second*10, time.Millisecond*200).Should(BeTrue()) + + // Step 2: create one CR — etcd writes apiVersion: v1alpha1 bytes. + cr := createCRs( + schema.GroupVersionKind{Group: toolhiveGroup, Version: "v1alpha1", Kind: kind}, + "obj-"+suf, 1, + )[0] + + // Step 3: flip storage to v1beta1. + Eventually(func() error { + live := &apiextensionsv1.CustomResourceDefinition{} + if err := k8sClient.Get(ctx, types.NamespacedName{Name: crdName}, live); err != nil { + return err + } + orig := live.DeepCopy() + for i := range live.Spec.Versions { + live.Spec.Versions[i].Storage = (live.Spec.Versions[i].Name == "v1beta1") + } + return k8sClient.Patch(ctx, live, client.MergeFrom(orig)) + }, time.Second*10, time.Millisecond*200).Should(Succeed()) + + // Confirm the storage flip settled before proceeding. + Eventually(func() bool { + live := &apiextensionsv1.CustomResourceDefinition{} + if err := k8sClient.Get(ctx, types.NamespacedName{Name: crdName}, live); err != nil { + return false + } + for _, v := range live.Spec.Versions { + if v.Name == "v1beta1" && v.Storage { + return true + } + } + return false + }, time.Second*10, time.Millisecond*200).Should(BeTrue()) + + // Step 4: storedVersions reflects the historical v1alpha1 entry. + setStoredVersions(crdName, []string{"v1alpha1", "v1beta1"}) + + // Step 5: snapshot RV before reconcile. + preLive := &unstructured.Unstructured{} + preLive.SetGroupVersionKind(schema.GroupVersionKind{ + Group: toolhiveGroup, Version: "v1beta1", Kind: kind, + }) + Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(cr), preLive)).To(Succeed()) + preRV := preLive.GetResourceVersion() + + // Step 6: reconcile with an event-capturing recorder so we can + // verify the public-contract MigrationSucceeded event fires. + fakeRecorder := events.NewFakeRecorder(8) + r := &controllers.StorageVersionMigratorReconciler{ + Client: k8sClient, + APIReader: k8sClient, + Scheme: k8sClient.Scheme(), + Recorder: fakeRecorder, + } + _, err := reconcile(r, crdName) + Expect(err).NotTo(HaveOccurred()) + + // Step 7: storedVersions trimmed. + Expect(getStoredVersions(crdName)).To(Equal([]string{"v1beta1"})) + + // Step 8: empirical proof — RV bumped because the cross-version + // Update actually wrote etcd. + postLive := &unstructured.Unstructured{} + postLive.SetGroupVersionKind(preLive.GroupVersionKind()) + Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(cr), postLive)).To(Succeed()) + Expect(postLive.GetResourceVersion()).NotTo(Equal(preRV), + "CR %s/%s resourceVersion did not bump (pre=%s post=%s) — cross-version re-store did not write to etcd", + cr.GetNamespace(), cr.GetName(), preRV, postLive.GetResourceVersion()) + + // Step 9: content fidelity — the apiserver's encode-decode + // round-trip across the version flip must preserve spec data. + // createCRs writes spec.marker = "placeholder"; that value + // must still be readable after the cross-version re-encode. + marker, found, err := unstructured.NestedString(postLive.Object, "spec", "marker") + Expect(err).NotTo(HaveOccurred()) + Expect(found).To(BeTrue(), "spec.marker must survive the cross-version re-encode") + Expect(marker).To(Equal("placeholder"), + "spec.marker content must be byte-preserved through the v1alpha1→v1beta1 re-encode") + + // Step 10: public-contract event — operators consuming the CRD's + // Events stream depend on MigrationSucceeded firing on the + // happy path. + Eventually(fakeRecorder.Events, time.Second).Should( + Receive(ContainSubstring(controllers.EventReasonMigrationSucceeded)), + "successful migration must emit a "+controllers.EventReasonMigrationSucceeded+" event") + }) + + It("skips CRDs in foreign API groups", func() { + suf := uniqueSuffix() + spec := crdSpec{ + Name: "outsiders" + suf + ".example.com", + Group: "example.com", + Kind: "Outsider" + suf, + ListKind: "Outsider" + suf + "List", + Plural: "outsiders" + suf, + Singular: "outsider" + suf, + Labelled: true, + HasStatusOnStored: true, + Versions: []versionSpec{ + {Name: "v1alpha1", Served: true, Storage: false}, + {Name: "v1beta1", Served: true, Storage: true}, + }, + } + installCRD(spec) + DeferCleanup(func() { deleteCRD(spec.Name) }) + + setStoredVersions(spec.Name, []string{"v1alpha1", "v1beta1"}) + + _, err := reconcile(newReconciler(), spec.Name) + Expect(err).NotTo(HaveOccurred()) + Expect(getStoredVersions(spec.Name)).To(Equal([]string{"v1alpha1", "v1beta1"}), + "storedVersions must be untouched for foreign-group CRDs") + }) + + It("skips toolhive CRDs missing the opt-in label", func() { + suf := uniqueSuffix() + spec := crdSpec{ + Name: "unlabelled" + suf + "." + toolhiveGroup, + Group: toolhiveGroup, + Kind: "Unlabelled" + suf, + ListKind: "Unlabelled" + suf + "List", + Plural: "unlabelled" + suf, + Singular: "unlabelled" + suf, + Labelled: false, + HasStatusOnStored: true, + Versions: []versionSpec{ + {Name: "v1alpha1", Served: true, Storage: false}, + {Name: "v1beta1", Served: true, Storage: true}, + }, + } + installCRD(spec) + DeferCleanup(func() { deleteCRD(spec.Name) }) + + setStoredVersions(spec.Name, []string{"v1alpha1", "v1beta1"}) + + _, err := reconcile(newReconciler(), spec.Name) + Expect(err).NotTo(HaveOccurred()) + Expect(getStoredVersions(spec.Name)).To(Equal([]string{"v1alpha1", "v1beta1"}), + "storedVersions must be untouched for unlabelled CRDs") + }) + + It("handles pagination across multiple list pages", func() { + suf := uniqueSuffix() + spec := crdSpec{ + Name: "paginated" + suf + "." + toolhiveGroup, + Group: toolhiveGroup, + Kind: "Paginated" + suf, + ListKind: "Paginated" + suf + "List", + Plural: "paginated" + suf, + Singular: "paginated" + suf, + Labelled: true, + HasStatusOnStored: true, + Versions: []versionSpec{ + {Name: "v1alpha1", Served: true, Storage: false}, + {Name: "v1beta1", Served: true, Storage: true}, + }, + } + installCRD(spec) + DeferCleanup(func() { deleteCRD(spec.Name) }) + + // Seven CRs with PageSize=3 forces three pages (3+3+1) and + // exercises the continue-token loop far more cheaply than 501 + // writes against envtest. + createCRs( + schema.GroupVersionKind{Group: spec.Group, Version: "v1beta1", Kind: spec.Kind}, + "obj-"+suf, 7, + ) + setStoredVersions(spec.Name, []string{"v1alpha1", "v1beta1"}) + + // Wrap APIReader to count List calls for this kind. This is the + // only direct proof that the continue-token loop actually ran — + // metadata-only SSAs don't leave a managedFields fingerprint. + counting := &countingAPIReader{Reader: k8sClient, kind: spec.Kind} + r := &controllers.StorageVersionMigratorReconciler{ + Client: k8sClient, + APIReader: counting, + Scheme: k8sClient.Scheme(), + Recorder: &noopRecorder{}, + PageSize: 3, + } + _, err := reconcile(r, spec.Name) + Expect(err).NotTo(HaveOccurred()) + Expect(getStoredVersions(spec.Name)).To(Equal([]string{"v1beta1"})) + + // 7 CRs with PageSize=3 ⇒ exactly 3 list calls (pages of 3+3+1). + // Equal (not >=) pins the loop count so a runaway over-fetch + // would fail the test. + Expect(counting.listCalls).To(Equal(3), + "pagination should have triggered exactly 3 list calls for 7 CRs at pageSize=3; got %d", + counting.listCalls) + }) + + It("does not touch storedVersions when a CR re-store fails", func() { + suf := uniqueSuffix() + spec := crdSpec{ + Name: "failures" + suf + "." + toolhiveGroup, + Group: toolhiveGroup, + Kind: "Failure" + suf, + ListKind: "Failure" + suf + "List", + Plural: "failures" + suf, + Singular: "failure" + suf, + Labelled: true, + HasStatusOnStored: true, + Versions: []versionSpec{ + {Name: "v1alpha1", Served: true, Storage: false}, + {Name: "v1beta1", Served: true, Storage: true}, + }, + } + installCRD(spec) + DeferCleanup(func() { deleteCRD(spec.Name) }) + + crs := createCRs( + schema.GroupVersionKind{Group: spec.Group, Version: "v1beta1", Kind: spec.Kind}, + "obj-"+suf, 3, + ) + setStoredVersions(spec.Name, []string{"v1alpha1", "v1beta1"}) + + failureTarget := client.ObjectKeyFromObject(crs[0]) + failing := &failingUpdateClient{ + Client: k8sClient, + errFn: func(key client.ObjectKey) error { + if key == failureTarget { + return fmt.Errorf("injected update failure for %s", key) + } + return nil + }, + } + fakeRecorder := events.NewFakeRecorder(8) + r := &controllers.StorageVersionMigratorReconciler{ + Client: failing, + APIReader: k8sClient, + Scheme: k8sClient.Scheme(), + Recorder: fakeRecorder, + } + _, err := reconcile(r, spec.Name) + Expect(err).To(HaveOccurred(), "reconcile should surface the injected failure") + Expect(err.Error()).To(ContainSubstring(failureTarget.Name)) + + // Critical contract: storedVersions must NOT be trimmed when any + // CR re-store failed. Otherwise the next release's v1alpha1 + // removal would orphan the un-migrated object in etcd. + Expect(getStoredVersions(spec.Name)).To(Equal([]string{"v1alpha1", "v1beta1"})) + + // Public-contract event: a real failure (not a self-healing + // conflict) must emit MigrationFailed so operators can alert. + Eventually(fakeRecorder.Events, time.Second).Should( + Receive(ContainSubstring(controllers.EventReasonMigrationFailed)), + "failed migration must emit a "+controllers.EventReasonMigrationFailed+" event") + }) + + It("leaves storedVersions untouched when a CR re-store hits a Conflict, then trims on retry", func() { + suf := uniqueSuffix() + spec := crdSpec{ + Name: "conflicts" + suf + "." + toolhiveGroup, + Group: toolhiveGroup, + Kind: "Conflict" + suf, + ListKind: "Conflict" + suf + "List", + Plural: "conflicts" + suf, + Singular: "conflict" + suf, + Labelled: true, + HasStatusOnStored: true, + Versions: []versionSpec{ + {Name: "v1alpha1", Served: true, Storage: false}, + {Name: "v1beta1", Served: true, Storage: true}, + }, + } + installCRD(spec) + DeferCleanup(func() { deleteCRD(spec.Name) }) + + crs := createCRs( + schema.GroupVersionKind{Group: spec.Group, Version: "v1beta1", Kind: spec.Kind}, + "obj-"+suf, 2, + ) + setStoredVersions(spec.Name, []string{"v1alpha1", "v1beta1"}) + + conflictTarget := client.ObjectKeyFromObject(crs[0]) + injectConflict := true + gr := schema.GroupResource{Group: spec.Group, Resource: spec.Plural} + conflicting := &failingUpdateClient{ + Client: k8sClient, + errFn: func(key client.ObjectKey) error { + if injectConflict && key == conflictTarget { + return apierrors.NewConflict(gr, key.Name, + fmt.Errorf("injected conflict")) + } + return nil + }, + } + r := &controllers.StorageVersionMigratorReconciler{ + Client: conflicting, + APIReader: k8sClient, + Scheme: k8sClient.Scheme(), + Recorder: &noopRecorder{}, + } + + // First pass: conflict swallowed at the per-CR level, but the + // function-level conflict counter trips errMigrationRetriedDueToConflicts + // so storedVersions is left untouched. + _, err := reconcile(r, spec.Name) + Expect(err).To(HaveOccurred(), + "reconcile must return an error when a Conflict was swallowed") + Expect(getStoredVersions(spec.Name)).To(Equal([]string{"v1alpha1", "v1beta1"}), + "storedVersions must not be trimmed on a pass with any swallowed Conflict") + + // Drop the injection and retry. The cache may have absorbed the + // non-conflicting CR's RV from the first pass — that's fine, the + // conflicting one was never recorded in the cache so it'll be + // re-attempted, succeed, and let the storedVersions patch fire. + injectConflict = false + _, err = reconcile(r, spec.Name) + Expect(err).NotTo(HaveOccurred()) + Expect(getStoredVersions(spec.Name)).To(Equal([]string{"v1beta1"})) + }) + + It("does not trim storedVersions when reconcile context is cancelled mid-flight", func() { + // Failure mode this guards against: a future refactor that + // swallows ctx.Err() during the per-CR loop and then trims + // storedVersions anyway would orphan un-migrated objects on + // operator shutdown. The contract is: any error during + // restoreCRs — including context cancellation — leaves + // storedVersions intact for the next reconcile to retry. + suf := uniqueSuffix() + spec := crdSpec{ + Name: "cancels" + suf + "." + toolhiveGroup, + Group: toolhiveGroup, + Kind: "Cancel" + suf, + ListKind: "Cancel" + suf + "List", + Plural: "cancels" + suf, + Singular: "cancel" + suf, + Labelled: true, + HasStatusOnStored: true, + Versions: []versionSpec{ + {Name: "v1alpha1", Served: true, Storage: false}, + {Name: "v1beta1", Served: true, Storage: true}, + }, + } + installCRD(spec) + DeferCleanup(func() { deleteCRD(spec.Name) }) + + createCRs( + schema.GroupVersionKind{Group: spec.Group, Version: "v1beta1", Kind: spec.Kind}, + "obj-"+suf, 3, + ) + setStoredVersions(spec.Name, []string{"v1alpha1", "v1beta1"}) + + // Cancel the context before invoking Reconcile. The list + // call will fail with context.Canceled and restoreCRs will + // bubble it up; patchStoredVersions must NOT run. + cancelCtx, cancel := context.WithCancel(ctx) + cancel() + _, err := newReconciler().Reconcile(cancelCtx, ctrl.Request{ + NamespacedName: types.NamespacedName{Name: spec.Name}, + }) + Expect(err).To(HaveOccurred(), + "reconcile must return an error when context is cancelled before list completes") + Expect(getStoredVersions(spec.Name)).To(Equal([]string{"v1alpha1", "v1beta1"}), + "storedVersions must not be trimmed on a cancelled reconcile — a future operator restart would otherwise orphan un-migrated CRs") + }) + }) +}) + +// ------------------------------------------------------------------ +// Test doubles +// ------------------------------------------------------------------ + +// failingUpdateClient wraps a real client.Client and intercepts Update (and +// Status().Update) for specific object keys. The controller's restoreOne goes +// through Update — so this wrapper is how we inject failures and conflicts. +// +// errFn returns the error to inject for a given key, or nil to let the call +// pass through to the wrapped client. Returning a non-nil error short-circuits +// the call so the underlying object is not modified. +type failingUpdateClient struct { + client.Client + errFn func(key client.ObjectKey) error +} + +func (f *failingUpdateClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + if err := f.errFn(client.ObjectKeyFromObject(obj)); err != nil { + return err + } + return f.Client.Update(ctx, obj, opts...) +} + +func (f *failingUpdateClient) Status() client.SubResourceWriter { + return &failingUpdateStatus{ + inner: f.Client.Status(), + errFn: f.errFn, + } +} + +type failingUpdateStatus struct { + inner client.SubResourceWriter + errFn func(key client.ObjectKey) error +} + +func (s *failingUpdateStatus) Create(ctx context.Context, obj client.Object, sub client.Object, opts ...client.SubResourceCreateOption) error { + return s.inner.Create(ctx, obj, sub, opts...) +} + +func (s *failingUpdateStatus) Update(ctx context.Context, obj client.Object, opts ...client.SubResourceUpdateOption) error { + if err := s.errFn(client.ObjectKeyFromObject(obj)); err != nil { + return err + } + return s.inner.Update(ctx, obj, opts...) +} + +func (s *failingUpdateStatus) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.SubResourcePatchOption) error { + return s.inner.Patch(ctx, obj, patch, opts...) +} + +func (s *failingUpdateStatus) Apply(ctx context.Context, obj runtime.ApplyConfiguration, opts ...client.SubResourceApplyOption) error { + return s.inner.Apply(ctx, obj, opts...) +} + +// noopRecorder is a minimal events.EventRecorder for direct-Reconcile tests. +type noopRecorder struct{} + +func (*noopRecorder) Eventf(_ runtime.Object, _ runtime.Object, _, _, _, _ string, _ ...any) { +} + +// countingAPIReader wraps a client.Reader and records how many List calls +// targeted a given kind. Used by the pagination test to verify the +// continue-token loop ran as expected. +type countingAPIReader struct { + client.Reader + kind string + listCalls int +} + +func (c *countingAPIReader) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { + if u, ok := list.(*unstructured.UnstructuredList); ok { + // ListKind is "List"; match on the configured kind prefix. + if u.GetKind() == c.kind+"List" { + c.listCalls++ + } + } + return c.Reader.List(ctx, list, opts...) +} diff --git a/cmd/thv-operator/test-integration/storageversionmigrator/suite_test.go b/cmd/thv-operator/test-integration/storageversionmigrator/suite_test.go new file mode 100644 index 0000000000..0e94586912 --- /dev/null +++ b/cmd/thv-operator/test-integration/storageversionmigrator/suite_test.go @@ -0,0 +1,83 @@ +// SPDX-FileCopyrightText: Copyright 2025 Stacklok, Inc. +// SPDX-License-Identifier: Apache-2.0 + +// Package storageversionmigrator contains envtest-backed integration tests +// for the StorageVersionMigrator controller. +// +// The suite does NOT pre-install any CRDs — each test constructs and installs +// the exact CRD it needs, which keeps scenarios independent and lets us +// exercise edge cases (foreign groups, missing status subresource, etc.) that +// wouldn't be possible with the real toolhive CRD manifests. +// +// The suite also does NOT start a controller manager. Each test constructs its +// own reconciler and calls Reconcile directly. This is more deterministic than +// manager-driven tests (no Eventually() races against the background +// controller) and lets individual tests inject custom clients to exercise +// failure paths. +package storageversionmigrator + +import ( + "context" + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "go.uber.org/zap/zapcore" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" +) + +var ( + cfg *rest.Config + k8sClient client.Client + testEnv *envtest.Environment + ctx context.Context + cancel context.CancelFunc +) + +func TestStorageVersionMigrator(t *testing.T) { + t.Parallel() + RegisterFailHandler(Fail) + + suiteConfig, reporterConfig := GinkgoConfiguration() + reporterConfig.Verbose = false + reporterConfig.VeryVerbose = false + reporterConfig.FullTrace = false + + RunSpecs(t, "StorageVersionMigrator Controller Integration Test Suite", suiteConfig, reporterConfig) +} + +var _ = BeforeSuite(func() { + logLevel := zapcore.ErrorLevel + logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true), zap.Level(logLevel))) + + ctx, cancel = context.WithCancel(context.TODO()) + + By("bootstrapping envtest") + testEnv = &envtest.Environment{ + ErrorIfCRDPathMissing: false, // tests install CRDs on demand + } + + var err error + cfg, err = testEnv.Start() + Expect(err).NotTo(HaveOccurred()) + Expect(cfg).NotTo(BeNil()) + + utilruntime.Must(apiextensionsv1.AddToScheme(scheme.Scheme)) + + k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) + Expect(err).NotTo(HaveOccurred()) + Expect(k8sClient).NotTo(BeNil()) +}) + +var _ = AfterSuite(func() { + By("tearing down envtest") + cancel() + Expect(testEnv.Stop()).NotTo(HaveOccurred()) +}) diff --git a/deploy/charts/operator/templates/clusterrole/role.yaml b/deploy/charts/operator/templates/clusterrole/role.yaml index a7a8b3f6e4..392730b4a5 100644 --- a/deploy/charts/operator/templates/clusterrole/role.yaml +++ b/deploy/charts/operator/templates/clusterrole/role.yaml @@ -41,6 +41,21 @@ rules: - pods/log verbs: - get +- apiGroups: + - apiextensions.k8s.io + resources: + - customresourcedefinitions + verbs: + - get + - list + - watch +- apiGroups: + - apiextensions.k8s.io + resources: + - customresourcedefinitions/status + verbs: + - patch + - update - apiGroups: - apps resources: @@ -98,27 +113,15 @@ rules: - apiGroups: - toolhive.stacklok.dev resources: - - embeddingservers - - mcpexternalauthconfigs - - mcpgroups - - mcpoidcconfigs - - mcpregistries - - mcpremoteproxies - - mcpservers - - mcptoolconfigs - - mcpwebhookconfigs - - virtualmcpservers + - '*' verbs: - - create - - delete - get - list - - patch - update - - watch - apiGroups: - toolhive.stacklok.dev resources: + - '*/status' - embeddingservers/finalizers - mcpexternalauthconfigs/finalizers - mcpgroups/finalizers @@ -130,6 +133,27 @@ rules: - mcpwebhookconfigs/finalizers verbs: - update +- apiGroups: + - toolhive.stacklok.dev + resources: + - embeddingservers + - mcpexternalauthconfigs + - mcpgroups + - mcpoidcconfigs + - mcpregistries + - mcpremoteproxies + - mcpservers + - mcptoolconfigs + - mcpwebhookconfigs + - virtualmcpservers + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - toolhive.stacklok.dev resources: diff --git a/test/e2e/chainsaw/operator/multi-tenancy/setup/assert-rbac-clusterrole.yaml b/test/e2e/chainsaw/operator/multi-tenancy/setup/assert-rbac-clusterrole.yaml index a7a8b3f6e4..392730b4a5 100644 --- a/test/e2e/chainsaw/operator/multi-tenancy/setup/assert-rbac-clusterrole.yaml +++ b/test/e2e/chainsaw/operator/multi-tenancy/setup/assert-rbac-clusterrole.yaml @@ -41,6 +41,21 @@ rules: - pods/log verbs: - get +- apiGroups: + - apiextensions.k8s.io + resources: + - customresourcedefinitions + verbs: + - get + - list + - watch +- apiGroups: + - apiextensions.k8s.io + resources: + - customresourcedefinitions/status + verbs: + - patch + - update - apiGroups: - apps resources: @@ -98,27 +113,15 @@ rules: - apiGroups: - toolhive.stacklok.dev resources: - - embeddingservers - - mcpexternalauthconfigs - - mcpgroups - - mcpoidcconfigs - - mcpregistries - - mcpremoteproxies - - mcpservers - - mcptoolconfigs - - mcpwebhookconfigs - - virtualmcpservers + - '*' verbs: - - create - - delete - get - list - - patch - update - - watch - apiGroups: - toolhive.stacklok.dev resources: + - '*/status' - embeddingservers/finalizers - mcpexternalauthconfigs/finalizers - mcpgroups/finalizers @@ -130,6 +133,27 @@ rules: - mcpwebhookconfigs/finalizers verbs: - update +- apiGroups: + - toolhive.stacklok.dev + resources: + - embeddingservers + - mcpexternalauthconfigs + - mcpgroups + - mcpoidcconfigs + - mcpregistries + - mcpremoteproxies + - mcpservers + - mcptoolconfigs + - mcpwebhookconfigs + - virtualmcpservers + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - toolhive.stacklok.dev resources: diff --git a/test/e2e/chainsaw/operator/single-tenancy/setup/assert-rbac-clusterrole.yaml b/test/e2e/chainsaw/operator/single-tenancy/setup/assert-rbac-clusterrole.yaml index a7a8b3f6e4..392730b4a5 100644 --- a/test/e2e/chainsaw/operator/single-tenancy/setup/assert-rbac-clusterrole.yaml +++ b/test/e2e/chainsaw/operator/single-tenancy/setup/assert-rbac-clusterrole.yaml @@ -41,6 +41,21 @@ rules: - pods/log verbs: - get +- apiGroups: + - apiextensions.k8s.io + resources: + - customresourcedefinitions + verbs: + - get + - list + - watch +- apiGroups: + - apiextensions.k8s.io + resources: + - customresourcedefinitions/status + verbs: + - patch + - update - apiGroups: - apps resources: @@ -98,27 +113,15 @@ rules: - apiGroups: - toolhive.stacklok.dev resources: - - embeddingservers - - mcpexternalauthconfigs - - mcpgroups - - mcpoidcconfigs - - mcpregistries - - mcpremoteproxies - - mcpservers - - mcptoolconfigs - - mcpwebhookconfigs - - virtualmcpservers + - '*' verbs: - - create - - delete - get - list - - patch - update - - watch - apiGroups: - toolhive.stacklok.dev resources: + - '*/status' - embeddingservers/finalizers - mcpexternalauthconfigs/finalizers - mcpgroups/finalizers @@ -130,6 +133,27 @@ rules: - mcpwebhookconfigs/finalizers verbs: - update +- apiGroups: + - toolhive.stacklok.dev + resources: + - embeddingservers + - mcpexternalauthconfigs + - mcpgroups + - mcpoidcconfigs + - mcpregistries + - mcpremoteproxies + - mcpservers + - mcptoolconfigs + - mcpwebhookconfigs + - virtualmcpservers + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - toolhive.stacklok.dev resources: