From a06236dd72c758b3a040f134628848b1faec2cf7 Mon Sep 17 00:00:00 2001 From: Seth Jennings Date: Fri, 18 Aug 2017 15:09:46 -0500 Subject: [PATCH] Factor out endpoint address generation, skip unneeded endpoint updates Also add unit tests for new endpoint helpers related to updatePod --- pkg/controller/endpoint/BUILD | 1 + .../endpoint/endpoints_controller.go | 77 ++++++++-- .../endpoint/endpoints_controller_test.go | 140 ++++++++++++++++++ 3 files changed, 204 insertions(+), 14 deletions(-) diff --git a/pkg/controller/endpoint/BUILD b/pkg/controller/endpoint/BUILD index 3202f12e557..f18fde133cd 100644 --- a/pkg/controller/endpoint/BUILD +++ b/pkg/controller/endpoint/BUILD @@ -50,6 +50,7 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index f0b0a4e9129..987b44a9511 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -202,6 +202,49 @@ func (e *EndpointController) addPod(obj interface{}) { } } +func podToEndpointAddress(pod *v1.Pod) *v1.EndpointAddress { + return &v1.EndpointAddress{ + IP: pod.Status.PodIP, + NodeName: &pod.Spec.NodeName, + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Namespace: pod.ObjectMeta.Namespace, + Name: pod.ObjectMeta.Name, + UID: pod.ObjectMeta.UID, + ResourceVersion: pod.ObjectMeta.ResourceVersion, + }} +} + +func podAddressChanged(oldPod, newPod *v1.Pod) bool { + // Convert the pod to an EndpointAddress, clear inert fields, + // and see if they are the same. + newEndpointAddress := podToEndpointAddress(newPod) + oldEndpointAddress := podToEndpointAddress(oldPod) + // Ignore the ResourceVersion because it changes + // with every pod update. This allows the comparison to + // show equality if all other relevant fields match. + newEndpointAddress.TargetRef.ResourceVersion = "" + oldEndpointAddress.TargetRef.ResourceVersion = "" + if reflect.DeepEqual(newEndpointAddress, oldEndpointAddress) { + // The pod has not changed in any way that impacts the endpoints + return false + } + return true +} + +func determineNeededServiceUpdates(oldServices, services sets.String, podChanged bool) sets.String { + if podChanged { + // if the labels and pod changed, all services need to be updated + services = services.Union(oldServices) + } else { + // if only the labels changed, services not common to + // both the new and old service set (i.e the disjunctive union) + // need to be updated + services = services.Difference(oldServices).Union(oldServices.Difference(services)) + } + return services +} + // When a pod is updated, figure out what services it used to be a member of // and what services it will be a member of, and enqueue the union of these. // old and cur must be *v1.Pod types. @@ -213,22 +256,37 @@ func (e *EndpointController) updatePod(old, cur interface{}) { // Two different versions of the same pod will always have different RVs. return } + + podChanged := podAddressChanged(oldPod, newPod) + + // Check if the pod labels have changed, indicating a possibe + // change in the service membership + labelsChanged := false + if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) || + !hostNameAndDomainAreEqual(newPod, oldPod) { + labelsChanged = true + } + + // If both the pod and labels are unchanged, no update is needed + if !podChanged && !labelsChanged { + return + } + services, err := e.getPodServiceMemberships(newPod) if err != nil { utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err)) return } - // Only need to get the old services if the labels changed. - if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) || - !hostNameAndDomainAreEqual(newPod, oldPod) { + if labelsChanged { oldServices, err := e.getPodServiceMemberships(oldPod) if err != nil { utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err)) return } - services = services.Union(oldServices) + services = determineNeededServiceUpdates(oldServices, services, podChanged) } + for key := range services { e.queue.Add(key) } @@ -376,16 +434,7 @@ func (e *EndpointController) syncService(key string) error { continue } - epa := v1.EndpointAddress{ - IP: pod.Status.PodIP, - NodeName: &pod.Spec.NodeName, - TargetRef: &v1.ObjectReference{ - Kind: "Pod", - Namespace: pod.ObjectMeta.Namespace, - Name: pod.ObjectMeta.Name, - UID: pod.ObjectMeta.UID, - ResourceVersion: pod.ObjectMeta.ResourceVersion, - }} + epa := *podToEndpointAddress(pod) hostname := pod.Spec.Hostname if len(hostname) > 0 && pod.Spec.Subdomain == service.Name && service.Namespace == pod.Namespace { diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index ad4007ca3c7..cc40c815551 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -27,6 +27,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" @@ -956,3 +957,142 @@ func TestShouldPodBeInEndpoints(t *testing.T) { } } } + +func TestPodToEndpointAddress(t *testing.T) { + podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) + ns := "test" + addPods(podStore, ns, 1, 1, 0) + pods := podStore.List() + if len(pods) != 1 { + t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods)) + return + } + pod := pods[0].(*v1.Pod) + epa := podToEndpointAddress(pod) + if epa.IP != pod.Status.PodIP { + t.Errorf("IP: expected: %s, got: %s", pod.Status.PodIP, epa.IP) + } + if *(epa.NodeName) != pod.Spec.NodeName { + t.Errorf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName)) + } + if epa.TargetRef.Kind != "Pod" { + t.Errorf("TargetRef.Kind: expected: %s, got: %s", "Pod", epa.TargetRef.Kind) + } + if epa.TargetRef.Namespace != pod.ObjectMeta.Namespace { + t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Namespace, epa.TargetRef.Namespace) + } + if epa.TargetRef.Name != pod.ObjectMeta.Name { + t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Name, epa.TargetRef.Name) + } + if epa.TargetRef.UID != pod.ObjectMeta.UID { + t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.UID, epa.TargetRef.UID) + } + if epa.TargetRef.ResourceVersion != pod.ObjectMeta.ResourceVersion { + t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.ResourceVersion, epa.TargetRef.ResourceVersion) + } +} + +func TestPodAddressChanged(t *testing.T) { + podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) + ns := "test" + addPods(podStore, ns, 1, 1, 0) + pods := podStore.List() + if len(pods) != 1 { + t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods)) + return + } + oldPod := pods[0].(*v1.Pod) + newPod := oldPod.DeepCopy() + + if podAddressChanged(oldPod, newPod) { + t.Errorf("Expected address to be unchanged for copied pod") + } + + newPod.Spec.NodeName = "changed" + if !podAddressChanged(oldPod, newPod) { + t.Errorf("Expected address to be changed for pod with NodeName changed") + } + newPod.Spec.NodeName = oldPod.Spec.NodeName + + newPod.ObjectMeta.ResourceVersion = "changed" + if podAddressChanged(oldPod, newPod) { + t.Errorf("Expected address to be unchanged for pod with only ResourceVersion changed") + } + newPod.ObjectMeta.ResourceVersion = oldPod.ObjectMeta.ResourceVersion + + newPod.Status.PodIP = "1.2.3.1" + if !podAddressChanged(oldPod, newPod) { + t.Errorf("Expected address to be changed with pod IP address change") + } + newPod.Status.PodIP = oldPod.Status.PodIP + + newPod.ObjectMeta.Name = "wrong-name" + if !podAddressChanged(oldPod, newPod) { + t.Errorf("Expected address to be changed with pod name change") + } + newPod.ObjectMeta.Name = oldPod.ObjectMeta.Name +} + +func TestDetermineNeededServiceUpdates(t *testing.T) { + testCases := []struct { + name string + a sets.String + b sets.String + union sets.String + xor sets.String + }{ + { + name: "no services changed", + a: sets.NewString("a", "b", "c"), + b: sets.NewString("a", "b", "c"), + xor: sets.NewString(), + union: sets.NewString("a", "b", "c"), + }, + { + name: "all old services removed, new services added", + a: sets.NewString("a", "b", "c"), + b: sets.NewString("d", "e", "f"), + xor: sets.NewString("a", "b", "c", "d", "e", "f"), + union: sets.NewString("a", "b", "c", "d", "e", "f"), + }, + { + name: "all old services removed, no new services added", + a: sets.NewString("a", "b", "c"), + b: sets.NewString(), + xor: sets.NewString("a", "b", "c"), + union: sets.NewString("a", "b", "c"), + }, + { + name: "no old services, but new services added", + a: sets.NewString(), + b: sets.NewString("a", "b", "c"), + xor: sets.NewString("a", "b", "c"), + union: sets.NewString("a", "b", "c"), + }, + { + name: "one service removed, one service added, two unchanged", + a: sets.NewString("a", "b", "c"), + b: sets.NewString("b", "c", "d"), + xor: sets.NewString("a", "d"), + union: sets.NewString("a", "b", "c", "d"), + }, + { + name: "no services", + a: sets.NewString(), + b: sets.NewString(), + xor: sets.NewString(), + union: sets.NewString(), + }, + } + for _, testCase := range testCases { + retval := determineNeededServiceUpdates(testCase.a, testCase.b, false) + if !retval.Equal(testCase.xor) { + t.Errorf("%s (with podChanged=false): expected: %v got: %v", testCase.name, testCase.xor.List(), retval.List()) + } + + retval = determineNeededServiceUpdates(testCase.a, testCase.b, true) + if !retval.Equal(testCase.union) { + t.Errorf("%s (with podChanged=true): expected: %v got: %v", testCase.name, testCase.union.List(), retval.List()) + } + } +}