diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index cdcbd854b67..afc1dc76d7a 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -681,7 +681,9 @@ func setNodeCIDRMaskSizes(cfg nodeipamconfig.NodeIPAMControllerConfiguration, cl } func startStorageVersionGCController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { + ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "storageVersionGC")) go storageversiongc.NewStorageVersionGC( + ctx, controllerContext.ClientBuilder.ClientOrDie("storage-version-garbage-collector"), controllerContext.InformerFactory.Coordination().V1().Leases(), controllerContext.InformerFactory.Internal().V1alpha1().StorageVersions(), diff --git a/pkg/controller/storageversiongc/gc_controller.go b/pkg/controller/storageversiongc/gc_controller.go index 42b566c5f59..b7f4b936afc 100644 --- a/pkg/controller/storageversiongc/gc_controller.go +++ b/pkg/controller/storageversiongc/gc_controller.go @@ -55,7 +55,7 @@ type Controller struct { } // NewStorageVersionGC creates a new Controller. -func NewStorageVersionGC(clientset kubernetes.Interface, leaseInformer coordinformers.LeaseInformer, storageVersionInformer apiserverinternalinformers.StorageVersionInformer) *Controller { +func NewStorageVersionGC(ctx context.Context, clientset kubernetes.Interface, leaseInformer coordinformers.LeaseInformer, storageVersionInformer apiserverinternalinformers.StorageVersionInformer) *Controller { c := &Controller{ kubeclientset: clientset, leaseLister: leaseInformer.Lister(), @@ -64,14 +64,20 @@ func NewStorageVersionGC(clientset kubernetes.Interface, leaseInformer coordinfo leaseQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "storage_version_garbage_collector_leases"), storageVersionQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "storage_version_garbage_collector_storageversions"), } - + logger := klog.FromContext(ctx) leaseInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - DeleteFunc: c.onDeleteLease, + DeleteFunc: func(obj interface{}) { + c.onDeleteLease(logger, obj) + }, }) // use the default resync period from the informer storageVersionInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: c.onAddStorageVersion, - UpdateFunc: c.onUpdateStorageVersion, + AddFunc: func(obj interface{}) { + c.onAddStorageVersion(logger, obj) + }, + UpdateFunc: func(old, newObj interface{}) { + c.onUpdateStorageVersion(logger, old, newObj) + }, }) return c @@ -79,12 +85,13 @@ func NewStorageVersionGC(clientset kubernetes.Interface, leaseInformer coordinfo // Run starts one worker. func (c *Controller) Run(ctx context.Context) { + logger := klog.FromContext(ctx) defer utilruntime.HandleCrash() defer c.leaseQueue.ShutDown() defer c.storageVersionQueue.ShutDown() - defer klog.Infof("Shutting down storage version garbage collector") + defer logger.Info("Shutting down storage version garbage collector") - klog.Infof("Starting storage version garbage collector") + logger.Info("Starting storage version garbage collector") if !cache.WaitForCacheSync(ctx.Done(), c.leasesSynced, c.storageVersionSynced) { utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) @@ -215,31 +222,31 @@ func (c *Controller) syncStorageVersion(ctx context.Context, name string) error return c.updateOrDeleteStorageVersion(ctx, sv, serverStorageVersions) } -func (c *Controller) onAddStorageVersion(obj interface{}) { +func (c *Controller) onAddStorageVersion(logger klog.Logger, obj interface{}) { castObj := obj.(*apiserverinternalv1alpha1.StorageVersion) - c.enqueueStorageVersion(castObj) + c.enqueueStorageVersion(logger, castObj) } -func (c *Controller) onUpdateStorageVersion(oldObj, newObj interface{}) { +func (c *Controller) onUpdateStorageVersion(logger klog.Logger, oldObj, newObj interface{}) { castNewObj := newObj.(*apiserverinternalv1alpha1.StorageVersion) - c.enqueueStorageVersion(castNewObj) + c.enqueueStorageVersion(logger, castNewObj) } // enqueueStorageVersion enqueues the storage version if it has entry for invalid apiserver -func (c *Controller) enqueueStorageVersion(obj *apiserverinternalv1alpha1.StorageVersion) { +func (c *Controller) enqueueStorageVersion(logger klog.Logger, obj *apiserverinternalv1alpha1.StorageVersion) { for _, sv := range obj.Status.StorageVersions { lease, err := c.leaseLister.Leases(metav1.NamespaceSystem).Get(sv.APIServerID) if err != nil || lease == nil || lease.Labels == nil || lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != controlplane.KubeAPIServer { // we cannot find a corresponding identity lease in cache, enqueue the storageversion - klog.V(4).Infof("Observed storage version %s with invalid apiserver entry", obj.Name) + logger.V(4).Info("Observed storage version with invalid apiserver entry", "objName", obj.Name) c.storageVersionQueue.Add(obj.Name) return } } } -func (c *Controller) onDeleteLease(obj interface{}) { +func (c *Controller) onDeleteLease(logger klog.Logger, obj interface{}) { castObj, ok := obj.(*coordinationv1.Lease) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) @@ -257,7 +264,7 @@ func (c *Controller) onDeleteLease(obj interface{}) { if castObj.Namespace == metav1.NamespaceSystem && castObj.Labels != nil && castObj.Labels[controlplane.IdentityLeaseComponentLabelKey] == controlplane.KubeAPIServer { - klog.V(4).Infof("Observed lease %s deleted", castObj.Name) + logger.V(4).Info("Observed lease deleted", "castObjName", castObj.Name) c.enqueueLease(castObj) } } diff --git a/pkg/controller/storageversiongc/gc_controller_test.go b/pkg/controller/storageversiongc/gc_controller_test.go index 9e1a039706d..4349c879a8a 100644 --- a/pkg/controller/storageversiongc/gc_controller_test.go +++ b/pkg/controller/storageversiongc/gc_controller_test.go @@ -29,15 +29,16 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + "k8s.io/klog/v2/ktesting" utilpointer "k8s.io/utils/pointer" ) -func setupController(clientset kubernetes.Interface) { +func setupController(ctx context.Context, clientset kubernetes.Interface) { informerFactory := informers.NewSharedInformerFactory(clientset, 100*time.Millisecond) leaseInformer := informerFactory.Coordination().V1().Leases() storageVersionInformer := informerFactory.Internal().V1alpha1().StorageVersions() - controller := NewStorageVersionGC(clientset, leaseInformer, storageVersionInformer) + controller := NewStorageVersionGC(ctx, clientset, leaseInformer, storageVersionInformer) go controller.Run(context.Background()) informerFactory.Start(nil) } @@ -93,7 +94,8 @@ func Test_StorageVersionUpdatedWithAllEncodingVersionsEqualOnLeaseDeletion(t *te } clientset := fake.NewSimpleClientset(lease1, lease2, lease3, storageVersion) - setupController(clientset) + _, ctx := ktesting.NewTestContext(t) + setupController(ctx, clientset) // Delete the lease object and verify that storage version status is updated if err := clientset.CoordinationV1().Leases(metav1.NamespaceSystem).Delete(context.Background(), "kube-apiserver-1", metav1.DeleteOptions{}); err != nil { @@ -177,7 +179,8 @@ func Test_StorageVersionUpdatedWithDifferentEncodingVersionsOnLeaseDeletion(t *t } clientset := fake.NewSimpleClientset(lease1, lease2, lease3, storageVersion) - setupController(clientset) + _, ctx := ktesting.NewTestContext(t) + setupController(ctx, clientset) // Delete the lease object and verify that storage version status is updated if err := clientset.CoordinationV1().Leases(metav1.NamespaceSystem).Delete(context.Background(), "kube-apiserver-2", metav1.DeleteOptions{}); err != nil { @@ -257,7 +260,8 @@ func Test_StorageVersionContainsInvalidLeaseID(t *testing.T) { } clientset := fake.NewSimpleClientset(lease1, lease2, lease3, storageVersion) - setupController(clientset) + _, ctx := ktesting.NewTestContext(t) + setupController(ctx, clientset) // add a delay to ensure controller had a chance to reconcile time.Sleep(2 * time.Second) @@ -325,7 +329,8 @@ func Test_StorageVersionDeletedOnLeaseDeletion(t *testing.T) { } clientset := fake.NewSimpleClientset(lease1, storageVersion) - setupController(clientset) + _, ctx := ktesting.NewTestContext(t) + setupController(ctx, clientset) // Delete the lease object and verify that storage version status is updated if err := clientset.CoordinationV1().Leases(metav1.NamespaceSystem).Delete(context.Background(), "kube-apiserver-1", metav1.DeleteOptions{}); err != nil { diff --git a/test/integration/storageversion/gc_test.go b/test/integration/storageversion/gc_test.go index 212b6e4d5dc..13f51709472 100644 --- a/test/integration/storageversion/gc_test.go +++ b/test/integration/storageversion/gc_test.go @@ -33,6 +33,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/klog/v2/ktesting" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/pkg/controller/storageversiongc" "k8s.io/kubernetes/pkg/controlplane" @@ -62,7 +63,8 @@ func TestStorageVersionGarbageCollection(t *testing.T) { leaseInformer := informers.Coordination().V1().Leases() storageVersionInformer := informers.Internal().V1alpha1().StorageVersions() - controller := storageversiongc.NewStorageVersionGC(kubeclient, leaseInformer, storageVersionInformer) + _, ctx := ktesting.NewTestContext(t) + controller := storageversiongc.NewStorageVersionGC(ctx, kubeclient, leaseInformer, storageVersionInformer) ctx, cancel := context.WithCancel(context.Background()) defer cancel()