mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-29 21:29:24 +00:00
Merge pull request #125675 from tnqn/fix-rapid-endpoints-update
Fix endpoints status out-of-sync when the pod state changes rapidly
This commit is contained in:
@@ -110,6 +110,7 @@ func NewEndpointController(ctx context.Context, podInformer coreinformers.PodInf
|
||||
e.endpointsLister = endpointsInformer.Lister()
|
||||
e.endpointsSynced = endpointsInformer.Informer().HasSynced
|
||||
|
||||
e.staleEndpointsTracker = newStaleEndpointsTracker()
|
||||
e.triggerTimeTracker = endpointsliceutil.NewTriggerTimeTracker()
|
||||
e.eventBroadcaster = broadcaster
|
||||
e.eventRecorder = recorder
|
||||
@@ -145,6 +146,8 @@ type Controller struct {
|
||||
// endpointsSynced returns true if the endpoints shared informer has been synced at least once.
|
||||
// Added as a member to the struct to allow injection for testing.
|
||||
endpointsSynced cache.InformerSynced
|
||||
// staleEndpointsTracker can help determine if a cached Endpoints is out of date.
|
||||
staleEndpointsTracker *staleEndpointsTracker
|
||||
|
||||
// Services that need to be updated. A channel is inappropriate here,
|
||||
// because it allows services with lots of pods to be serviced much
|
||||
@@ -384,6 +387,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
|
||||
return err
|
||||
}
|
||||
e.triggerTimeTracker.DeleteService(namespace, name)
|
||||
e.staleEndpointsTracker.Delete(namespace, name)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -473,6 +477,8 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
|
||||
Labels: service.Labels,
|
||||
},
|
||||
}
|
||||
} else if e.staleEndpointsTracker.IsStale(currentEndpoints) {
|
||||
return fmt.Errorf("endpoints informer cache is out of date, resource version %s already processed for endpoints %s", currentEndpoints.ResourceVersion, key)
|
||||
}
|
||||
|
||||
createEndpoints := len(currentEndpoints.ResourceVersion) == 0
|
||||
@@ -555,6 +561,12 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
|
||||
|
||||
return err
|
||||
}
|
||||
// If the current endpoints is updated we track the old resource version, so
|
||||
// if we obtain this resource version again from the lister we know is outdated
|
||||
// and we need to retry later to wait for the informer cache to be up-to-date.
|
||||
if !createEndpoints {
|
||||
e.staleEndpointsTracker.Stale(currentEndpoints)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -160,10 +160,10 @@ func makeTestServer(t *testing.T, namespace string) (*httptest.Server, *utiltest
|
||||
return httptest.NewServer(mux), &fakeEndpointsHandler
|
||||
}
|
||||
|
||||
// makeBlockingEndpointDeleteTestServer will signal the blockNextAction channel on endpoint "POST" & "DELETE" requests. All
|
||||
// block endpoint "DELETE" requestsi will wait on a blockDelete signal to delete endpoint. If controller is nil, a error will
|
||||
// be sent in the response.
|
||||
func makeBlockingEndpointDeleteTestServer(t *testing.T, controller *endpointController, endpoint *v1.Endpoints, blockDelete, blockNextAction chan struct{}, namespace string) *httptest.Server {
|
||||
// makeBlockingEndpointTestServer will signal the blockNextAction channel on endpoint "POST", "PUT", and "DELETE"
|
||||
// requests. "POST" and "PUT" requests will wait on a blockUpdate signal if provided, while "DELETE" requests will wait
|
||||
// on a blockDelete signal if provided. If controller is nil, an error will be sent in the response.
|
||||
func makeBlockingEndpointTestServer(t *testing.T, controller *endpointController, endpoint *v1.Endpoints, blockUpdate, blockDelete, blockNextAction chan struct{}, namespace string) *httptest.Server {
|
||||
|
||||
handlerFunc := func(res http.ResponseWriter, req *http.Request) {
|
||||
if controller == nil {
|
||||
@@ -172,23 +172,37 @@ func makeBlockingEndpointDeleteTestServer(t *testing.T, controller *endpointCont
|
||||
return
|
||||
}
|
||||
|
||||
if req.Method == "POST" {
|
||||
controller.endpointsStore.Add(endpoint)
|
||||
if req.Method == "POST" || req.Method == "PUT" {
|
||||
if blockUpdate != nil {
|
||||
go func() {
|
||||
// Delay the update of endpoints to make endpoints cache out of sync
|
||||
<-blockUpdate
|
||||
_ = controller.endpointsStore.Add(endpoint)
|
||||
}()
|
||||
} else {
|
||||
_ = controller.endpointsStore.Add(endpoint)
|
||||
}
|
||||
blockNextAction <- struct{}{}
|
||||
}
|
||||
|
||||
if req.Method == "DELETE" {
|
||||
go func() {
|
||||
// Delay the deletion of endoints to make endpoint cache out of sync
|
||||
<-blockDelete
|
||||
controller.endpointsStore.Delete(endpoint)
|
||||
if blockDelete != nil {
|
||||
go func() {
|
||||
// Delay the deletion of endpoints to make endpoints cache out of sync
|
||||
<-blockDelete
|
||||
_ = controller.endpointsStore.Delete(endpoint)
|
||||
controller.onEndpointsDelete(endpoint)
|
||||
}()
|
||||
} else {
|
||||
_ = controller.endpointsStore.Delete(endpoint)
|
||||
controller.onEndpointsDelete(endpoint)
|
||||
}()
|
||||
}
|
||||
blockNextAction <- struct{}{}
|
||||
}
|
||||
|
||||
res.Header().Set("Content-Type", "application/json")
|
||||
res.WriteHeader(http.StatusOK)
|
||||
res.Write([]byte(runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{})))
|
||||
_, _ = res.Write([]byte(runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), endpoint)))
|
||||
}
|
||||
|
||||
mux := http.NewServeMux()
|
||||
@@ -2378,7 +2392,7 @@ func TestMultipleServiceChanges(t *testing.T) {
|
||||
blockDelete := make(chan struct{})
|
||||
blockNextAction := make(chan struct{})
|
||||
stopChan := make(chan struct{})
|
||||
testServer := makeBlockingEndpointDeleteTestServer(t, controller, endpoint, blockDelete, blockNextAction, ns)
|
||||
testServer := makeBlockingEndpointTestServer(t, controller, endpoint, nil, blockDelete, blockNextAction, ns)
|
||||
defer testServer.Close()
|
||||
|
||||
tCtx := ktesting.Init(t)
|
||||
@@ -2423,6 +2437,83 @@ func TestMultipleServiceChanges(t *testing.T) {
|
||||
close(stopChan)
|
||||
}
|
||||
|
||||
// TestMultiplePodChanges tests that endpoints that are not updated because of an out of sync endpoints cache are
|
||||
// eventually resynced after multiple Pod changes.
|
||||
func TestMultiplePodChanges(t *testing.T) {
|
||||
ns := metav1.NamespaceDefault
|
||||
|
||||
readyEndpoints := &v1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, ResourceVersion: "1"},
|
||||
Subsets: []v1.EndpointSubset{{
|
||||
Addresses: []v1.EndpointAddress{
|
||||
{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
|
||||
},
|
||||
Ports: []v1.EndpointPort{{Port: 8080, Protocol: v1.ProtocolTCP}},
|
||||
}},
|
||||
}
|
||||
notReadyEndpoints := &v1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, ResourceVersion: "2"},
|
||||
Subsets: []v1.EndpointSubset{{
|
||||
NotReadyAddresses: []v1.EndpointAddress{
|
||||
{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
|
||||
},
|
||||
Ports: []v1.EndpointPort{{Port: 8080, Protocol: v1.ProtocolTCP}},
|
||||
}},
|
||||
}
|
||||
|
||||
controller := &endpointController{}
|
||||
blockUpdate := make(chan struct{})
|
||||
blockNextAction := make(chan struct{})
|
||||
stopChan := make(chan struct{})
|
||||
testServer := makeBlockingEndpointTestServer(t, controller, notReadyEndpoints, blockUpdate, nil, blockNextAction, ns)
|
||||
defer testServer.Close()
|
||||
|
||||
tCtx := ktesting.Init(t)
|
||||
*controller = *newController(tCtx, testServer.URL, 0*time.Second)
|
||||
pod := testPod(ns, 0, 1, true, ipv4only)
|
||||
_ = controller.podStore.Add(pod)
|
||||
_ = controller.endpointsStore.Add(readyEndpoints)
|
||||
_ = controller.serviceStore.Add(&v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
Spec: v1.ServiceSpec{
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
ClusterIP: "10.0.0.1",
|
||||
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
|
||||
},
|
||||
})
|
||||
|
||||
go func() { controller.Run(tCtx, 1) }()
|
||||
|
||||
// Rapidly update the Pod: Ready -> NotReady -> Ready.
|
||||
pod2 := pod.DeepCopy()
|
||||
pod2.ResourceVersion = "2"
|
||||
pod2.Status.Conditions[0].Status = v1.ConditionFalse
|
||||
_ = controller.podStore.Update(pod2)
|
||||
controller.updatePod(pod, pod2)
|
||||
// blockNextAction should eventually unblock once server gets endpoints request.
|
||||
waitForChanReceive(t, 1*time.Second, blockNextAction, "Pod Update should have caused a request to be sent to the test server")
|
||||
// The endpoints update hasn't been applied to the cache yet.
|
||||
pod3 := pod.DeepCopy()
|
||||
pod3.ResourceVersion = "3"
|
||||
pod3.Status.Conditions[0].Status = v1.ConditionTrue
|
||||
_ = controller.podStore.Update(pod3)
|
||||
controller.updatePod(pod2, pod3)
|
||||
// It shouldn't get endpoints request as the endpoints in the cache is out-of-date.
|
||||
timer := time.NewTimer(100 * time.Millisecond)
|
||||
select {
|
||||
case <-timer.C:
|
||||
case <-blockNextAction:
|
||||
t.Errorf("Pod Update shouldn't have caused a request to be sent to the test server")
|
||||
}
|
||||
|
||||
// Applying the endpoints update to the cache should cause test server to update endpoints.
|
||||
close(blockUpdate)
|
||||
waitForChanReceive(t, 1*time.Second, blockNextAction, "Endpoints should have been updated")
|
||||
|
||||
close(blockNextAction)
|
||||
close(stopChan)
|
||||
}
|
||||
|
||||
func TestSyncServiceAddresses(t *testing.T) {
|
||||
makeService := func(tolerateUnready bool) *v1.Service {
|
||||
return &v1.Service{
|
||||
|
||||
64
pkg/controller/endpoint/endpoints_tracker.go
Normal file
64
pkg/controller/endpoint/endpoints_tracker.go
Normal file
@@ -0,0 +1,64 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package endpoint
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
)
|
||||
|
||||
// staleEndpointsTracker tracks Endpoints and their stale resource versions to
|
||||
// help determine if an Endpoints is stale.
|
||||
type staleEndpointsTracker struct {
|
||||
// lock protects staleResourceVersionByEndpoints.
|
||||
lock sync.RWMutex
|
||||
// staleResourceVersionByEndpoints tracks the stale resource version of Endpoints.
|
||||
staleResourceVersionByEndpoints map[types.NamespacedName]string
|
||||
}
|
||||
|
||||
func newStaleEndpointsTracker() *staleEndpointsTracker {
|
||||
return &staleEndpointsTracker{
|
||||
staleResourceVersionByEndpoints: map[types.NamespacedName]string{},
|
||||
}
|
||||
}
|
||||
|
||||
func (t *staleEndpointsTracker) Stale(endpoints *v1.Endpoints) {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
nn := types.NamespacedName{Name: endpoints.Name, Namespace: endpoints.Namespace}
|
||||
t.staleResourceVersionByEndpoints[nn] = endpoints.ResourceVersion
|
||||
}
|
||||
|
||||
func (t *staleEndpointsTracker) IsStale(endpoints *v1.Endpoints) bool {
|
||||
t.lock.RLock()
|
||||
defer t.lock.RUnlock()
|
||||
nn := types.NamespacedName{Name: endpoints.Name, Namespace: endpoints.Namespace}
|
||||
staleResourceVersion, exists := t.staleResourceVersionByEndpoints[nn]
|
||||
if exists && staleResourceVersion == endpoints.ResourceVersion {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (t *staleEndpointsTracker) Delete(namespace, name string) {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
nn := types.NamespacedName{Namespace: namespace, Name: name}
|
||||
delete(t.staleResourceVersionByEndpoints, nn)
|
||||
}
|
||||
54
pkg/controller/endpoint/endpoints_tracker_test.go
Normal file
54
pkg/controller/endpoint/endpoints_tracker_test.go
Normal file
@@ -0,0 +1,54 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package endpoint
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func TestStaleEndpointsTracker(t *testing.T) {
|
||||
ns := metav1.NamespaceDefault
|
||||
tracker := newStaleEndpointsTracker()
|
||||
|
||||
endpoints := &v1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: ns,
|
||||
ResourceVersion: "1",
|
||||
},
|
||||
Subsets: []v1.EndpointSubset{{
|
||||
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
|
||||
Ports: []v1.EndpointPort{{Port: 1000}},
|
||||
}},
|
||||
}
|
||||
|
||||
assert.False(t, tracker.IsStale(endpoints), "IsStale should return false before the endpoint is staled")
|
||||
|
||||
tracker.Stale(endpoints)
|
||||
assert.True(t, tracker.IsStale(endpoints), "IsStale should return true after the endpoint is staled")
|
||||
|
||||
endpoints.ResourceVersion = "2"
|
||||
assert.False(t, tracker.IsStale(endpoints), "IsStale should return false after the endpoint is updated")
|
||||
|
||||
tracker.Delete(endpoints.Namespace, endpoints.Name)
|
||||
assert.Empty(t, tracker.staleResourceVersionByEndpoints)
|
||||
}
|
||||
Reference in New Issue
Block a user