From dad9c77a74b948ae2ad502f703f0fc8c664a15b4 Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Tue, 3 Sep 2024 14:54:53 -0700 Subject: [PATCH] bugfix: endpoints controller track resource version conrrectly The endpoints controller store the resource version of the previous Endpoints objects to avoid issues related to stale information on the cache. However, there can be update operations that succeed without increasing the resource version, causing the endpoints controller to declare stale the existing Resource Version and stopping the Endpoints to be updated. Co-Author-By: Quan Tian Co-Author-By: Yang Yang --- .../endpoint/endpoints_controller.go | 8 +- test/integration/endpoints/endpoints_test.go | 198 ++++++++++++++++++ 2 files changed, 204 insertions(+), 2 deletions(-) diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 19596223e52..8a61211f608 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -532,12 +532,13 @@ func (e *Controller) syncService(ctx context.Context, key string) error { } logger.V(4).Info("Update endpoints", "service", klog.KObj(service), "readyEndpoints", totalReadyEps, "notreadyEndpoints", totalNotReadyEps) + var updatedEndpoints *v1.Endpoints if createEndpoints { // No previous endpoints, create them _, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{}) } else { // Pre-existing - _, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{}) + updatedEndpoints, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{}) } if err != nil { if createEndpoints && errors.IsForbidden(err) { @@ -564,7 +565,10 @@ func (e *Controller) syncService(ctx context.Context, key string) error { // If the current endpoints is updated we track the old resource version, so // if we obtain this resource version again from the lister we know is outdated // and we need to retry later to wait for the informer cache to be up-to-date. - if !createEndpoints { + // there are some operations (webhooks, truncated endpoints, ...) that can potentially cause endpoints updates became noop + // and return the same resourceVersion. + // Ref: https://issues.k8s.io/127370 , https://issues.k8s.io/126578 + if updatedEndpoints != nil && updatedEndpoints.ResourceVersion != currentEndpoints.ResourceVersion { e.staleEndpointsTracker.Stale(currentEndpoints) } return nil diff --git a/test/integration/endpoints/endpoints_test.go b/test/integration/endpoints/endpoints_test.go index 2dc71791d77..516482273c9 100644 --- a/test/integration/endpoints/endpoints_test.go +++ b/test/integration/endpoints/endpoints_test.go @@ -28,6 +28,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" @@ -35,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/controller/endpoint" "k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/utils/ktesting" + netutils "k8s.io/utils/net" ) func TestEndpointUpdates(t *testing.T) { @@ -605,3 +607,199 @@ func newExternalNameService(namespace, name string) *v1.Service { svc.Spec.ExternalName = "google.com" return svc } + +func TestEndpointTruncate(t *testing.T) { + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) + defer server.TearDownFn() + + client, err := clientset.NewForConfig(server.ClientConfig) + if err != nil { + t.Fatalf("Error creating clientset: %v", err) + } + + informers := informers.NewSharedInformerFactory(client, 0) + + tCtx := ktesting.Init(t) + epController := endpoint.NewEndpointController( + tCtx, + informers.Core().V1().Pods(), + informers.Core().V1().Services(), + informers.Core().V1().Endpoints(), + client, + 0) + + // Start informer and controllers + informers.Start(tCtx.Done()) + go epController.Run(tCtx, 1) + + // Create namespace + ns := framework.CreateNamespaceOrDie(client, "test-endpoints-truncate", t) + defer framework.DeleteNamespaceOrDie(client, ns, t) + + // Create a pod with labels + basePod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Labels: labelMap(), + }, + Spec: v1.PodSpec{ + NodeName: "fake-node", + Containers: []v1.Container{ + { + Name: "fakename", + Image: "fakeimage", + Ports: []v1.ContainerPort{ + { + Name: "port-443", + ContainerPort: 443, + }, + }, + }, + }, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + }, + PodIP: "10.0.0.1", + PodIPs: []v1.PodIP{ + { + IP: "10.0.0.1", + }, + }, + }, + } + + // create 1001 Pods to reach endpoint max capacity that is set to 1000 + allPodNames := sets.New[string]() + baseIP := netutils.BigForIP(netutils.ParseIPSloppy("10.0.0.1")) + for i := 0; i < 1001; i++ { + pod := basePod.DeepCopy() + pod.Name = fmt.Sprintf("%s-%d", basePod.Name, i) + allPodNames.Insert(pod.Name) + podIP := netutils.AddIPOffset(baseIP, i).String() + pod.Status.PodIP = podIP + pod.Status.PodIPs[0] = v1.PodIP{IP: podIP} + createdPod, err := client.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create pod %s: %v", pod.Name, err) + } + + createdPod.Status = pod.Status + _, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, createdPod, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err) + } + } + + // Create a service associated to the pod + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + Namespace: ns.Name, + Labels: map[string]string{ + "foo": "bar", + }, + }, + Spec: v1.ServiceSpec{ + Selector: map[string]string{ + "foo": "bar", + }, + Ports: []v1.ServicePort{ + {Name: "port-443", Port: 443, Protocol: "TCP", TargetPort: intstr.FromInt32(443)}, + }, + }, + } + _, err = client.CoreV1().Services(ns.Name).Create(tCtx, svc, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create service %s: %v", svc.Name, err) + } + + var truncatedPodName string + // poll until associated Endpoints to the previously created Service exists + if err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Second, true, func(context.Context) (bool, error) { + podNames := sets.New[string]() + endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{}) + if err != nil { + return false, nil + } + + for _, subset := range endpoints.Subsets { + for _, address := range subset.Addresses { + podNames.Insert(address.TargetRef.Name) + } + } + + if podNames.Len() != 1000 { + return false, nil + } + + truncated, ok := endpoints.Annotations[v1.EndpointsOverCapacity] + if !ok || truncated != "truncated" { + return false, nil + } + // There is only 1 truncated Pod. + truncatedPodName, _ = allPodNames.Difference(podNames).PopAny() + return true, nil + }); err != nil { + t.Fatalf("endpoints not found: %v", err) + } + + // Update the truncated Pod several times to make endpoints controller resync the service. + truncatedPod, err := client.CoreV1().Pods(ns.Name).Get(tCtx, truncatedPodName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get pod %s: %v", truncatedPodName, err) + } + for i := 0; i < 10; i++ { + truncatedPod.Status.Conditions[0].Status = v1.ConditionFalse + truncatedPod, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, truncatedPod, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update status of pod %s: %v", truncatedPod.Name, err) + } + truncatedPod.Status.Conditions[0].Status = v1.ConditionTrue + truncatedPod, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, truncatedPod, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update status of pod %s: %v", truncatedPod.Name, err) + } + } + + // delete 501 Pods + for i := 500; i < 1001; i++ { + podName := fmt.Sprintf("%s-%d", basePod.Name, i) + err = client.CoreV1().Pods(ns.Name).Delete(tCtx, podName, metav1.DeleteOptions{}) + if err != nil { + t.Fatalf("error deleting test pod: %v", err) + } + } + + // poll until endpoints for deleted Pod are no longer in Endpoints. + if err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Second, true, func(context.Context) (bool, error) { + endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{}) + if err != nil { + return false, nil + } + + numEndpoints := 0 + for _, subset := range endpoints.Subsets { + numEndpoints += len(subset.Addresses) + } + + if numEndpoints != 500 { + return false, nil + } + + truncated, ok := endpoints.Annotations[v1.EndpointsOverCapacity] + if ok || truncated == "truncated" { + return false, nil + } + + return true, nil + }); err != nil { + t.Fatalf("error checking for no endpoints with terminating pods: %v", err) + } +}