From 325205efb72ec4aeb226befe9da2ef8fcb055c6b Mon Sep 17 00:00:00 2001 From: "xin.li" Date: Mon, 20 Mar 2023 21:25:15 +0800 Subject: [PATCH] Migrated pkg/controller/endpoint to contextual logging Signed-off-by: xin.li --- hack/logcheck.conf | 1 - .../endpoint/endpoints_controller.go | 47 ++++++++++--------- 2 files changed, 25 insertions(+), 23 deletions(-) diff --git a/hack/logcheck.conf b/hack/logcheck.conf index 5161fc58878..49ff0e0d44b 100644 --- a/hack/logcheck.conf +++ b/hack/logcheck.conf @@ -42,7 +42,6 @@ contextual k8s.io/kubernetes/test/e2e/dra/.* -contextual k8s.io/kubernetes/pkg/controller/controller_utils.go -contextual k8s.io/kubernetes/pkg/controller/deployment/.* -contextual k8s.io/kubernetes/pkg/controller/disruption/.* --contextual k8s.io/kubernetes/pkg/controller/endpoint/.* -contextual k8s.io/kubernetes/pkg/controller/endpointslice/.* -contextual k8s.io/kubernetes/pkg/controller/endpointslicemirroring/.* -contextual k8s.io/kubernetes/pkg/controller/garbagecollector/.* diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index cde9028c5c1..f2a6e19947a 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -169,8 +169,9 @@ func (e *Controller) Run(ctx context.Context, workers int) { defer e.queue.ShutDown() - klog.Infof("Starting endpoint controller") - defer klog.Infof("Shutting down endpoint controller") + logger := klog.FromContext(ctx) + logger.Info("Starting endpoint controller") + defer logger.Info("Shutting down endpoint controller") if !cache.WaitForNamedCacheSync("endpoint", ctx.Done(), e.podsSynced, e.servicesSynced, e.endpointsSynced) { return @@ -321,13 +322,14 @@ func (e *Controller) processNextWorkItem(ctx context.Context) bool { } defer e.queue.Done(eKey) + logger := klog.FromContext(ctx) err := e.syncService(ctx, eKey.(string)) - e.handleErr(err, eKey) + e.handleErr(logger, err, eKey) return true } -func (e *Controller) handleErr(err error, key interface{}) { +func (e *Controller) handleErr(logger klog.Logger, err error, key interface{}) { if err == nil { e.queue.Forget(key) return @@ -335,30 +337,31 @@ func (e *Controller) handleErr(err error, key interface{}) { ns, name, keyErr := cache.SplitMetaNamespaceKey(key.(string)) if keyErr != nil { - klog.ErrorS(err, "Failed to split meta namespace cache key", "key", key) + logger.Error(err, "Failed to split meta namespace cache key", "key", key) } if e.queue.NumRequeues(key) < maxRetries { - klog.V(2).InfoS("Error syncing endpoints, retrying", "service", klog.KRef(ns, name), "err", err) + logger.V(2).Info("Error syncing endpoints, retrying", "service", klog.KRef(ns, name), "err", err) e.queue.AddRateLimited(key) return } - klog.Warningf("Dropping service %q out of the queue: %v", key, err) + logger.Info("Dropping service out of the queue", "service", klog.KRef(ns, name), "err", err) e.queue.Forget(key) utilruntime.HandleError(err) } func (e *Controller) syncService(ctx context.Context, key string) error { startTime := time.Now() - defer func() { - klog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Since(startTime)) - }() - + logger := klog.FromContext(ctx) namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err } + defer func() { + logger.V(4).Info("Finished syncing service endpoints", "service", klog.KRef(namespace, name), "startTime", time.Since(startTime)) + }() + service, err := e.serviceLister.Services(namespace).Get(name) if err != nil { if !errors.IsNotFound(err) { @@ -390,7 +393,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { return nil } - klog.V(5).Infof("About to update endpoints for service %q", key) + logger.V(5).Info("About to update endpoints for service", "service", klog.KRef(namespace, name)) pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated()) if err != nil { // Since we're getting stuff from a local cache, it is @@ -410,7 +413,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { for _, pod := range pods { if !endpointutil.ShouldPodBeInEndpoints(pod, service.Spec.PublishNotReadyAddresses) { - klog.V(5).Infof("Pod %s/%s is not included on endpoints for Service %s/%s", pod.Namespace, pod.Name, service.Namespace, service.Name) + logger.V(5).Info("Pod is not included on endpoints for Service", "pod", klog.KObj(pod), "service", klog.KObj(service)) continue } @@ -418,7 +421,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { if err != nil { // this will happen, if the cluster runs with some nodes configured as dual stack and some as not // such as the case of an upgrade.. - klog.V(2).Infof("Failed to find endpoint for service:%s with ClusterIP:%s on pod:%s with error:%v", service.Name, service.Spec.ClusterIP, klog.KObj(pod), err) + logger.V(2).Info("Failed to find endpoint for service with ClusterIP on pod with error", "service", klog.KObj(service), "clusterIP", service.Spec.ClusterIP, "pod", klog.KObj(pod), "error", err) continue } @@ -430,7 +433,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { // Allow headless service not to have ports. if len(service.Spec.Ports) == 0 { if service.Spec.ClusterIP == api.ClusterIPNone { - subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses) + subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(logger, subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses) // No need to repack subsets for headless service without ports. } } else { @@ -438,13 +441,13 @@ func (e *Controller) syncService(ctx context.Context, key string) error { servicePort := &service.Spec.Ports[i] portNum, err := podutil.FindPort(pod, servicePort) if err != nil { - klog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err) + logger.V(4).Info("Failed to find port for service", "service", klog.KObj(service), "error", err) continue } epp := endpointPortFromServicePort(servicePort, portNum) var readyEps, notReadyEps int - subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, service.Spec.PublishNotReadyAddresses) + subsets, readyEps, notReadyEps = addEndpointSubset(logger, subsets, pod, epa, epp, service.Spec.PublishNotReadyAddresses) totalReadyEps = totalReadyEps + readyEps totalNotReadyEps = totalNotReadyEps + notReadyEps } @@ -483,7 +486,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { endpointutil.EndpointSubsetsEqualIgnoreResourceVersion(currentEndpoints.Subsets, subsets) && apiequality.Semantic.DeepEqual(compareLabels, service.Labels) && capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) { - klog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) + logger.V(5).Info("endpoints are equal, skipping update", "service", klog.KObj(service)) return nil } newEndpoints := currentEndpoints.DeepCopy() @@ -516,7 +519,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { newEndpoints.Labels = utillabels.CloneAndRemoveLabel(newEndpoints.Labels, v1.IsHeadlessService) } - klog.V(4).Infof("Update endpoints for %v/%v, ready: %d not ready: %d", service.Namespace, service.Name, totalReadyEps, totalNotReadyEps) + logger.V(4).Info("Update endpoints", "service", klog.KObj(service), "readyEndpoints", totalReadyEps, "notreadyEndpoints", totalNotReadyEps) if createEndpoints { // No previous endpoints, create them _, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{}) @@ -530,7 +533,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { // 1. namespace is terminating, endpoint creation is not allowed by default. // 2. policy is misconfigured, in which case no service would function anywhere. // Given the frequency of 1, we log at a lower level. - klog.V(5).Infof("Forbidden from creating endpoints: %v", err) + logger.V(5).Info("Forbidden from creating endpoints", "error", err) // If the namespace is terminating, creates will continue to fail. Simply drop the item. if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) { @@ -583,7 +586,7 @@ func (e *Controller) checkLeftoverEndpoints() { // The addresses are added to the corresponding field, ready or not ready, depending // on the pod status and the Service PublishNotReadyAddresses field value. // The pod passed to this function must have already been filtered through ShouldPodBeInEndpoints. -func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress, +func addEndpointSubset(logger klog.Logger, subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress, epp *v1.EndpointPort, tolerateUnreadyEndpoints bool) ([]v1.EndpointSubset, int, int) { var readyEps int var notReadyEps int @@ -598,7 +601,7 @@ func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.Endpoint }) readyEps++ } else { // if it is not a ready address it has to be not ready - klog.V(5).Infof("Pod is out of service: %s/%s", pod.Namespace, pod.Name) + logger.V(5).Info("Pod is out of service", "pod", klog.KObj(pod)) subsets = append(subsets, v1.EndpointSubset{ NotReadyAddresses: []v1.EndpointAddress{epa}, Ports: ports,