bugfix: endpoints controller track resource version conrrectly

The endpoints controller store the resource version of the previous
Endpoints objects to avoid issues related to stale information on the
cache.

However, there can be update operations that succeed without increasing
the resource version, causing the endpoints controller to declare stale
the existing Resource Version and stopping the Endpoints to be updated.

Co-Author-By: Quan Tian <quan.tian@broadcom.com>
Co-Author-By: Yang Yang <yyyng@amazon.com>
This commit is contained in:
Yang Yang 2024-09-03 14:54:53 -07:00 committed by Swetha Repakula
parent aae6745a67
commit dad9c77a74
2 changed files with 204 additions and 2 deletions

View File

@ -532,12 +532,13 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
}
logger.V(4).Info("Update endpoints", "service", klog.KObj(service), "readyEndpoints", totalReadyEps, "notreadyEndpoints", totalNotReadyEps)
var updatedEndpoints *v1.Endpoints
if createEndpoints {
// No previous endpoints, create them
_, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{})
} else {
// Pre-existing
_, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{})
updatedEndpoints, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{})
}
if err != nil {
if createEndpoints && errors.IsForbidden(err) {
@ -564,7 +565,10 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
// If the current endpoints is updated we track the old resource version, so
// if we obtain this resource version again from the lister we know is outdated
// and we need to retry later to wait for the informer cache to be up-to-date.
if !createEndpoints {
// there are some operations (webhooks, truncated endpoints, ...) that can potentially cause endpoints updates became noop
// and return the same resourceVersion.
// Ref: https://issues.k8s.io/127370 , https://issues.k8s.io/126578
if updatedEndpoints != nil && updatedEndpoints.ResourceVersion != currentEndpoints.ResourceVersion {
e.staleEndpointsTracker.Stale(currentEndpoints)
}
return nil

View File

@ -28,6 +28,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
@ -35,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/kubernetes/test/utils/ktesting"
netutils "k8s.io/utils/net"
)
func TestEndpointUpdates(t *testing.T) {
@ -605,3 +607,199 @@ func newExternalNameService(namespace, name string) *v1.Service {
svc.Spec.ExternalName = "google.com"
return svc
}
func TestEndpointTruncate(t *testing.T) {
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())
defer server.TearDownFn()
client, err := clientset.NewForConfig(server.ClientConfig)
if err != nil {
t.Fatalf("Error creating clientset: %v", err)
}
informers := informers.NewSharedInformerFactory(client, 0)
tCtx := ktesting.Init(t)
epController := endpoint.NewEndpointController(
tCtx,
informers.Core().V1().Pods(),
informers.Core().V1().Services(),
informers.Core().V1().Endpoints(),
client,
0)
// Start informer and controllers
informers.Start(tCtx.Done())
go epController.Run(tCtx, 1)
// Create namespace
ns := framework.CreateNamespaceOrDie(client, "test-endpoints-truncate", t)
defer framework.DeleteNamespaceOrDie(client, ns, t)
// Create a pod with labels
basePod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Labels: labelMap(),
},
Spec: v1.PodSpec{
NodeName: "fake-node",
Containers: []v1.Container{
{
Name: "fakename",
Image: "fakeimage",
Ports: []v1.ContainerPort{
{
Name: "port-443",
ContainerPort: 443,
},
},
},
},
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
},
PodIP: "10.0.0.1",
PodIPs: []v1.PodIP{
{
IP: "10.0.0.1",
},
},
},
}
// create 1001 Pods to reach endpoint max capacity that is set to 1000
allPodNames := sets.New[string]()
baseIP := netutils.BigForIP(netutils.ParseIPSloppy("10.0.0.1"))
for i := 0; i < 1001; i++ {
pod := basePod.DeepCopy()
pod.Name = fmt.Sprintf("%s-%d", basePod.Name, i)
allPodNames.Insert(pod.Name)
podIP := netutils.AddIPOffset(baseIP, i).String()
pod.Status.PodIP = podIP
pod.Status.PodIPs[0] = v1.PodIP{IP: podIP}
createdPod, err := client.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create pod %s: %v", pod.Name, err)
}
createdPod.Status = pod.Status
_, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, createdPod, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err)
}
}
// Create a service associated to the pod
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test-service",
Namespace: ns.Name,
Labels: map[string]string{
"foo": "bar",
},
},
Spec: v1.ServiceSpec{
Selector: map[string]string{
"foo": "bar",
},
Ports: []v1.ServicePort{
{Name: "port-443", Port: 443, Protocol: "TCP", TargetPort: intstr.FromInt32(443)},
},
},
}
_, err = client.CoreV1().Services(ns.Name).Create(tCtx, svc, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create service %s: %v", svc.Name, err)
}
var truncatedPodName string
// poll until associated Endpoints to the previously created Service exists
if err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Second, true, func(context.Context) (bool, error) {
podNames := sets.New[string]()
endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{})
if err != nil {
return false, nil
}
for _, subset := range endpoints.Subsets {
for _, address := range subset.Addresses {
podNames.Insert(address.TargetRef.Name)
}
}
if podNames.Len() != 1000 {
return false, nil
}
truncated, ok := endpoints.Annotations[v1.EndpointsOverCapacity]
if !ok || truncated != "truncated" {
return false, nil
}
// There is only 1 truncated Pod.
truncatedPodName, _ = allPodNames.Difference(podNames).PopAny()
return true, nil
}); err != nil {
t.Fatalf("endpoints not found: %v", err)
}
// Update the truncated Pod several times to make endpoints controller resync the service.
truncatedPod, err := client.CoreV1().Pods(ns.Name).Get(tCtx, truncatedPodName, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get pod %s: %v", truncatedPodName, err)
}
for i := 0; i < 10; i++ {
truncatedPod.Status.Conditions[0].Status = v1.ConditionFalse
truncatedPod, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, truncatedPod, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to update status of pod %s: %v", truncatedPod.Name, err)
}
truncatedPod.Status.Conditions[0].Status = v1.ConditionTrue
truncatedPod, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, truncatedPod, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to update status of pod %s: %v", truncatedPod.Name, err)
}
}
// delete 501 Pods
for i := 500; i < 1001; i++ {
podName := fmt.Sprintf("%s-%d", basePod.Name, i)
err = client.CoreV1().Pods(ns.Name).Delete(tCtx, podName, metav1.DeleteOptions{})
if err != nil {
t.Fatalf("error deleting test pod: %v", err)
}
}
// poll until endpoints for deleted Pod are no longer in Endpoints.
if err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Second, true, func(context.Context) (bool, error) {
endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{})
if err != nil {
return false, nil
}
numEndpoints := 0
for _, subset := range endpoints.Subsets {
numEndpoints += len(subset.Addresses)
}
if numEndpoints != 500 {
return false, nil
}
truncated, ok := endpoints.Annotations[v1.EndpointsOverCapacity]
if ok || truncated == "truncated" {
return false, nil
}
return true, nil
}); err != nil {
t.Fatalf("error checking for no endpoints with terminating pods: %v", err)
}
}