feat: implements svm controller

Signed-off-by: Nilekh Chaudhari <1626598+nilekhc@users.noreply.github.com>
This commit is contained in:
Nilekh Chaudhari
2024-01-04 19:34:05 +00:00
parent 91a7708cdc
commit 9161302e7f
17 changed files with 2337 additions and 65 deletions

View File

@@ -20,11 +20,11 @@ import (
"context"
goerrors "errors"
"fmt"
"k8s.io/controller-manager/pkg/informerfactory"
"reflect"
"sync"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -42,10 +42,8 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/controller-manager/controller"
"k8s.io/controller-manager/pkg/informerfactory"
"k8s.io/klog/v2"
c "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/apis/config/scheme"
"k8s.io/kubernetes/pkg/controller/garbagecollector/metrics"
)
@@ -93,36 +91,28 @@ func NewGarbageCollector(
sharedInformers informerfactory.InformerFactory,
informersStarted <-chan struct{},
) (*GarbageCollector, error) {
graphBuilder := NewDependencyGraphBuilder(ctx, metadataClient, mapper, ignoredResources, sharedInformers, informersStarted)
return NewComposedGarbageCollector(ctx, kubeClient, metadataClient, mapper, graphBuilder)
}
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "garbage-collector-controller"})
func NewComposedGarbageCollector(
ctx context.Context,
kubeClient clientset.Interface,
metadataClient metadata.Interface,
mapper meta.ResettableRESTMapper,
graphBuilder *GraphBuilder,
) (*GarbageCollector, error) {
attemptToDelete, attemptToOrphan, absentOwnerCache := graphBuilder.GetGraphResources()
attemptToDelete := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_delete")
attemptToOrphan := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_orphan")
absentOwnerCache := NewReferenceCache(500)
gc := &GarbageCollector{
metadataClient: metadataClient,
restMapper: mapper,
attemptToDelete: attemptToDelete,
attemptToOrphan: attemptToOrphan,
absentOwnerCache: absentOwnerCache,
kubeClient: kubeClient,
eventBroadcaster: eventBroadcaster,
}
gc.dependencyGraphBuilder = &GraphBuilder{
eventRecorder: eventRecorder,
metadataClient: metadataClient,
informersStarted: informersStarted,
restMapper: mapper,
graphChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_graph_changes"),
uidToNode: &concurrentUIDToNode{
uidToNode: make(map[types.UID]*node),
},
attemptToDelete: attemptToDelete,
attemptToOrphan: attemptToOrphan,
absentOwnerCache: absentOwnerCache,
sharedInformers: sharedInformers,
ignoredResources: ignoredResources,
metadataClient: metadataClient,
restMapper: mapper,
attemptToDelete: attemptToDelete,
attemptToOrphan: attemptToOrphan,
absentOwnerCache: absentOwnerCache,
kubeClient: kubeClient,
eventBroadcaster: graphBuilder.eventBroadcaster,
dependencyGraphBuilder: graphBuilder,
}
metrics.Register()
@@ -863,3 +853,8 @@ func GetDeletableResources(logger klog.Logger, discoveryClient discovery.ServerR
func (gc *GarbageCollector) Name() string {
return "garbagecollector"
}
// GetDependencyGraphBuilder return graph builder which is particularly helpful for testing where controllerContext is not available
func (gc *GarbageCollector) GetDependencyGraphBuilder() *GraphBuilder {
return gc.dependencyGraphBuilder
}

View File

@@ -40,7 +40,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/controller-manager/pkg/informerfactory"
"k8s.io/kubernetes/pkg/controller/apis/config/scheme"
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
)
@@ -97,7 +97,8 @@ type GraphBuilder struct {
// it is protected by monitorLock.
running bool
eventRecorder record.EventRecorder
eventRecorder record.EventRecorder
eventBroadcaster record.EventBroadcaster
metadataClient metadata.Interface
// monitors are the producer of the graphChanges queue, graphBuilder alters
@@ -134,6 +135,39 @@ func (m *monitor) Run() {
type monitors map[schema.GroupVersionResource]*monitor
func NewDependencyGraphBuilder(
ctx context.Context,
metadataClient metadata.Interface,
mapper meta.ResettableRESTMapper,
ignoredResources map[schema.GroupResource]struct{},
sharedInformers informerfactory.InformerFactory,
informersStarted <-chan struct{},
) *GraphBuilder {
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
attemptToDelete := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_delete")
attemptToOrphan := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_orphan")
absentOwnerCache := NewReferenceCache(500)
graphBuilder := &GraphBuilder{
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "garbage-collector-controller"}),
eventBroadcaster: eventBroadcaster,
metadataClient: metadataClient,
informersStarted: informersStarted,
restMapper: mapper,
graphChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_graph_changes"),
uidToNode: &concurrentUIDToNode{
uidToNode: make(map[types.UID]*node),
},
attemptToDelete: attemptToDelete,
attemptToOrphan: attemptToOrphan,
absentOwnerCache: absentOwnerCache,
sharedInformers: sharedInformers,
ignoredResources: ignoredResources,
}
return graphBuilder
}
func (gb *GraphBuilder) controllerFor(logger klog.Logger, resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, cache.Store, error) {
handlers := cache.ResourceEventHandlerFuncs{
// add the event to the dependencyGraphBuilder's graphChanges.
@@ -935,3 +969,62 @@ func getAlternateOwnerIdentity(deps []*node, verifiedAbsentIdentity objectRefere
// otherwise return the first alternate identity
return first
}
func (gb *GraphBuilder) GetGraphResources() (
attemptToDelete workqueue.RateLimitingInterface,
attemptToOrphan workqueue.RateLimitingInterface,
absentOwnerCache *ReferenceCache,
) {
return gb.attemptToDelete, gb.attemptToOrphan, gb.absentOwnerCache
}
type Monitor struct {
Store cache.Store
Controller cache.Controller
}
// GetMonitor returns a monitor for the given resource.
// If the monitor is not synced, it will return an error and the monitor to allow the caller to decide whether to retry.
// If the monitor is not found, it will return only an error.
func (gb *GraphBuilder) GetMonitor(ctx context.Context, resource schema.GroupVersionResource) (*Monitor, error) {
gb.monitorLock.RLock()
defer gb.monitorLock.RUnlock()
var monitor *monitor
if m, ok := gb.monitors[resource]; ok {
monitor = m
} else {
for monitorGVR, m := range gb.monitors {
if monitorGVR.Group == resource.Group && monitorGVR.Resource == resource.Resource {
monitor = m
break
}
}
}
if monitor == nil {
return nil, fmt.Errorf("no monitor found for resource %s", resource.String())
}
resourceMonitor := &Monitor{
Store: monitor.store,
Controller: monitor.controller,
}
if !cache.WaitForNamedCacheSync(
gb.Name(),
ctx.Done(),
func() bool {
return monitor.controller.HasSynced()
},
) {
// returning monitor to allow the caller to decide whether to retry as it can be synced later
return resourceMonitor, fmt.Errorf("dependency graph for resource %s is not synced", resource.String())
}
return resourceMonitor, nil
}
func (gb *GraphBuilder) Name() string {
return "dependencygraphbuilder"
}

View File

@@ -0,0 +1,284 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storageversionmigrator
import (
"context"
"fmt"
"time"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/metadata"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller"
svmv1alpha1 "k8s.io/api/storagemigration/v1alpha1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
svminformers "k8s.io/client-go/informers/storagemigration/v1alpha1"
clientset "k8s.io/client-go/kubernetes"
svmlisters "k8s.io/client-go/listers/storagemigration/v1alpha1"
)
const (
// this name is guaranteed to be not present in the cluster as it not a valid namespace name
fakeSVMNamespaceName string = "@fake:svm_ns!"
ResourceVersionControllerName string = "resource-version-controller"
)
// ResourceVersionController adds the resource version obtained from a randomly nonexistent namespace
// to the SVM status before the migration is initiated. This resource version is utilized for checking
// freshness of GC cache before the migration is initiated.
type ResourceVersionController struct {
discoveryClient *discovery.DiscoveryClient
metadataClient metadata.Interface
svmListers svmlisters.StorageVersionMigrationLister
svmSynced cache.InformerSynced
queue workqueue.RateLimitingInterface
kubeClient clientset.Interface
mapper meta.ResettableRESTMapper
}
func NewResourceVersionController(
ctx context.Context,
kubeClient clientset.Interface,
discoveryClient *discovery.DiscoveryClient,
metadataClient metadata.Interface,
svmInformer svminformers.StorageVersionMigrationInformer,
mapper meta.ResettableRESTMapper,
) *ResourceVersionController {
logger := klog.FromContext(ctx)
rvController := &ResourceVersionController{
kubeClient: kubeClient,
discoveryClient: discoveryClient,
metadataClient: metadataClient,
svmListers: svmInformer.Lister(),
svmSynced: svmInformer.Informer().HasSynced,
mapper: mapper,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ResourceVersionControllerName),
}
_, _ = svmInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
rvController.addSVM(logger, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
rvController.updateSVM(logger, oldObj, newObj)
},
})
return rvController
}
func (rv *ResourceVersionController) addSVM(logger klog.Logger, obj interface{}) {
svm := obj.(*svmv1alpha1.StorageVersionMigration)
logger.V(4).Info("Adding", "svm", klog.KObj(svm))
rv.enqueue(svm)
}
func (rv *ResourceVersionController) updateSVM(logger klog.Logger, oldObj, newObj interface{}) {
oldSVM := oldObj.(*svmv1alpha1.StorageVersionMigration)
newSVM := newObj.(*svmv1alpha1.StorageVersionMigration)
logger.V(4).Info("Updating", "svm", klog.KObj(oldSVM))
rv.enqueue(newSVM)
}
func (rv *ResourceVersionController) enqueue(svm *svmv1alpha1.StorageVersionMigration) {
key, err := controller.KeyFunc(svm)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %w", svm, err))
return
}
rv.queue.Add(key)
}
func (rv *ResourceVersionController) Run(ctx context.Context) {
defer utilruntime.HandleCrash()
defer rv.queue.ShutDown()
logger := klog.FromContext(ctx)
logger.Info("Starting", "controller", ResourceVersionControllerName)
defer logger.Info("Shutting down", "controller", ResourceVersionControllerName)
if !cache.WaitForNamedCacheSync(ResourceVersionControllerName, ctx.Done(), rv.svmSynced) {
return
}
go wait.UntilWithContext(ctx, rv.worker, time.Second)
<-ctx.Done()
}
func (rv *ResourceVersionController) worker(ctx context.Context) {
for rv.processNext(ctx) {
}
}
func (rv *ResourceVersionController) processNext(ctx context.Context) bool {
eKey, quit := rv.queue.Get()
if quit {
return false
}
defer rv.queue.Done(eKey)
key := eKey.(string)
err := rv.sync(ctx, key)
if err == nil {
rv.queue.Forget(key)
return true
}
klog.FromContext(ctx).V(2).Info("Error syncing SVM resource, retrying", "svm", key, "err", err)
rv.queue.AddRateLimited(key)
return true
}
func (rv *ResourceVersionController) sync(ctx context.Context, key string) error {
logger := klog.FromContext(ctx)
startTime := time.Now()
// SVM is a cluster scoped resource so we don't care about the namespace
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
svm, err := rv.svmListers.Get(name)
if apierrors.IsNotFound(err) {
// no work to do, don't fail and requeue
return nil
}
if err != nil {
return err
}
// working with copy to avoid race condition between this and migration controller
toBeProcessedSVM := svm.DeepCopy()
gvr := getGVRFromResource(toBeProcessedSVM)
if IsConditionTrue(toBeProcessedSVM, svmv1alpha1.MigrationSucceeded) || IsConditionTrue(toBeProcessedSVM, svmv1alpha1.MigrationFailed) {
logger.V(4).Info("Migration has already succeeded or failed previously, skipping", "svm", name)
return nil
}
if len(toBeProcessedSVM.Status.ResourceVersion) != 0 {
logger.V(4).Info("Resource version is already set", "svm", name)
return nil
}
exists, err := rv.resourceExists(gvr)
if err != nil {
return err
}
if !exists {
_, err = rv.kubeClient.StoragemigrationV1alpha1().
StorageVersionMigrations().
UpdateStatus(
ctx,
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason),
metav1.UpdateOptions{},
)
if err != nil {
return err
}
return nil
}
toBeProcessedSVM.Status.ResourceVersion, err = rv.getLatestResourceVersion(gvr, ctx)
if err != nil {
return err
}
_, err = rv.kubeClient.StoragemigrationV1alpha1().
StorageVersionMigrations().
UpdateStatus(ctx, toBeProcessedSVM, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("error updating status for %s: %w", toBeProcessedSVM.Name, err)
}
logger.V(4).Info("Resource version has been successfully added", "svm", key, "elapsed", time.Since(startTime))
return nil
}
func (rv *ResourceVersionController) getLatestResourceVersion(gvr schema.GroupVersionResource, ctx context.Context) (string, error) {
isResourceNamespaceScoped, err := rv.isResourceNamespaceScoped(gvr)
if err != nil {
return "", err
}
var randomList *metav1.PartialObjectMetadataList
if isResourceNamespaceScoped {
// get list resourceVersion from random non-existent namesapce for the given GVR
randomList, err = rv.metadataClient.Resource(gvr).
Namespace(fakeSVMNamespaceName).
List(ctx, metav1.ListOptions{
Limit: 1,
})
} else {
randomList, err = rv.metadataClient.Resource(gvr).
List(ctx, metav1.ListOptions{
Limit: 1,
})
}
if err != nil {
// error here is very abstract. adding additional context for better debugging
return "", fmt.Errorf("error getting latest resourceVersion for %s: %w", gvr.String(), err)
}
return randomList.GetResourceVersion(), err
}
func (rv *ResourceVersionController) resourceExists(gvr schema.GroupVersionResource) (bool, error) {
mapperGVRs, err := rv.mapper.ResourcesFor(gvr)
if err != nil {
return false, err
}
for _, mapperGVR := range mapperGVRs {
if mapperGVR.Group == gvr.Group &&
mapperGVR.Version == gvr.Version &&
mapperGVR.Resource == gvr.Resource {
return true, nil
}
}
return false, nil
}
func (rv *ResourceVersionController) isResourceNamespaceScoped(gvr schema.GroupVersionResource) (bool, error) {
resourceList, err := rv.discoveryClient.ServerResourcesForGroupVersion(gvr.GroupVersion().String())
if err != nil {
return false, err
}
for _, resource := range resourceList.APIResources {
if resource.Name == gvr.Resource {
return resource.Namespaced, nil
}
}
return false, fmt.Errorf("resource %q not found", gvr.String())
}

