mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-09 12:07:47 +00:00
Migrated pkg/controller/endpoint to contextual logging
Signed-off-by: xin.li <xin.li@daocloud.io>
This commit is contained in:
parent
fe91bc257b
commit
325205efb7
@ -42,7 +42,6 @@ contextual k8s.io/kubernetes/test/e2e/dra/.*
|
|||||||
-contextual k8s.io/kubernetes/pkg/controller/controller_utils.go
|
-contextual k8s.io/kubernetes/pkg/controller/controller_utils.go
|
||||||
-contextual k8s.io/kubernetes/pkg/controller/deployment/.*
|
-contextual k8s.io/kubernetes/pkg/controller/deployment/.*
|
||||||
-contextual k8s.io/kubernetes/pkg/controller/disruption/.*
|
-contextual k8s.io/kubernetes/pkg/controller/disruption/.*
|
||||||
-contextual k8s.io/kubernetes/pkg/controller/endpoint/.*
|
|
||||||
-contextual k8s.io/kubernetes/pkg/controller/endpointslice/.*
|
-contextual k8s.io/kubernetes/pkg/controller/endpointslice/.*
|
||||||
-contextual k8s.io/kubernetes/pkg/controller/endpointslicemirroring/.*
|
-contextual k8s.io/kubernetes/pkg/controller/endpointslicemirroring/.*
|
||||||
-contextual k8s.io/kubernetes/pkg/controller/garbagecollector/.*
|
-contextual k8s.io/kubernetes/pkg/controller/garbagecollector/.*
|
||||||
|
@ -169,8 +169,9 @@ func (e *Controller) Run(ctx context.Context, workers int) {
|
|||||||
|
|
||||||
defer e.queue.ShutDown()
|
defer e.queue.ShutDown()
|
||||||
|
|
||||||
klog.Infof("Starting endpoint controller")
|
logger := klog.FromContext(ctx)
|
||||||
defer klog.Infof("Shutting down endpoint controller")
|
logger.Info("Starting endpoint controller")
|
||||||
|
defer logger.Info("Shutting down endpoint controller")
|
||||||
|
|
||||||
if !cache.WaitForNamedCacheSync("endpoint", ctx.Done(), e.podsSynced, e.servicesSynced, e.endpointsSynced) {
|
if !cache.WaitForNamedCacheSync("endpoint", ctx.Done(), e.podsSynced, e.servicesSynced, e.endpointsSynced) {
|
||||||
return
|
return
|
||||||
@ -321,13 +322,14 @@ func (e *Controller) processNextWorkItem(ctx context.Context) bool {
|
|||||||
}
|
}
|
||||||
defer e.queue.Done(eKey)
|
defer e.queue.Done(eKey)
|
||||||
|
|
||||||
|
logger := klog.FromContext(ctx)
|
||||||
err := e.syncService(ctx, eKey.(string))
|
err := e.syncService(ctx, eKey.(string))
|
||||||
e.handleErr(err, eKey)
|
e.handleErr(logger, err, eKey)
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Controller) handleErr(err error, key interface{}) {
|
func (e *Controller) handleErr(logger klog.Logger, err error, key interface{}) {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
e.queue.Forget(key)
|
e.queue.Forget(key)
|
||||||
return
|
return
|
||||||
@ -335,30 +337,31 @@ func (e *Controller) handleErr(err error, key interface{}) {
|
|||||||
|
|
||||||
ns, name, keyErr := cache.SplitMetaNamespaceKey(key.(string))
|
ns, name, keyErr := cache.SplitMetaNamespaceKey(key.(string))
|
||||||
if keyErr != nil {
|
if keyErr != nil {
|
||||||
klog.ErrorS(err, "Failed to split meta namespace cache key", "key", key)
|
logger.Error(err, "Failed to split meta namespace cache key", "key", key)
|
||||||
}
|
}
|
||||||
|
|
||||||
if e.queue.NumRequeues(key) < maxRetries {
|
if e.queue.NumRequeues(key) < maxRetries {
|
||||||
klog.V(2).InfoS("Error syncing endpoints, retrying", "service", klog.KRef(ns, name), "err", err)
|
logger.V(2).Info("Error syncing endpoints, retrying", "service", klog.KRef(ns, name), "err", err)
|
||||||
e.queue.AddRateLimited(key)
|
e.queue.AddRateLimited(key)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.Warningf("Dropping service %q out of the queue: %v", key, err)
|
logger.Info("Dropping service out of the queue", "service", klog.KRef(ns, name), "err", err)
|
||||||
e.queue.Forget(key)
|
e.queue.Forget(key)
|
||||||
utilruntime.HandleError(err)
|
utilruntime.HandleError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Controller) syncService(ctx context.Context, key string) error {
|
func (e *Controller) syncService(ctx context.Context, key string) error {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
defer func() {
|
logger := klog.FromContext(ctx)
|
||||||
klog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Since(startTime))
|
|
||||||
}()
|
|
||||||
|
|
||||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
defer func() {
|
||||||
|
logger.V(4).Info("Finished syncing service endpoints", "service", klog.KRef(namespace, name), "startTime", time.Since(startTime))
|
||||||
|
}()
|
||||||
|
|
||||||
service, err := e.serviceLister.Services(namespace).Get(name)
|
service, err := e.serviceLister.Services(namespace).Get(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.IsNotFound(err) {
|
if !errors.IsNotFound(err) {
|
||||||
@ -390,7 +393,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(5).Infof("About to update endpoints for service %q", key)
|
logger.V(5).Info("About to update endpoints for service", "service", klog.KRef(namespace, name))
|
||||||
pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())
|
pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Since we're getting stuff from a local cache, it is
|
// Since we're getting stuff from a local cache, it is
|
||||||
@ -410,7 +413,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
|
|||||||
|
|
||||||
for _, pod := range pods {
|
for _, pod := range pods {
|
||||||
if !endpointutil.ShouldPodBeInEndpoints(pod, service.Spec.PublishNotReadyAddresses) {
|
if !endpointutil.ShouldPodBeInEndpoints(pod, service.Spec.PublishNotReadyAddresses) {
|
||||||
klog.V(5).Infof("Pod %s/%s is not included on endpoints for Service %s/%s", pod.Namespace, pod.Name, service.Namespace, service.Name)
|
logger.V(5).Info("Pod is not included on endpoints for Service", "pod", klog.KObj(pod), "service", klog.KObj(service))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -418,7 +421,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
// this will happen, if the cluster runs with some nodes configured as dual stack and some as not
|
// this will happen, if the cluster runs with some nodes configured as dual stack and some as not
|
||||||
// such as the case of an upgrade..
|
// such as the case of an upgrade..
|
||||||
klog.V(2).Infof("Failed to find endpoint for service:%s with ClusterIP:%s on pod:%s with error:%v", service.Name, service.Spec.ClusterIP, klog.KObj(pod), err)
|
logger.V(2).Info("Failed to find endpoint for service with ClusterIP on pod with error", "service", klog.KObj(service), "clusterIP", service.Spec.ClusterIP, "pod", klog.KObj(pod), "error", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -430,7 +433,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
|
|||||||
// Allow headless service not to have ports.
|
// Allow headless service not to have ports.
|
||||||
if len(service.Spec.Ports) == 0 {
|
if len(service.Spec.Ports) == 0 {
|
||||||
if service.Spec.ClusterIP == api.ClusterIPNone {
|
if service.Spec.ClusterIP == api.ClusterIPNone {
|
||||||
subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses)
|
subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(logger, subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses)
|
||||||
// No need to repack subsets for headless service without ports.
|
// No need to repack subsets for headless service without ports.
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -438,13 +441,13 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
|
|||||||
servicePort := &service.Spec.Ports[i]
|
servicePort := &service.Spec.Ports[i]
|
||||||
portNum, err := podutil.FindPort(pod, servicePort)
|
portNum, err := podutil.FindPort(pod, servicePort)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
|
logger.V(4).Info("Failed to find port for service", "service", klog.KObj(service), "error", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
epp := endpointPortFromServicePort(servicePort, portNum)
|
epp := endpointPortFromServicePort(servicePort, portNum)
|
||||||
|
|
||||||
var readyEps, notReadyEps int
|
var readyEps, notReadyEps int
|
||||||
subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, service.Spec.PublishNotReadyAddresses)
|
subsets, readyEps, notReadyEps = addEndpointSubset(logger, subsets, pod, epa, epp, service.Spec.PublishNotReadyAddresses)
|
||||||
totalReadyEps = totalReadyEps + readyEps
|
totalReadyEps = totalReadyEps + readyEps
|
||||||
totalNotReadyEps = totalNotReadyEps + notReadyEps
|
totalNotReadyEps = totalNotReadyEps + notReadyEps
|
||||||
}
|
}
|
||||||
@ -483,7 +486,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
|
|||||||
endpointutil.EndpointSubsetsEqualIgnoreResourceVersion(currentEndpoints.Subsets, subsets) &&
|
endpointutil.EndpointSubsetsEqualIgnoreResourceVersion(currentEndpoints.Subsets, subsets) &&
|
||||||
apiequality.Semantic.DeepEqual(compareLabels, service.Labels) &&
|
apiequality.Semantic.DeepEqual(compareLabels, service.Labels) &&
|
||||||
capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) {
|
capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) {
|
||||||
klog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
|
logger.V(5).Info("endpoints are equal, skipping update", "service", klog.KObj(service))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
newEndpoints := currentEndpoints.DeepCopy()
|
newEndpoints := currentEndpoints.DeepCopy()
|
||||||
@ -516,7 +519,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
|
|||||||
newEndpoints.Labels = utillabels.CloneAndRemoveLabel(newEndpoints.Labels, v1.IsHeadlessService)
|
newEndpoints.Labels = utillabels.CloneAndRemoveLabel(newEndpoints.Labels, v1.IsHeadlessService)
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(4).Infof("Update endpoints for %v/%v, ready: %d not ready: %d", service.Namespace, service.Name, totalReadyEps, totalNotReadyEps)
|
logger.V(4).Info("Update endpoints", "service", klog.KObj(service), "readyEndpoints", totalReadyEps, "notreadyEndpoints", totalNotReadyEps)
|
||||||
if createEndpoints {
|
if createEndpoints {
|
||||||
// No previous endpoints, create them
|
// No previous endpoints, create them
|
||||||
_, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{})
|
_, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{})
|
||||||
@ -530,7 +533,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
|
|||||||
// 1. namespace is terminating, endpoint creation is not allowed by default.
|
// 1. namespace is terminating, endpoint creation is not allowed by default.
|
||||||
// 2. policy is misconfigured, in which case no service would function anywhere.
|
// 2. policy is misconfigured, in which case no service would function anywhere.
|
||||||
// Given the frequency of 1, we log at a lower level.
|
// Given the frequency of 1, we log at a lower level.
|
||||||
klog.V(5).Infof("Forbidden from creating endpoints: %v", err)
|
logger.V(5).Info("Forbidden from creating endpoints", "error", err)
|
||||||
|
|
||||||
// If the namespace is terminating, creates will continue to fail. Simply drop the item.
|
// If the namespace is terminating, creates will continue to fail. Simply drop the item.
|
||||||
if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
|
if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
|
||||||
@ -583,7 +586,7 @@ func (e *Controller) checkLeftoverEndpoints() {
|
|||||||
// The addresses are added to the corresponding field, ready or not ready, depending
|
// The addresses are added to the corresponding field, ready or not ready, depending
|
||||||
// on the pod status and the Service PublishNotReadyAddresses field value.
|
// on the pod status and the Service PublishNotReadyAddresses field value.
|
||||||
// The pod passed to this function must have already been filtered through ShouldPodBeInEndpoints.
|
// The pod passed to this function must have already been filtered through ShouldPodBeInEndpoints.
|
||||||
func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress,
|
func addEndpointSubset(logger klog.Logger, subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress,
|
||||||
epp *v1.EndpointPort, tolerateUnreadyEndpoints bool) ([]v1.EndpointSubset, int, int) {
|
epp *v1.EndpointPort, tolerateUnreadyEndpoints bool) ([]v1.EndpointSubset, int, int) {
|
||||||
var readyEps int
|
var readyEps int
|
||||||
var notReadyEps int
|
var notReadyEps int
|
||||||
@ -598,7 +601,7 @@ func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.Endpoint
|
|||||||
})
|
})
|
||||||
readyEps++
|
readyEps++
|
||||||
} else { // if it is not a ready address it has to be not ready
|
} else { // if it is not a ready address it has to be not ready
|
||||||
klog.V(5).Infof("Pod is out of service: %s/%s", pod.Namespace, pod.Name)
|
logger.V(5).Info("Pod is out of service", "pod", klog.KObj(pod))
|
||||||
subsets = append(subsets, v1.EndpointSubset{
|
subsets = append(subsets, v1.EndpointSubset{
|
||||||
NotReadyAddresses: []v1.EndpointAddress{epa},
|
NotReadyAddresses: []v1.EndpointAddress{epa},
|
||||||
Ports: ports,
|
Ports: ports,
|
||||||
|
Loading…
Reference in New Issue
Block a user