Factor out endpoint address generation, skip unneeded endpoint updates

Also add unit tests for new endpoint helpers related to updatePod
This commit is contained in:
Seth Jennings 2017-08-18 15:09:46 -05:00 committed by Joel Smith
parent e633a1604f
commit a06236dd72
3 changed files with 204 additions and 14 deletions

View File

@ -50,6 +50,7 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",

View File

@ -202,6 +202,49 @@ func (e *EndpointController) addPod(obj interface{}) {
}
}
func podToEndpointAddress(pod *v1.Pod) *v1.EndpointAddress {
return &v1.EndpointAddress{
IP: pod.Status.PodIP,
NodeName: &pod.Spec.NodeName,
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Namespace: pod.ObjectMeta.Namespace,
Name: pod.ObjectMeta.Name,
UID: pod.ObjectMeta.UID,
ResourceVersion: pod.ObjectMeta.ResourceVersion,
}}
}
func podAddressChanged(oldPod, newPod *v1.Pod) bool {
// Convert the pod to an EndpointAddress, clear inert fields,
// and see if they are the same.
newEndpointAddress := podToEndpointAddress(newPod)
oldEndpointAddress := podToEndpointAddress(oldPod)
// Ignore the ResourceVersion because it changes
// with every pod update. This allows the comparison to
// show equality if all other relevant fields match.
newEndpointAddress.TargetRef.ResourceVersion = ""
oldEndpointAddress.TargetRef.ResourceVersion = ""
if reflect.DeepEqual(newEndpointAddress, oldEndpointAddress) {
// The pod has not changed in any way that impacts the endpoints
return false
}
return true
}
func determineNeededServiceUpdates(oldServices, services sets.String, podChanged bool) sets.String {
if podChanged {
// if the labels and pod changed, all services need to be updated
services = services.Union(oldServices)
} else {
// if only the labels changed, services not common to
// both the new and old service set (i.e the disjunctive union)
// need to be updated
services = services.Difference(oldServices).Union(oldServices.Difference(services))
}
return services
}
// When a pod is updated, figure out what services it used to be a member of
// and what services it will be a member of, and enqueue the union of these.
// old and cur must be *v1.Pod types.
@ -213,22 +256,37 @@ func (e *EndpointController) updatePod(old, cur interface{}) {
// Two different versions of the same pod will always have different RVs.
return
}
podChanged := podAddressChanged(oldPod, newPod)
// Check if the pod labels have changed, indicating a possibe
// change in the service membership
labelsChanged := false
if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) ||
!hostNameAndDomainAreEqual(newPod, oldPod) {
labelsChanged = true
}
// If both the pod and labels are unchanged, no update is needed
if !podChanged && !labelsChanged {
return
}
services, err := e.getPodServiceMemberships(newPod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err))
return
}
// Only need to get the old services if the labels changed.
if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) ||
!hostNameAndDomainAreEqual(newPod, oldPod) {
if labelsChanged {
oldServices, err := e.getPodServiceMemberships(oldPod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err))
return
}
services = services.Union(oldServices)
services = determineNeededServiceUpdates(oldServices, services, podChanged)
}
for key := range services {
e.queue.Add(key)
}
@ -376,16 +434,7 @@ func (e *EndpointController) syncService(key string) error {
continue
}
epa := v1.EndpointAddress{
IP: pod.Status.PodIP,
NodeName: &pod.Spec.NodeName,
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Namespace: pod.ObjectMeta.Namespace,
Name: pod.ObjectMeta.Name,
UID: pod.ObjectMeta.UID,
ResourceVersion: pod.ObjectMeta.ResourceVersion,
}}
epa := *podToEndpointAddress(pod)
hostname := pod.Spec.Hostname
if len(hostname) > 0 && pod.Spec.Subdomain == service.Name && service.Namespace == pod.Namespace {

View File

@ -27,6 +27,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
@ -956,3 +957,142 @@ func TestShouldPodBeInEndpoints(t *testing.T) {
}
}
}
func TestPodToEndpointAddress(t *testing.T) {
podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
ns := "test"
addPods(podStore, ns, 1, 1, 0)
pods := podStore.List()
if len(pods) != 1 {
t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods))
return
}
pod := pods[0].(*v1.Pod)
epa := podToEndpointAddress(pod)
if epa.IP != pod.Status.PodIP {
t.Errorf("IP: expected: %s, got: %s", pod.Status.PodIP, epa.IP)
}
if *(epa.NodeName) != pod.Spec.NodeName {
t.Errorf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName))
}
if epa.TargetRef.Kind != "Pod" {
t.Errorf("TargetRef.Kind: expected: %s, got: %s", "Pod", epa.TargetRef.Kind)
}
if epa.TargetRef.Namespace != pod.ObjectMeta.Namespace {
t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Namespace, epa.TargetRef.Namespace)
}
if epa.TargetRef.Name != pod.ObjectMeta.Name {
t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Name, epa.TargetRef.Name)
}
if epa.TargetRef.UID != pod.ObjectMeta.UID {
t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.UID, epa.TargetRef.UID)
}
if epa.TargetRef.ResourceVersion != pod.ObjectMeta.ResourceVersion {
t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.ResourceVersion, epa.TargetRef.ResourceVersion)
}
}
func TestPodAddressChanged(t *testing.T) {
podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
ns := "test"
addPods(podStore, ns, 1, 1, 0)
pods := podStore.List()
if len(pods) != 1 {
t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods))
return
}
oldPod := pods[0].(*v1.Pod)
newPod := oldPod.DeepCopy()
if podAddressChanged(oldPod, newPod) {
t.Errorf("Expected address to be unchanged for copied pod")
}
newPod.Spec.NodeName = "changed"
if !podAddressChanged(oldPod, newPod) {
t.Errorf("Expected address to be changed for pod with NodeName changed")
}
newPod.Spec.NodeName = oldPod.Spec.NodeName
newPod.ObjectMeta.ResourceVersion = "changed"
if podAddressChanged(oldPod, newPod) {
t.Errorf("Expected address to be unchanged for pod with only ResourceVersion changed")
}
newPod.ObjectMeta.ResourceVersion = oldPod.ObjectMeta.ResourceVersion
newPod.Status.PodIP = "1.2.3.1"
if !podAddressChanged(oldPod, newPod) {
t.Errorf("Expected address to be changed with pod IP address change")
}
newPod.Status.PodIP = oldPod.Status.PodIP
newPod.ObjectMeta.Name = "wrong-name"
if !podAddressChanged(oldPod, newPod) {
t.Errorf("Expected address to be changed with pod name change")
}
newPod.ObjectMeta.Name = oldPod.ObjectMeta.Name
}
func TestDetermineNeededServiceUpdates(t *testing.T) {
testCases := []struct {
name string
a sets.String
b sets.String
union sets.String
xor sets.String
}{
{
name: "no services changed",
a: sets.NewString("a", "b", "c"),
b: sets.NewString("a", "b", "c"),
xor: sets.NewString(),
union: sets.NewString("a", "b", "c"),
},
{
name: "all old services removed, new services added",
a: sets.NewString("a", "b", "c"),
b: sets.NewString("d", "e", "f"),
xor: sets.NewString("a", "b", "c", "d", "e", "f"),
union: sets.NewString("a", "b", "c", "d", "e", "f"),
},
{
name: "all old services removed, no new services added",
a: sets.NewString("a", "b", "c"),
b: sets.NewString(),
xor: sets.NewString("a", "b", "c"),
union: sets.NewString("a", "b", "c"),
},
{
name: "no old services, but new services added",
a: sets.NewString(),
b: sets.NewString("a", "b", "c"),
xor: sets.NewString("a", "b", "c"),
union: sets.NewString("a", "b", "c"),
},
{
name: "one service removed, one service added, two unchanged",
a: sets.NewString("a", "b", "c"),
b: sets.NewString("b", "c", "d"),
xor: sets.NewString("a", "d"),
union: sets.NewString("a", "b", "c", "d"),
},
{
name: "no services",
a: sets.NewString(),
b: sets.NewString(),
xor: sets.NewString(),
union: sets.NewString(),
},
}
for _, testCase := range testCases {
retval := determineNeededServiceUpdates(testCase.a, testCase.b, false)
if !retval.Equal(testCase.xor) {
t.Errorf("%s (with podChanged=false): expected: %v got: %v", testCase.name, testCase.xor.List(), retval.List())
}
retval = determineNeededServiceUpdates(testCase.a, testCase.b, true)
if !retval.Equal(testCase.union) {
t.Errorf("%s (with podChanged=true): expected: %v got: %v", testCase.name, testCase.union.List(), retval.List())
}
}
}