From 3cf2822bc587f3f63e87e9b38098970d8f2b7ca5 Mon Sep 17 00:00:00 2001 From: "xin.li" Date: Mon, 20 Mar 2023 21:49:41 +0800 Subject: [PATCH 1/3] Migrated pkg/controller/garbagecollector to contextual logging Signed-off-by: xin.li --- pkg/controller/garbagecollector/garbagecollector.go | 13 +++++++------ .../garbagecollector/garbagecollector_test.go | 3 ++- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index 1cd2355a7c0..a5163cfcdf7 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -187,7 +187,7 @@ func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery. logger := klog.FromContext(ctx) // Get the current resource list from discovery. - newResources := GetDeletableResources(discoveryClient) + newResources := GetDeletableResources(ctx, discoveryClient) // This can occur if there is an internal error in GetDeletableResources. if len(newResources) == 0 { @@ -214,7 +214,7 @@ func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery. // On a reattempt, check if available resources have changed if attempt > 1 { - newResources = GetDeletableResources(discoveryClient) + newResources = GetDeletableResources(ctx, discoveryClient) if len(newResources) == 0 { logger.V(2).Info("no resources reported by discovery", "attempt", attempt) metrics.GarbageCollectorResourcesSyncError.Inc() @@ -809,13 +809,14 @@ func (gc *GarbageCollector) GraphHasUID(u types.UID) bool { // All discovery errors are considered temporary. Upon encountering any error, // GetDeletableResources will log and return any discovered resources it was // able to process (which may be none). -func GetDeletableResources(discoveryClient discovery.ServerResourcesInterface) map[schema.GroupVersionResource]struct{} { +func GetDeletableResources(ctx context.Context, discoveryClient discovery.ServerResourcesInterface) map[schema.GroupVersionResource]struct{} { + logger := klog.FromContext(ctx) preferredResources, err := discoveryClient.ServerPreferredResources() if err != nil { if discovery.IsGroupDiscoveryFailedError(err) { - klog.Warningf("failed to discover some groups: %v", err.(*discovery.ErrGroupDiscoveryFailed).Groups) + logger.Info("failed to discover some groups", "groups", err.(*discovery.ErrGroupDiscoveryFailed).Groups) } else { - klog.Warningf("failed to discover preferred resources: %v", err) + logger.Info("failed to discover preferred resources", "error", err) } } if preferredResources == nil { @@ -829,7 +830,7 @@ func GetDeletableResources(discoveryClient discovery.ServerResourcesInterface) m for _, rl := range deletableResources { gv, err := schema.ParseGroupVersion(rl.GroupVersion) if err != nil { - klog.Warningf("ignoring invalid discovered resource %q: %v", rl.GroupVersion, err) + logger.Info("ignoring invalid discovered resource", "groupversion", rl.GroupVersion, "error", err) continue } for i := range rl.APIResources { diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index 56b1215884c..f88cb61c434 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -798,13 +798,14 @@ func TestGetDeletableResources(t *testing.T) { }, } + _, ctx := ktesting.NewTestContext(t) for name, test := range tests { t.Logf("testing %q", name) client := &fakeServerResources{ PreferredResources: test.serverResources, Error: test.err, } - actual := GetDeletableResources(client) + actual := GetDeletableResources(ctx, client) if !reflect.DeepEqual(test.deletableResources, actual) { t.Errorf("expected resources:\n%v\ngot:\n%v", test.deletableResources, actual) } From 6c0387d004790694be71c555f97e7a65f80ce980 Mon Sep 17 00:00:00 2001 From: "xin.li" Date: Mon, 20 Mar 2023 21:25:15 +0800 Subject: [PATCH 2/3] Migrated pkg/controller/endpoint to contextual logging Signed-off-by: xin.li --- .../endpoint/endpoints_controller.go | 42 ++++++++++--------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index cde9028c5c1..5b3e4e526a8 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -169,8 +169,9 @@ func (e *Controller) Run(ctx context.Context, workers int) { defer e.queue.ShutDown() - klog.Infof("Starting endpoint controller") - defer klog.Infof("Shutting down endpoint controller") + logger := klog.FromContext(ctx) + logger.Info("Starting endpoint controller") + defer logger.Info("Shutting down endpoint controller") if !cache.WaitForNamedCacheSync("endpoint", ctx.Done(), e.podsSynced, e.servicesSynced, e.endpointsSynced) { return @@ -322,37 +323,39 @@ func (e *Controller) processNextWorkItem(ctx context.Context) bool { defer e.queue.Done(eKey) err := e.syncService(ctx, eKey.(string)) - e.handleErr(err, eKey) + e.handleErr(ctx, err, eKey) return true } -func (e *Controller) handleErr(err error, key interface{}) { +func (e *Controller) handleErr(ctx context.Context, err error, key interface{}) { if err == nil { e.queue.Forget(key) return } + logger := klog.FromContext(ctx) ns, name, keyErr := cache.SplitMetaNamespaceKey(key.(string)) if keyErr != nil { - klog.ErrorS(err, "Failed to split meta namespace cache key", "key", key) + logger.Error(err, "Failed to split meta namespace cache key", "key", key) } if e.queue.NumRequeues(key) < maxRetries { - klog.V(2).InfoS("Error syncing endpoints, retrying", "service", klog.KRef(ns, name), "err", err) + logger.V(2).Info("Error syncing endpoints, retrying", "service", klog.KRef(ns, name), "err", err) e.queue.AddRateLimited(key) return } - klog.Warningf("Dropping service %q out of the queue: %v", key, err) + logger.Info("Dropping service out of the queue", "service", key, "queue", err) e.queue.Forget(key) utilruntime.HandleError(err) } func (e *Controller) syncService(ctx context.Context, key string) error { startTime := time.Now() + logger := klog.FromContext(ctx) defer func() { - klog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Since(startTime)) + logger.V(4).Info("Finished syncing service endpoints", "service", key, "startTime", time.Since(startTime)) }() namespace, name, err := cache.SplitMetaNamespaceKey(key) @@ -390,7 +393,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { return nil } - klog.V(5).Infof("About to update endpoints for service %q", key) + logger.V(5).Info("About to update endpoints for service", "service", key) pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated()) if err != nil { // Since we're getting stuff from a local cache, it is @@ -410,7 +413,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { for _, pod := range pods { if !endpointutil.ShouldPodBeInEndpoints(pod, service.Spec.PublishNotReadyAddresses) { - klog.V(5).Infof("Pod %s/%s is not included on endpoints for Service %s/%s", pod.Namespace, pod.Name, service.Namespace, service.Name) + logger.V(5).Info("Pod is not included on endpoints for Service", "pod", klog.KObj(pod), "service", klog.KObj(service)) continue } @@ -418,7 +421,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { if err != nil { // this will happen, if the cluster runs with some nodes configured as dual stack and some as not // such as the case of an upgrade.. - klog.V(2).Infof("Failed to find endpoint for service:%s with ClusterIP:%s on pod:%s with error:%v", service.Name, service.Spec.ClusterIP, klog.KObj(pod), err) + logger.V(2).Info("Failed to find endpoint for service with ClusterIP on pod with error", "service", service.Name, "clusterIP", service.Spec.ClusterIP, "pod", klog.KObj(pod), "error", err) continue } @@ -430,7 +433,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { // Allow headless service not to have ports. if len(service.Spec.Ports) == 0 { if service.Spec.ClusterIP == api.ClusterIPNone { - subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses) + subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(ctx, subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses) // No need to repack subsets for headless service without ports. } } else { @@ -438,13 +441,13 @@ func (e *Controller) syncService(ctx context.Context, key string) error { servicePort := &service.Spec.Ports[i] portNum, err := podutil.FindPort(pod, servicePort) if err != nil { - klog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err) + logger.V(4).Info("Failed to find port for service", "service", klog.KObj(service), "error", err) continue } epp := endpointPortFromServicePort(servicePort, portNum) var readyEps, notReadyEps int - subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, service.Spec.PublishNotReadyAddresses) + subsets, readyEps, notReadyEps = addEndpointSubset(ctx, subsets, pod, epa, epp, service.Spec.PublishNotReadyAddresses) totalReadyEps = totalReadyEps + readyEps totalNotReadyEps = totalNotReadyEps + notReadyEps } @@ -483,7 +486,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { endpointutil.EndpointSubsetsEqualIgnoreResourceVersion(currentEndpoints.Subsets, subsets) && apiequality.Semantic.DeepEqual(compareLabels, service.Labels) && capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) { - klog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) + logger.V(5).Info("endpoints are equal, skipping update", "service", klog.KObj(service)) return nil } newEndpoints := currentEndpoints.DeepCopy() @@ -516,7 +519,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { newEndpoints.Labels = utillabels.CloneAndRemoveLabel(newEndpoints.Labels, v1.IsHeadlessService) } - klog.V(4).Infof("Update endpoints for %v/%v, ready: %d not ready: %d", service.Namespace, service.Name, totalReadyEps, totalNotReadyEps) + logger.V(4).Info("Update endpoints", "service", klog.KObj(service), "readyEndpoints", totalReadyEps, "notreadyEndpoints", totalNotReadyEps) if createEndpoints { // No previous endpoints, create them _, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{}) @@ -530,7 +533,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { // 1. namespace is terminating, endpoint creation is not allowed by default. // 2. policy is misconfigured, in which case no service would function anywhere. // Given the frequency of 1, we log at a lower level. - klog.V(5).Infof("Forbidden from creating endpoints: %v", err) + logger.V(5).Info("Forbidden from creating endpoints", "error", err) // If the namespace is terminating, creates will continue to fail. Simply drop the item. if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) { @@ -583,11 +586,12 @@ func (e *Controller) checkLeftoverEndpoints() { // The addresses are added to the corresponding field, ready or not ready, depending // on the pod status and the Service PublishNotReadyAddresses field value. // The pod passed to this function must have already been filtered through ShouldPodBeInEndpoints. -func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress, +func addEndpointSubset(ctx context.Context, subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress, epp *v1.EndpointPort, tolerateUnreadyEndpoints bool) ([]v1.EndpointSubset, int, int) { var readyEps int var notReadyEps int ports := []v1.EndpointPort{} + logger := klog.FromContext(ctx) if epp != nil { ports = append(ports, *epp) } @@ -598,7 +602,7 @@ func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.Endpoint }) readyEps++ } else { // if it is not a ready address it has to be not ready - klog.V(5).Infof("Pod is out of service: %s/%s", pod.Namespace, pod.Name) + logger.V(5).Info("Pod is out of service", "pod", klog.KObj(pod)) subsets = append(subsets, v1.EndpointSubset{ NotReadyAddresses: []v1.EndpointAddress{epa}, Ports: ports, From dfc1838379f503692d2868895a66828ce2f54442 Mon Sep 17 00:00:00 2001 From: Ziqi Zhao Date: Sun, 26 Mar 2023 22:10:19 +0800 Subject: [PATCH 3/3] Migrated pkg/controller/volume|util|replicaset|nodeipam to contextual logging Signed-off-by: Ziqi Zhao --- cmd/kube-controller-manager/app/core.go | 1 + hack/logcheck.conf | 8 -- .../deployment/deployment_controller.go | 4 +- pkg/controller/deployment/rollback.go | 4 +- pkg/controller/deployment/sync.go | 4 +- .../deployment/util/deployment_util.go | 8 +- .../endpoint/endpoints_controller.go | 42 +++---- .../garbagecollector/garbagecollector.go | 7 +- .../garbagecollector/garbagecollector_test.go | 4 +- .../nodeipam/ipam/cloud_cidr_allocator.go | 8 +- .../ipam/controller_legacyprovider.go | 2 +- .../ipam/multi_cidr_range_allocator.go | 19 +-- .../nodeipam/ipam/range_allocator.go | 6 +- .../nodeipam/ipam/sync/sync_test.go | 15 ++- .../node_lifecycle_controller.go | 6 +- pkg/controller/replicaset/replica_set.go | 66 ++++++----- pkg/controller/replicaset/replica_set_test.go | 110 ++++++++++-------- .../replication/replication_controller.go | 5 +- pkg/controller/util/node/controller_utils.go | 10 +- .../attachdetach/attach_detach_controller.go | 1 + .../attachdetach/testing/testvolumespec.go | 8 -- .../volume/expand/expand_controller.go | 1 + .../persistentvolume/pv_controller_test.go | 5 +- .../volume/persistentvolume/volume_host.go | 1 + test/integration/quota/quota_test.go | 9 +- .../replicationcontroller_test.go | 5 +- 26 files changed, 192 insertions(+), 167 deletions(-) diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index baad5543262..86f4559e82b 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -377,6 +377,7 @@ func startEndpointController(ctx context.Context, controllerCtx ControllerContex func startReplicationController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { go replicationcontroller.NewReplicationManager( + klog.FromContext(ctx), controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Core().V1().ReplicationControllers(), controllerContext.ClientBuilder.ClientOrDie("replication-controller"), diff --git a/hack/logcheck.conf b/hack/logcheck.conf index 9b0cb1a6546..0737213161c 100644 --- a/hack/logcheck.conf +++ b/hack/logcheck.conf @@ -30,7 +30,6 @@ structured k8s.io/apiserver/pkg/server/options/encryptionconfig/.* # A few files involved in startup migrated already to contextual # We can't enable contextual logcheck until all are migrated contextual k8s.io/dynamic-resource-allocation/.* -contextual k8s.io/kubernetes/cmd/kube-controller-manager/.* contextual k8s.io/kubernetes/cmd/kube-scheduler/.* contextual k8s.io/kubernetes/pkg/controller/.* contextual k8s.io/kubernetes/test/e2e/dra/.* @@ -39,7 +38,6 @@ contextual k8s.io/kubernetes/test/e2e/dra/.* # this point it is easier to list the exceptions. -contextual k8s.io/kubernetes/pkg/controller/controller_ref_manager.go -contextual k8s.io/kubernetes/pkg/controller/controller_utils.go --contextual k8s.io/kubernetes/pkg/controller/deployment/.* -contextual k8s.io/kubernetes/pkg/controller/disruption/.* -contextual k8s.io/kubernetes/pkg/controller/endpoint/.* -contextual k8s.io/kubernetes/pkg/controller/endpointslice/.* @@ -48,12 +46,6 @@ contextual k8s.io/kubernetes/test/e2e/dra/.* -contextual k8s.io/kubernetes/pkg/controller/nodeipam/.* -contextual k8s.io/kubernetes/pkg/controller/podgc/.* -contextual k8s.io/kubernetes/pkg/controller/replicaset/.* --contextual k8s.io/kubernetes/pkg/controller/util/.* --contextual k8s.io/kubernetes/pkg/controller/volume/attachdetach/attach_detach_controller.go --contextual k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing/testvolumespec.go --contextual k8s.io/kubernetes/pkg/controller/volume/expand/expand_controller.go --contextual k8s.io/kubernetes/pkg/controller/volume/persistentvolume/pv_controller_test.go --contextual k8s.io/kubernetes/pkg/controller/volume/persistentvolume/volume_host.go -contextual k8s.io/kubernetes/pkg/controller/volume/pvcprotection/pvc_protection_controller_test.go -contextual k8s.io/kubernetes/pkg/controller/volume/pvprotection/pv_protection_controller_test.go diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index c08dd0c1908..fdd24f6c07e 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -27,7 +27,7 @@ import ( "time" apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -582,7 +582,7 @@ func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) logger := klog.FromContext(ctx) namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - klog.ErrorS(err, "Failed to split meta namespace cache key", "cacheKey", key) + logger.Error(err, "Failed to split meta namespace cache key", "cacheKey", key) return err } diff --git a/pkg/controller/deployment/rollback.go b/pkg/controller/deployment/rollback.go index af5eedd6a85..11708ceb9b6 100644 --- a/pkg/controller/deployment/rollback.go +++ b/pkg/controller/deployment/rollback.go @@ -22,7 +22,7 @@ import ( "strconv" apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" @@ -41,7 +41,7 @@ func (dc *DeploymentController) rollback(ctx context.Context, d *apps.Deployment rollbackTo := getRollbackTo(d) // If rollback revision is 0, rollback to the last revision if rollbackTo.Revision == 0 { - if rollbackTo.Revision = deploymentutil.LastRevision(allRSs); rollbackTo.Revision == 0 { + if rollbackTo.Revision = deploymentutil.LastRevision(logger, allRSs); rollbackTo.Revision == 0 { // If we still can't find the last revision, gives up rollback dc.emitRollbackWarningEvent(d, deploymentutil.RollbackRevisionNotFound, "Unable to find last revision.") // Gives up rollback diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go index c7ede39aa68..e009072aa27 100644 --- a/pkg/controller/deployment/sync.go +++ b/pkg/controller/deployment/sync.go @@ -24,7 +24,7 @@ import ( "strconv" apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" @@ -140,7 +140,7 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De existingNewRS := deploymentutil.FindNewReplicaSet(d, rsList) // Calculate the max revision number among all old RSes - maxOldRevision := deploymentutil.MaxRevision(oldRSs) + maxOldRevision := deploymentutil.MaxRevision(logger, oldRSs) // Calculate revision number for this new replica set newRevision := strconv.FormatInt(maxOldRevision+1, 10) diff --git a/pkg/controller/deployment/util/deployment_util.go b/pkg/controller/deployment/util/deployment_util.go index 31ae7a98bc9..d071dbfed09 100644 --- a/pkg/controller/deployment/util/deployment_util.go +++ b/pkg/controller/deployment/util/deployment_util.go @@ -184,12 +184,12 @@ func SetDeploymentRevision(deployment *apps.Deployment, revision string) bool { } // MaxRevision finds the highest revision in the replica sets -func MaxRevision(allRSs []*apps.ReplicaSet) int64 { +func MaxRevision(logger klog.Logger, allRSs []*apps.ReplicaSet) int64 { max := int64(0) for _, rs := range allRSs { if v, err := Revision(rs); err != nil { // Skip the replica sets when it failed to parse their revision information - klog.V(4).Info("Couldn't parse revision for replica set, deployment controller will skip it when reconciling revisions", "replicaSet", klog.KObj(rs), "err", err) + logger.V(4).Info("Couldn't parse revision for replica set, deployment controller will skip it when reconciling revisions", "replicaSet", klog.KObj(rs), "err", err) } else if v > max { max = v } @@ -198,12 +198,12 @@ func MaxRevision(allRSs []*apps.ReplicaSet) int64 { } // LastRevision finds the second max revision number in all replica sets (the last revision) -func LastRevision(allRSs []*apps.ReplicaSet) int64 { +func LastRevision(logger klog.Logger, allRSs []*apps.ReplicaSet) int64 { max, secMax := int64(0), int64(0) for _, rs := range allRSs { if v, err := Revision(rs); err != nil { // Skip the replica sets when it failed to parse their revision information - klog.V(4).Info("Couldn't parse revision for replica set, deployment controller will skip it when reconciling revisions", "replicaSet", klog.KObj(rs), "err", err) + logger.V(4).Info("Couldn't parse revision for replica set, deployment controller will skip it when reconciling revisions", "replicaSet", klog.KObj(rs), "err", err) } else if v >= max { secMax = max max = v diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 5b3e4e526a8..cde9028c5c1 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -169,9 +169,8 @@ func (e *Controller) Run(ctx context.Context, workers int) { defer e.queue.ShutDown() - logger := klog.FromContext(ctx) - logger.Info("Starting endpoint controller") - defer logger.Info("Shutting down endpoint controller") + klog.Infof("Starting endpoint controller") + defer klog.Infof("Shutting down endpoint controller") if !cache.WaitForNamedCacheSync("endpoint", ctx.Done(), e.podsSynced, e.servicesSynced, e.endpointsSynced) { return @@ -323,39 +322,37 @@ func (e *Controller) processNextWorkItem(ctx context.Context) bool { defer e.queue.Done(eKey) err := e.syncService(ctx, eKey.(string)) - e.handleErr(ctx, err, eKey) + e.handleErr(err, eKey) return true } -func (e *Controller) handleErr(ctx context.Context, err error, key interface{}) { +func (e *Controller) handleErr(err error, key interface{}) { if err == nil { e.queue.Forget(key) return } - logger := klog.FromContext(ctx) ns, name, keyErr := cache.SplitMetaNamespaceKey(key.(string)) if keyErr != nil { - logger.Error(err, "Failed to split meta namespace cache key", "key", key) + klog.ErrorS(err, "Failed to split meta namespace cache key", "key", key) } if e.queue.NumRequeues(key) < maxRetries { - logger.V(2).Info("Error syncing endpoints, retrying", "service", klog.KRef(ns, name), "err", err) + klog.V(2).InfoS("Error syncing endpoints, retrying", "service", klog.KRef(ns, name), "err", err) e.queue.AddRateLimited(key) return } - logger.Info("Dropping service out of the queue", "service", key, "queue", err) + klog.Warningf("Dropping service %q out of the queue: %v", key, err) e.queue.Forget(key) utilruntime.HandleError(err) } func (e *Controller) syncService(ctx context.Context, key string) error { startTime := time.Now() - logger := klog.FromContext(ctx) defer func() { - logger.V(4).Info("Finished syncing service endpoints", "service", key, "startTime", time.Since(startTime)) + klog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Since(startTime)) }() namespace, name, err := cache.SplitMetaNamespaceKey(key) @@ -393,7 +390,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { return nil } - logger.V(5).Info("About to update endpoints for service", "service", key) + klog.V(5).Infof("About to update endpoints for service %q", key) pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated()) if err != nil { // Since we're getting stuff from a local cache, it is @@ -413,7 +410,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { for _, pod := range pods { if !endpointutil.ShouldPodBeInEndpoints(pod, service.Spec.PublishNotReadyAddresses) { - logger.V(5).Info("Pod is not included on endpoints for Service", "pod", klog.KObj(pod), "service", klog.KObj(service)) + klog.V(5).Infof("Pod %s/%s is not included on endpoints for Service %s/%s", pod.Namespace, pod.Name, service.Namespace, service.Name) continue } @@ -421,7 +418,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { if err != nil { // this will happen, if the cluster runs with some nodes configured as dual stack and some as not // such as the case of an upgrade.. - logger.V(2).Info("Failed to find endpoint for service with ClusterIP on pod with error", "service", service.Name, "clusterIP", service.Spec.ClusterIP, "pod", klog.KObj(pod), "error", err) + klog.V(2).Infof("Failed to find endpoint for service:%s with ClusterIP:%s on pod:%s with error:%v", service.Name, service.Spec.ClusterIP, klog.KObj(pod), err) continue } @@ -433,7 +430,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { // Allow headless service not to have ports. if len(service.Spec.Ports) == 0 { if service.Spec.ClusterIP == api.ClusterIPNone { - subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(ctx, subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses) + subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses) // No need to repack subsets for headless service without ports. } } else { @@ -441,13 +438,13 @@ func (e *Controller) syncService(ctx context.Context, key string) error { servicePort := &service.Spec.Ports[i] portNum, err := podutil.FindPort(pod, servicePort) if err != nil { - logger.V(4).Info("Failed to find port for service", "service", klog.KObj(service), "error", err) + klog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err) continue } epp := endpointPortFromServicePort(servicePort, portNum) var readyEps, notReadyEps int - subsets, readyEps, notReadyEps = addEndpointSubset(ctx, subsets, pod, epa, epp, service.Spec.PublishNotReadyAddresses) + subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, service.Spec.PublishNotReadyAddresses) totalReadyEps = totalReadyEps + readyEps totalNotReadyEps = totalNotReadyEps + notReadyEps } @@ -486,7 +483,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { endpointutil.EndpointSubsetsEqualIgnoreResourceVersion(currentEndpoints.Subsets, subsets) && apiequality.Semantic.DeepEqual(compareLabels, service.Labels) && capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) { - logger.V(5).Info("endpoints are equal, skipping update", "service", klog.KObj(service)) + klog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) return nil } newEndpoints := currentEndpoints.DeepCopy() @@ -519,7 +516,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { newEndpoints.Labels = utillabels.CloneAndRemoveLabel(newEndpoints.Labels, v1.IsHeadlessService) } - logger.V(4).Info("Update endpoints", "service", klog.KObj(service), "readyEndpoints", totalReadyEps, "notreadyEndpoints", totalNotReadyEps) + klog.V(4).Infof("Update endpoints for %v/%v, ready: %d not ready: %d", service.Namespace, service.Name, totalReadyEps, totalNotReadyEps) if createEndpoints { // No previous endpoints, create them _, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{}) @@ -533,7 +530,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { // 1. namespace is terminating, endpoint creation is not allowed by default. // 2. policy is misconfigured, in which case no service would function anywhere. // Given the frequency of 1, we log at a lower level. - logger.V(5).Info("Forbidden from creating endpoints", "error", err) + klog.V(5).Infof("Forbidden from creating endpoints: %v", err) // If the namespace is terminating, creates will continue to fail. Simply drop the item. if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) { @@ -586,12 +583,11 @@ func (e *Controller) checkLeftoverEndpoints() { // The addresses are added to the corresponding field, ready or not ready, depending // on the pod status and the Service PublishNotReadyAddresses field value. // The pod passed to this function must have already been filtered through ShouldPodBeInEndpoints. -func addEndpointSubset(ctx context.Context, subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress, +func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress, epp *v1.EndpointPort, tolerateUnreadyEndpoints bool) ([]v1.EndpointSubset, int, int) { var readyEps int var notReadyEps int ports := []v1.EndpointPort{} - logger := klog.FromContext(ctx) if epp != nil { ports = append(ports, *epp) } @@ -602,7 +598,7 @@ func addEndpointSubset(ctx context.Context, subsets []v1.EndpointSubset, pod *v1 }) readyEps++ } else { // if it is not a ready address it has to be not ready - logger.V(5).Info("Pod is out of service", "pod", klog.KObj(pod)) + klog.V(5).Infof("Pod is out of service: %s/%s", pod.Namespace, pod.Name) subsets = append(subsets, v1.EndpointSubset{ NotReadyAddresses: []v1.EndpointAddress{epa}, Ports: ports, diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index a5163cfcdf7..82fd8d208ce 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -187,7 +187,7 @@ func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery. logger := klog.FromContext(ctx) // Get the current resource list from discovery. - newResources := GetDeletableResources(ctx, discoveryClient) + newResources := GetDeletableResources(logger, discoveryClient) // This can occur if there is an internal error in GetDeletableResources. if len(newResources) == 0 { @@ -214,7 +214,7 @@ func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery. // On a reattempt, check if available resources have changed if attempt > 1 { - newResources = GetDeletableResources(ctx, discoveryClient) + newResources = GetDeletableResources(logger, discoveryClient) if len(newResources) == 0 { logger.V(2).Info("no resources reported by discovery", "attempt", attempt) metrics.GarbageCollectorResourcesSyncError.Inc() @@ -809,8 +809,7 @@ func (gc *GarbageCollector) GraphHasUID(u types.UID) bool { // All discovery errors are considered temporary. Upon encountering any error, // GetDeletableResources will log and return any discovered resources it was // able to process (which may be none). -func GetDeletableResources(ctx context.Context, discoveryClient discovery.ServerResourcesInterface) map[schema.GroupVersionResource]struct{} { - logger := klog.FromContext(ctx) +func GetDeletableResources(logger klog.Logger, discoveryClient discovery.ServerResourcesInterface) map[schema.GroupVersionResource]struct{} { preferredResources, err := discoveryClient.ServerPreferredResources() if err != nil { if discovery.IsGroupDiscoveryFailedError(err) { diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index f88cb61c434..a11d54b6634 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -798,14 +798,14 @@ func TestGetDeletableResources(t *testing.T) { }, } - _, ctx := ktesting.NewTestContext(t) + logger, _ := ktesting.NewTestContext(t) for name, test := range tests { t.Logf("testing %q", name) client := &fakeServerResources{ PreferredResources: test.serverResources, Error: test.err, } - actual := GetDeletableResources(ctx, client) + actual := GetDeletableResources(logger, client) if !reflect.DeepEqual(test.deletableResources, actual) { t.Errorf("expected resources:\n%v\ngot:\n%v", test.deletableResources, actual) } diff --git a/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go b/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go index 0102b8b87e0..cf2d644f3ec 100644 --- a/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go +++ b/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go @@ -131,7 +131,7 @@ func NewCloudCIDRAllocator(logger klog.Logger, client clientset.Interface, cloud } return nil }), - DeleteFunc: controllerutil.CreateDeleteNodeHandler(func(node *v1.Node) error { + DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error { return ca.ReleaseCIDR(logger, node) }), }) @@ -272,11 +272,11 @@ func (ca *cloudCIDRAllocator) updateCIDRAllocation(logger klog.Logger, nodeName cidrStrings, err := ca.cloud.AliasRangesByProviderID(node.Spec.ProviderID) if err != nil { - controllerutil.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable") + controllerutil.RecordNodeStatusChange(logger, ca.recorder, node, "CIDRNotAvailable") return fmt.Errorf("failed to get cidr(s) from provider: %v", err) } if len(cidrStrings) == 0 { - controllerutil.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable") + controllerutil.RecordNodeStatusChange(logger, ca.recorder, node, "CIDRNotAvailable") return fmt.Errorf("failed to allocate cidr: Node %v has no CIDRs", node.Name) } //Can have at most 2 ips (one for v4 and one for v6) @@ -311,7 +311,7 @@ func (ca *cloudCIDRAllocator) updateCIDRAllocation(logger klog.Logger, nodeName } } if err != nil { - controllerutil.RecordNodeStatusChange(ca.recorder, node, "CIDRAssignmentFailed") + controllerutil.RecordNodeStatusChange(logger, ca.recorder, node, "CIDRAssignmentFailed") logger.Error(err, "Failed to update the node PodCIDR after multiple attempts", "node", klog.KObj(node), "cidrStrings", cidrStrings) return err } diff --git a/pkg/controller/nodeipam/ipam/controller_legacyprovider.go b/pkg/controller/nodeipam/ipam/controller_legacyprovider.go index 9f389661263..bdf318b8d59 100644 --- a/pkg/controller/nodeipam/ipam/controller_legacyprovider.go +++ b/pkg/controller/nodeipam/ipam/controller_legacyprovider.go @@ -150,7 +150,7 @@ func (c *Controller) Start(logger klog.Logger, nodeInformer informers.NodeInform UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { return c.onUpdate(logger, newNode) }), - DeleteFunc: controllerutil.CreateDeleteNodeHandler(func(node *v1.Node) error { + DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error { return c.onDelete(logger, node) }), }) diff --git a/pkg/controller/nodeipam/ipam/multi_cidr_range_allocator.go b/pkg/controller/nodeipam/ipam/multi_cidr_range_allocator.go index e067929838b..0f3b6a3ef1b 100644 --- a/pkg/controller/nodeipam/ipam/multi_cidr_range_allocator.go +++ b/pkg/controller/nodeipam/ipam/multi_cidr_range_allocator.go @@ -286,6 +286,7 @@ func (r *multiCIDRRangeAllocator) runCIDRWorker(ctx context.Context) { // processNextWorkItem will read a single work item off the cidrQueue and // attempt to process it, by calling the syncHandler. func (r *multiCIDRRangeAllocator) processNextCIDRWorkItem(ctx context.Context) bool { + logger := klog.FromContext(ctx) obj, shutdown := r.cidrQueue.Get() if shutdown { return false @@ -325,7 +326,7 @@ func (r *multiCIDRRangeAllocator) processNextCIDRWorkItem(ctx context.Context) b // Finally, if no error occurs we Forget this item so it does not // get cidrQueued again until another change happens. r.cidrQueue.Forget(obj) - klog.Infof("Successfully synced '%s'", key) + logger.Info("Successfully synced", "key", key) return nil }(ctx, obj) @@ -384,7 +385,7 @@ func (r *multiCIDRRangeAllocator) processNextNodeWorkItem(ctx context.Context) b // Finally, if no error occurs we Forget this item so it does not // get nodeQueue again until another change happens. r.nodeQueue.Forget(obj) - klog.Infof("Successfully synced '%s'", key) + logger.Info("Successfully synced", "key", key) return nil }(klog.FromContext(ctx), obj) @@ -399,12 +400,12 @@ func (r *multiCIDRRangeAllocator) processNextNodeWorkItem(ctx context.Context) b func (r *multiCIDRRangeAllocator) syncNode(logger klog.Logger, key string) error { startTime := time.Now() defer func() { - klog.V(4).Infof("Finished syncing Node request %q (%v)", key, time.Since(startTime)) + logger.V(4).Info("Finished syncing Node request", "node", key, "elapsed", time.Since(startTime)) }() node, err := r.nodeLister.Get(key) if apierrors.IsNotFound(err) { - klog.V(3).Infof("node has been deleted: %v", key) + logger.V(3).Info("node has been deleted", "node", key) // TODO: obtain the node object information to call ReleaseCIDR from here // and retry if there is an error. return nil @@ -414,7 +415,7 @@ func (r *multiCIDRRangeAllocator) syncNode(logger klog.Logger, key string) error } // Check the DeletionTimestamp to determine if object is under deletion. if !node.DeletionTimestamp.IsZero() { - klog.V(3).Infof("node is being deleted: %v", key) + logger.V(3).Info("node is being deleted", "node", key) return r.ReleaseCIDR(logger, node) } return r.AllocateOrOccupyCIDR(logger, node) @@ -557,12 +558,12 @@ func (r *multiCIDRRangeAllocator) AllocateOrOccupyCIDR(logger klog.Logger, node cidrs, clusterCIDR, err := r.prioritizedCIDRs(logger, node) if err != nil { - controllerutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable") + controllerutil.RecordNodeStatusChange(logger, r.recorder, node, "CIDRNotAvailable") return fmt.Errorf("failed to get cidrs for node %s", node.Name) } if len(cidrs) == 0 { - controllerutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable") + controllerutil.RecordNodeStatusChange(logger, r.recorder, node, "CIDRNotAvailable") return fmt.Errorf("no cidrSets with matching labels found for node %s", node.Name) } @@ -696,7 +697,7 @@ func (r *multiCIDRRangeAllocator) updateCIDRsAllocation(logger klog.Logger, data } // failed release back to the pool. logger.Error(err, "Failed to update node PodCIDR after attempts", "node", klog.KObj(node), "podCIDR", cidrsString, "retries", cidrUpdateRetries) - controllerutil.RecordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed") + controllerutil.RecordNodeStatusChange(logger, r.recorder, node, "CIDRAssignmentFailed") // We accept the fact that we may leak CIDRs here. This is safer than releasing // them in case when we don't know if request went through. // NodeController restart will return all falsely allocated CIDRs to the pool. @@ -1151,7 +1152,7 @@ func (r *multiCIDRRangeAllocator) reconcileDelete(ctx context.Context, clusterCI if slice.ContainsString(clusterCIDR.GetFinalizers(), clusterCIDRFinalizer, nil) { logger.V(2).Info("Releasing ClusterCIDR", "clusterCIDR", clusterCIDR.Name) if err := r.deleteClusterCIDR(logger, clusterCIDR); err != nil { - klog.V(2).Info("Error while deleting ClusterCIDR", "err", err) + logger.V(2).Info("Error while deleting ClusterCIDR", "err", err) return err } // Remove the finalizer as delete is successful. diff --git a/pkg/controller/nodeipam/ipam/range_allocator.go b/pkg/controller/nodeipam/ipam/range_allocator.go index 47bd392e234..b0bab2f1be2 100644 --- a/pkg/controller/nodeipam/ipam/range_allocator.go +++ b/pkg/controller/nodeipam/ipam/range_allocator.go @@ -157,7 +157,7 @@ func NewCIDRRangeAllocator(logger klog.Logger, client clientset.Interface, nodeI } return nil }), - DeleteFunc: controllerutil.CreateDeleteNodeHandler(func(node *v1.Node) error { + DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error { return ra.ReleaseCIDR(logger, node) }), }) @@ -274,7 +274,7 @@ func (r *rangeAllocator) AllocateOrOccupyCIDR(logger klog.Logger, node *v1.Node) podCIDR, err := r.cidrSets[idx].AllocateNext() if err != nil { r.removeNodeFromProcessing(node.Name) - controllerutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable") + controllerutil.RecordNodeStatusChange(logger, r.recorder, node, "CIDRNotAvailable") return fmt.Errorf("failed to allocate cidr from cluster cidr at idx:%v: %v", idx, err) } allocated.allocatedCIDRs[idx] = podCIDR @@ -383,7 +383,7 @@ func (r *rangeAllocator) updateCIDRsAllocation(logger klog.Logger, data nodeRese } // failed release back to the pool logger.Error(err, "Failed to update node PodCIDR after multiple attempts", "node", klog.KObj(node), "podCIDRs", cidrsString) - controllerutil.RecordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed") + controllerutil.RecordNodeStatusChange(logger, r.recorder, node, "CIDRAssignmentFailed") // We accept the fact that we may leak CIDRs here. This is safer than releasing // them in case when we don't know if request went through. // NodeController restart will return all falsely allocated CIDRs to the pool. diff --git a/pkg/controller/nodeipam/ipam/sync/sync_test.go b/pkg/controller/nodeipam/ipam/sync/sync_test.go index c355f95ea96..8afc22858cf 100644 --- a/pkg/controller/nodeipam/ipam/sync/sync_test.go +++ b/pkg/controller/nodeipam/ipam/sync/sync_test.go @@ -58,6 +58,8 @@ type fakeAPIs struct { calls []string events []fakeEvent results []error + + logger klog.Logger } func (f *fakeAPIs) Alias(ctx context.Context, node *v1.Node) (*net.IPNet, error) { @@ -90,7 +92,7 @@ func (f *fakeAPIs) EmitNodeWarningEvent(nodeName, reason, fmtStr string, args .. } func (f *fakeAPIs) ReportResult(err error) { - klog.V(2).Infof("ReportResult %v", err) + f.logger.V(2).Info("ReportResult", "err", err) f.results = append(f.results, err) if f.reportChan != nil { f.reportChan <- struct{}{} @@ -106,7 +108,7 @@ func (f *fakeAPIs) ResyncTimeout() time.Duration { func (f *fakeAPIs) dumpTrace() { for i, x := range f.calls { - klog.Infof("trace %v: %v", i, x) + f.logger.Info("trace", "index", i, "call", x) } } @@ -196,12 +198,13 @@ func TestNodeSyncUpdate(t *testing.T) { wantError: false, }, } { + logger, _ := ktesting.NewTestContext(t) cidr, _ := cidrset.NewCIDRSet(clusterCIDRRange, 24) + tc.fake.logger = logger sync := New(&tc.fake, &tc.fake, &tc.fake, tc.mode, "node1", cidr) doneChan := make(chan struct{}) // Do a single step of the loop. - logger, _ := ktesting.NewTestContext(t) go sync.Loop(logger, doneChan) sync.Update(tc.node) close(sync.opChan) @@ -224,15 +227,16 @@ func TestNodeSyncUpdate(t *testing.T) { } func TestNodeSyncResync(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) fake := &fakeAPIs{ nodeRet: nodeWithCIDRRange, resyncTimeout: time.Millisecond, reportChan: make(chan struct{}), + logger: logger, } cidr, _ := cidrset.NewCIDRSet(clusterCIDRRange, 24) sync := New(fake, fake, fake, SyncFromCluster, "node1", cidr) doneChan := make(chan struct{}) - logger, _ := ktesting.NewTestContext(t) go sync.Loop(logger, doneChan) <-fake.reportChan close(sync.opChan) @@ -272,12 +276,13 @@ func TestNodeSyncDelete(t *testing.T) { }, }, } { + logger, _ := ktesting.NewTestContext(t) cidr, _ := cidrset.NewCIDRSet(clusterCIDRRange, 24) + tc.fake.logger = logger sync := New(&tc.fake, &tc.fake, &tc.fake, tc.mode, "node1", cidr) doneChan := make(chan struct{}) // Do a single step of the loop. - logger, _ := ktesting.NewTestContext(t) go sync.Loop(logger, doneChan) sync.Delete(tc.node) <-doneChan diff --git a/pkg/controller/nodelifecycle/node_lifecycle_controller.go b/pkg/controller/nodelifecycle/node_lifecycle_controller.go index 7e9b7b9b696..70e144452e7 100644 --- a/pkg/controller/nodelifecycle/node_lifecycle_controller.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller.go @@ -422,7 +422,7 @@ func NewNodeLifecycleController( nc.taintManager.NodeUpdated(oldNode, newNode) return nil }), - DeleteFunc: controllerutil.CreateDeleteNodeHandler(func(node *v1.Node) error { + DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error { nc.taintManager.NodeUpdated(node, nil) return nil }), @@ -438,7 +438,7 @@ func NewNodeLifecycleController( nc.nodeUpdateQueue.Add(newNode.Name) return nil }), - DeleteFunc: controllerutil.CreateDeleteNodeHandler(func(node *v1.Node) error { + DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error { nc.nodesToRetry.Delete(node.Name) return nil }), @@ -744,7 +744,7 @@ func (nc *Controller) monitorNodeHealth(ctx context.Context) error { switch { case currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue: // Report node event only once when status changed. - controllerutil.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady") + controllerutil.RecordNodeStatusChange(logger, nc.recorder, node, "NodeNotReady") fallthrough case needsRetry && observedReadyCondition.Status != v1.ConditionTrue: if err = controllerutil.MarkPodsNotReady(ctx, nc.kubeClient, nc.recorder, pods, node.Name); err != nil { diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index 9a03a04a635..cc129b7d94e 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -121,7 +121,7 @@ func NewReplicaSetController(logger klog.Logger, rsInformer appsinformers.Replic if err := metrics.Register(legacyregistry.Register); err != nil { logger.Error(err, "unable to register metrics") } - return NewBaseController(rsInformer, podInformer, kubeClient, burstReplicas, + return NewBaseController(logger, rsInformer, podInformer, kubeClient, burstReplicas, apps.SchemeGroupVersion.WithKind("ReplicaSet"), "replicaset_controller", "replicaset", @@ -135,7 +135,7 @@ func NewReplicaSetController(logger klog.Logger, rsInformer appsinformers.Replic // NewBaseController is the implementation of NewReplicaSetController with additional injected // parameters so that it can also serve as the implementation of NewReplicationController. -func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int, +func NewBaseController(logger klog.Logger, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int, gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface, eventBroadcaster record.EventBroadcaster) *ReplicaSetController { rsc := &ReplicaSetController{ @@ -149,9 +149,15 @@ func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer } rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: rsc.addRS, - UpdateFunc: rsc.updateRS, - DeleteFunc: rsc.deleteRS, + AddFunc: func(obj interface{}) { + rsc.addRS(logger, obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + rsc.updateRS(logger, oldObj, newObj) + }, + DeleteFunc: func(obj interface{}) { + rsc.deleteRS(logger, obj) + }, }) rsInformer.Informer().AddIndexers(cache.Indexers{ controllerUIDIndex: func(obj interface{}) ([]string, error) { @@ -171,12 +177,18 @@ func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer rsc.rsListerSynced = rsInformer.Informer().HasSynced podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: rsc.addPod, + AddFunc: func(obj interface{}) { + rsc.addPod(logger, obj) + }, // This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like // overkill the most frequent pod update is status, and the associated ReplicaSet will only list from // local storage, so it should be ok. - UpdateFunc: rsc.updatePod, - DeleteFunc: rsc.deletePod, + UpdateFunc: func(oldObj, newObj interface{}) { + rsc.updatePod(logger, oldObj, newObj) + }, + DeleteFunc: func(obj interface{}) { + rsc.deletePod(logger, obj) + }, }) rsc.podLister = podInformer.Lister() rsc.podListerSynced = podInformer.Informer().HasSynced @@ -293,14 +305,14 @@ func (rsc *ReplicaSetController) enqueueRSAfter(rs *apps.ReplicaSet, duration ti rsc.queue.AddAfter(key, duration) } -func (rsc *ReplicaSetController) addRS(obj interface{}) { +func (rsc *ReplicaSetController) addRS(logger klog.Logger, obj interface{}) { rs := obj.(*apps.ReplicaSet) - klog.V(4).Infof("Adding %s %s/%s", rsc.Kind, rs.Namespace, rs.Name) + logger.V(4).Info("Adding", "replicaSet", klog.KObj(rs)) rsc.enqueueRS(rs) } // callback when RS is updated -func (rsc *ReplicaSetController) updateRS(old, cur interface{}) { +func (rsc *ReplicaSetController) updateRS(logger klog.Logger, old, cur interface{}) { oldRS := old.(*apps.ReplicaSet) curRS := cur.(*apps.ReplicaSet) @@ -311,7 +323,7 @@ func (rsc *ReplicaSetController) updateRS(old, cur interface{}) { utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", oldRS, err)) return } - rsc.deleteRS(cache.DeletedFinalStateUnknown{ + rsc.deleteRS(logger, cache.DeletedFinalStateUnknown{ Key: key, Obj: oldRS, }) @@ -330,12 +342,12 @@ func (rsc *ReplicaSetController) updateRS(old, cur interface{}) { // that bad as ReplicaSets that haven't met expectations yet won't // sync, and all the listing is done using local stores. if *(oldRS.Spec.Replicas) != *(curRS.Spec.Replicas) { - klog.V(4).Infof("%v %v updated. Desired pod count change: %d->%d", rsc.Kind, curRS.Name, *(oldRS.Spec.Replicas), *(curRS.Spec.Replicas)) + logger.V(4).Info("replicaSet updated. Desired pod count change.", "replicaSet", klog.KObj(oldRS), "oldReplicas", *(oldRS.Spec.Replicas), "newReplicas", *(curRS.Spec.Replicas)) } rsc.enqueueRS(curRS) } -func (rsc *ReplicaSetController) deleteRS(obj interface{}) { +func (rsc *ReplicaSetController) deleteRS(logger klog.Logger, obj interface{}) { rs, ok := obj.(*apps.ReplicaSet) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) @@ -356,7 +368,7 @@ func (rsc *ReplicaSetController) deleteRS(obj interface{}) { return } - klog.V(4).Infof("Deleting %s %q", rsc.Kind, key) + logger.V(4).Info("Deleting", "replicaSet", klog.KObj(rs)) // Delete expectations for the ReplicaSet so if we create a new one with the same name it starts clean rsc.expectations.DeleteExpectations(key) @@ -365,13 +377,13 @@ func (rsc *ReplicaSetController) deleteRS(obj interface{}) { } // When a pod is created, enqueue the replica set that manages it and update its expectations. -func (rsc *ReplicaSetController) addPod(obj interface{}) { +func (rsc *ReplicaSetController) addPod(logger klog.Logger, obj interface{}) { pod := obj.(*v1.Pod) if pod.DeletionTimestamp != nil { // on a restart of the controller manager, it's possible a new pod shows up in a state that // is already pending deletion. Prevent the pod from being a creation observation. - rsc.deletePod(pod) + rsc.deletePod(logger, pod) return } @@ -385,7 +397,7 @@ func (rsc *ReplicaSetController) addPod(obj interface{}) { if err != nil { return } - klog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod) + logger.V(4).Info("Pod created", "pod", klog.KObj(pod), "detail", pod) rsc.expectations.CreationObserved(rsKey) rsc.queue.Add(rsKey) return @@ -399,7 +411,7 @@ func (rsc *ReplicaSetController) addPod(obj interface{}) { if len(rss) == 0 { return } - klog.V(4).Infof("Orphan Pod %s created: %#v.", pod.Name, pod) + logger.V(4).Info("Orphan Pod created", "pod", klog.KObj(pod), "detail", pod) for _, rs := range rss { rsc.enqueueRS(rs) } @@ -408,7 +420,7 @@ func (rsc *ReplicaSetController) addPod(obj interface{}) { // When a pod is updated, figure out what replica set/s manage it and wake them // up. If the labels of the pod have changed we need to awaken both the old // and new replica set. old and cur must be *v1.Pod types. -func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { +func (rsc *ReplicaSetController) updatePod(logger klog.Logger, old, cur interface{}) { curPod := cur.(*v1.Pod) oldPod := old.(*v1.Pod) if curPod.ResourceVersion == oldPod.ResourceVersion { @@ -424,10 +436,10 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { // for modification of the deletion timestamp and expect an rs to create more replicas asap, not wait // until the kubelet actually deletes the pod. This is different from the Phase of a pod changing, because // an rs never initiates a phase change, and so is never asleep waiting for the same. - rsc.deletePod(curPod) + rsc.deletePod(logger, curPod) if labelChanged { // we don't need to check the oldPod.DeletionTimestamp because DeletionTimestamp cannot be unset. - rsc.deletePod(oldPod) + rsc.deletePod(logger, oldPod) } return } @@ -448,7 +460,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { if rs == nil { return } - klog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) + logger.V(4).Info("Pod objectMeta updated.", "pod", klog.KObj(oldPod), "oldObjectMeta", oldPod.ObjectMeta, "curObjectMeta", curPod.ObjectMeta) rsc.enqueueRS(rs) // TODO: MinReadySeconds in the Pod will generate an Available condition to be added in // the Pod status which in turn will trigger a requeue of the owning replica set thus @@ -458,7 +470,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { // Note that this still suffers from #29229, we are just moving the problem one level // "closer" to kubelet (from the deployment to the replica set controller). if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && rs.Spec.MinReadySeconds > 0 { - klog.V(2).Infof("%v %q will be enqueued after %ds for availability check", rsc.Kind, rs.Name, rs.Spec.MinReadySeconds) + logger.V(2).Info("pod will be enqueued after a while for availability check", "duration", rs.Spec.MinReadySeconds, "kind", rsc.Kind, "pod", klog.KObj(oldPod)) // Add a second to avoid milliseconds skew in AddAfter. // See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info. rsc.enqueueRSAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second) @@ -473,7 +485,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { if len(rss) == 0 { return } - klog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) + logger.V(4).Info("Orphan Pod objectMeta updated.", "pod", klog.KObj(oldPod), "oldObjectMeta", oldPod.ObjectMeta, "curObjectMeta", curPod.ObjectMeta) for _, rs := range rss { rsc.enqueueRS(rs) } @@ -482,7 +494,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { // When a pod is deleted, enqueue the replica set that manages the pod and update its expectations. // obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item. -func (rsc *ReplicaSetController) deletePod(obj interface{}) { +func (rsc *ReplicaSetController) deletePod(logger klog.Logger, obj interface{}) { pod, ok := obj.(*v1.Pod) // When a delete is dropped, the relist will notice a pod in the store not @@ -516,7 +528,7 @@ func (rsc *ReplicaSetController) deletePod(obj interface{}) { utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err)) return } - klog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod) + logger.V(4).Info("Pod deleted", "delete_by", utilruntime.GetCaller(), "deletion_timestamp", pod.DeletionTimestamp, "pod", klog.KObj(pod)) rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod)) rsc.queue.Add(rsKey) } diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go index 40c1f1e50ba..b6fc90c57c4 100644 --- a/pkg/controller/replicaset/replica_set_test.go +++ b/pkg/controller/replicaset/replica_set_test.go @@ -182,7 +182,7 @@ func newPodList(store cache.Store, count int, status v1.PodPhase, labelMap map[s // processSync initiates a sync via processNextWorkItem() to test behavior that // depends on both functions (such as re-queueing on sync error). -func processSync(rsc *ReplicaSetController, key string) error { +func processSync(ctx context.Context, rsc *ReplicaSetController, key string) error { // Save old syncHandler and replace with one that captures the error. oldSyncHandler := rsc.syncHandler defer func() { @@ -194,7 +194,7 @@ func processSync(rsc *ReplicaSetController, key string) error { return syncErr } rsc.queue.Add(key) - rsc.processNextWorkItem(context.TODO()) + rsc.processNextWorkItem(ctx) return syncErr } @@ -215,6 +215,7 @@ func validateSyncReplicaSet(fakePodControl *controller.FakePodControl, expectedC } func TestSyncReplicaSetDoesNothing(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) fakePodControl := controller.FakePodControl{} stopCh := make(chan struct{}) @@ -228,7 +229,7 @@ func TestSyncReplicaSetDoesNothing(t *testing.T) { newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 2, v1.PodRunning, labelMap, rsSpec, "pod") manager.podControl = &fakePodControl - manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t)) + manager.syncReplicaSet(ctx, GetKey(rsSpec, t)) err := validateSyncReplicaSet(&fakePodControl, 0, 0, 0) if err != nil { t.Fatal(err) @@ -236,6 +237,7 @@ func TestSyncReplicaSetDoesNothing(t *testing.T) { } func TestDeleteFinalStateUnknown(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) fakePodControl := controller.FakePodControl{} stopCh := make(chan struct{}) @@ -255,9 +257,9 @@ func TestDeleteFinalStateUnknown(t *testing.T) { rsSpec := newReplicaSet(1, labelMap) informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rsSpec) pods := newPodList(nil, 1, v1.PodRunning, labelMap, rsSpec, "pod") - manager.deletePod(cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]}) + manager.deletePod(logger, cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]}) - go manager.worker(context.TODO()) + go manager.worker(ctx) expected := GetKey(rsSpec, t) select { @@ -273,6 +275,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) { // Tell the rs to create 100 replicas, but simulate a limit (like a quota limit) // of 10, and verify that the rs doesn't make 100 create calls per sync pass func TestSyncReplicaSetCreateFailures(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) fakePodControl := controller.FakePodControl{} fakePodControl.CreateLimit = 10 @@ -286,7 +289,7 @@ func TestSyncReplicaSetCreateFailures(t *testing.T) { informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs) manager.podControl = &fakePodControl - manager.syncReplicaSet(context.TODO(), GetKey(rs, t)) + manager.syncReplicaSet(ctx, GetKey(rs, t)) err := validateSyncReplicaSet(&fakePodControl, fakePodControl.CreateLimit, 0, 0) if err != nil { t.Fatal(err) @@ -302,6 +305,7 @@ func TestSyncReplicaSetCreateFailures(t *testing.T) { func TestSyncReplicaSetDormancy(t *testing.T) { // Setup a test server so we can lie about the current state of pods + _, ctx := ktesting.NewTestContext(t) fakeHandler := utiltesting.FakeHandler{ StatusCode: 200, ResponseBody: "{}", @@ -328,7 +332,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) { rsSpec.Status.Replicas = 1 rsSpec.Status.ReadyReplicas = 1 rsSpec.Status.AvailableReplicas = 1 - manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t)) + manager.syncReplicaSet(ctx, GetKey(rsSpec, t)) err := validateSyncReplicaSet(&fakePodControl, 1, 0, 0) if err != nil { t.Fatal(err) @@ -339,7 +343,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) { rsSpec.Status.ReadyReplicas = 0 rsSpec.Status.AvailableReplicas = 0 fakePodControl.Clear() - manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t)) + manager.syncReplicaSet(ctx, GetKey(rsSpec, t)) err = validateSyncReplicaSet(&fakePodControl, 0, 0, 0) if err != nil { t.Fatal(err) @@ -360,7 +364,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) { fakePodControl.Clear() fakePodControl.Err = fmt.Errorf("fake Error") - manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t)) + manager.syncReplicaSet(ctx, GetKey(rsSpec, t)) err = validateSyncReplicaSet(&fakePodControl, 1, 0, 0) if err != nil { t.Fatal(err) @@ -369,7 +373,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) { // This replica should not need a Lowering of expectations, since the previous create failed fakePodControl.Clear() fakePodControl.Err = nil - manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t)) + manager.syncReplicaSet(ctx, GetKey(rsSpec, t)) err = validateSyncReplicaSet(&fakePodControl, 1, 0, 0) if err != nil { t.Fatal(err) @@ -597,7 +601,7 @@ func TestRelatedPodsLookup(t *testing.T) { } for _, pod := range c.pods { informers.Core().V1().Pods().Informer().GetIndexer().Add(pod) - manager.addPod(pod) + manager.addPod(logger, pod) } actualPods, err := manager.getIndirectlyRelatedPods(logger, c.rs) if err != nil { @@ -616,13 +620,14 @@ func TestRelatedPodsLookup(t *testing.T) { } func TestWatchControllers(t *testing.T) { + fakeWatch := watch.NewFake() client := fake.NewSimpleClientset() client.PrependWatchReactor("replicasets", core.DefaultWatchReactor(fakeWatch, nil)) stopCh := make(chan struct{}) defer close(stopCh) informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) - logger, _ := ktesting.NewTestContext(t) + logger, ctx := ktesting.NewTestContext(t) manager := NewReplicaSetController( logger, informers.Apps().V1().ReplicaSets(), @@ -653,7 +658,7 @@ func TestWatchControllers(t *testing.T) { } // Start only the ReplicaSet watcher and the workqueue, send a watch event, // and make sure it hits the sync method. - go wait.UntilWithContext(context.TODO(), manager.worker, 10*time.Millisecond) + go wait.UntilWithContext(ctx, manager.worker, 10*time.Millisecond) testRSSpec.Name = "foo" fakeWatch.Add(&testRSSpec) @@ -666,6 +671,7 @@ func TestWatchControllers(t *testing.T) { } func TestWatchPods(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) client := fake.NewSimpleClientset() fakeWatch := watch.NewFake() @@ -703,7 +709,7 @@ func TestWatchPods(t *testing.T) { // Start only the pod watcher and the workqueue, send a watch event, // and make sure it hits the sync method for the right ReplicaSet. go informers.Core().V1().Pods().Informer().Run(stopCh) - go manager.Run(context.TODO(), 1) + go manager.Run(ctx, 1) pods := newPodList(nil, 1, v1.PodRunning, labelMap, testRSSpec, "pod") testPod := pods.Items[0] @@ -718,6 +724,7 @@ func TestWatchPods(t *testing.T) { } func TestUpdatePods(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) stopCh := make(chan struct{}) defer close(stopCh) manager, informers := testNewReplicaSetControllerFromClient(t, fake.NewSimpleClientset(), stopCh, BurstReplicas) @@ -737,7 +744,7 @@ func TestUpdatePods(t *testing.T) { return nil } - go wait.UntilWithContext(context.TODO(), manager.worker, 10*time.Millisecond) + go wait.UntilWithContext(ctx, manager.worker, 10*time.Millisecond) // Put 2 ReplicaSets and one pod into the informers labelMap1 := map[string]string{"foo": "bar"} @@ -760,7 +767,7 @@ func TestUpdatePods(t *testing.T) { pod2 := pod1 pod2.Labels = labelMap2 pod2.ResourceVersion = "2" - manager.updatePod(&pod1, &pod2) + manager.updatePod(logger, &pod1, &pod2) expected := sets.NewString(testRSSpec1.Name) for _, name := range expected.List() { t.Logf("Expecting update for %+v", name) @@ -782,7 +789,7 @@ func TestUpdatePods(t *testing.T) { pod2 = pod1 pod2.OwnerReferences = nil pod2.ResourceVersion = "2" - manager.updatePod(&pod1, &pod2) + manager.updatePod(logger, &pod1, &pod2) expected = sets.NewString(testRSSpec2.Name) for _, name := range expected.List() { t.Logf("Expecting update for %+v", name) @@ -805,7 +812,7 @@ func TestUpdatePods(t *testing.T) { pod2 = pod1 pod2.OwnerReferences = nil pod2.ResourceVersion = "2" - manager.updatePod(&pod1, &pod2) + manager.updatePod(logger, &pod1, &pod2) expected = sets.NewString(testRSSpec1.Name, testRSSpec2.Name) for _, name := range expected.List() { t.Logf("Expecting update for %+v", name) @@ -827,7 +834,7 @@ func TestUpdatePods(t *testing.T) { pod2 = pod1 pod2.Labels = labelMap2 pod2.ResourceVersion = "2" - manager.updatePod(&pod1, &pod2) + manager.updatePod(logger, &pod1, &pod2) expected = sets.NewString(testRSSpec2.Name) for _, name := range expected.List() { t.Logf("Expecting update for %+v", name) @@ -843,6 +850,7 @@ func TestUpdatePods(t *testing.T) { } func TestControllerUpdateRequeue(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) // This server should force a requeue of the controller because it fails to update status.Replicas. labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(1, labelMap) @@ -868,7 +876,7 @@ func TestControllerUpdateRequeue(t *testing.T) { // Enqueue once. Then process it. Disable rate-limiting for this. manager.queue = workqueue.NewRateLimitingQueue(workqueue.NewMaxOfRateLimiter()) manager.enqueueRS(rs) - manager.processNextWorkItem(context.TODO()) + manager.processNextWorkItem(ctx) // It should have been requeued. if got, want := manager.queue.Len(), 1; got != want { t.Errorf("queue.Len() = %v, want %v", got, want) @@ -923,6 +931,7 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) { // TODO: This test is too hairy for a unittest. It should be moved to an E2E suite. func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) { + logger, ctx := ktesting.NewTestContext(t) labelMap := map[string]string{"foo": "bar"} rsSpec := newReplicaSet(numReplicas, labelMap) client := fake.NewSimpleClientset(rsSpec) @@ -949,7 +958,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rsSpec) for i := 0; i < numReplicas; i += burstReplicas { - manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t)) + manager.syncReplicaSet(ctx, GetKey(rsSpec, t)) // The store accrues active pods. It's also used by the ReplicaSet to determine how many // replicas to create. @@ -972,7 +981,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) // None of these should wake the controller because it has expectations==BurstReplicas. for i := int32(0); i < expectedPods-1; i++ { informers.Core().V1().Pods().Informer().GetIndexer().Add(&pods.Items[i]) - manager.addPod(&pods.Items[i]) + manager.addPod(logger, &pods.Items[i]) } podExp, exists, err := manager.expectations.GetExpectations(rsKey) @@ -1015,7 +1024,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) // don't double delete. for i := range podsToDelete[1:] { informers.Core().V1().Pods().Informer().GetIndexer().Delete(podsToDelete[i]) - manager.deletePod(podsToDelete[i]) + manager.deletePod(logger, podsToDelete[i]) } podExp, exists, err := manager.expectations.GetExpectations(rsKey) if !exists || err != nil { @@ -1028,7 +1037,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) // Check that the ReplicaSet didn't take any action for all the above pods fakePodControl.Clear() - manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t)) + manager.syncReplicaSet(ctx, GetKey(rsSpec, t)) err := validateSyncReplicaSet(&fakePodControl, 0, 0, 0) if err != nil { t.Fatal(err) @@ -1039,7 +1048,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) // which will cause it to create/delete the remaining replicas up to burstReplicas. if replicas != 0 { informers.Core().V1().Pods().Informer().GetIndexer().Add(&pods.Items[expectedPods-1]) - manager.addPod(&pods.Items[expectedPods-1]) + manager.addPod(logger, &pods.Items[expectedPods-1]) } else { expectedDel := manager.expectations.GetUIDs(GetKey(rsSpec, t)) if expectedDel.Len() != 1 { @@ -1058,7 +1067,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) }, } informers.Core().V1().Pods().Informer().GetIndexer().Delete(lastPod) - manager.deletePod(lastPod) + manager.deletePod(logger, lastPod) } pods.Items = pods.Items[expectedPods:] } @@ -1093,6 +1102,7 @@ func (fe FakeRSExpectations) SatisfiedExpectations(controllerKey string) bool { // TestRSSyncExpectations tests that a pod cannot sneak in between counting active pods // and checking expectations. func TestRSSyncExpectations(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) fakePodControl := controller.FakePodControl{} stopCh := make(chan struct{}) @@ -1115,7 +1125,7 @@ func TestRSSyncExpectations(t *testing.T) { informers.Core().V1().Pods().Informer().GetIndexer().Add(&postExpectationsPod) }, }) - manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t)) + manager.syncReplicaSet(ctx, GetKey(rsSpec, t)) err := validateSyncReplicaSet(&fakePodControl, 0, 0, 0) if err != nil { t.Fatal(err) @@ -1123,6 +1133,7 @@ func TestRSSyncExpectations(t *testing.T) { } func TestDeleteControllerAndExpectations(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) rs := newReplicaSet(1, map[string]string{"foo": "bar"}) client := fake.NewSimpleClientset(rs) stopCh := make(chan struct{}) @@ -1135,7 +1146,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { manager.podControl = &fakePodControl // This should set expectations for the ReplicaSet - manager.syncReplicaSet(context.TODO(), GetKey(rs, t)) + manager.syncReplicaSet(ctx, GetKey(rs, t)) err := validateSyncReplicaSet(&fakePodControl, 1, 0, 0) if err != nil { t.Fatal(err) @@ -1155,8 +1166,8 @@ func TestDeleteControllerAndExpectations(t *testing.T) { t.Errorf("No expectations found for ReplicaSet") } informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Delete(rs) - manager.deleteRS(rs) - manager.syncReplicaSet(context.TODO(), GetKey(rs, t)) + manager.deleteRS(logger, rs) + manager.syncReplicaSet(ctx, GetKey(rs, t)) _, exists, err = manager.expectations.GetExpectations(rsKey) if err != nil { @@ -1169,7 +1180,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { // This should have no effect, since we've deleted the ReplicaSet. podExp.Add(-1, 0) informers.Core().V1().Pods().Informer().GetIndexer().Replace(make([]interface{}, 0), "0") - manager.syncReplicaSet(context.TODO(), GetKey(rs, t)) + manager.syncReplicaSet(ctx, GetKey(rs, t)) err = validateSyncReplicaSet(&fakePodControl, 0, 0, 0) if err != nil { t.Fatal(err) @@ -1177,6 +1188,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { } func TestExpectationsOnRecreate(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) client := fake.NewSimpleClientset() stopCh := make(chan struct{}) defer close(stopCh) @@ -1200,7 +1212,7 @@ func TestExpectationsOnRecreate(t *testing.T) { } oldRS := newReplicaSet(1, map[string]string{"foo": "bar"}) - oldRS, err := client.AppsV1().ReplicaSets(oldRS.Namespace).Create(context.TODO(), oldRS, metav1.CreateOptions{}) + oldRS, err := client.AppsV1().ReplicaSets(oldRS.Namespace).Create(ctx, oldRS, metav1.CreateOptions{}) if err != nil { t.Fatal(err) } @@ -1213,7 +1225,7 @@ func TestExpectationsOnRecreate(t *testing.T) { t.Fatalf("initial RS didn't result in new item in the queue: %v", err) } - ok := manager.processNextWorkItem(context.TODO()) + ok := manager.processNextWorkItem(ctx) if !ok { t.Fatal("queue is shutting down") } @@ -1244,7 +1256,7 @@ func TestExpectationsOnRecreate(t *testing.T) { t.Fatal("Unexpected item in the queue") } - err = client.AppsV1().ReplicaSets(oldRS.Namespace).Delete(context.TODO(), oldRS.Name, metav1.DeleteOptions{}) + err = client.AppsV1().ReplicaSets(oldRS.Namespace).Delete(ctx, oldRS.Name, metav1.DeleteOptions{}) if err != nil { t.Fatal(err) } @@ -1281,7 +1293,7 @@ func TestExpectationsOnRecreate(t *testing.T) { newRS := oldRS.DeepCopy() newRS.UID = uuid.NewUUID() - newRS, err = client.AppsV1().ReplicaSets(newRS.Namespace).Create(context.TODO(), newRS, metav1.CreateOptions{}) + newRS, err = client.AppsV1().ReplicaSets(newRS.Namespace).Create(ctx, newRS, metav1.CreateOptions{}) if err != nil { t.Fatal(err) } @@ -1299,7 +1311,7 @@ func TestExpectationsOnRecreate(t *testing.T) { t.Fatalf("Re-creating RS didn't result in new item in the queue: %v", err) } - ok = manager.processNextWorkItem(context.TODO()) + ok = manager.processNextWorkItem(ctx) if !ok { t.Fatal("Queue is shutting down!") } @@ -1338,6 +1350,7 @@ func shuffle(controllers []*apps.ReplicaSet) []*apps.ReplicaSet { } func TestOverlappingRSs(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) labelMap := map[string]string{"foo": "bar"} @@ -1373,7 +1386,7 @@ func TestOverlappingRSs(t *testing.T) { } rsKey := GetKey(rs, t) - manager.addPod(pod) + manager.addPod(logger, pod) queueRS, _ := manager.queue.Get() if queueRS != rsKey { t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) @@ -1381,6 +1394,7 @@ func TestOverlappingRSs(t *testing.T) { } func TestDeletionTimestamp(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) labelMap := map[string]string{"foo": "bar"} stopCh := make(chan struct{}) @@ -1399,7 +1413,7 @@ func TestDeletionTimestamp(t *testing.T) { manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)}) // A pod added with a deletion timestamp should decrement deletions, not creations. - manager.addPod(&pod) + manager.addPod(logger, &pod) queueRS, _ := manager.queue.Get() if queueRS != rsKey { @@ -1417,7 +1431,7 @@ func TestDeletionTimestamp(t *testing.T) { oldPod := newPodList(nil, 1, v1.PodPending, labelMap, rs, "pod").Items[0] oldPod.ResourceVersion = "2" manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)}) - manager.updatePod(&oldPod, &pod) + manager.updatePod(logger, &oldPod, &pod) queueRS, _ = manager.queue.Get() if queueRS != rsKey { @@ -1446,7 +1460,7 @@ func TestDeletionTimestamp(t *testing.T) { manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(secondPod)}) oldPod.DeletionTimestamp = &metav1.Time{Time: time.Now()} oldPod.ResourceVersion = "2" - manager.updatePod(&oldPod, &pod) + manager.updatePod(logger, &oldPod, &pod) podExp, exists, err = manager.expectations.GetExpectations(rsKey) if !exists || err != nil || podExp.Fulfilled() { @@ -1455,14 +1469,14 @@ func TestDeletionTimestamp(t *testing.T) { // A pod with a non-nil deletion timestamp should also be ignored by the // delete handler, because it's already been counted in the update. - manager.deletePod(&pod) + manager.deletePod(logger, &pod) podExp, exists, err = manager.expectations.GetExpectations(rsKey) if !exists || err != nil || podExp.Fulfilled() { t.Fatalf("Wrong expectations %#v", podExp) } // Deleting the second pod should clear expectations. - manager.deletePod(secondPod) + manager.deletePod(logger, secondPod) queueRS, _ = manager.queue.Get() if queueRS != rsKey { @@ -1487,6 +1501,7 @@ func setupManagerWithGCEnabled(t *testing.T, stopCh chan struct{}, objs ...runti } func TestDoNotPatchPodWithOtherControlRef(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) stopCh := make(chan struct{}) @@ -1499,7 +1514,7 @@ func TestDoNotPatchPodWithOtherControlRef(t *testing.T) { pod := newPod("pod", rs, v1.PodRunning, nil, true) pod.OwnerReferences = []metav1.OwnerReference{otherControllerReference} informers.Core().V1().Pods().Informer().GetIndexer().Add(pod) - err := manager.syncReplicaSet(context.TODO(), GetKey(rs, t)) + err := manager.syncReplicaSet(ctx, GetKey(rs, t)) if err != nil { t.Fatal(err) } @@ -1511,6 +1526,7 @@ func TestDoNotPatchPodWithOtherControlRef(t *testing.T) { } func TestPatchPodFails(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) stopCh := make(chan struct{}) @@ -1525,7 +1541,7 @@ func TestPatchPodFails(t *testing.T) { // control of the pods and requeue to try again. fakePodControl.Err = fmt.Errorf("fake Error") rsKey := GetKey(rs, t) - err := processSync(manager, rsKey) + err := processSync(ctx, manager, rsKey) if err == nil || !strings.Contains(err.Error(), "fake Error") { t.Errorf("expected fake Error, got %+v", err) } @@ -1544,6 +1560,7 @@ func TestPatchPodFails(t *testing.T) { // RS controller shouldn't adopt or create more pods if the rc is about to be // deleted. func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) now := metav1.Now() @@ -1556,7 +1573,7 @@ func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) { informers.Core().V1().Pods().Informer().GetIndexer().Add(pod1) // no patch, no create - err := manager.syncReplicaSet(context.TODO(), GetKey(rs, t)) + err := manager.syncReplicaSet(ctx, GetKey(rs, t)) if err != nil { t.Fatal(err) } @@ -1567,6 +1584,7 @@ func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) { } func TestDoNotAdoptOrCreateIfBeingDeletedRace(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) labelMap := map[string]string{"foo": "bar"} // Bare client says it IS deleted. rs := newReplicaSet(2, labelMap) @@ -1585,7 +1603,7 @@ func TestDoNotAdoptOrCreateIfBeingDeletedRace(t *testing.T) { informers.Core().V1().Pods().Informer().GetIndexer().Add(pod1) // sync should abort. - err := manager.syncReplicaSet(context.TODO(), GetKey(rs, t)) + err := manager.syncReplicaSet(ctx, GetKey(rs, t)) if err == nil { t.Error("syncReplicaSet() err = nil, expected non-nil") } diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index 3d2d1a46114..0318d6dcd8d 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -31,6 +31,7 @@ import ( clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/replicaset" ) @@ -47,10 +48,10 @@ type ReplicationManager struct { } // NewReplicationManager configures a replication manager with the specified event recorder -func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicationManager { +func NewReplicationManager(logger klog.Logger, podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicationManager { eventBroadcaster := record.NewBroadcaster() return &ReplicationManager{ - *replicaset.NewBaseController(informerAdapter{rcInformer}, podInformer, clientsetAdapter{kubeClient}, burstReplicas, + *replicaset.NewBaseController(logger, informerAdapter{rcInformer}, podInformer, clientsetAdapter{kubeClient}, burstReplicas, v1.SchemeGroupVersion.WithKind("ReplicationController"), "replication_controller", "replicationmanager", diff --git a/pkg/controller/util/node/controller_utils.go b/pkg/controller/util/node/controller_utils.go index 0b12b724123..ef7c083e6dd 100644 --- a/pkg/controller/util/node/controller_utils.go +++ b/pkg/controller/util/node/controller_utils.go @@ -175,7 +175,7 @@ func RecordNodeEvent(ctx context.Context, recorder record.EventRecorder, nodeNam } // RecordNodeStatusChange records a event related to a node status change. (Common to lifecycle and ipam) -func RecordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, newStatus string) { +func RecordNodeStatusChange(logger klog.Logger, recorder record.EventRecorder, node *v1.Node, newStatus string) { ref := &v1.ObjectReference{ APIVersion: "v1", Kind: "Node", @@ -183,7 +183,7 @@ func RecordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, newSta UID: node.UID, Namespace: "", } - klog.V(2).InfoS("Recording status change event message for node", "status", newStatus, "node", node.Name) + logger.V(2).Info("Recording status change event message for node", "status", newStatus, "node", node.Name) // TODO: This requires a transaction, either both node status is updated // and event is recorded or neither should happen, see issue #6055. recorder.Eventf(ref, v1.EventTypeNormal, newStatus, "Node %s status is now: %s", node.Name, newStatus) @@ -265,7 +265,7 @@ func CreateUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldOb } // CreateDeleteNodeHandler creates a delete node handler. (Common to lifecycle and ipam) -func CreateDeleteNodeHandler(f func(node *v1.Node) error) func(obj interface{}) { +func CreateDeleteNodeHandler(logger klog.Logger, f func(node *v1.Node) error) func(obj interface{}) { return func(originalObj interface{}) { originalNode, isNode := originalObj.(*v1.Node) // We can get DeletedFinalStateUnknown instead of *v1.Node here and @@ -273,12 +273,12 @@ func CreateDeleteNodeHandler(f func(node *v1.Node) error) func(obj interface{}) if !isNode { deletedState, ok := originalObj.(cache.DeletedFinalStateUnknown) if !ok { - klog.ErrorS(nil, "Received unexpected object", "object", originalObj) + logger.Error(nil, "Received unexpected object", "object", originalObj) return } originalNode, ok = deletedState.Obj.(*v1.Node) if !ok { - klog.ErrorS(nil, "DeletedFinalStateUnknown contained non-Node object", "object", deletedState.Obj) + logger.Error(nil, "DeletedFinalStateUnknown contained non-Node object", "object", deletedState.Obj) return } } diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index fc8e55fb5cc..12d2ea41cb3 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -874,6 +874,7 @@ func (adc *attachDetachController) GetServiceAccountTokenFunc() func(_, _ string func (adc *attachDetachController) DeleteServiceAccountTokenFunc() func(types.UID) { return func(types.UID) { + // nolint:logcheck klog.ErrorS(nil, "DeleteServiceAccountToken unsupported in attachDetachController") } } diff --git a/pkg/controller/volume/attachdetach/testing/testvolumespec.go b/pkg/controller/volume/attachdetach/testing/testvolumespec.go index 58009f5d6fc..0ce94935b27 100644 --- a/pkg/controller/volume/attachdetach/testing/testvolumespec.go +++ b/pkg/controller/volume/attachdetach/testing/testvolumespec.go @@ -29,7 +29,6 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" - "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util" ) @@ -376,7 +375,6 @@ func (plugin *TestPlugin) GetVolumeName(spec *volume.Spec) (string, error) { plugin.pluginLock.Lock() defer plugin.pluginLock.Unlock() if spec == nil { - klog.ErrorS(nil, "GetVolumeName called with nil volume spec") plugin.ErrorEncountered = true return "", fmt.Errorf("GetVolumeName called with nil volume spec") } @@ -400,7 +398,6 @@ func (plugin *TestPlugin) CanSupport(spec *volume.Spec) bool { plugin.pluginLock.Lock() defer plugin.pluginLock.Unlock() if spec == nil { - klog.ErrorS(nil, "CanSupport called with nil volume spec") plugin.ErrorEncountered = true } return true @@ -414,7 +411,6 @@ func (plugin *TestPlugin) NewMounter(spec *volume.Spec, podRef *v1.Pod, opts vol plugin.pluginLock.Lock() defer plugin.pluginLock.Unlock() if spec == nil { - klog.ErrorS(nil, "NewMounter called with nil volume spec") plugin.ErrorEncountered = true } return nil, nil @@ -540,7 +536,6 @@ func (attacher *testPluginAttacher) Attach(spec *volume.Spec, nodeName types.Nod defer attacher.pluginLock.Unlock() if spec == nil { *attacher.ErrorEncountered = true - klog.ErrorS(nil, "Attach called with nil volume spec") return "", fmt.Errorf("Attach called with nil volume spec") } attacher.attachedVolumeMap[string(nodeName)] = append(attacher.attachedVolumeMap[string(nodeName)], spec.Name()) @@ -556,7 +551,6 @@ func (attacher *testPluginAttacher) WaitForAttach(spec *volume.Spec, devicePath defer attacher.pluginLock.Unlock() if spec == nil { *attacher.ErrorEncountered = true - klog.ErrorS(nil, "WaitForAttach called with nil volume spec") return "", fmt.Errorf("WaitForAttach called with nil volume spec") } fakePath := fmt.Sprintf("%s/%s", devicePath, spec.Name()) @@ -568,7 +562,6 @@ func (attacher *testPluginAttacher) GetDeviceMountPath(spec *volume.Spec) (strin defer attacher.pluginLock.Unlock() if spec == nil { *attacher.ErrorEncountered = true - klog.ErrorS(nil, "GetDeviceMountPath called with nil volume spec") return "", fmt.Errorf("GetDeviceMountPath called with nil volume spec") } return "", nil @@ -579,7 +572,6 @@ func (attacher *testPluginAttacher) MountDevice(spec *volume.Spec, devicePath st defer attacher.pluginLock.Unlock() if spec == nil { *attacher.ErrorEncountered = true - klog.ErrorS(nil, "MountDevice called with nil volume spec") return fmt.Errorf("MountDevice called with nil volume spec") } return nil diff --git a/pkg/controller/volume/expand/expand_controller.go b/pkg/controller/volume/expand/expand_controller.go index 140ce01cbb3..a5b5eaea97a 100644 --- a/pkg/controller/volume/expand/expand_controller.go +++ b/pkg/controller/volume/expand/expand_controller.go @@ -454,6 +454,7 @@ func (expc *expandController) GetServiceAccountTokenFunc() func(_, _ string, _ * func (expc *expandController) DeleteServiceAccountTokenFunc() func(types.UID) { return func(types.UID) { + //nolint:logcheck klog.ErrorS(nil, "DeleteServiceAccountToken unsupported in expandController") } } diff --git a/pkg/controller/volume/persistentvolume/pv_controller_test.go b/pkg/controller/volume/persistentvolume/pv_controller_test.go index d2014721517..c75d0b45dd5 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller_test.go +++ b/pkg/controller/volume/persistentvolume/pv_controller_test.go @@ -37,7 +37,6 @@ import ( featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/component-helpers/storage/volume" csitrans "k8s.io/csi-translation-lib" - "k8s.io/klog/v2" "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/controller" pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing" @@ -311,7 +310,7 @@ func TestControllerSync(t *testing.T) { }, }, } - _, ctx := ktesting.NewTestContext(t) + logger, ctx := ktesting.NewTestContext(t) doit := func(test controllerTest) { // Initialize the controller client := &fake.Clientset{} @@ -372,7 +371,7 @@ func TestControllerSync(t *testing.T) { if err != nil { t.Errorf("Test %q controller sync failed: %v", test.name, err) } - klog.V(4).Infof("controller synced, starting test") + logger.V(4).Info("controller synced, starting test") // Call the tested function err = test.test(ctrl, reactor.VolumeReactor, test) diff --git a/pkg/controller/volume/persistentvolume/volume_host.go b/pkg/controller/volume/persistentvolume/volume_host.go index 16eeb9c951f..dc478148d32 100644 --- a/pkg/controller/volume/persistentvolume/volume_host.go +++ b/pkg/controller/volume/persistentvolume/volume_host.go @@ -118,6 +118,7 @@ func (ctrl *PersistentVolumeController) GetServiceAccountTokenFunc() func(_, _ s func (ctrl *PersistentVolumeController) DeleteServiceAccountTokenFunc() func(types.UID) { return func(types.UID) { + //nolint:logcheck klog.ErrorS(nil, "DeleteServiceAccountToken unsupported in PersistentVolumeController") } } diff --git a/test/integration/quota/quota_test.go b/test/integration/quota/quota_test.go index c5e7c55b06a..4393f3fd059 100644 --- a/test/integration/quota/quota_test.go +++ b/test/integration/quota/quota_test.go @@ -60,7 +60,7 @@ const ( // quota_test.go:100: Took 4.196205966s to scale up without quota // quota_test.go:115: Took 12.021640372s to scale up with quota func TestQuota(t *testing.T) { - _, ctx := ktesting.NewTestContext(t) + logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -82,6 +82,7 @@ func TestQuota(t *testing.T) { informers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) rm := replicationcontroller.NewReplicationManager( + logger, informers.Core().V1().Pods(), informers.Core().V1().ReplicationControllers(), clientset, @@ -290,7 +291,7 @@ plugins: t.Fatal(err) } - _, ctx := ktesting.NewTestContext(t) + logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -312,6 +313,7 @@ plugins: informers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) rm := replicationcontroller.NewReplicationManager( + logger, informers.Core().V1().Pods(), informers.Core().V1().ReplicationControllers(), clientset, @@ -417,7 +419,7 @@ plugins: t.Fatal(err) } - _, ctx := ktesting.NewTestContext(t) + logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -439,6 +441,7 @@ plugins: informers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) rm := replicationcontroller.NewReplicationManager( + logger, informers.Core().V1().Pods(), informers.Core().V1().ReplicationControllers(), clientset, diff --git a/test/integration/replicationcontroller/replicationcontroller_test.go b/test/integration/replicationcontroller/replicationcontroller_test.go index aa3893889a0..5258451cb5e 100644 --- a/test/integration/replicationcontroller/replicationcontroller_test.go +++ b/test/integration/replicationcontroller/replicationcontroller_test.go @@ -23,7 +23,7 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -38,6 +38,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/retry" featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/klog/v2/ktesting" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller/replication" @@ -122,7 +123,9 @@ func rmSetup(t *testing.T) (kubeapiservertesting.TearDownFunc, *replication.Repl resyncPeriod := 12 * time.Hour informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "rc-informers")), resyncPeriod) + logger, _ := ktesting.NewTestContext(t) rm := replication.NewReplicationManager( + logger, informers.Core().V1().Pods(), informers.Core().V1().ReplicationControllers(), clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "replication-controller")),