From ee9ace14c2dcbd59e51c05f7c8ef4e8b5d1b8e29 Mon Sep 17 00:00:00 2001 From: Haowei Cai Date: Thu, 12 Nov 2020 16:21:00 -0800 Subject: [PATCH] add storage version garbage collector --- .../app/controllermanager.go | 6 + cmd/kube-controller-manager/app/core.go | 10 + .../storageversiongc/gc_controller.go | 296 ++++++++++++++++++ 3 files changed, 312 insertions(+) create mode 100644 pkg/controller/storageversiongc/gc_controller.go diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 0c0b460d932..5bc022dd2f0 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -36,9 +36,11 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" + genericfeatures "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/server/mux" + utilfeature "k8s.io/apiserver/pkg/util/feature" cacheddiscovery "k8s.io/client-go/discovery/cached" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" @@ -424,6 +426,10 @@ func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc controllers["ttl-after-finished"] = startTTLAfterFinishedController controllers["root-ca-cert-publisher"] = startRootCACertPublisher controllers["ephemeral-volume"] = startEphemeralVolumeController + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) && + utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) { + controllers["storage-version-gc"] = startStorageVersionGCController + } return controllers } diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 4c9a841f5ae..832fcddac10 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -56,6 +56,7 @@ import ( replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota" serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount" + "k8s.io/kubernetes/pkg/controller/storageversiongc" ttlcontroller "k8s.io/kubernetes/pkg/controller/ttl" "k8s.io/kubernetes/pkg/controller/ttlafterfinished" "k8s.io/kubernetes/pkg/controller/volume/attachdetach" @@ -674,3 +675,12 @@ func getNodeCIDRMaskSizes(clusterCIDRs []*net.IPNet, maskSizeIPv4, maskSizeIPv6 } return nodeMaskCIDRs } + +func startStorageVersionGCController(ctx ControllerContext) (http.Handler, bool, error) { + go storageversiongc.NewStorageVersionGC( + ctx.ClientBuilder.ClientOrDie("storage-version-garbage-collector"), + ctx.InformerFactory.Coordination().V1().Leases(), + ctx.InformerFactory.Internal().V1alpha1().StorageVersions(), + ).Run(ctx.Stop) + return nil, true, nil +} diff --git a/pkg/controller/storageversiongc/gc_controller.go b/pkg/controller/storageversiongc/gc_controller.go new file mode 100644 index 00000000000..f53ff693973 --- /dev/null +++ b/pkg/controller/storageversiongc/gc_controller.go @@ -0,0 +1,296 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storageversiongc + +import ( + "context" + "fmt" + "time" + + apiserverinternalv1alpha1 "k8s.io/api/apiserverinternal/v1alpha1" + coordinationv1 "k8s.io/api/coordination/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + apiserverinternalinformers "k8s.io/client-go/informers/apiserverinternal/v1alpha1" + coordinformers "k8s.io/client-go/informers/coordination/v1" + "k8s.io/client-go/kubernetes" + coordlisters "k8s.io/client-go/listers/coordination/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/kubernetes/pkg/controlplane" + + "k8s.io/klog/v2" +) + +// Controller watches kube-apiserver leases and storageversions, and delete stale +// storage version entries and objects. +type Controller struct { + kubeclientset kubernetes.Interface + + leaseLister coordlisters.LeaseLister + leasesSynced cache.InformerSynced + + storageVersionSynced cache.InformerSynced + + leaseQueue workqueue.RateLimitingInterface + storageVersionQueue workqueue.RateLimitingInterface +} + +// NewStorageVersionGC creates a new Controller. +func NewStorageVersionGC(clientset kubernetes.Interface, leaseInformer coordinformers.LeaseInformer, storageVersionInformer apiserverinternalinformers.StorageVersionInformer) *Controller { + c := &Controller{ + kubeclientset: clientset, + leaseLister: leaseInformer.Lister(), + leasesSynced: leaseInformer.Informer().HasSynced, + storageVersionSynced: storageVersionInformer.Informer().HasSynced, + leaseQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "storage_version_garbage_collector_leases"), + storageVersionQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "storage_version_garbage_collector_storageversions"), + } + + leaseInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: c.onDeleteLease, + }) + // use the default resync period from the informer + storageVersionInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.onAddStorageVersion, + UpdateFunc: c.onUpdateStorageVersion, + }) + + return c +} + +// Run starts one worker. +func (c *Controller) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer c.leaseQueue.ShutDown() + defer c.storageVersionQueue.ShutDown() + defer klog.Infof("Shutting down storage version garbage collector") + + klog.Infof("Starting storage version garbage collector") + + if !cache.WaitForCacheSync(stopCh, c.leasesSynced, c.storageVersionSynced) { + utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + return + } + + // Identity lease deletion and storageversion update don't happen too often. Start one + // worker for each of them. + // runLeaseWorker handles legit identity lease deletion, while runStorageVersionWorker + // handles storageversion creation/update with non-existing id. The latter should rarely + // happen. It's okay for the two workers to conflict on update. + go wait.Until(c.runLeaseWorker, time.Second, stopCh) + go wait.Until(c.runStorageVersionWorker, time.Second, stopCh) + + <-stopCh +} + +func (c *Controller) runLeaseWorker() { + for c.processNextLease() { + } +} + +func (c *Controller) processNextLease() bool { + key, quit := c.leaseQueue.Get() + if quit { + return false + } + defer c.leaseQueue.Done(key) + + err := c.processDeletedLease(key.(string)) + if err == nil { + c.leaseQueue.Forget(key) + return true + } + + utilruntime.HandleError(fmt.Errorf("lease %v failed with: %v", key, err)) + c.leaseQueue.AddRateLimited(key) + return true +} + +func (c *Controller) runStorageVersionWorker() { + for c.processNextStorageVersion() { + } +} + +func (c *Controller) processNextStorageVersion() bool { + key, quit := c.storageVersionQueue.Get() + if quit { + return false + } + defer c.storageVersionQueue.Done(key) + + err := c.syncStorageVersion(key.(string)) + if err == nil { + c.storageVersionQueue.Forget(key) + return true + } + + utilruntime.HandleError(fmt.Errorf("storage version %v failed with: %v", key, err)) + c.storageVersionQueue.AddRateLimited(key) + return true +} + +func (c *Controller) processDeletedLease(name string) error { + _, err := c.kubeclientset.CoordinationV1().Leases(metav1.NamespaceSystem).Get(context.TODO(), name, metav1.GetOptions{}) + // the lease isn't deleted, nothing we need to do here + if err == nil { + return nil + } + if !apierrors.IsNotFound(err) { + return err + } + // the frequency of this call won't be too high because we only trigger on identity lease deletions + storageVersionList, err := c.kubeclientset.InternalV1alpha1().StorageVersions().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return err + } + + var errors []error + for _, sv := range storageVersionList.Items { + var serverStorageVersions []apiserverinternalv1alpha1.ServerStorageVersion + hasStaleRecord := false + for _, ssv := range sv.Status.StorageVersions { + if ssv.APIServerID == name { + hasStaleRecord = true + continue + } + serverStorageVersions = append(serverStorageVersions, ssv) + } + if !hasStaleRecord { + continue + } + if err := c.updateOrDeleteStorageVersion(&sv, serverStorageVersions); err != nil { + errors = append(errors, err) + } + } + + return utilerrors.NewAggregate(errors) +} + +func (c *Controller) syncStorageVersion(name string) error { + sv, err := c.kubeclientset.InternalV1alpha1().StorageVersions().Get(context.TODO(), name, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + // The problematic storage version that was added/updated recently is gone. + // Nothing we need to do here. + return nil + } + if err != nil { + return err + } + + hasInvalidID := false + var serverStorageVersions []apiserverinternalv1alpha1.ServerStorageVersion + for _, v := range sv.Status.StorageVersions { + lease, err := c.kubeclientset.CoordinationV1().Leases(metav1.NamespaceSystem).Get(context.TODO(), v.APIServerID, metav1.GetOptions{}) + if err != nil || lease == nil || lease.Labels == nil || + lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != controlplane.KubeAPIServer { + // We cannot find a corresponding identity lease from apiserver as well. + // We need to clean up this storage version. + hasInvalidID = true + continue + } + serverStorageVersions = append(serverStorageVersions, v) + } + if !hasInvalidID { + return nil + } + return c.updateOrDeleteStorageVersion(sv, serverStorageVersions) +} + +func (c *Controller) onAddStorageVersion(obj interface{}) { + castObj := obj.(*apiserverinternalv1alpha1.StorageVersion) + c.enqueueStorageVersion(castObj) +} + +func (c *Controller) onUpdateStorageVersion(oldObj, newObj interface{}) { + castNewObj := newObj.(*apiserverinternalv1alpha1.StorageVersion) + c.enqueueStorageVersion(castNewObj) +} + +// enqueueStorageVersion enqueues the storage version if it has entry for invalid apiserver +func (c *Controller) enqueueStorageVersion(obj *apiserverinternalv1alpha1.StorageVersion) { + for _, sv := range obj.Status.StorageVersions { + lease, err := c.leaseLister.Leases(metav1.NamespaceSystem).Get(sv.APIServerID) + if err != nil || lease == nil || lease.Labels == nil || + lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != controlplane.KubeAPIServer { + // we cannot find a corresponding identity lease in cache, enqueue the storageversion + c.storageVersionQueue.Add(obj.Name) + return + } + } +} + +func (c *Controller) onDeleteLease(obj interface{}) { + castObj, ok := obj.(*coordinationv1.Lease) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) + return + } + castObj, ok = tombstone.Obj.(*coordinationv1.Lease) + if !ok { + utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Lease %#v", obj)) + return + } + } + + if castObj.Namespace == metav1.NamespaceSystem && + castObj.Labels != nil && + castObj.Labels[controlplane.IdentityLeaseComponentLabelKey] == controlplane.KubeAPIServer { + klog.V(4).Infof("Observed lease %s deleted", castObj.Name) + c.enqueueLease(castObj) + } +} + +func (c *Controller) enqueueLease(obj *coordinationv1.Lease) { + c.leaseQueue.Add(obj.Name) +} + +func setCommonEncodingVersion(sv *apiserverinternalv1alpha1.StorageVersion) { + if len(sv.Status.StorageVersions) == 0 { + return + } + firstVersion := sv.Status.StorageVersions[0].EncodingVersion + agreed := true + for _, ssv := range sv.Status.StorageVersions { + if ssv.EncodingVersion != firstVersion { + agreed = false + break + } + } + if agreed { + sv.Status.CommonEncodingVersion = &firstVersion + } else { + sv.Status.CommonEncodingVersion = nil + } +} + +func (c *Controller) updateOrDeleteStorageVersion(sv *apiserverinternalv1alpha1.StorageVersion, serverStorageVersions []apiserverinternalv1alpha1.ServerStorageVersion) error { + if len(serverStorageVersions) == 0 { + return c.kubeclientset.InternalV1alpha1().StorageVersions().Delete( + context.TODO(), sv.Name, metav1.DeleteOptions{}) + } + sv.Status.StorageVersions = serverStorageVersions + setCommonEncodingVersion(sv) + _, err := c.kubeclientset.InternalV1alpha1().StorageVersions().UpdateStatus( + context.TODO(), sv, metav1.UpdateOptions{}) + return err +}