Merge pull request #108078 from tnqn/endpoints-resync

Skip updating Endpoints if no relevant fields change
This commit is contained in:
Kubernetes Prow Robot 2022-03-01 23:23:13 -08:00 committed by GitHub
commit 0dc83fe696
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 248 additions and 16 deletions

View File

@ -490,8 +490,10 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
if _, ok := currentEndpoints.Labels[v1.IsHeadlessService]; ok {
compareLabels = utillabels.CloneAndRemoveLabel(currentEndpoints.Labels, v1.IsHeadlessService)
}
// When comparing the subsets, we ignore the difference in ResourceVersion of Pod to avoid unnecessary Endpoints
// updates caused by Pod updates that we don't care, e.g. annotation update.
if !createEndpoints &&
apiequality.Semantic.DeepEqual(currentEndpoints.Subsets, subsets) &&
endpointutil.EndpointSubsetsEqualIgnoreResourceVersion(currentEndpoints.Subsets, subsets) &&
apiequality.Semantic.DeepEqual(compareLabels, service.Labels) &&
capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) {
klog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)

View File

@ -319,6 +319,52 @@ func TestSyncEndpointsExistingEmptySubsets(t *testing.T) {
endpointsHandler.ValidateRequestCount(t, 0)
}
func TestSyncEndpointsWithPodResourceVersionUpdateOnly(t *testing.T) {
ns := metav1.NamespaceDefault
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
pod0 := testPod(ns, 0, 1, true, ipv4only)
pod1 := testPod(ns, 1, 1, false, ipv4only)
endpoints := newController(testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{
{
IP: pod0.Status.PodIPs[0].IP,
NodeName: &emptyNodeName,
TargetRef: &v1.ObjectReference{Kind: "Pod", Name: pod0.Name, Namespace: ns, ResourceVersion: "1"},
},
},
NotReadyAddresses: []v1.EndpointAddress{
{
IP: pod1.Status.PodIPs[0].IP,
NodeName: &emptyNodeName,
TargetRef: &v1.ObjectReference{Kind: "Pod", Name: pod1.Name, Namespace: ns, ResourceVersion: "2"},
},
},
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}},
})
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
},
})
pod0.ResourceVersion = "3"
pod1.ResourceVersion = "4"
endpoints.podStore.Add(pod0)
endpoints.podStore.Add(pod1)
endpoints.syncService(context.TODO(), ns+"/foo")
endpointsHandler.ValidateRequestCount(t, 0)
}
func TestSyncEndpointsNewNoSubsets(t *testing.T) {
ns := metav1.NamespaceDefault
testServer, endpointsHandler := makeTestServer(t, ns)

View File

@ -26,7 +26,7 @@ import (
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
@ -37,6 +37,18 @@ import (
"k8s.io/kubernetes/pkg/util/hash"
)
// semanticIgnoreResourceVersion does semantic deep equality checks for objects
// but excludes ResourceVersion of ObjectReference. They are used when comparing
// endpoints in Endpoints and EndpointSlice objects to avoid unnecessary updates
// caused by Pod resourceVersion change.
var semanticIgnoreResourceVersion = conversion.EqualitiesOrDie(
func(a, b v1.ObjectReference) bool {
a.ResourceVersion = ""
b.ResourceVersion = ""
return a == b
},
)
// ServiceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls to AsSelectorPreValidated (see #73527)
type ServiceSelectorCache struct {
lock sync.RWMutex
@ -279,7 +291,8 @@ func (sl portsInOrder) Less(i, j int) bool {
// EndpointsEqualBeyondHash returns true if endpoints have equal attributes
// but excludes equality checks that would have already been covered with
// endpoint hashing (see hashEndpoint func for more info).
// endpoint hashing (see hashEndpoint func for more info) and ignores difference
// in ResourceVersion of TargetRef.
func EndpointsEqualBeyondHash(ep1, ep2 *discovery.Endpoint) bool {
if stringPtrChanged(ep1.NodeName, ep2.NodeName) {
return false
@ -293,7 +306,20 @@ func EndpointsEqualBeyondHash(ep1, ep2 *discovery.Endpoint) bool {
return false
}
if objectRefPtrChanged(ep1.TargetRef, ep2.TargetRef) {
// Serving and Terminating will only be set when the EndpointSliceTerminatingCondition feature is on.
// Ignore their difference if the expected or actual value is nil, which means the feature enablement is changed.
// Otherwise all EndpointSlices in the system would be updated on the first controller-manager restart even without
// actual changes, leading to delay in processing legitimate updates.
// Its value will be set to the expected one when there is an actual change triggering update of this EndpointSlice.
if ep1.Conditions.Serving != nil && ep2.Conditions.Serving != nil && *ep1.Conditions.Serving != *ep2.Conditions.Serving {
return false
}
if ep1.Conditions.Terminating != nil && ep2.Conditions.Terminating != nil && *ep1.Conditions.Terminating != *ep2.Conditions.Terminating {
return false
}
if !semanticIgnoreResourceVersion.DeepEqual(ep1.TargetRef, ep2.TargetRef) {
return false
}
@ -311,18 +337,6 @@ func boolPtrChanged(ptr1, ptr2 *bool) bool {
return false
}
// objectRefPtrChanged returns true if a set of object ref pointers have
// different values.
func objectRefPtrChanged(ref1, ref2 *v1.ObjectReference) bool {
if (ref1 == nil) != (ref2 == nil) {
return true
}
if ref1 != nil && ref2 != nil && !apiequality.Semantic.DeepEqual(*ref1, *ref2) {
return true
}
return false
}
// stringPtrChanged returns true if a set of string pointers have different values.
func stringPtrChanged(ptr1, ptr2 *string) bool {
if (ptr1 == nil) != (ptr2 == nil) {
@ -333,3 +347,9 @@ func stringPtrChanged(ptr1, ptr2 *string) bool {
}
return false
}
// EndpointSubsetsEqualIgnoreResourceVersion returns true if EndpointSubsets
// have equal attributes but excludes ResourceVersion of Pod.
func EndpointSubsetsEqualIgnoreResourceVersion(subsets1, subsets2 []v1.EndpointSubset) bool {
return semanticIgnoreResourceVersion.DeepEqual(subsets1, subsets2)
}

View File

@ -755,6 +755,58 @@ func TestEndpointsEqualBeyondHash(t *testing.T) {
},
expected: false,
},
{
name: "Serving condition changed from nil to true",
ep1: &discovery.Endpoint{
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
Serving: nil,
Terminating: nil,
},
Addresses: []string{"10.0.0.1"},
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"},
Zone: utilpointer.StringPtr("zone-1"),
NodeName: utilpointer.StringPtr("node-1"),
},
ep2: &discovery.Endpoint{
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(false),
},
Addresses: []string{"10.0.0.1"},
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"},
Zone: utilpointer.StringPtr("zone-1"),
NodeName: utilpointer.StringPtr("node-1"),
},
expected: true,
},
{
name: "Serving condition changed from false to true",
ep1: &discovery.Endpoint{
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
Serving: utilpointer.BoolPtr(false),
Terminating: utilpointer.BoolPtr(false),
},
Addresses: []string{"10.0.0.1"},
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"},
Zone: utilpointer.StringPtr("zone-1"),
NodeName: utilpointer.StringPtr("node-1"),
},
ep2: &discovery.Endpoint{
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(false),
},
Addresses: []string{"10.0.0.1"},
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"},
Zone: utilpointer.StringPtr("zone-1"),
NodeName: utilpointer.StringPtr("node-1"),
},
expected: false,
},
{
name: "Pod name changed",
ep1: &discovery.Endpoint{
@ -777,6 +829,28 @@ func TestEndpointsEqualBeyondHash(t *testing.T) {
},
expected: false,
},
{
name: "Pod resourceVersion changed",
ep1: &discovery.Endpoint{
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
},
Addresses: []string{"10.0.0.1"},
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0", ResourceVersion: "1"},
Zone: utilpointer.StringPtr("zone-1"),
NodeName: utilpointer.StringPtr("node-1"),
},
ep2: &discovery.Endpoint{
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
},
Addresses: []string{"10.0.0.1"},
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0", ResourceVersion: "2"},
Zone: utilpointer.StringPtr("zone-1"),
NodeName: utilpointer.StringPtr("node-1"),
},
expected: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -786,3 +860,93 @@ func TestEndpointsEqualBeyondHash(t *testing.T) {
})
}
}
func TestEndpointSubsetsEqualIgnoreResourceVersion(t *testing.T) {
copyAndMutateEndpointSubset := func(orig *v1.EndpointSubset, mutator func(*v1.EndpointSubset)) *v1.EndpointSubset {
newSubSet := orig.DeepCopy()
mutator(newSubSet)
return newSubSet
}
es1 := &v1.EndpointSubset{
Addresses: []v1.EndpointAddress{
{
IP: "1.1.1.1",
TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1-1", Namespace: "ns", ResourceVersion: "1"},
},
},
NotReadyAddresses: []v1.EndpointAddress{
{
IP: "1.1.1.2",
TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1-2", Namespace: "ns2", ResourceVersion: "2"},
},
},
Ports: []v1.EndpointPort{{Port: 8081, Protocol: "TCP"}},
}
es2 := &v1.EndpointSubset{
Addresses: []v1.EndpointAddress{
{
IP: "2.2.2.1",
TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2-1", Namespace: "ns", ResourceVersion: "3"},
},
},
NotReadyAddresses: []v1.EndpointAddress{
{
IP: "2.2.2.2",
TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2-2", Namespace: "ns2", ResourceVersion: "4"},
},
},
Ports: []v1.EndpointPort{{Port: 8082, Protocol: "TCP"}},
}
tests := []struct {
name string
subsets1 []v1.EndpointSubset
subsets2 []v1.EndpointSubset
expected bool
}{
{
name: "Subsets removed",
subsets1: []v1.EndpointSubset{*es1, *es2},
subsets2: []v1.EndpointSubset{*es1},
expected: false,
},
{
name: "Ready Pod IP changed",
subsets1: []v1.EndpointSubset{*es1, *es2},
subsets2: []v1.EndpointSubset{*copyAndMutateEndpointSubset(es1, func(es *v1.EndpointSubset) {
es.Addresses[0].IP = "1.1.1.10"
}), *es2},
expected: false,
},
{
name: "NotReady Pod IP changed",
subsets1: []v1.EndpointSubset{*es1, *es2},
subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es2, func(es *v1.EndpointSubset) {
es.NotReadyAddresses[0].IP = "2.2.2.10"
})},
expected: false,
},
{
name: "Pod ResourceVersion changed",
subsets1: []v1.EndpointSubset{*es1, *es2},
subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es2, func(es *v1.EndpointSubset) {
es.Addresses[0].TargetRef.ResourceVersion = "100"
})},
expected: true,
},
{
name: "Ports changed",
subsets1: []v1.EndpointSubset{*es1, *es2},
subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es1, func(es *v1.EndpointSubset) {
es.Ports[0].Port = 8082
})},
expected: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := EndpointSubsetsEqualIgnoreResourceVersion(tt.subsets1, tt.subsets2); got != tt.expected {
t.Errorf("semanticIgnoreResourceVersion.DeepEqual() = %v, expected %v", got, tt.expected)
}
})
}
}