diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 96e00dbecb0..19596223e52 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -110,6 +110,7 @@ func NewEndpointController(ctx context.Context, podInformer coreinformers.PodInf e.endpointsLister = endpointsInformer.Lister() e.endpointsSynced = endpointsInformer.Informer().HasSynced + e.staleEndpointsTracker = newStaleEndpointsTracker() e.triggerTimeTracker = endpointsliceutil.NewTriggerTimeTracker() e.eventBroadcaster = broadcaster e.eventRecorder = recorder @@ -145,6 +146,8 @@ type Controller struct { // endpointsSynced returns true if the endpoints shared informer has been synced at least once. // Added as a member to the struct to allow injection for testing. endpointsSynced cache.InformerSynced + // staleEndpointsTracker can help determine if a cached Endpoints is out of date. + staleEndpointsTracker *staleEndpointsTracker // Services that need to be updated. A channel is inappropriate here, // because it allows services with lots of pods to be serviced much @@ -384,6 +387,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { return err } e.triggerTimeTracker.DeleteService(namespace, name) + e.staleEndpointsTracker.Delete(namespace, name) return nil } @@ -473,6 +477,8 @@ func (e *Controller) syncService(ctx context.Context, key string) error { Labels: service.Labels, }, } + } else if e.staleEndpointsTracker.IsStale(currentEndpoints) { + return fmt.Errorf("endpoints informer cache is out of date, resource version %s already processed for endpoints %s", currentEndpoints.ResourceVersion, key) } createEndpoints := len(currentEndpoints.ResourceVersion) == 0 @@ -555,6 +561,12 @@ func (e *Controller) syncService(ctx context.Context, key string) error { return err } + // If the current endpoints is updated we track the old resource version, so + // if we obtain this resource version again from the lister we know is outdated + // and we need to retry later to wait for the informer cache to be up-to-date. + if !createEndpoints { + e.staleEndpointsTracker.Stale(currentEndpoints) + } return nil } diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 304488b43cb..d7004245799 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -160,10 +160,10 @@ func makeTestServer(t *testing.T, namespace string) (*httptest.Server, *utiltest return httptest.NewServer(mux), &fakeEndpointsHandler } -// makeBlockingEndpointDeleteTestServer will signal the blockNextAction channel on endpoint "POST" & "DELETE" requests. All -// block endpoint "DELETE" requestsi will wait on a blockDelete signal to delete endpoint. If controller is nil, a error will -// be sent in the response. -func makeBlockingEndpointDeleteTestServer(t *testing.T, controller *endpointController, endpoint *v1.Endpoints, blockDelete, blockNextAction chan struct{}, namespace string) *httptest.Server { +// makeBlockingEndpointTestServer will signal the blockNextAction channel on endpoint "POST", "PUT", and "DELETE" +// requests. "POST" and "PUT" requests will wait on a blockUpdate signal if provided, while "DELETE" requests will wait +// on a blockDelete signal if provided. If controller is nil, an error will be sent in the response. +func makeBlockingEndpointTestServer(t *testing.T, controller *endpointController, endpoint *v1.Endpoints, blockUpdate, blockDelete, blockNextAction chan struct{}, namespace string) *httptest.Server { handlerFunc := func(res http.ResponseWriter, req *http.Request) { if controller == nil { @@ -172,23 +172,37 @@ func makeBlockingEndpointDeleteTestServer(t *testing.T, controller *endpointCont return } - if req.Method == "POST" { - controller.endpointsStore.Add(endpoint) + if req.Method == "POST" || req.Method == "PUT" { + if blockUpdate != nil { + go func() { + // Delay the update of endpoints to make endpoints cache out of sync + <-blockUpdate + _ = controller.endpointsStore.Add(endpoint) + }() + } else { + _ = controller.endpointsStore.Add(endpoint) + } blockNextAction <- struct{}{} } if req.Method == "DELETE" { - go func() { - // Delay the deletion of endoints to make endpoint cache out of sync - <-blockDelete - controller.endpointsStore.Delete(endpoint) + if blockDelete != nil { + go func() { + // Delay the deletion of endpoints to make endpoints cache out of sync + <-blockDelete + _ = controller.endpointsStore.Delete(endpoint) + controller.onEndpointsDelete(endpoint) + }() + } else { + _ = controller.endpointsStore.Delete(endpoint) controller.onEndpointsDelete(endpoint) - }() + } blockNextAction <- struct{}{} } + res.Header().Set("Content-Type", "application/json") res.WriteHeader(http.StatusOK) - res.Write([]byte(runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{}))) + _, _ = res.Write([]byte(runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), endpoint))) } mux := http.NewServeMux() @@ -2378,7 +2392,7 @@ func TestMultipleServiceChanges(t *testing.T) { blockDelete := make(chan struct{}) blockNextAction := make(chan struct{}) stopChan := make(chan struct{}) - testServer := makeBlockingEndpointDeleteTestServer(t, controller, endpoint, blockDelete, blockNextAction, ns) + testServer := makeBlockingEndpointTestServer(t, controller, endpoint, nil, blockDelete, blockNextAction, ns) defer testServer.Close() tCtx := ktesting.Init(t) @@ -2423,6 +2437,83 @@ func TestMultipleServiceChanges(t *testing.T) { close(stopChan) } +// TestMultiplePodChanges tests that endpoints that are not updated because of an out of sync endpoints cache are +// eventually resynced after multiple Pod changes. +func TestMultiplePodChanges(t *testing.T) { + ns := metav1.NamespaceDefault + + readyEndpoints := &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, ResourceVersion: "1"}, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{ + {IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}, + }, + Ports: []v1.EndpointPort{{Port: 8080, Protocol: v1.ProtocolTCP}}, + }}, + } + notReadyEndpoints := &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, ResourceVersion: "2"}, + Subsets: []v1.EndpointSubset{{ + NotReadyAddresses: []v1.EndpointAddress{ + {IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}, + }, + Ports: []v1.EndpointPort{{Port: 8080, Protocol: v1.ProtocolTCP}}, + }}, + } + + controller := &endpointController{} + blockUpdate := make(chan struct{}) + blockNextAction := make(chan struct{}) + stopChan := make(chan struct{}) + testServer := makeBlockingEndpointTestServer(t, controller, notReadyEndpoints, blockUpdate, nil, blockNextAction, ns) + defer testServer.Close() + + tCtx := ktesting.Init(t) + *controller = *newController(tCtx, testServer.URL, 0*time.Second) + pod := testPod(ns, 0, 1, true, ipv4only) + _ = controller.podStore.Add(pod) + _ = controller.endpointsStore.Add(readyEndpoints) + _ = controller.serviceStore.Add(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: v1.ServiceSpec{ + Selector: map[string]string{"foo": "bar"}, + ClusterIP: "10.0.0.1", + Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}}, + }, + }) + + go func() { controller.Run(tCtx, 1) }() + + // Rapidly update the Pod: Ready -> NotReady -> Ready. + pod2 := pod.DeepCopy() + pod2.ResourceVersion = "2" + pod2.Status.Conditions[0].Status = v1.ConditionFalse + _ = controller.podStore.Update(pod2) + controller.updatePod(pod, pod2) + // blockNextAction should eventually unblock once server gets endpoints request. + waitForChanReceive(t, 1*time.Second, blockNextAction, "Pod Update should have caused a request to be sent to the test server") + // The endpoints update hasn't been applied to the cache yet. + pod3 := pod.DeepCopy() + pod3.ResourceVersion = "3" + pod3.Status.Conditions[0].Status = v1.ConditionTrue + _ = controller.podStore.Update(pod3) + controller.updatePod(pod2, pod3) + // It shouldn't get endpoints request as the endpoints in the cache is out-of-date. + timer := time.NewTimer(100 * time.Millisecond) + select { + case <-timer.C: + case <-blockNextAction: + t.Errorf("Pod Update shouldn't have caused a request to be sent to the test server") + } + + // Applying the endpoints update to the cache should cause test server to update endpoints. + close(blockUpdate) + waitForChanReceive(t, 1*time.Second, blockNextAction, "Endpoints should have been updated") + + close(blockNextAction) + close(stopChan) +} + func TestSyncServiceAddresses(t *testing.T) { makeService := func(tolerateUnready bool) *v1.Service { return &v1.Service{ diff --git a/pkg/controller/endpoint/endpoints_tracker.go b/pkg/controller/endpoint/endpoints_tracker.go new file mode 100644 index 00000000000..6c1d1aeacfd --- /dev/null +++ b/pkg/controller/endpoint/endpoints_tracker.go @@ -0,0 +1,64 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endpoint + +import ( + "sync" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" +) + +// staleEndpointsTracker tracks Endpoints and their stale resource versions to +// help determine if an Endpoints is stale. +type staleEndpointsTracker struct { + // lock protects staleResourceVersionByEndpoints. + lock sync.RWMutex + // staleResourceVersionByEndpoints tracks the stale resource version of Endpoints. + staleResourceVersionByEndpoints map[types.NamespacedName]string +} + +func newStaleEndpointsTracker() *staleEndpointsTracker { + return &staleEndpointsTracker{ + staleResourceVersionByEndpoints: map[types.NamespacedName]string{}, + } +} + +func (t *staleEndpointsTracker) Stale(endpoints *v1.Endpoints) { + t.lock.Lock() + defer t.lock.Unlock() + nn := types.NamespacedName{Name: endpoints.Name, Namespace: endpoints.Namespace} + t.staleResourceVersionByEndpoints[nn] = endpoints.ResourceVersion +} + +func (t *staleEndpointsTracker) IsStale(endpoints *v1.Endpoints) bool { + t.lock.RLock() + defer t.lock.RUnlock() + nn := types.NamespacedName{Name: endpoints.Name, Namespace: endpoints.Namespace} + staleResourceVersion, exists := t.staleResourceVersionByEndpoints[nn] + if exists && staleResourceVersion == endpoints.ResourceVersion { + return true + } + return false +} + +func (t *staleEndpointsTracker) Delete(namespace, name string) { + t.lock.Lock() + defer t.lock.Unlock() + nn := types.NamespacedName{Namespace: namespace, Name: name} + delete(t.staleResourceVersionByEndpoints, nn) +} diff --git a/pkg/controller/endpoint/endpoints_tracker_test.go b/pkg/controller/endpoint/endpoints_tracker_test.go new file mode 100644 index 00000000000..b97c4a00761 --- /dev/null +++ b/pkg/controller/endpoint/endpoints_tracker_test.go @@ -0,0 +1,54 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endpoint + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestStaleEndpointsTracker(t *testing.T) { + ns := metav1.NamespaceDefault + tracker := newStaleEndpointsTracker() + + endpoints := &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, + Ports: []v1.EndpointPort{{Port: 1000}}, + }}, + } + + assert.False(t, tracker.IsStale(endpoints), "IsStale should return false before the endpoint is staled") + + tracker.Stale(endpoints) + assert.True(t, tracker.IsStale(endpoints), "IsStale should return true after the endpoint is staled") + + endpoints.ResourceVersion = "2" + assert.False(t, tracker.IsStale(endpoints), "IsStale should return false after the endpoint is updated") + + tracker.Delete(endpoints.Namespace, endpoints.Name) + assert.Empty(t, tracker.staleResourceVersionByEndpoints) +} diff --git a/test/integration/endpoints/endpoints_test.go b/test/integration/endpoints/endpoints_test.go index d0605eab035..2dc71791d77 100644 --- a/test/integration/endpoints/endpoints_test.go +++ b/test/integration/endpoints/endpoints_test.go @@ -17,8 +17,10 @@ limitations under the License. package endpoints import ( + "context" "errors" "fmt" + "reflect" "testing" "time" @@ -158,6 +160,147 @@ func TestEndpointUpdates(t *testing.T) { } +// Regression test for https://issues.k8s.io/125638 +func TestEndpointWithMultiplePodUpdates(t *testing.T) { + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) + defer server.TearDownFn() + + client, err := clientset.NewForConfig(server.ClientConfig) + if err != nil { + t.Fatalf("Error creating clientset: %v", err) + } + + informers := informers.NewSharedInformerFactory(client, 0) + + tCtx := ktesting.Init(t) + epController := endpoint.NewEndpointController( + tCtx, + informers.Core().V1().Pods(), + informers.Core().V1().Services(), + informers.Core().V1().Endpoints(), + client, + 0) + + // Process 10 services in parallel to increase likelihood of outdated informer cache. + concurrency := 10 + // Start informer and controllers + informers.Start(tCtx.Done()) + go epController.Run(tCtx, concurrency) + + // Create namespace + ns := framework.CreateNamespaceOrDie(client, "test-endpoints-updates", t) + defer framework.DeleteNamespaceOrDie(client, ns, t) + + // Create a pod with labels + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: ns.Name, + Labels: labelMap(), + }, + Spec: v1.PodSpec{ + NodeName: "fakenode", + Containers: []v1.Container{ + { + Name: "fake-name", + Image: "fakeimage", + }, + }, + }, + } + + pod, err = client.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create pod %s: %v", pod.Name, err) + } + + // Set pod status + pod.Status = v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}}, + PodIPs: []v1.PodIP{{IP: "1.1.1.1"}}, + } + pod, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, pod, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err) + } + + var services []*v1.Service + // Create services associated to the pod + for i := 0; i < concurrency; i++ { + svc := newService(ns.Name, fmt.Sprintf("foo%d", i)) + _, err = client.CoreV1().Services(ns.Name).Create(tCtx, svc, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create service %s: %v", svc.Name, err) + } + services = append(services, svc) + } + + for _, service := range services { + // Ensure the new endpoints are created. + if err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Second, true, func(context.Context) (bool, error) { + _, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, service.Name, metav1.GetOptions{}) + if err != nil { + return false, nil + } + return true, nil + }); err != nil { + t.Fatalf("endpoints not found: %v", err) + } + } + + // Update pod's status and revert it immediately. The endpoints should be in-sync with the pod's status eventually. + pod.Status.Conditions[0].Status = v1.ConditionFalse + pod, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, pod, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update pod %s to not ready: %v", pod.Name, err) + } + + pod.Status.Conditions[0].Status = v1.ConditionTrue + pod, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, pod, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update pod %s to ready: %v", pod.Name, err) + } + + // Some workers might update endpoints twice (Ready->NotReady->Ready), while others may not update endpoints at all + // if they receive the 2nd pod update quickly. Consequently, we can't rely on endpoints resource version to + // determine if the controller has processed the pod updates. Instead, we will wait for 1 second, assuming that this + // provides enough time for the workers to process endpoints at least once. + time.Sleep(1 * time.Second) + expectedEndpointAddresses := []v1.EndpointAddress{ + { + IP: pod.Status.PodIP, + NodeName: &pod.Spec.NodeName, + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Namespace: pod.Namespace, + Name: pod.Name, + UID: pod.UID, + }, + }, + } + for _, service := range services { + var endpoints *v1.Endpoints + if err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Second, true, func(context.Context) (bool, error) { + endpoints, err = client.CoreV1().Endpoints(ns.Name).Get(tCtx, service.Name, metav1.GetOptions{}) + if err != nil { + t.Logf("Error fetching endpoints: %v", err) + return false, nil + } + if len(endpoints.Subsets) == 0 { + return false, nil + } + if !reflect.DeepEqual(expectedEndpointAddresses, endpoints.Subsets[0].Addresses) { + return false, nil + } + return true, nil + }); err != nil { + t.Fatalf("Expected endpoints %v to contain ready endpoint addresses %v", endpoints, expectedEndpointAddresses) + } + } +} + // TestExternalNameToClusterIPTransition tests that Service of type ExternalName // does not get endpoints, and after transition to ClusterIP, service gets endpoint, // without headless label