View File

@@ -0,0 +1,318 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storageversionmigrator
import (
"context"
"encoding/json"
"fmt"
"time"
"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/garbagecollector"
svmv1alpha1 "k8s.io/api/storagemigration/v1alpha1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
svminformers "k8s.io/client-go/informers/storagemigration/v1alpha1"
svmlisters "k8s.io/client-go/listers/storagemigration/v1alpha1"
)
const (
workers = 5
migrationSuccessStatusReason = "StorageVersionMigrationSucceeded"
migrationRunningStatusReason = "StorageVersionMigrationInProgress"
migrationFailedStatusReason = "StorageVersionMigrationFailed"
)
type SVMController struct {
controllerName string
kubeClient kubernetes.Interface
dynamicClient *dynamic.DynamicClient
svmListers svmlisters.StorageVersionMigrationLister
svmSynced cache.InformerSynced
queue workqueue.RateLimitingInterface
restMapper meta.RESTMapper
dependencyGraphBuilder *garbagecollector.GraphBuilder
}
func NewSVMController(
ctx context.Context,
kubeClient kubernetes.Interface,
dynamicClient *dynamic.DynamicClient,
svmInformer svminformers.StorageVersionMigrationInformer,
controllerName string,
mapper meta.ResettableRESTMapper,
dependencyGraphBuilder *garbagecollector.GraphBuilder,
) *SVMController {
logger := klog.FromContext(ctx)
svmController := &SVMController{
kubeClient: kubeClient,
dynamicClient: dynamicClient,
controllerName: controllerName,
svmListers: svmInformer.Lister(),
svmSynced: svmInformer.Informer().HasSynced,
restMapper: mapper,
dependencyGraphBuilder: dependencyGraphBuilder,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerName),
}
_, _ = svmInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
svmController.addSVM(logger, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
svmController.updateSVM(logger, oldObj, newObj)
},
})
return svmController
}
func (svmc *SVMController) Name() string {
return svmc.controllerName
}
func (svmc *SVMController) addSVM(logger klog.Logger, obj interface{}) {
svm := obj.(*svmv1alpha1.StorageVersionMigration)
logger.V(4).Info("Adding", "svm", klog.KObj(svm))
svmc.enqueue(svm)
}
func (svmc *SVMController) updateSVM(logger klog.Logger, oldObj, newObj interface{}) {
oldSVM := oldObj.(*svmv1alpha1.StorageVersionMigration)
newSVM := newObj.(*svmv1alpha1.StorageVersionMigration)
logger.V(4).Info("Updating", "svm", klog.KObj(oldSVM))
svmc.enqueue(newSVM)
}
func (svmc *SVMController) enqueue(svm *svmv1alpha1.StorageVersionMigration) {
key, err := controller.KeyFunc(svm)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %w", svm, err))
return
}
svmc.queue.Add(key)
}
func (svmc *SVMController) Run(ctx context.Context) {
defer utilruntime.HandleCrash()
defer svmc.queue.ShutDown()
logger := klog.FromContext(ctx)
logger.Info("Starting", "controller", svmc.controllerName)
defer logger.Info("Shutting down", "controller", svmc.controllerName)
if !cache.WaitForNamedCacheSync(svmc.controllerName, ctx.Done(), svmc.svmSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, svmc.worker, time.Second)
}
<-ctx.Done()
}
func (svmc *SVMController) worker(ctx context.Context) {
for svmc.processNext(ctx) {
}
}
func (svmc *SVMController) processNext(ctx context.Context) bool {
svmKey, quit := svmc.queue.Get()
if quit {
return false
}
defer svmc.queue.Done(svmKey)
key := svmKey.(string)
err := svmc.sync(ctx, key)
if err == nil {
svmc.queue.Forget(key)
return true
}
klog.FromContext(ctx).V(2).Info("Error syncing SVM resource, retrying", "svm", key, "err", err)
svmc.queue.AddRateLimited(key)
return true
}
func (svmc *SVMController) sync(ctx context.Context, key string) error {
logger := klog.FromContext(ctx)
startTime := time.Now()
if svmc.dependencyGraphBuilder == nil {
logger.V(4).Info("dependency graph builder is not set. we will skip migration")
return nil
}
// SVM is a cluster scoped resource so we don't care about the namespace
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
svm, err := svmc.svmListers.Get(name)
if apierrors.IsNotFound(err) {
// no work to do, don't fail and requeue
return nil
}
if err != nil {
return err
}
// working with a copy to avoid race condition between this and resource version controller
toBeProcessedSVM := svm.DeepCopy()
if IsConditionTrue(toBeProcessedSVM, svmv1alpha1.MigrationSucceeded) || IsConditionTrue(toBeProcessedSVM, svmv1alpha1.MigrationFailed) {
logger.V(4).Info("Migration has already succeeded or failed previously, skipping", "svm", name)
return nil
}
if len(toBeProcessedSVM.Status.ResourceVersion) == 0 {
logger.V(4).Info("The latest resource version is empty. We will attempt to migrate once the resource version is available.")
return nil
}
gvr := getGVRFromResource(toBeProcessedSVM)
resourceMonitor, err := svmc.dependencyGraphBuilder.GetMonitor(ctx, gvr)
if resourceMonitor != nil {
if err != nil {
// non nil monitor indicates that error is due to resource not being synced
return fmt.Errorf("dependency graph is not synced, requeuing to attempt again")
}
} else {
// we can't migrate a resource that doesn't exist in the GC
_, err = svmc.kubeClient.StoragemigrationV1alpha1().
StorageVersionMigrations().
UpdateStatus(
ctx,
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason),
metav1.UpdateOptions{},
)
if err != nil {
return err
}
logger.V(4).Error(fmt.Errorf("error migrating the resource"), "resource does not exist in GC", "gvr", gvr.String())
return nil
}
gcListResourceVersion, err := convertResourceVersionToInt(resourceMonitor.Controller.LastSyncResourceVersion())
if err != nil {
return err
}
listResourceVersion, err := convertResourceVersionToInt(toBeProcessedSVM.Status.ResourceVersion)
if err != nil {
return err
}
if gcListResourceVersion < listResourceVersion {
return fmt.Errorf("GC cache is not up to date, requeuing to attempt again. gcListResourceVersion: %d, listResourceVersion: %d", gcListResourceVersion, listResourceVersion)
}
toBeProcessedSVM, err = svmc.kubeClient.StoragemigrationV1alpha1().
StorageVersionMigrations().
UpdateStatus(
ctx,
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationRunning, migrationRunningStatusReason),
metav1.UpdateOptions{},
)
if err != nil {
return err
}
gvk, err := svmc.restMapper.KindFor(gvr)
if err != nil {
return err
}
typeMeta := metav1.TypeMeta{}
typeMeta.APIVersion, typeMeta.Kind = gvk.ToAPIVersionAndKind()
data, err := json.Marshal(typeMeta)
if err != nil {
return err
}
// ToDo: implement a mechanism to resume migration from the last migrated resource in case of a failure
// process storage migration
for _, gvrKey := range resourceMonitor.Store.ListKeys() {
namespace, name, err := cache.SplitMetaNamespaceKey(gvrKey)
if err != nil {
return err
}
_, err = svmc.dynamicClient.Resource(gvr).
Namespace(namespace).
Patch(ctx,
name,
types.ApplyPatchType,
data,
metav1.PatchOptions{
FieldManager: svmc.controllerName,
},
)
if err != nil {
// in case of NotFound or Conflict, we can stop processing migration for that resource
if apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
continue
}
_, err = svmc.kubeClient.StoragemigrationV1alpha1().
StorageVersionMigrations().
UpdateStatus(
ctx,
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason),
metav1.UpdateOptions{},
)
if err != nil {
return err
}
logger.V(4).Error(err, "Failed to migrate the resource", "name", gvrKey, "gvr", gvr.String(), "reason", apierrors.ReasonForError(err))
return nil
// Todo: add retry for scenarios where API server returns rate limiting error
}
logger.V(4).Info("Successfully migrated the resource", "name", gvrKey, "gvr", gvr.String())
}
_, err = svmc.kubeClient.StoragemigrationV1alpha1().
StorageVersionMigrations().
UpdateStatus(
ctx,
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationSucceeded, migrationSuccessStatusReason),
metav1.UpdateOptions{},
)
if err != nil {
return err
}
logger.V(4).Info("Finished syncing svm resource", "key", key, "gvr", gvr.String(), "elapsed", time.Since(startTime))
return nil
}

