From 9fb6e2ef55a7e42dfb5749617046dae71435eed9 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Sun, 24 May 2020 11:55:09 -0400 Subject: [PATCH 1/3] Fix Endpoint/EndpointSlice pod change detection The endpoint controllers responded to Pod changes by trying to figure out if the generated endpoint resource would change, rather than just checking if the Pod had changed, but since the set of Pod fields that need to be checked depend on the Service and Node as well, the code ended up only checking for a subset of the changes it should have. In particular, EndpointSliceController ended up only looking at IPv4 Pod IPs when processing Pod update events, so when a Pod went from having no IP to having only an IPv6 IP, EndpointSliceController would think it hadn't changed. --- pkg/controller/endpoint/BUILD | 1 - .../endpoint/endpoints_controller.go | 64 +++--- .../endpoint/endpoints_controller_test.go | 182 +----------------- .../endpointslice/endpointslice_controller.go | 2 +- pkg/controller/endpointslice/utils.go | 15 +- pkg/controller/endpointslice/utils_test.go | 63 ------ .../util/endpoint/controller_utils.go | 35 ++-- .../util/endpoint/controller_utils_test.go | 165 ++++++++++++++++ 8 files changed, 217 insertions(+), 310 deletions(-) diff --git a/pkg/controller/endpoint/BUILD b/pkg/controller/endpoint/BUILD index 159f84b1cd6..8767dc22ccd 100644 --- a/pkg/controller/endpoint/BUILD +++ b/pkg/controller/endpoint/BUILD @@ -48,7 +48,6 @@ go_test( "//pkg/api/v1/endpoints:go_default_library", "//pkg/apis/core:go_default_library", "//pkg/controller:go_default_library", - "//pkg/controller/util/endpoint:go_default_library", "//pkg/features:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 769fc773a9d..d68b01fce2b 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -19,7 +19,6 @@ package endpoint import ( "context" "fmt" - "reflect" "strconv" "time" @@ -213,39 +212,33 @@ func (e *EndpointController) addPod(obj interface{}) { } func podToEndpointAddressForService(svc *v1.Service, pod *v1.Pod) (*v1.EndpointAddress, error) { + var endpointIP string + if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) { - return podToEndpointAddress(pod), nil - } + endpointIP = pod.Status.PodIP + } else { + // api-server service controller ensured that the service got the correct IP Family + // according to user setup, here we only need to match EndPoint IPs' family to service + // actual IP family. as in, we don't need to check service.IPFamily - // api-server service controller ensured that the service got the correct IP Family - // according to user setup, here we only need to match EndPoint IPs' family to service - // actual IP family. as in, we don't need to check service.IPFamily - - ipv6ClusterIP := utilnet.IsIPv6String(svc.Spec.ClusterIP) - for _, podIP := range pod.Status.PodIPs { - ipv6PodIP := utilnet.IsIPv6String(podIP.IP) - // same family? - // TODO (khenidak) when we remove the max of 2 PodIP limit from pods - // we will have to return multiple endpoint addresses - if ipv6ClusterIP == ipv6PodIP { - return &v1.EndpointAddress{ - IP: podIP.IP, - NodeName: &pod.Spec.NodeName, - TargetRef: &v1.ObjectReference{ - Kind: "Pod", - Namespace: pod.ObjectMeta.Namespace, - Name: pod.ObjectMeta.Name, - UID: pod.ObjectMeta.UID, - ResourceVersion: pod.ObjectMeta.ResourceVersion, - }}, nil + ipv6ClusterIP := utilnet.IsIPv6String(svc.Spec.ClusterIP) + for _, podIP := range pod.Status.PodIPs { + ipv6PodIP := utilnet.IsIPv6String(podIP.IP) + // same family? + // TODO (khenidak) when we remove the max of 2 PodIP limit from pods + // we will have to return multiple endpoint addresses + if ipv6ClusterIP == ipv6PodIP { + endpointIP = podIP.IP + break + } + } + if endpointIP == "" { + return nil, fmt.Errorf("failed to find a matching endpoint for service %v", svc.Name) } } - return nil, fmt.Errorf("failed to find a matching endpoint for service %v", svc.Name) -} -func podToEndpointAddress(pod *v1.Pod) *v1.EndpointAddress { return &v1.EndpointAddress{ - IP: pod.Status.PodIP, + IP: endpointIP, NodeName: &pod.Spec.NodeName, TargetRef: &v1.ObjectReference{ Kind: "Pod", @@ -253,24 +246,15 @@ func podToEndpointAddress(pod *v1.Pod) *v1.EndpointAddress { Name: pod.ObjectMeta.Name, UID: pod.ObjectMeta.UID, ResourceVersion: pod.ObjectMeta.ResourceVersion, - }} -} - -func endpointChanged(pod1, pod2 *v1.Pod) bool { - endpointAddress1 := podToEndpointAddress(pod1) - endpointAddress2 := podToEndpointAddress(pod2) - - endpointAddress1.TargetRef.ResourceVersion = "" - endpointAddress2.TargetRef.ResourceVersion = "" - - return !reflect.DeepEqual(endpointAddress1, endpointAddress2) + }, + }, nil } // When a pod is updated, figure out what services it used to be a member of // and what services it will be a member of, and enqueue the union of these. // old and cur must be *v1.Pod types. func (e *EndpointController) updatePod(old, cur interface{}) { - services := endpointutil.GetServicesToUpdateOnPodChange(e.serviceLister, e.serviceSelectorCache, old, cur, endpointChanged) + services := endpointutil.GetServicesToUpdateOnPodChange(e.serviceLister, e.serviceSelectorCache, old, cur) for key := range services { e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod) } diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index ce8ea794c44..3db9982cdde 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -43,7 +43,6 @@ import ( endptspkg "k8s.io/kubernetes/pkg/api/v1/endpoints" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/controller" - endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" "k8s.io/kubernetes/pkg/features" utilpointer "k8s.io/utils/pointer" ) @@ -67,7 +66,6 @@ func testPod(namespace string, id int, nPorts int, isReady bool, makeDualstack b Containers: []v1.Container{{Ports: []v1.ContainerPort{}}}, }, Status: v1.PodStatus{ - PodIP: fmt.Sprintf("1.2.3.%d", 4+id), Conditions: []v1.PodCondition{ { Type: v1.PodReady, @@ -83,16 +81,11 @@ func testPod(namespace string, id int, nPorts int, isReady bool, makeDualstack b p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports, v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)}) } + p.Status.PodIPs = append(p.Status.PodIPs, v1.PodIP{IP: fmt.Sprintf("1.2.3.%d", 4+id)}) if makeDualstack { - p.Status.PodIPs = []v1.PodIP{ - { - IP: p.Status.PodIP, - }, - { - IP: fmt.Sprintf("2000::%d", id), - }, - } + p.Status.PodIPs = append(p.Status.PodIPs, v1.PodIP{IP: fmt.Sprintf("2000::%d", id)}) } + p.Status.PodIP = p.Status.PodIPs[0].IP return p } @@ -1238,169 +1231,6 @@ func TestPodToEndpointAddressForService(t *testing.T) { } -func TestPodToEndpointAddress(t *testing.T) { - podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) - ns := "test" - addPods(podStore, ns, 1, 1, 0, false) - pods := podStore.List() - if len(pods) != 1 { - t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods)) - return - } - pod := pods[0].(*v1.Pod) - epa := podToEndpointAddress(pod) - if epa.IP != pod.Status.PodIP { - t.Errorf("IP: expected: %s, got: %s", pod.Status.PodIP, epa.IP) - } - if *(epa.NodeName) != pod.Spec.NodeName { - t.Errorf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName)) - } - if epa.TargetRef.Kind != "Pod" { - t.Errorf("TargetRef.Kind: expected: %s, got: %s", "Pod", epa.TargetRef.Kind) - } - if epa.TargetRef.Namespace != pod.ObjectMeta.Namespace { - t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Namespace, epa.TargetRef.Namespace) - } - if epa.TargetRef.Name != pod.ObjectMeta.Name { - t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Name, epa.TargetRef.Name) - } - if epa.TargetRef.UID != pod.ObjectMeta.UID { - t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.UID, epa.TargetRef.UID) - } - if epa.TargetRef.ResourceVersion != pod.ObjectMeta.ResourceVersion { - t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.ResourceVersion, epa.TargetRef.ResourceVersion) - } -} - -func TestPodChanged(t *testing.T) { - podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) - ns := "test" - addPods(podStore, ns, 1, 1, 0, false) - pods := podStore.List() - if len(pods) != 1 { - t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods)) - return - } - oldPod := pods[0].(*v1.Pod) - newPod := oldPod.DeepCopy() - - if podChangedHelper(oldPod, newPod, endpointChanged) { - t.Errorf("Expected pod to be unchanged for copied pod") - } - - newPod.Spec.NodeName = "changed" - if !podChangedHelper(oldPod, newPod, endpointChanged) { - t.Errorf("Expected pod to be changed for pod with NodeName changed") - } - newPod.Spec.NodeName = oldPod.Spec.NodeName - - newPod.ObjectMeta.ResourceVersion = "changed" - if podChangedHelper(oldPod, newPod, endpointChanged) { - t.Errorf("Expected pod to be unchanged for pod with only ResourceVersion changed") - } - newPod.ObjectMeta.ResourceVersion = oldPod.ObjectMeta.ResourceVersion - - newPod.Status.PodIP = "1.2.3.1" - if !podChangedHelper(oldPod, newPod, endpointChanged) { - t.Errorf("Expected pod to be changed with pod IP address change") - } - newPod.Status.PodIP = oldPod.Status.PodIP - - /* dual stack tests */ - // primary changes, because changing IPs is done by changing sandbox - // case 1: add new secondary IP - newPod.Status.PodIP = "1.1.3.1" - newPod.Status.PodIPs = []v1.PodIP{ - { - IP: "1.1.3.1", - }, - { - IP: "2000::1", - }, - } - if !podChangedHelper(oldPod, newPod, endpointChanged) { - t.Errorf("Expected pod to be changed with adding secondary IP") - } - // reset - newPod.Status.PodIPs = nil - newPod.Status.PodIP = oldPod.Status.PodIP - - // case 2: removing a secondary IP - saved := oldPod.Status.PodIP - oldPod.Status.PodIP = "1.1.3.1" - oldPod.Status.PodIPs = []v1.PodIP{ - { - IP: "1.1.3.1", - }, - { - IP: "2000::1", - }, - } - - newPod.Status.PodIP = "1.2.3.4" - newPod.Status.PodIPs = []v1.PodIP{ - { - IP: "1.2.3.4", - }, - } - - // reset - oldPod.Status.PodIPs = nil - newPod.Status.PodIPs = nil - oldPod.Status.PodIP = saved - newPod.Status.PodIP = saved - // case 3: change secondary - // case 2: removing a secondary IP - saved = oldPod.Status.PodIP - oldPod.Status.PodIP = "1.1.3.1" - oldPod.Status.PodIPs = []v1.PodIP{ - { - IP: "1.1.3.1", - }, - { - IP: "2000::1", - }, - } - - newPod.Status.PodIP = "1.2.3.4" - newPod.Status.PodIPs = []v1.PodIP{ - { - IP: "1.2.3.4", - }, - { - IP: "2000::2", - }, - } - - // reset - oldPod.Status.PodIPs = nil - newPod.Status.PodIPs = nil - oldPod.Status.PodIP = saved - newPod.Status.PodIP = saved - - /* end dual stack testing */ - - newPod.ObjectMeta.Name = "wrong-name" - if !podChangedHelper(oldPod, newPod, endpointChanged) { - t.Errorf("Expected pod to be changed with pod name change") - } - newPod.ObjectMeta.Name = oldPod.ObjectMeta.Name - - saveConditions := oldPod.Status.Conditions - oldPod.Status.Conditions = nil - if !podChangedHelper(oldPod, newPod, endpointChanged) { - t.Errorf("Expected pod to be changed with pod readiness change") - } - oldPod.Status.Conditions = saveConditions - - now := metav1.NewTime(time.Now().UTC()) - newPod.ObjectMeta.DeletionTimestamp = &now - if !podChangedHelper(oldPod, newPod, endpointChanged) { - t.Errorf("Expected pod to be changed with DeletionTimestamp change") - } - newPod.ObjectMeta.DeletionTimestamp = oldPod.ObjectMeta.DeletionTimestamp.DeepCopy() -} - func TestLastTriggerChangeTimeAnnotation(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns) @@ -1680,6 +1510,7 @@ func TestPodUpdatesBatching(t *testing.T) { oldPod := old.(*v1.Pod) newPod := oldPod.DeepCopy() newPod.Status.PodIP = update.podIP + newPod.Status.PodIPs[0].IP = update.podIP newPod.ResourceVersion = strconv.Itoa(resourceVersion) resourceVersion++ @@ -1996,8 +1827,3 @@ func stringVal(str *string) string { } return *str } - -func podChangedHelper(oldPod, newPod *v1.Pod, endpointChanged endpointutil.EndpointsMatch) bool { - podChanged, _ := endpointutil.PodChanged(oldPod, newPod, endpointChanged) - return podChanged -} diff --git a/pkg/controller/endpointslice/endpointslice_controller.go b/pkg/controller/endpointslice/endpointslice_controller.go index 63098565a49..e4b120a706a 100644 --- a/pkg/controller/endpointslice/endpointslice_controller.go +++ b/pkg/controller/endpointslice/endpointslice_controller.go @@ -457,7 +457,7 @@ func (c *Controller) addPod(obj interface{}) { } func (c *Controller) updatePod(old, cur interface{}) { - services := endpointutil.GetServicesToUpdateOnPodChange(c.serviceLister, c.serviceSelectorCache, old, cur, podEndpointChanged) + services := endpointutil.GetServicesToUpdateOnPodChange(c.serviceLister, c.serviceSelectorCache, old, cur) for key := range services { c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod) } diff --git a/pkg/controller/endpointslice/utils.go b/pkg/controller/endpointslice/utils.go index 6d1a1e570ed..a3589e53907 100644 --- a/pkg/controller/endpointslice/utils.go +++ b/pkg/controller/endpointslice/utils.go @@ -18,7 +18,6 @@ package endpointslice import ( "fmt" - "reflect" "time" corev1 "k8s.io/api/core/v1" @@ -36,19 +35,7 @@ import ( utilnet "k8s.io/utils/net" ) -// podEndpointChanged returns true if the results of podToEndpoint are different -// for the pods passed to this function. -func podEndpointChanged(pod1, pod2 *corev1.Pod) bool { - endpoint1 := podToEndpoint(pod1, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}}) - endpoint2 := podToEndpoint(pod2, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}}) - - endpoint1.TargetRef.ResourceVersion = "" - endpoint2.TargetRef.ResourceVersion = "" - - return !reflect.DeepEqual(endpoint1, endpoint2) -} - -// podToEndpoint returns an Endpoint object generated from a Pod and Node. +// podToEndpoint returns an Endpoint object generated from pod, node, and service. func podToEndpoint(pod *corev1.Pod, node *corev1.Node, service *corev1.Service) discovery.Endpoint { // Build out topology information. This is currently limited to hostname, // zone, and region, but this will be expanded in the future. diff --git a/pkg/controller/endpointslice/utils_test.go b/pkg/controller/endpointslice/utils_test.go index f485ff02e20..d65c4d78097 100644 --- a/pkg/controller/endpointslice/utils_test.go +++ b/pkg/controller/endpointslice/utils_test.go @@ -20,7 +20,6 @@ import ( "fmt" "reflect" "testing" - "time" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" @@ -32,8 +31,6 @@ import ( "k8s.io/apimachinery/pkg/util/rand" "k8s.io/client-go/kubernetes/fake" k8stesting "k8s.io/client-go/testing" - "k8s.io/client-go/tools/cache" - endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" utilpointer "k8s.io/utils/pointer" ) @@ -255,61 +252,6 @@ func TestPodToEndpoint(t *testing.T) { } } -func TestPodChangedWithPodEndpointChanged(t *testing.T) { - podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) - ns := "test" - podStore.Add(newPod(1, ns, true, 1)) - pods := podStore.List() - if len(pods) != 1 { - t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods)) - return - } - oldPod := pods[0].(*v1.Pod) - newPod := oldPod.DeepCopy() - - if podChangedHelper(oldPod, newPod, podEndpointChanged) { - t.Errorf("Expected pod to be unchanged for copied pod") - } - - newPod.Spec.NodeName = "changed" - if !podChangedHelper(oldPod, newPod, podEndpointChanged) { - t.Errorf("Expected pod to be changed for pod with NodeName changed") - } - newPod.Spec.NodeName = oldPod.Spec.NodeName - - newPod.ObjectMeta.ResourceVersion = "changed" - if podChangedHelper(oldPod, newPod, podEndpointChanged) { - t.Errorf("Expected pod to be unchanged for pod with only ResourceVersion changed") - } - newPod.ObjectMeta.ResourceVersion = oldPod.ObjectMeta.ResourceVersion - - newPod.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.1"}} - if !podChangedHelper(oldPod, newPod, podEndpointChanged) { - t.Errorf("Expected pod to be changed with pod IP address change") - } - newPod.Status.PodIPs = oldPod.Status.PodIPs - - newPod.ObjectMeta.Name = "wrong-name" - if !podChangedHelper(oldPod, newPod, podEndpointChanged) { - t.Errorf("Expected pod to be changed with pod name change") - } - newPod.ObjectMeta.Name = oldPod.ObjectMeta.Name - - saveConditions := oldPod.Status.Conditions - oldPod.Status.Conditions = nil - if !podChangedHelper(oldPod, newPod, podEndpointChanged) { - t.Errorf("Expected pod to be changed with pod readiness change") - } - oldPod.Status.Conditions = saveConditions - - now := metav1.NewTime(time.Now().UTC()) - newPod.ObjectMeta.DeletionTimestamp = &now - if !podChangedHelper(oldPod, newPod, podEndpointChanged) { - t.Errorf("Expected pod to be changed with DeletionTimestamp change") - } - newPod.ObjectMeta.DeletionTimestamp = oldPod.ObjectMeta.DeletionTimestamp.DeepCopy() -} - func TestServiceControllerKey(t *testing.T) { testCases := map[string]struct { endpointSlice *discovery.EndpointSlice @@ -545,8 +487,3 @@ func newEmptyEndpointSlice(n int, namespace string, endpointMeta endpointMeta, s Endpoints: []discovery.Endpoint{}, } } - -func podChangedHelper(oldPod, newPod *v1.Pod, endpointChanged endpointutil.EndpointsMatch) bool { - podChanged, _ := endpointutil.PodChanged(oldPod, newPod, podEndpointChanged) - return podChanged -} diff --git a/pkg/controller/util/endpoint/controller_utils.go b/pkg/controller/util/endpoint/controller_utils.go index e38e65668d7..d1744441bec 100644 --- a/pkg/controller/util/endpoint/controller_utils.go +++ b/pkg/controller/util/endpoint/controller_utils.go @@ -106,9 +106,6 @@ func (sc *ServiceSelectorCache) GetPodServiceMemberships(serviceLister v1listers return set, nil } -// EndpointsMatch is a type of function that returns true if pod endpoints match. -type EndpointsMatch func(*v1.Pod, *v1.Pod) bool - // PortMapKey is used to uniquely identify groups of endpoint ports. type PortMapKey string @@ -153,9 +150,10 @@ func ShouldSetHostname(pod *v1.Pod, svc *v1.Service) bool { return len(pod.Spec.Hostname) > 0 && pod.Spec.Subdomain == svc.Name && svc.Namespace == pod.Namespace } -// PodChanged returns two boolean values, the first returns true if the pod. -// has changed, the second value returns true if the pod labels have changed. -func PodChanged(oldPod, newPod *v1.Pod, endpointChanged EndpointsMatch) (bool, bool) { +// podEndpointsChanged returns two boolean values. The first is true if the pod has +// changed in a way that may change existing endpoints. The second value is true if the +// pod has changed in a way that may affect which Services it matches. +func podEndpointsChanged(oldPod, newPod *v1.Pod) (bool, bool) { // Check if the pod labels have changed, indicating a possible // change in the service membership labelsChanged := false @@ -175,16 +173,27 @@ func PodChanged(oldPod, newPod *v1.Pod, endpointChanged EndpointsMatch) (bool, b if podutil.IsPodReady(oldPod) != podutil.IsPodReady(newPod) { return true, labelsChanged } - // Convert the pod to an Endpoint, clear inert fields, - // and see if they are the same. - // TODO: Add a watcher for node changes separate from this - // We don't want to trigger multiple syncs at a pod level when a node changes - return endpointChanged(newPod, oldPod), labelsChanged + + // Check if the pod IPs have changed + if len(oldPod.Status.PodIPs) != len(newPod.Status.PodIPs) { + return true, labelsChanged + } + for i := range oldPod.Status.PodIPs { + if oldPod.Status.PodIPs[i].IP != newPod.Status.PodIPs[i].IP { + return true, labelsChanged + } + } + + // Endpoints may also reference a pod's Name, Namespace, UID, and NodeName, but + // the first three are immutable, and NodeName is immutable once initially set, + // which happens before the pod gets an IP. + + return false, labelsChanged } // GetServicesToUpdateOnPodChange returns a set of Service keys for Services // that have potentially been affected by a change to this pod. -func GetServicesToUpdateOnPodChange(serviceLister v1listers.ServiceLister, selectorCache *ServiceSelectorCache, old, cur interface{}, endpointChanged EndpointsMatch) sets.String { +func GetServicesToUpdateOnPodChange(serviceLister v1listers.ServiceLister, selectorCache *ServiceSelectorCache, old, cur interface{}) sets.String { newPod := cur.(*v1.Pod) oldPod := old.(*v1.Pod) if newPod.ResourceVersion == oldPod.ResourceVersion { @@ -193,7 +202,7 @@ func GetServicesToUpdateOnPodChange(serviceLister v1listers.ServiceLister, selec return sets.String{} } - podChanged, labelsChanged := PodChanged(oldPod, newPod, endpointChanged) + podChanged, labelsChanged := podEndpointsChanged(oldPod, newPod) // If both the pod and labels are unchanged, no update is needed if !podChanged && !labelsChanged { diff --git a/pkg/controller/util/endpoint/controller_utils_test.go b/pkg/controller/util/endpoint/controller_utils_test.go index 474842b43da..df118e0a7af 100644 --- a/pkg/controller/util/endpoint/controller_utils_test.go +++ b/pkg/controller/util/endpoint/controller_utils_test.go @@ -499,3 +499,168 @@ func BenchmarkGetPodServiceMemberships(b *testing.B) { } } } + +func Test_podChanged(t *testing.T) { + testCases := []struct { + testName string + modifier func(*v1.Pod, *v1.Pod) + podChanged bool + labelsChanged bool + }{ + { + testName: "no changes", + modifier: func(old, new *v1.Pod) {}, + podChanged: false, + labelsChanged: false, + }, { + testName: "change NodeName", + modifier: func(old, new *v1.Pod) { + new.Spec.NodeName = "changed" + }, + // NodeName can only change before the pod has an IP, and we don't care about the + // pod yet at that point so we ignore this change + podChanged: false, + labelsChanged: false, + }, { + testName: "change ResourceVersion", + modifier: func(old, new *v1.Pod) { + new.ObjectMeta.ResourceVersion = "changed" + }, + // ResourceVersion is intentionally ignored if nothing else changed + podChanged: false, + labelsChanged: false, + }, { + testName: "add primary IPv4", + modifier: func(old, new *v1.Pod) { + new.Status.PodIP = "1.2.3.4" + new.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.4"}} + }, + podChanged: true, + labelsChanged: false, + }, { + testName: "modify primary IPv4", + modifier: func(old, new *v1.Pod) { + old.Status.PodIP = "1.2.3.4" + old.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.4"}} + new.Status.PodIP = "2.3.4.5" + new.Status.PodIPs = []v1.PodIP{{IP: "2.3.4.5"}} + }, + podChanged: true, + labelsChanged: false, + }, { + testName: "add primary IPv6", + modifier: func(old, new *v1.Pod) { + new.Status.PodIP = "fd00:10:96::1" + new.Status.PodIPs = []v1.PodIP{{IP: "fd00:10:96::1"}} + }, + podChanged: true, + labelsChanged: false, + }, { + testName: "modify primary IPv6", + modifier: func(old, new *v1.Pod) { + old.Status.PodIP = "fd00:10:96::1" + old.Status.PodIPs = []v1.PodIP{{IP: "fd00:10:96::1"}} + new.Status.PodIP = "fd00:10:96::2" + new.Status.PodIPs = []v1.PodIP{{IP: "fd00:10:96::2"}} + }, + podChanged: true, + labelsChanged: false, + }, { + testName: "add secondary IP", + modifier: func(old, new *v1.Pod) { + old.Status.PodIP = "1.2.3.4" + old.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.4"}} + new.Status.PodIP = "1.2.3.4" + new.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.4"}, {IP: "fd00:10:96::1"}} + }, + podChanged: true, + labelsChanged: false, + }, { + testName: "modify secondary IP", + modifier: func(old, new *v1.Pod) { + old.Status.PodIP = "1.2.3.4" + old.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.4"}, {IP: "fd00:10:96::1"}} + new.Status.PodIP = "1.2.3.4" + new.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.4"}, {IP: "fd00:10:96::2"}} + }, + podChanged: true, + labelsChanged: false, + }, { + testName: "remove secondary IP", + modifier: func(old, new *v1.Pod) { + old.Status.PodIP = "1.2.3.4" + old.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.4"}, {IP: "fd00:10:96::1"}} + new.Status.PodIP = "1.2.3.4" + new.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.4"}} + }, + podChanged: true, + labelsChanged: false, + }, { + testName: "change readiness", + modifier: func(old, new *v1.Pod) { + new.Status.Conditions[0].Status = v1.ConditionTrue + }, + podChanged: true, + labelsChanged: false, + }, { + testName: "mark for deletion", + modifier: func(old, new *v1.Pod) { + now := metav1.NewTime(time.Now().UTC()) + new.ObjectMeta.DeletionTimestamp = &now + }, + podChanged: true, + labelsChanged: false, + }, { + testName: "add label", + modifier: func(old, new *v1.Pod) { + new.Labels["label"] = "new" + }, + podChanged: false, + labelsChanged: true, + }, { + testName: "modify label", + modifier: func(old, new *v1.Pod) { + old.Labels["label"] = "old" + new.Labels["label"] = "new" + }, + podChanged: false, + labelsChanged: true, + }, { + testName: "remove label", + modifier: func(old, new *v1.Pod) { + old.Labels["label"] = "old" + }, + podChanged: false, + labelsChanged: true, + }, + } + + orig := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test", + Name: "pod", + Labels: map[string]string{"foo": "bar"}, + }, + Status: v1.PodStatus{ + Conditions: []v1.PodCondition{ + {Type: v1.PodReady, Status: v1.ConditionFalse}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.testName, func(t *testing.T) { + old := orig.DeepCopy() + new := old.DeepCopy() + tc.modifier(old, new) + + podChanged, labelsChanged := podEndpointsChanged(old, new) + if podChanged != tc.podChanged { + t.Errorf("Expected podChanged to be %t, got %t", tc.podChanged, podChanged) + } + if labelsChanged != tc.labelsChanged { + t.Errorf("Expected labelsChanged to be %t, got %t", tc.labelsChanged, labelsChanged) + } + }) + } +} From 9023d19c5748c4e2f1a3911fe59eecf24274c8d3 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Sun, 24 May 2020 18:27:20 -0400 Subject: [PATCH 2/3] Improve EndpointController dual-stack testing Rewrite some of the test helpers to better support single-stack IPv4 vs single-stack IPv6 vs dual-stack IPv4 primary vs dual-stack IPv6 primary, and update TestPodToEndpointAddressForService to test some more cases. --- pkg/controller/endpoint/BUILD | 1 + .../endpoint/endpoints_controller_test.go | 251 ++++++++++++++---- 2 files changed, 193 insertions(+), 59 deletions(-) diff --git a/pkg/controller/endpoint/BUILD b/pkg/controller/endpoint/BUILD index 8767dc22ccd..da35826b242 100644 --- a/pkg/controller/endpoint/BUILD +++ b/pkg/controller/endpoint/BUILD @@ -64,6 +64,7 @@ go_test( "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/util/testing:go_default_library", "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", + "//vendor/k8s.io/utils/net:go_default_library", "//vendor/k8s.io/utils/pointer:go_default_library", ], ) diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 3db9982cdde..c82a1d0ec0b 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -44,6 +44,7 @@ import ( api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/features" + utilnet "k8s.io/utils/net" utilpointer "k8s.io/utils/pointer" ) @@ -54,7 +55,12 @@ var triggerTime = time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC) var triggerTimeString = triggerTime.Format(time.RFC3339Nano) var oldTriggerTimeString = triggerTime.Add(-time.Hour).Format(time.RFC3339Nano) -func testPod(namespace string, id int, nPorts int, isReady bool, makeDualstack bool) *v1.Pod { +var ipv4only = []v1.IPFamily{v1.IPv4Protocol} +var ipv6only = []v1.IPFamily{v1.IPv6Protocol} +var ipv4ipv6 = []v1.IPFamily{v1.IPv4Protocol, v1.IPv6Protocol} +var ipv6ipv4 = []v1.IPFamily{v1.IPv6Protocol, v1.IPv4Protocol} + +func testPod(namespace string, id int, nPorts int, isReady bool, ipFamilies []v1.IPFamily) *v1.Pod { p := &v1.Pod{ TypeMeta: metav1.TypeMeta{APIVersion: "v1"}, ObjectMeta: metav1.ObjectMeta{ @@ -81,19 +87,24 @@ func testPod(namespace string, id int, nPorts int, isReady bool, makeDualstack b p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports, v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)}) } - p.Status.PodIPs = append(p.Status.PodIPs, v1.PodIP{IP: fmt.Sprintf("1.2.3.%d", 4+id)}) - if makeDualstack { - p.Status.PodIPs = append(p.Status.PodIPs, v1.PodIP{IP: fmt.Sprintf("2000::%d", id)}) + for _, family := range ipFamilies { + var ip string + if family == v1.IPv4Protocol { + ip = fmt.Sprintf("1.2.3.%d", 4+id) + } else { + ip = fmt.Sprintf("2000::%d", 4+id) + } + p.Status.PodIPs = append(p.Status.PodIPs, v1.PodIP{IP: ip}) } p.Status.PodIP = p.Status.PodIPs[0].IP return p } -func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int, makeDualstack bool) { +func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int, ipFamilies []v1.IPFamily) { for i := 0; i < nPods+nNotReady; i++ { isReady := i < nPods - pod := testPod(namespace, i, nPorts, isReady, makeDualstack) + pod := testPod(namespace, i, nPorts, isReady, ipFamilies) store.Add(pod) } } @@ -301,7 +312,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}}, }}, }) - addPods(endpoints.podStore, ns, 1, 1, 0, false) + addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ @@ -345,7 +356,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) { Ports: []v1.EndpointPort{{Port: 1000, Protocol: "UDP"}}, }}, }) - addPods(endpoints.podStore, ns, 1, 1, 0, false) + addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ @@ -389,7 +400,7 @@ func TestSyncEndpointsProtocolSCTP(t *testing.T) { Ports: []v1.EndpointPort{{Port: 1000, Protocol: "SCTP"}}, }}, }) - addPods(endpoints.podStore, ns, 1, 1, 0, false) + addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ @@ -430,7 +441,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { }, Subsets: []v1.EndpointSubset{}, }) - addPods(endpoints.podStore, ns, 1, 1, 0, false) + addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ @@ -470,7 +481,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) { }, Subsets: []v1.EndpointSubset{}, }) - addPods(endpoints.podStore, ns, 0, 1, 1, false) + addPods(endpoints.podStore, ns, 0, 1, 1, ipv4only) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ @@ -510,7 +521,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) { }, Subsets: []v1.EndpointSubset{}, }) - addPods(endpoints.podStore, ns, 1, 1, 1, false) + addPods(endpoints.podStore, ns, 1, 1, 1, ipv4only) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ @@ -554,7 +565,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { Ports: []v1.EndpointPort{{Port: 1000}}, }}, }) - addPods(endpoints.podStore, ns, 1, 1, 0, false) + addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ @@ -597,7 +608,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, }}, }) - addPods(endpoints.podStore, metav1.NamespaceDefault, 1, 1, 0, false) + addPods(endpoints.podStore, metav1.NamespaceDefault, 1, 1, 0, ipv4only) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: metav1.NamespaceDefault}, Spec: v1.ServiceSpec{ @@ -614,8 +625,8 @@ func TestSyncEndpointsItems(t *testing.T) { testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL, 0*time.Second) - addPods(endpoints.podStore, ns, 3, 2, 0, false) - addPods(endpoints.podStore, "blah", 5, 2, 0, false) // make sure these aren't found! + addPods(endpoints.podStore, ns, 3, 2, 0, ipv4only) + addPods(endpoints.podStore, "blah", 5, 2, 0, ipv4only) // make sure these aren't found! endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, @@ -659,7 +670,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) { testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL, 0*time.Second) - addPods(endpoints.podStore, ns, 3, 2, 0, false) + addPods(endpoints.podStore, ns, 3, 2, 0, ipv4only) serviceLabels := map[string]string{"foo": "bar"} endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -721,7 +732,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { Ports: []v1.EndpointPort{{Port: 1000}}, }}, }) - addPods(endpoints.podStore, ns, 1, 1, 0, false) + addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only) serviceLabels := map[string]string{"baz": "blah"} endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -771,7 +782,7 @@ func TestWaitsForAllInformersToBeSynced2(t *testing.T) { testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL, 0*time.Second) - addPods(endpoints.podStore, ns, 1, 1, 0, false) + addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only) service := &v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, @@ -823,7 +834,7 @@ func TestSyncEndpointsHeadlessService(t *testing.T) { Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}}, }}, }) - addPods(endpoints.podStore, ns, 1, 1, 0, false) + addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only) service := &v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, Labels: map[string]string{"a": "b"}}, Spec: v1.ServiceSpec{ @@ -987,7 +998,7 @@ func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) { Ports: nil, }, }) - addPods(endpoints.podStore, ns, 1, 1, 0, false) + addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only) endpoints.syncService(ns + "/foo") endpointsHandler.ValidateRequestCount(t, 1) data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{ @@ -1095,89 +1106,211 @@ func TestShouldPodBeInEndpoints(t *testing.T) { } } } + func TestPodToEndpointAddressForService(t *testing.T) { + ipv4 := v1.IPv4Protocol + ipv6 := v1.IPv6Protocol + testCases := []struct { - name string - expectedEndPointIP string - enableDualStack bool - expectError bool - enableDualStackPod bool + name string + + enableDualStack bool + ipFamilies []v1.IPFamily service v1.Service + + expectedEndpointFamily v1.IPFamily + expectError bool }{ { - name: "v4 service, in a single stack cluster", - expectedEndPointIP: "1.2.3.4", + name: "v4 service, in a single stack cluster", - enableDualStack: false, - expectError: false, - enableDualStackPod: false, + enableDualStack: false, + ipFamilies: ipv4only, service: v1.Service{ Spec: v1.ServiceSpec{ ClusterIP: "10.0.0.1", }, }, + + expectedEndpointFamily: ipv4, }, { name: "v4 service, in a dual stack cluster", - expectedEndPointIP: "1.2.3.4", - enableDualStack: true, - expectError: false, - enableDualStackPod: true, + enableDualStack: true, + ipFamilies: ipv4ipv6, service: v1.Service{ Spec: v1.ServiceSpec{ ClusterIP: "10.0.0.1", }, }, + + expectedEndpointFamily: ipv4, }, { - name: "v6 service, in a dual stack cluster. dual stack enabled", - expectedEndPointIP: "2000::0", + name: "v4 service, in a dual stack ipv6-primary cluster", - enableDualStack: true, - expectError: false, - enableDualStackPod: true, + enableDualStack: true, + ipFamilies: ipv6ipv4, + + service: v1.Service{ + Spec: v1.ServiceSpec{ + ClusterIP: "10.0.0.1", + }, + }, + + expectedEndpointFamily: ipv4, + }, + { + name: "v4 headless service, in a single stack cluster", + + enableDualStack: false, + ipFamilies: ipv4only, + + service: v1.Service{ + Spec: v1.ServiceSpec{ + ClusterIP: v1.ClusterIPNone, + }, + }, + + expectedEndpointFamily: ipv4, + }, + { + name: "v4 headless service, in a dual stack cluster", + + enableDualStack: false, + ipFamilies: ipv4ipv6, + + service: v1.Service{ + Spec: v1.ServiceSpec{ + ClusterIP: v1.ClusterIPNone, + IPFamily: &ipv4, + }, + }, + + expectedEndpointFamily: ipv4, + }, + { + name: "v4 legacy headless service, in a dual stack cluster", + + enableDualStack: false, + ipFamilies: ipv4ipv6, + + service: v1.Service{ + Spec: v1.ServiceSpec{ + ClusterIP: v1.ClusterIPNone, + }, + }, + + expectedEndpointFamily: ipv4, + }, + { + name: "v4 legacy headless service, in a dual stack ipv6-primary cluster", + + enableDualStack: true, + ipFamilies: ipv6ipv4, + + service: v1.Service{ + Spec: v1.ServiceSpec{ + ClusterIP: v1.ClusterIPNone, + }, + }, + + expectedEndpointFamily: ipv4, + }, + { + name: "v6 service, in a dual stack cluster", + + enableDualStack: true, + ipFamilies: ipv4ipv6, service: v1.Service{ Spec: v1.ServiceSpec{ ClusterIP: "3000::1", }, }, + + expectedEndpointFamily: ipv6, + }, + { + name: "v6 headless service, in a single stack cluster", + + enableDualStack: false, + ipFamilies: ipv6only, + + service: v1.Service{ + Spec: v1.ServiceSpec{ + ClusterIP: v1.ClusterIPNone, + }, + }, + + expectedEndpointFamily: ipv6, + }, + // { + // name: "v6 headless service, in a dual stack cluster", + // + // enableDualStack: true, + // ipFamilies: ipv4ipv6, + // + // service: v1.Service{ + // Spec: v1.ServiceSpec{ + // ClusterIP: v1.ClusterIPNone, + // IPFamily: &ipv6, + // }, + // }, + // + // expectedEndpointFamily: ipv6, + // }, + { + name: "v6 legacy headless service, in a dual stack cluster", + + enableDualStack: false, + ipFamilies: ipv4ipv6, + + service: v1.Service{ + Spec: v1.ServiceSpec{ + ClusterIP: v1.ClusterIPNone, + }, + }, + + // This is not the behavior we *want*, but it's the behavior we currently expect. + expectedEndpointFamily: ipv4, }, // in reality this is a misconfigured cluster // i.e user is not using dual stack and have PodIP == v4 and ServiceIP==v6 // we are testing that we will keep producing the expected behavior { - name: "v6 service, in a v4 only cluster. dual stack disabled", - expectedEndPointIP: "1.2.3.4", + name: "v6 service, in a v4 only cluster. dual stack disabled", - enableDualStack: false, - expectError: false, - enableDualStackPod: false, + enableDualStack: false, + ipFamilies: ipv4only, service: v1.Service{ Spec: v1.ServiceSpec{ ClusterIP: "3000::1", }, }, + + expectedEndpointFamily: ipv4, }, + // but this will actually give an error { - name: "v6 service, in a v4 only cluster - dual stack enabled", - expectedEndPointIP: "1.2.3.4", + name: "v6 service, in a v4 only cluster - dual stack enabled", - enableDualStack: true, - expectError: true, - enableDualStackPod: false, + enableDualStack: true, + ipFamilies: ipv4only, service: v1.Service{ Spec: v1.ServiceSpec{ ClusterIP: "3000::1", }, }, + + expectError: true, }, } for _, tc := range testCases { @@ -1185,7 +1318,7 @@ func TestPodToEndpointAddressForService(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, tc.enableDualStack)() podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) ns := "test" - addPods(podStore, ns, 1, 1, 0, tc.enableDualStackPod) + addPods(podStore, ns, 1, 1, 0, tc.ipFamilies) pods := podStore.List() if len(pods) != 1 { t.Fatalf("podStore size: expected: %d, got: %d", 1, len(pods)) @@ -1205,8 +1338,8 @@ func TestPodToEndpointAddressForService(t *testing.T) { return } - if epa.IP != tc.expectedEndPointIP { - t.Fatalf("IP: expected: %s, got: %s", pod.Status.PodIP, epa.IP) + if utilnet.IsIPv6String(epa.IP) != (tc.expectedEndpointFamily == ipv6) { + t.Fatalf("IP: expected %s, got: %s", tc.expectedEndpointFamily, epa.IP) } if *(epa.NodeName) != pod.Spec.NodeName { t.Fatalf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName)) @@ -1247,7 +1380,7 @@ func TestLastTriggerChangeTimeAnnotation(t *testing.T) { Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}}, }}, }) - addPods(endpoints.podStore, ns, 1, 1, 0, false) + addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)}, Spec: v1.ServiceSpec{ @@ -1297,7 +1430,7 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) { Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}}, }}, }) - addPods(endpoints.podStore, ns, 1, 1, 0, false) + addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)}, Spec: v1.ServiceSpec{ @@ -1348,7 +1481,7 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationCleared(t *testing.T) { }}, }) // Neither pod nor service has trigger time, this should cause annotation to be cleared. - addPods(endpoints.podStore, ns, 1, 1, 0, false) + addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ @@ -1487,7 +1620,7 @@ func TestPodUpdatesBatching(t *testing.T) { go endpoints.Run(1, stopCh) - addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, false) + addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, ipv4only) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, @@ -1621,7 +1754,7 @@ func TestPodAddsBatching(t *testing.T) { for i, add := range tc.adds { time.Sleep(add.delay) - p := testPod(ns, i, 1, true, false) + p := testPod(ns, i, 1, true, ipv4only) endpoints.podStore.Add(p) endpoints.addPod(p) } @@ -1732,7 +1865,7 @@ func TestPodDeleteBatching(t *testing.T) { go endpoints.Run(1, stopCh) - addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, false) + addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, ipv4only) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, From e46572ef4b0f0a6b095c7dcdceb5bbca2ec0e9ff Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Sun, 24 May 2020 17:48:59 -0400 Subject: [PATCH 3/3] Improve EndpointController's handling of headless services under dual-stack EndpointController was accidentally requiring all headless services to be IPv4-only in clusters with IPv6DualStack enabled. This still leaves "legacy" (ie, IPFamily-less) headless services as always IPv4-only because the controller doesn't currently have easy access to the information that would allow it to fix that. (EndpointSliceController had the same problem already, and still does.) This can be fixed, if needed, by manually setting IPFamily, and the proposed API for 1.20 will handle this situation better. --- .../endpoint/endpoints_controller.go | 12 ++------ .../endpoint/endpoints_controller_test.go | 30 +++++++++---------- pkg/controller/endpointslice/reconciler.go | 2 +- pkg/controller/endpointslice/utils.go | 8 +---- pkg/controller/util/endpoint/BUILD | 2 ++ .../util/endpoint/controller_utils.go | 17 +++++++++++ 6 files changed, 39 insertions(+), 32 deletions(-) diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index d68b01fce2b..610ec5b34fa 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -215,19 +215,13 @@ func podToEndpointAddressForService(svc *v1.Service, pod *v1.Pod) (*v1.EndpointA var endpointIP string if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) { + // In a legacy cluster, the pod IP is guaranteed to be usable endpointIP = pod.Status.PodIP } else { - // api-server service controller ensured that the service got the correct IP Family - // according to user setup, here we only need to match EndPoint IPs' family to service - // actual IP family. as in, we don't need to check service.IPFamily - - ipv6ClusterIP := utilnet.IsIPv6String(svc.Spec.ClusterIP) + ipv6Service := endpointutil.IsIPv6Service(svc) for _, podIP := range pod.Status.PodIPs { ipv6PodIP := utilnet.IsIPv6String(podIP.IP) - // same family? - // TODO (khenidak) when we remove the max of 2 PodIP limit from pods - // we will have to return multiple endpoint addresses - if ipv6ClusterIP == ipv6PodIP { + if ipv6Service == ipv6PodIP { endpointIP = podIP.IP break } diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index c82a1d0ec0b..e5f6d9d2323 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -1249,21 +1249,21 @@ func TestPodToEndpointAddressForService(t *testing.T) { expectedEndpointFamily: ipv6, }, - // { - // name: "v6 headless service, in a dual stack cluster", - // - // enableDualStack: true, - // ipFamilies: ipv4ipv6, - // - // service: v1.Service{ - // Spec: v1.ServiceSpec{ - // ClusterIP: v1.ClusterIPNone, - // IPFamily: &ipv6, - // }, - // }, - // - // expectedEndpointFamily: ipv6, - // }, + { + name: "v6 headless service, in a dual stack cluster", + + enableDualStack: true, + ipFamilies: ipv4ipv6, + + service: v1.Service{ + Spec: v1.ServiceSpec{ + ClusterIP: v1.ClusterIPNone, + IPFamily: &ipv6, + }, + }, + + expectedEndpointFamily: ipv6, + }, { name: "v6 legacy headless service, in a dual stack cluster", diff --git a/pkg/controller/endpointslice/reconciler.go b/pkg/controller/endpointslice/reconciler.go index ef44809aea1..1f97e3294d1 100644 --- a/pkg/controller/endpointslice/reconciler.go +++ b/pkg/controller/endpointslice/reconciler.go @@ -59,7 +59,7 @@ type endpointMeta struct { func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time) error { addressType := discovery.AddressTypeIPv4 - if isIPv6Service(service) { + if endpointutil.IsIPv6Service(service) { addressType = discovery.AddressTypeIPv6 } diff --git a/pkg/controller/endpointslice/utils.go b/pkg/controller/endpointslice/utils.go index a3589e53907..316c26b856e 100644 --- a/pkg/controller/endpointslice/utils.go +++ b/pkg/controller/endpointslice/utils.go @@ -120,7 +120,7 @@ func getEndpointAddresses(podStatus corev1.PodStatus, service *corev1.Service) [ for _, podIP := range podStatus.PodIPs { isIPv6PodIP := utilnet.IsIPv6String(podIP.IP) - if isIPv6PodIP == isIPv6Service(service) { + if isIPv6PodIP == endpointutil.IsIPv6Service(service) { addresses = append(addresses, podIP.IP) } } @@ -128,12 +128,6 @@ func getEndpointAddresses(podStatus corev1.PodStatus, service *corev1.Service) [ return addresses } -// isIPv6Service returns true if the Service uses IPv6 addresses. -func isIPv6Service(service *corev1.Service) bool { - // IPFamily is not guaranteed to be set, even in an IPv6 only cluster. - return (service.Spec.IPFamily != nil && *service.Spec.IPFamily == corev1.IPv6Protocol) || utilnet.IsIPv6String(service.Spec.ClusterIP) -} - // endpointsEqualBeyondHash returns true if endpoints have equal attributes // but excludes equality checks that would have already been covered with // endpoint hashing (see hashEndpoint func for more info). diff --git a/pkg/controller/util/endpoint/BUILD b/pkg/controller/util/endpoint/BUILD index 3a17c9c71f9..81f4068279b 100644 --- a/pkg/controller/util/endpoint/BUILD +++ b/pkg/controller/util/endpoint/BUILD @@ -10,6 +10,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/api/v1/pod:go_default_library", + "//pkg/apis/core/v1/helper:go_default_library", "//pkg/controller:go_default_library", "//pkg/util/hash:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", @@ -19,6 +20,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", + "//vendor/k8s.io/utils/net:go_default_library", ], ) diff --git a/pkg/controller/util/endpoint/controller_utils.go b/pkg/controller/util/endpoint/controller_utils.go index d1744441bec..9c2ab1eedf9 100644 --- a/pkg/controller/util/endpoint/controller_utils.go +++ b/pkg/controller/util/endpoint/controller_utils.go @@ -32,8 +32,10 @@ import ( v1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" podutil "k8s.io/kubernetes/pkg/api/v1/pod" + "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/util/hash" + utilnet "k8s.io/utils/net" ) // ServiceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls to AsSelectorPreValidated (see #73527) @@ -275,3 +277,18 @@ func (sl portsInOrder) Less(i, j int) bool { h2 := DeepHashObjectToString(sl[j]) return h1 < h2 } + +// IsIPv6Service checks if svc should have IPv6 endpoints +func IsIPv6Service(svc *v1.Service) bool { + if helper.IsServiceIPSet(svc) { + return utilnet.IsIPv6String(svc.Spec.ClusterIP) + } else if svc.Spec.IPFamily != nil { + return *svc.Spec.IPFamily == v1.IPv6Protocol + } else { + // FIXME: for legacy headless Services with no IPFamily, the current + // thinking is that we should use the cluster default. Unfortunately + // the endpoint controller doesn't know the cluster default. For now, + // assume it's IPv4. + return false + } +}