diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 19596223e52..8a61211f608 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -532,12 +532,13 @@ func (e *Controller) syncService(ctx context.Context, key string) error { } logger.V(4).Info("Update endpoints", "service", klog.KObj(service), "readyEndpoints", totalReadyEps, "notreadyEndpoints", totalNotReadyEps) + var updatedEndpoints *v1.Endpoints if createEndpoints { // No previous endpoints, create them _, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{}) } else { // Pre-existing - _, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{}) + updatedEndpoints, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{}) } if err != nil { if createEndpoints && errors.IsForbidden(err) { @@ -564,7 +565,10 @@ func (e *Controller) syncService(ctx context.Context, key string) error { // If the current endpoints is updated we track the old resource version, so // if we obtain this resource version again from the lister we know is outdated // and we need to retry later to wait for the informer cache to be up-to-date. - if !createEndpoints { + // there are some operations (webhooks, truncated endpoints, ...) that can potentially cause endpoints updates became noop + // and return the same resourceVersion. + // Ref: https://issues.k8s.io/127370 , https://issues.k8s.io/126578 + if updatedEndpoints != nil && updatedEndpoints.ResourceVersion != currentEndpoints.ResourceVersion { e.staleEndpointsTracker.Stale(currentEndpoints) } return nil diff --git a/test/integration/endpoints/endpoints_test.go b/test/integration/endpoints/endpoints_test.go index 2dc71791d77..516482273c9 100644 --- a/test/integration/endpoints/endpoints_test.go +++ b/test/integration/endpoints/endpoints_test.go @@ -28,6 +28,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" @@ -35,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/controller/endpoint" "k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/utils/ktesting" + netutils "k8s.io/utils/net" ) func TestEndpointUpdates(t *testing.T) { @@ -605,3 +607,199 @@ func newExternalNameService(namespace, name string) *v1.Service { svc.Spec.ExternalName = "google.com" return svc } + +func TestEndpointTruncate(t *testing.T) { + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) + defer server.TearDownFn() + + client, err := clientset.NewForConfig(server.ClientConfig) + if err != nil { + t.Fatalf("Error creating clientset: %v", err) + } + + informers := informers.NewSharedInformerFactory(client, 0) + + tCtx := ktesting.Init(t) + epController := endpoint.NewEndpointController( + tCtx, + informers.Core().V1().Pods(), + informers.Core().V1().Services(), + informers.Core().V1().Endpoints(), + client, + 0) + + // Start informer and controllers + informers.Start(tCtx.Done()) + go epController.Run(tCtx, 1) + + // Create namespace + ns := framework.CreateNamespaceOrDie(client, "test-endpoints-truncate", t) + defer framework.DeleteNamespaceOrDie(client, ns, t) + + // Create a pod with labels + basePod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Labels: labelMap(), + }, + Spec: v1.PodSpec{ + NodeName: "fake-node", + Containers: []v1.Container{ + { + Name: "fakename", + Image: "fakeimage", + Ports: []v1.ContainerPort{ + { + Name: "port-443", + ContainerPort: 443, + }, + }, + }, + }, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + }, + PodIP: "10.0.0.1", + PodIPs: []v1.PodIP{ + { + IP: "10.0.0.1", + }, + }, + }, + } + + // create 1001 Pods to reach endpoint max capacity that is set to 1000 + allPodNames := sets.New[string]() + baseIP := netutils.BigForIP(netutils.ParseIPSloppy("10.0.0.1")) + for i := 0; i < 1001; i++ { + pod := basePod.DeepCopy() + pod.Name = fmt.Sprintf("%s-%d", basePod.Name, i) + allPodNames.Insert(pod.Name) + podIP := netutils.AddIPOffset(baseIP, i).String() + pod.Status.PodIP = podIP + pod.Status.PodIPs[0] = v1.PodIP{IP: podIP} + createdPod, err := client.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create pod %s: %v", pod.Name, err) + } + + createdPod.Status = pod.Status + _, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, createdPod, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err) + } + } + + // Create a service associated to the pod + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + Namespace: ns.Name, + Labels: map[string]string{ + "foo": "bar", + }, + }, + Spec: v1.ServiceSpec{ + Selector: map[string]string{ + "foo": "bar", + }, + Ports: []v1.ServicePort{ + {Name: "port-443", Port: 443, Protocol: "TCP", TargetPort: intstr.FromInt32(443)}, + }, + }, + } + _, err = client.CoreV1().Services(ns.Name).Create(tCtx, svc, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create service %s: %v", svc.Name, err) + } + + var truncatedPodName string + // poll until associated Endpoints to the previously created Service exists + if err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Second, true, func(context.Context) (bool, error) { + podNames := sets.New[string]() + endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{}) + if err != nil { + return false, nil + } + + for _, subset := range endpoints.Subsets { + for _, address := range subset.Addresses { + podNames.Insert(address.TargetRef.Name) + } + } + + if podNames.Len() != 1000 { + return false, nil + } + + truncated, ok := endpoints.Annotations[v1.EndpointsOverCapacity] + if !ok || truncated != "truncated" { + return false, nil + } + // There is only 1 truncated Pod. + truncatedPodName, _ = allPodNames.Difference(podNames).PopAny() + return true, nil + }); err != nil { + t.Fatalf("endpoints not found: %v", err) + } + + // Update the truncated Pod several times to make endpoints controller resync the service. + truncatedPod, err := client.CoreV1().Pods(ns.Name).Get(tCtx, truncatedPodName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get pod %s: %v", truncatedPodName, err) + } + for i := 0; i < 10; i++ { + truncatedPod.Status.Conditions[0].Status = v1.ConditionFalse + truncatedPod, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, truncatedPod, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update status of pod %s: %v", truncatedPod.Name, err) + } + truncatedPod.Status.Conditions[0].Status = v1.ConditionTrue + truncatedPod, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, truncatedPod, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update status of pod %s: %v", truncatedPod.Name, err) + } + } + + // delete 501 Pods + for i := 500; i < 1001; i++ { + podName := fmt.Sprintf("%s-%d", basePod.Name, i) + err = client.CoreV1().Pods(ns.Name).Delete(tCtx, podName, metav1.DeleteOptions{}) + if err != nil { + t.Fatalf("error deleting test pod: %v", err) + } + } + + // poll until endpoints for deleted Pod are no longer in Endpoints. + if err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Second, true, func(context.Context) (bool, error) { + endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{}) + if err != nil { + return false, nil + } + + numEndpoints := 0 + for _, subset := range endpoints.Subsets { + numEndpoints += len(subset.Addresses) + } + + if numEndpoints != 500 { + return false, nil + } + + truncated, ok := endpoints.Annotations[v1.EndpointsOverCapacity] + if ok || truncated == "truncated" { + return false, nil + } + + return true, nil + }); err != nil { + t.Fatalf("error checking for no endpoints with terminating pods: %v", err) + } +}