mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 13:37:30 +00:00
Merge pull request #50934 from joelsmith/skip-endpoints-update
Automatic merge from submit-queue (batch tested with PRs 46458, 50934, 50766, 50970, 47698) Skip non-update endpoint updates **What this PR does / why we need it**: On large clusters, a large percentage of endpoint updates are actually non-updates that occur as a result of a change in an associated pod. This results in endpoint updates where the only field that has changed is the `TargetRef.ResourceVersion` in the endpoint address associated with the changed pod. Given enough of these non-updates, the endpoint controller's queue rate limit can be overwhelmed and legitimate updates can be delayed, resulting in (temporarily) broken services. We have clusters where we've seen endpoint updates take 9 minutes. **Which issue this PR fixes** : fixes #50936 **Special notes for your reviewer**: N/A **Release note**: ```release-note Prevent unneeded endpoint updates ```
This commit is contained in:
commit
aa41ff26d0
@ -50,6 +50,7 @@ go_test(
|
|||||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/informers:go_default_library",
|
"//vendor/k8s.io/client-go/informers:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||||
|
@ -202,6 +202,49 @@ func (e *EndpointController) addPod(obj interface{}) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func podToEndpointAddress(pod *v1.Pod) *v1.EndpointAddress {
|
||||||
|
return &v1.EndpointAddress{
|
||||||
|
IP: pod.Status.PodIP,
|
||||||
|
NodeName: &pod.Spec.NodeName,
|
||||||
|
TargetRef: &v1.ObjectReference{
|
||||||
|
Kind: "Pod",
|
||||||
|
Namespace: pod.ObjectMeta.Namespace,
|
||||||
|
Name: pod.ObjectMeta.Name,
|
||||||
|
UID: pod.ObjectMeta.UID,
|
||||||
|
ResourceVersion: pod.ObjectMeta.ResourceVersion,
|
||||||
|
}}
|
||||||
|
}
|
||||||
|
|
||||||
|
func podAddressChanged(oldPod, newPod *v1.Pod) bool {
|
||||||
|
// Convert the pod to an EndpointAddress, clear inert fields,
|
||||||
|
// and see if they are the same.
|
||||||
|
newEndpointAddress := podToEndpointAddress(newPod)
|
||||||
|
oldEndpointAddress := podToEndpointAddress(oldPod)
|
||||||
|
// Ignore the ResourceVersion because it changes
|
||||||
|
// with every pod update. This allows the comparison to
|
||||||
|
// show equality if all other relevant fields match.
|
||||||
|
newEndpointAddress.TargetRef.ResourceVersion = ""
|
||||||
|
oldEndpointAddress.TargetRef.ResourceVersion = ""
|
||||||
|
if reflect.DeepEqual(newEndpointAddress, oldEndpointAddress) {
|
||||||
|
// The pod has not changed in any way that impacts the endpoints
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func determineNeededServiceUpdates(oldServices, services sets.String, podChanged bool) sets.String {
|
||||||
|
if podChanged {
|
||||||
|
// if the labels and pod changed, all services need to be updated
|
||||||
|
services = services.Union(oldServices)
|
||||||
|
} else {
|
||||||
|
// if only the labels changed, services not common to
|
||||||
|
// both the new and old service set (i.e the disjunctive union)
|
||||||
|
// need to be updated
|
||||||
|
services = services.Difference(oldServices).Union(oldServices.Difference(services))
|
||||||
|
}
|
||||||
|
return services
|
||||||
|
}
|
||||||
|
|
||||||
// When a pod is updated, figure out what services it used to be a member of
|
// When a pod is updated, figure out what services it used to be a member of
|
||||||
// and what services it will be a member of, and enqueue the union of these.
|
// and what services it will be a member of, and enqueue the union of these.
|
||||||
// old and cur must be *v1.Pod types.
|
// old and cur must be *v1.Pod types.
|
||||||
@ -213,22 +256,37 @@ func (e *EndpointController) updatePod(old, cur interface{}) {
|
|||||||
// Two different versions of the same pod will always have different RVs.
|
// Two different versions of the same pod will always have different RVs.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
podChanged := podAddressChanged(oldPod, newPod)
|
||||||
|
|
||||||
|
// Check if the pod labels have changed, indicating a possibe
|
||||||
|
// change in the service membership
|
||||||
|
labelsChanged := false
|
||||||
|
if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) ||
|
||||||
|
!hostNameAndDomainAreEqual(newPod, oldPod) {
|
||||||
|
labelsChanged = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// If both the pod and labels are unchanged, no update is needed
|
||||||
|
if !podChanged && !labelsChanged {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
services, err := e.getPodServiceMemberships(newPod)
|
services, err := e.getPodServiceMemberships(newPod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err))
|
utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only need to get the old services if the labels changed.
|
if labelsChanged {
|
||||||
if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) ||
|
|
||||||
!hostNameAndDomainAreEqual(newPod, oldPod) {
|
|
||||||
oldServices, err := e.getPodServiceMemberships(oldPod)
|
oldServices, err := e.getPodServiceMemberships(oldPod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err))
|
utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
services = services.Union(oldServices)
|
services = determineNeededServiceUpdates(oldServices, services, podChanged)
|
||||||
}
|
}
|
||||||
|
|
||||||
for key := range services {
|
for key := range services {
|
||||||
e.queue.Add(key)
|
e.queue.Add(key)
|
||||||
}
|
}
|
||||||
@ -376,16 +434,7 @@ func (e *EndpointController) syncService(key string) error {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
epa := v1.EndpointAddress{
|
epa := *podToEndpointAddress(pod)
|
||||||
IP: pod.Status.PodIP,
|
|
||||||
NodeName: &pod.Spec.NodeName,
|
|
||||||
TargetRef: &v1.ObjectReference{
|
|
||||||
Kind: "Pod",
|
|
||||||
Namespace: pod.ObjectMeta.Namespace,
|
|
||||||
Name: pod.ObjectMeta.Name,
|
|
||||||
UID: pod.ObjectMeta.UID,
|
|
||||||
ResourceVersion: pod.ObjectMeta.ResourceVersion,
|
|
||||||
}}
|
|
||||||
|
|
||||||
hostname := pod.Spec.Hostname
|
hostname := pod.Spec.Hostname
|
||||||
if len(hostname) > 0 && pod.Spec.Subdomain == service.Name && service.Namespace == pod.Namespace {
|
if len(hostname) > 0 && pod.Spec.Subdomain == service.Name && service.Namespace == pod.Namespace {
|
||||||
|
@ -27,6 +27,7 @@ import (
|
|||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/intstr"
|
"k8s.io/apimachinery/pkg/util/intstr"
|
||||||
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
@ -956,3 +957,142 @@ func TestShouldPodBeInEndpoints(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPodToEndpointAddress(t *testing.T) {
|
||||||
|
podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
|
||||||
|
ns := "test"
|
||||||
|
addPods(podStore, ns, 1, 1, 0)
|
||||||
|
pods := podStore.List()
|
||||||
|
if len(pods) != 1 {
|
||||||
|
t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
pod := pods[0].(*v1.Pod)
|
||||||
|
epa := podToEndpointAddress(pod)
|
||||||
|
if epa.IP != pod.Status.PodIP {
|
||||||
|
t.Errorf("IP: expected: %s, got: %s", pod.Status.PodIP, epa.IP)
|
||||||
|
}
|
||||||
|
if *(epa.NodeName) != pod.Spec.NodeName {
|
||||||
|
t.Errorf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName))
|
||||||
|
}
|
||||||
|
if epa.TargetRef.Kind != "Pod" {
|
||||||
|
t.Errorf("TargetRef.Kind: expected: %s, got: %s", "Pod", epa.TargetRef.Kind)
|
||||||
|
}
|
||||||
|
if epa.TargetRef.Namespace != pod.ObjectMeta.Namespace {
|
||||||
|
t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Namespace, epa.TargetRef.Namespace)
|
||||||
|
}
|
||||||
|
if epa.TargetRef.Name != pod.ObjectMeta.Name {
|
||||||
|
t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Name, epa.TargetRef.Name)
|
||||||
|
}
|
||||||
|
if epa.TargetRef.UID != pod.ObjectMeta.UID {
|
||||||
|
t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.UID, epa.TargetRef.UID)
|
||||||
|
}
|
||||||
|
if epa.TargetRef.ResourceVersion != pod.ObjectMeta.ResourceVersion {
|
||||||
|
t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.ResourceVersion, epa.TargetRef.ResourceVersion)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPodAddressChanged(t *testing.T) {
|
||||||
|
podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
|
||||||
|
ns := "test"
|
||||||
|
addPods(podStore, ns, 1, 1, 0)
|
||||||
|
pods := podStore.List()
|
||||||
|
if len(pods) != 1 {
|
||||||
|
t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
oldPod := pods[0].(*v1.Pod)
|
||||||
|
newPod := oldPod.DeepCopy()
|
||||||
|
|
||||||
|
if podAddressChanged(oldPod, newPod) {
|
||||||
|
t.Errorf("Expected address to be unchanged for copied pod")
|
||||||
|
}
|
||||||
|
|
||||||
|
newPod.Spec.NodeName = "changed"
|
||||||
|
if !podAddressChanged(oldPod, newPod) {
|
||||||
|
t.Errorf("Expected address to be changed for pod with NodeName changed")
|
||||||
|
}
|
||||||
|
newPod.Spec.NodeName = oldPod.Spec.NodeName
|
||||||
|
|
||||||
|
newPod.ObjectMeta.ResourceVersion = "changed"
|
||||||
|
if podAddressChanged(oldPod, newPod) {
|
||||||
|
t.Errorf("Expected address to be unchanged for pod with only ResourceVersion changed")
|
||||||
|
}
|
||||||
|
newPod.ObjectMeta.ResourceVersion = oldPod.ObjectMeta.ResourceVersion
|
||||||
|
|
||||||
|
newPod.Status.PodIP = "1.2.3.1"
|
||||||
|
if !podAddressChanged(oldPod, newPod) {
|
||||||
|
t.Errorf("Expected address to be changed with pod IP address change")
|
||||||
|
}
|
||||||
|
newPod.Status.PodIP = oldPod.Status.PodIP
|
||||||
|
|
||||||
|
newPod.ObjectMeta.Name = "wrong-name"
|
||||||
|
if !podAddressChanged(oldPod, newPod) {
|
||||||
|
t.Errorf("Expected address to be changed with pod name change")
|
||||||
|
}
|
||||||
|
newPod.ObjectMeta.Name = oldPod.ObjectMeta.Name
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDetermineNeededServiceUpdates(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
a sets.String
|
||||||
|
b sets.String
|
||||||
|
union sets.String
|
||||||
|
xor sets.String
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "no services changed",
|
||||||
|
a: sets.NewString("a", "b", "c"),
|
||||||
|
b: sets.NewString("a", "b", "c"),
|
||||||
|
xor: sets.NewString(),
|
||||||
|
union: sets.NewString("a", "b", "c"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "all old services removed, new services added",
|
||||||
|
a: sets.NewString("a", "b", "c"),
|
||||||
|
b: sets.NewString("d", "e", "f"),
|
||||||
|
xor: sets.NewString("a", "b", "c", "d", "e", "f"),
|
||||||
|
union: sets.NewString("a", "b", "c", "d", "e", "f"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "all old services removed, no new services added",
|
||||||
|
a: sets.NewString("a", "b", "c"),
|
||||||
|
b: sets.NewString(),
|
||||||
|
xor: sets.NewString("a", "b", "c"),
|
||||||
|
union: sets.NewString("a", "b", "c"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no old services, but new services added",
|
||||||
|
a: sets.NewString(),
|
||||||
|
b: sets.NewString("a", "b", "c"),
|
||||||
|
xor: sets.NewString("a", "b", "c"),
|
||||||
|
union: sets.NewString("a", "b", "c"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "one service removed, one service added, two unchanged",
|
||||||
|
a: sets.NewString("a", "b", "c"),
|
||||||
|
b: sets.NewString("b", "c", "d"),
|
||||||
|
xor: sets.NewString("a", "d"),
|
||||||
|
union: sets.NewString("a", "b", "c", "d"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no services",
|
||||||
|
a: sets.NewString(),
|
||||||
|
b: sets.NewString(),
|
||||||
|
xor: sets.NewString(),
|
||||||
|
union: sets.NewString(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, testCase := range testCases {
|
||||||
|
retval := determineNeededServiceUpdates(testCase.a, testCase.b, false)
|
||||||
|
if !retval.Equal(testCase.xor) {
|
||||||
|
t.Errorf("%s (with podChanged=false): expected: %v got: %v", testCase.name, testCase.xor.List(), retval.List())
|
||||||
|
}
|
||||||
|
|
||||||
|
retval = determineNeededServiceUpdates(testCase.a, testCase.b, true)
|
||||||
|
if !retval.Equal(testCase.union) {
|
||||||
|
t.Errorf("%s (with podChanged=true): expected: %v got: %v", testCase.name, testCase.union.List(), retval.List())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user