From 9844402a90618053de8cb6e6cac795f5f00228d7 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Tue, 4 Jun 2024 20:55:42 +0000 Subject: [PATCH 1/2] define some default kube-apiserver flags for tests disabling unneded controllers --- test/integration/framework/controlplane_utils.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/test/integration/framework/controlplane_utils.go b/test/integration/framework/controlplane_utils.go index 2af4f1a4f1b..30ef35663bf 100644 --- a/test/integration/framework/controlplane_utils.go +++ b/test/integration/framework/controlplane_utils.go @@ -98,3 +98,11 @@ func SharedEtcd() *storagebackend.Config { cfg.Transport.ServerList = []string{GetEtcdURL()} return cfg } + +// DefaultAPIServerFlags returns the default flags used to run kube-apiserver on tests +func DefaultTestServerFlags() []string { + return []string{ + "--endpoint-reconciler-type=none", // Disable Endpoints Reconciler so it does not keep failing trying to use 127.0.0.1 as a valid Endpoint. + "--disable-admission-plugins=ServiceAccount", // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + } +} From 47c8669c0c94490ae7b949a81896411ebf77e19b Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Tue, 3 Sep 2024 14:54:53 -0700 Subject: [PATCH 2/2] bugfix: endpoints controller track resource version conrrectly The endpoints controller store the resource version of the previous Endpoints objects to avoid issues related to stale information on the cache. However, there can be update operations that succeed without increasing the resource version, causing the endpoints controller to declare stale the existing Resource Version and stopping the Endpoints to be updated. Co-Author-By: Quan Tian Co-Author-By: Yang Yang --- .../endpoint/endpoints_controller.go | 8 +- test/integration/endpoints/endpoints_test.go | 198 ++++++++++++++++++ 2 files changed, 204 insertions(+), 2 deletions(-) diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index aebc3ce39a7..7769a87f43a 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -527,12 +527,13 @@ func (e *Controller) syncService(ctx context.Context, key string) error { } logger.V(4).Info("Update endpoints", "service", klog.KObj(service), "readyEndpoints", totalReadyEps, "notreadyEndpoints", totalNotReadyEps) + var updatedEndpoints *v1.Endpoints if createEndpoints { // No previous endpoints, create them _, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{}) } else { // Pre-existing - _, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{}) + updatedEndpoints, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{}) } if err != nil { if createEndpoints && errors.IsForbidden(err) { @@ -559,7 +560,10 @@ func (e *Controller) syncService(ctx context.Context, key string) error { // If the current endpoints is updated we track the old resource version, so // if we obtain this resource version again from the lister we know is outdated // and we need to retry later to wait for the informer cache to be up-to-date. - if !createEndpoints { + // there are some operations (webhooks, truncated endpoints, ...) that can potentially cause endpoints updates became noop + // and return the same resourceVersion. + // Ref: https://issues.k8s.io/127370 , https://issues.k8s.io/126578 + if updatedEndpoints != nil && updatedEndpoints.ResourceVersion != currentEndpoints.ResourceVersion { e.staleEndpointsTracker.Stale(currentEndpoints) } return nil diff --git a/test/integration/endpoints/endpoints_test.go b/test/integration/endpoints/endpoints_test.go index 7f9f365bd1f..c25f34263aa 100644 --- a/test/integration/endpoints/endpoints_test.go +++ b/test/integration/endpoints/endpoints_test.go @@ -28,6 +28,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" @@ -35,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/controller/endpoint" "k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/utils/ktesting" + netutils "k8s.io/utils/net" ) func TestEndpointUpdates(t *testing.T) { @@ -605,3 +607,199 @@ func newExternalNameService(namespace, name string) *v1.Service { svc.Spec.ExternalName = "google.com" return svc } + +func TestEndpointTruncate(t *testing.T) { + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) + defer server.TearDownFn() + + client, err := clientset.NewForConfig(server.ClientConfig) + if err != nil { + t.Fatalf("Error creating clientset: %v", err) + } + + informers := informers.NewSharedInformerFactory(client, 0) + + tCtx := ktesting.Init(t) + epController := endpoint.NewEndpointController( + tCtx, + informers.Core().V1().Pods(), + informers.Core().V1().Services(), + informers.Core().V1().Endpoints(), + client, + 0) + + // Start informer and controllers + informers.Start(tCtx.Done()) + go epController.Run(tCtx, 1) + + // Create namespace + ns := framework.CreateNamespaceOrDie(client, "test-endpoints-truncate", t) + defer framework.DeleteNamespaceOrDie(client, ns, t) + + // Create a pod with labels + basePod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Labels: labelMap(), + }, + Spec: v1.PodSpec{ + NodeName: "fake-node", + Containers: []v1.Container{ + { + Name: "fakename", + Image: "fakeimage", + Ports: []v1.ContainerPort{ + { + Name: "port-443", + ContainerPort: 443, + }, + }, + }, + }, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + }, + PodIP: "10.0.0.1", + PodIPs: []v1.PodIP{ + { + IP: "10.0.0.1", + }, + }, + }, + } + + // create 1001 Pods to reach endpoint max capacity that is set to 1000 + allPodNames := sets.New[string]() + baseIP := netutils.BigForIP(netutils.ParseIPSloppy("10.0.0.1")) + for i := 0; i < 1001; i++ { + pod := basePod.DeepCopy() + pod.Name = fmt.Sprintf("%s-%d", basePod.Name, i) + allPodNames.Insert(pod.Name) + podIP := netutils.AddIPOffset(baseIP, i).String() + pod.Status.PodIP = podIP + pod.Status.PodIPs[0] = v1.PodIP{IP: podIP} + createdPod, err := client.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create pod %s: %v", pod.Name, err) + } + + createdPod.Status = pod.Status + _, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, createdPod, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err) + } + } + + // Create a service associated to the pod + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + Namespace: ns.Name, + Labels: map[string]string{ + "foo": "bar", + }, + }, + Spec: v1.ServiceSpec{ + Selector: map[string]string{ + "foo": "bar", + }, + Ports: []v1.ServicePort{ + {Name: "port-443", Port: 443, Protocol: "TCP", TargetPort: intstr.FromInt32(443)}, + }, + }, + } + _, err = client.CoreV1().Services(ns.Name).Create(tCtx, svc, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create service %s: %v", svc.Name, err) + } + + var truncatedPodName string + // poll until associated Endpoints to the previously created Service exists + if err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Second, true, func(context.Context) (bool, error) { + podNames := sets.New[string]() + endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{}) + if err != nil { + return false, nil + } + + for _, subset := range endpoints.Subsets { + for _, address := range subset.Addresses { + podNames.Insert(address.TargetRef.Name) + } + } + + if podNames.Len() != 1000 { + return false, nil + } + + truncated, ok := endpoints.Annotations[v1.EndpointsOverCapacity] + if !ok || truncated != "truncated" { + return false, nil + } + // There is only 1 truncated Pod. + truncatedPodName, _ = allPodNames.Difference(podNames).PopAny() + return true, nil + }); err != nil { + t.Fatalf("endpoints not found: %v", err) + } + + // Update the truncated Pod several times to make endpoints controller resync the service. + truncatedPod, err := client.CoreV1().Pods(ns.Name).Get(tCtx, truncatedPodName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get pod %s: %v", truncatedPodName, err) + } + for i := 0; i < 10; i++ { + truncatedPod.Status.Conditions[0].Status = v1.ConditionFalse + truncatedPod, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, truncatedPod, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update status of pod %s: %v", truncatedPod.Name, err) + } + truncatedPod.Status.Conditions[0].Status = v1.ConditionTrue + truncatedPod, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, truncatedPod, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update status of pod %s: %v", truncatedPod.Name, err) + } + } + + // delete 501 Pods + for i := 500; i < 1001; i++ { + podName := fmt.Sprintf("%s-%d", basePod.Name, i) + err = client.CoreV1().Pods(ns.Name).Delete(tCtx, podName, metav1.DeleteOptions{}) + if err != nil { + t.Fatalf("error deleting test pod: %v", err) + } + } + + // poll until endpoints for deleted Pod are no longer in Endpoints. + if err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Second, true, func(context.Context) (bool, error) { + endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{}) + if err != nil { + return false, nil + } + + numEndpoints := 0 + for _, subset := range endpoints.Subsets { + numEndpoints += len(subset.Addresses) + } + + if numEndpoints != 500 { + return false, nil + } + + truncated, ok := endpoints.Annotations[v1.EndpointsOverCapacity] + if ok || truncated == "truncated" { + return false, nil + } + + return true, nil + }); err != nil { + t.Fatalf("error checking for no endpoints with terminating pods: %v", err) + } +}