From 1d43fd46940b6182bc8da625aa4cd1b1a1ca5cee Mon Sep 17 00:00:00 2001 From: Haowei Cai Date: Tue, 10 Nov 2020 15:12:14 -0800 Subject: [PATCH] add apiserver lease garbage collector --- .../apiserverleasegc/gc_controller.go | 135 ++++++++++++++++++ pkg/controlplane/instance.go | 18 ++- 2 files changed, 152 insertions(+), 1 deletion(-) create mode 100644 pkg/controlplane/controller/apiserverleasegc/gc_controller.go diff --git a/pkg/controlplane/controller/apiserverleasegc/gc_controller.go b/pkg/controlplane/controller/apiserverleasegc/gc_controller.go new file mode 100644 index 00000000000..a8621e3b1ee --- /dev/null +++ b/pkg/controlplane/controller/apiserverleasegc/gc_controller.go @@ -0,0 +1,135 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserverleasegc + +import ( + "context" + "fmt" + "time" + + v1 "k8s.io/api/coordination/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + informers "k8s.io/client-go/informers/coordination/v1" + "k8s.io/client-go/kubernetes" + listers "k8s.io/client-go/listers/coordination/v1" + "k8s.io/client-go/tools/cache" + + "k8s.io/klog/v2" +) + +// Controller deletes expired apiserver leases. +type Controller struct { + kubeclientset kubernetes.Interface + + leaseLister listers.LeaseLister + leaseInformer cache.SharedIndexInformer + leasesSynced cache.InformerSynced + + leaseNamespace string + + gcCheckPeriod time.Duration +} + +// NewAPIServerLeaseGC creates a new Controller. +func NewAPIServerLeaseGC(clientset kubernetes.Interface, gcCheckPeriod time.Duration, leaseNamespace, leaseLabelSelector string) *Controller { + // we construct our own informer because we need such a small subset of the information available. + // Just one namespace with label selection. + leaseInformer := informers.NewFilteredLeaseInformer( + clientset, + leaseNamespace, + 0, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + func(listOptions *metav1.ListOptions) { + listOptions.LabelSelector = leaseLabelSelector + }) + return &Controller{ + kubeclientset: clientset, + leaseLister: listers.NewLeaseLister(leaseInformer.GetIndexer()), + leaseInformer: leaseInformer, + leasesSynced: leaseInformer.HasSynced, + leaseNamespace: leaseNamespace, + gcCheckPeriod: gcCheckPeriod, + } +} + +// Run starts one worker. +func (c *Controller) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer klog.Infof("Shutting down apiserver lease garbage collector") + + klog.Infof("Starting apiserver lease garbage collector") + + // we have a personal informer that is narrowly scoped, start it. + go c.leaseInformer.Run(stopCh) + + if !cache.WaitForCacheSync(stopCh, c.leasesSynced) { + utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + return + } + + go wait.Until(c.gc, c.gcCheckPeriod, stopCh) + + <-stopCh +} + +func (c *Controller) gc() { + leases, err := c.leaseLister.Leases(c.leaseNamespace).List(labels.Everything()) + if err != nil { + klog.Errorf("Error while listing apiserver leases: %v", err) + return + } + for _, lease := range leases { + // evaluate lease from cache + if !isLeaseExpired(lease) { + continue + } + // double check latest lease from apiserver before deleting + lease, err := c.kubeclientset.CoordinationV1().Leases(c.leaseNamespace).Get(context.TODO(), lease.Name, metav1.GetOptions{}) + if err != nil && !errors.IsNotFound(err) { + klog.Errorf("Error getting lease: %v", err) + continue + } + if errors.IsNotFound(err) || lease == nil { + // the lease was deleted by the same GC controller in another apiserver + continue + } + // evaluate lease from apiserver + if !isLeaseExpired(lease) { + continue + } + if err := c.kubeclientset.CoordinationV1().Leases(c.leaseNamespace).Delete( + context.TODO(), lease.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) { + // If we get a 404, the lease was deleted by the same GC controller + // in another apiserver. Only log error if we get something other than 404. + klog.Errorf("Error deleting lease: %v", err) + } + } +} + +func isLeaseExpired(lease *v1.Lease) bool { + currentTime := time.Now() + // Leases created by the apiserver lease controller should have non-nil renew time + // and lease duration set. Leases without these fields set are invalid and should + // be GC'ed. + return lease.Spec.RenewTime == nil || + lease.Spec.LeaseDurationSeconds == nil || + lease.Spec.RenewTime.Add(time.Duration(*lease.Spec.LeaseDurationSeconds)*time.Second).Before(currentTime) +} diff --git a/pkg/controlplane/instance.go b/pkg/controlplane/instance.go index 744185f9901..e9ffdb4441f 100644 --- a/pkg/controlplane/instance.go +++ b/pkg/controlplane/instance.go @@ -83,6 +83,7 @@ import ( discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1beta1" "k8s.io/component-helpers/lease" api "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/controlplane/controller/apiserverleasegc" "k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust" "k8s.io/kubernetes/pkg/controlplane/reconcilers" "k8s.io/kubernetes/pkg/controlplane/tunneler" @@ -130,6 +131,8 @@ const ( IdentityLeaseComponentLabelKey = "k8s.io/component" // KubeAPIServer defines variable used internally when referring to kube-apiserver component KubeAPIServer = "kube-apiserver" + // KubeAPIServerIdentityLeaseLabelSelector selects kube-apiserver identity leases + KubeAPIServerIdentityLeaseLabelSelector = IdentityLeaseComponentLabelKey + "=" + KubeAPIServer ) // ExtraConfig defines extra configuration for the master @@ -496,7 +499,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) }) if utilfeature.DefaultFeatureGate.Enabled(apiserverfeatures.APIServerIdentity) { - m.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-lease-controller", func(hookContext genericapiserver.PostStartHookContext) error { + m.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-identity-lease-controller", func(hookContext genericapiserver.PostStartHookContext) error { kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig) if err != nil { return err @@ -513,6 +516,19 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) go controller.Run(wait.NeverStop) return nil }) + m.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-identity-lease-garbage-collector", func(hookContext genericapiserver.PostStartHookContext) error { + kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig) + if err != nil { + return err + } + go apiserverleasegc.NewAPIServerLeaseGC( + kubeClient, + time.Duration(c.ExtraConfig.IdentityLeaseDurationSeconds)*time.Second, + metav1.NamespaceSystem, + KubeAPIServerIdentityLeaseLabelSelector, + ).Run(wait.NeverStop) + return nil + }) } return m, nil