View File

@@ -0,0 +1,84 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storageversionmigrator
import (
"fmt"
"strconv"
"k8s.io/apimachinery/pkg/runtime/schema"
corev1 "k8s.io/api/core/v1"
svmv1alpha1 "k8s.io/api/storagemigration/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func convertResourceVersionToInt(rv string) (int64, error) {
resourceVersion, err := strconv.ParseInt(rv, 10, 64)
if err != nil {
return 0, fmt.Errorf("failed to parse resource version %q: %w", rv, err)
}
return resourceVersion, nil
}
func getGVRFromResource(svm *svmv1alpha1.StorageVersionMigration) schema.GroupVersionResource {
return schema.GroupVersionResource{
Group: svm.Spec.Resource.Group,
Version: svm.Spec.Resource.Version,
Resource: svm.Spec.Resource.Resource,
}
}
// IsConditionTrue returns true if the StorageVersionMigration has the given condition
// It is exported for use in tests
func IsConditionTrue(svm *svmv1alpha1.StorageVersionMigration, conditionType svmv1alpha1.MigrationConditionType) bool {
return indexOfCondition(svm, conditionType) != -1
}
func indexOfCondition(svm *svmv1alpha1.StorageVersionMigration, conditionType svmv1alpha1.MigrationConditionType) int {
for i, c := range svm.Status.Conditions {
if c.Type == conditionType && c.Status == corev1.ConditionTrue {
return i
}
}
return -1
}
func setStatusConditions(
toBeUpdatedSVM *svmv1alpha1.StorageVersionMigration,
conditionType svmv1alpha1.MigrationConditionType,
reason string,
) *svmv1alpha1.StorageVersionMigration {
if !IsConditionTrue(toBeUpdatedSVM, conditionType) {
if conditionType == svmv1alpha1.MigrationSucceeded || conditionType == svmv1alpha1.MigrationFailed {
runningConditionIdx := indexOfCondition(toBeUpdatedSVM, svmv1alpha1.MigrationRunning)
if runningConditionIdx != -1 {
toBeUpdatedSVM.Status.Conditions[runningConditionIdx].Status = corev1.ConditionFalse
}
}
toBeUpdatedSVM.Status.Conditions = append(toBeUpdatedSVM.Status.Conditions, svmv1alpha1.MigrationCondition{
Type: conditionType,
Status: corev1.ConditionTrue,
LastUpdateTime: metav1.Now(),
Reason: reason,
})
}
return toBeUpdatedSVM
}