Fix endpoints status out-of-sync when the pod state changes rapidly

When Pod state changes rapidly, endpoints controller may use outdated
informer cache to sync Service. If the outdated endpoints appear to be
expected by the controller, it skips updating it.

The commit fixes it by checking if endpoints informer cache is outdated
when processing a service. If the endpoints is stale, it returns an
error and retries later.

Signed-off-by: Quan Tian <quan.tian@broadcom.com>
This commit is contained in:
Quan Tian
2024-06-25 01:57:06 +08:00
parent 10ae1dbb52
commit 3bd975862a
5 changed files with 377 additions and 13 deletions

View File

@@ -17,8 +17,10 @@ limitations under the License.
package endpoints
import (
"context"
"errors"
"fmt"
"reflect"
"testing"
"time"
@@ -158,6 +160,147 @@ func TestEndpointUpdates(t *testing.T) {
}
// Regression test for https://issues.k8s.io/125638
func TestEndpointWithMultiplePodUpdates(t *testing.T) {
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())
defer server.TearDownFn()
client, err := clientset.NewForConfig(server.ClientConfig)
if err != nil {
t.Fatalf("Error creating clientset: %v", err)
}
informers := informers.NewSharedInformerFactory(client, 0)
tCtx := ktesting.Init(t)
epController := endpoint.NewEndpointController(
tCtx,
informers.Core().V1().Pods(),
informers.Core().V1().Services(),
informers.Core().V1().Endpoints(),
client,
0)
// Process 10 services in parallel to increase likelihood of outdated informer cache.
concurrency := 10
// Start informer and controllers
informers.Start(tCtx.Done())
go epController.Run(tCtx, concurrency)
// Create namespace
ns := framework.CreateNamespaceOrDie(client, "test-endpoints-updates", t)
defer framework.DeleteNamespaceOrDie(client, ns, t)
// Create a pod with labels
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Namespace: ns.Name,
Labels: labelMap(),
},
Spec: v1.PodSpec{
NodeName: "fakenode",
Containers: []v1.Container{
{
Name: "fake-name",
Image: "fakeimage",
},
},
},
}
pod, err = client.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create pod %s: %v", pod.Name, err)
}
// Set pod status
pod.Status = v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}},
PodIPs: []v1.PodIP{{IP: "1.1.1.1"}},
}
pod, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, pod, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err)
}
var services []*v1.Service
// Create services associated to the pod
for i := 0; i < concurrency; i++ {
svc := newService(ns.Name, fmt.Sprintf("foo%d", i))
_, err = client.CoreV1().Services(ns.Name).Create(tCtx, svc, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create service %s: %v", svc.Name, err)
}
services = append(services, svc)
}
for _, service := range services {
// Ensure the new endpoints are created.
if err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Second, true, func(context.Context) (bool, error) {
_, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, service.Name, metav1.GetOptions{})
if err != nil {
return false, nil
}
return true, nil
}); err != nil {
t.Fatalf("endpoints not found: %v", err)
}
}
// Update pod's status and revert it immediately. The endpoints should be in-sync with the pod's status eventually.
pod.Status.Conditions[0].Status = v1.ConditionFalse
pod, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, pod, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to update pod %s to not ready: %v", pod.Name, err)
}
pod.Status.Conditions[0].Status = v1.ConditionTrue
pod, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, pod, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to update pod %s to ready: %v", pod.Name, err)
}
// Some workers might update endpoints twice (Ready->NotReady->Ready), while others may not update endpoints at all
// if they receive the 2nd pod update quickly. Consequently, we can't rely on endpoints resource version to
// determine if the controller has processed the pod updates. Instead, we will wait for 1 second, assuming that this
// provides enough time for the workers to process endpoints at least once.
time.Sleep(1 * time.Second)
expectedEndpointAddresses := []v1.EndpointAddress{
{
IP: pod.Status.PodIP,
NodeName: &pod.Spec.NodeName,
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Namespace: pod.Namespace,
Name: pod.Name,
UID: pod.UID,
},
},
}
for _, service := range services {
var endpoints *v1.Endpoints
if err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Second, true, func(context.Context) (bool, error) {
endpoints, err = client.CoreV1().Endpoints(ns.Name).Get(tCtx, service.Name, metav1.GetOptions{})
if err != nil {
t.Logf("Error fetching endpoints: %v", err)
return false, nil
}
if len(endpoints.Subsets) == 0 {
return false, nil
}
if !reflect.DeepEqual(expectedEndpointAddresses, endpoints.Subsets[0].Addresses) {
return false, nil
}
return true, nil
}); err != nil {
t.Fatalf("Expected endpoints %v to contain ready endpoint addresses %v", endpoints, expectedEndpointAddresses)
}
}
}
// TestExternalNameToClusterIPTransition tests that Service of type ExternalName
// does not get endpoints, and after transition to ClusterIP, service gets endpoint,
// without headless label