Fix Endpoint/EndpointSlice pod change detection

The endpoint controllers responded to Pod changes by trying to figure
out if the generated endpoint resource would change, rather than just
checking if the Pod had changed, but since the set of Pod fields that
need to be checked depend on the Service and Node as well, the code
ended up only checking for a subset of the changes it should have.

In particular, EndpointSliceController ended up only looking at IPv4
Pod IPs when processing Pod update events, so when a Pod went from
having no IP to having only an IPv6 IP, EndpointSliceController would
think it hadn't changed.
This commit is contained in:
Dan Winship
2020-05-24 11:55:09 -04:00
parent f9ad7db9a6
commit 9fb6e2ef55
8 changed files with 217 additions and 310 deletions

View File

@@ -48,7 +48,6 @@ go_test(
"//pkg/api/v1/endpoints:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/util/endpoint:go_default_library",
"//pkg/features:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@@ -19,7 +19,6 @@ package endpoint
import (
"context"
"fmt"
"reflect"
"strconv"
"time"
@@ -213,39 +212,33 @@ func (e *EndpointController) addPod(obj interface{}) {
}
func podToEndpointAddressForService(svc *v1.Service, pod *v1.Pod) (*v1.EndpointAddress, error) {
var endpointIP string
if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
return podToEndpointAddress(pod), nil
}
endpointIP = pod.Status.PodIP
} else {
// api-server service controller ensured that the service got the correct IP Family
// according to user setup, here we only need to match EndPoint IPs' family to service
// actual IP family. as in, we don't need to check service.IPFamily
// api-server service controller ensured that the service got the correct IP Family
// according to user setup, here we only need to match EndPoint IPs' family to service
// actual IP family. as in, we don't need to check service.IPFamily
ipv6ClusterIP := utilnet.IsIPv6String(svc.Spec.ClusterIP)
for _, podIP := range pod.Status.PodIPs {
ipv6PodIP := utilnet.IsIPv6String(podIP.IP)
// same family?
// TODO (khenidak) when we remove the max of 2 PodIP limit from pods
// we will have to return multiple endpoint addresses
if ipv6ClusterIP == ipv6PodIP {
return &v1.EndpointAddress{
IP: podIP.IP,
NodeName: &pod.Spec.NodeName,
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Namespace: pod.ObjectMeta.Namespace,
Name: pod.ObjectMeta.Name,
UID: pod.ObjectMeta.UID,
ResourceVersion: pod.ObjectMeta.ResourceVersion,
}}, nil
ipv6ClusterIP := utilnet.IsIPv6String(svc.Spec.ClusterIP)
for _, podIP := range pod.Status.PodIPs {
ipv6PodIP := utilnet.IsIPv6String(podIP.IP)
// same family?
// TODO (khenidak) when we remove the max of 2 PodIP limit from pods
// we will have to return multiple endpoint addresses
if ipv6ClusterIP == ipv6PodIP {
endpointIP = podIP.IP
break
}
}
if endpointIP == "" {
return nil, fmt.Errorf("failed to find a matching endpoint for service %v", svc.Name)
}
}
return nil, fmt.Errorf("failed to find a matching endpoint for service %v", svc.Name)
}
func podToEndpointAddress(pod *v1.Pod) *v1.EndpointAddress {
return &v1.EndpointAddress{
IP: pod.Status.PodIP,
IP: endpointIP,
NodeName: &pod.Spec.NodeName,
TargetRef: &v1.ObjectReference{
Kind: "Pod",
@@ -253,24 +246,15 @@ func podToEndpointAddress(pod *v1.Pod) *v1.EndpointAddress {
Name: pod.ObjectMeta.Name,
UID: pod.ObjectMeta.UID,
ResourceVersion: pod.ObjectMeta.ResourceVersion,
}}
}
func endpointChanged(pod1, pod2 *v1.Pod) bool {
endpointAddress1 := podToEndpointAddress(pod1)
endpointAddress2 := podToEndpointAddress(pod2)
endpointAddress1.TargetRef.ResourceVersion = ""
endpointAddress2.TargetRef.ResourceVersion = ""
return !reflect.DeepEqual(endpointAddress1, endpointAddress2)
},
}, nil
}
// When a pod is updated, figure out what services it used to be a member of
// and what services it will be a member of, and enqueue the union of these.
// old and cur must be *v1.Pod types.
func (e *EndpointController) updatePod(old, cur interface{}) {
services := endpointutil.GetServicesToUpdateOnPodChange(e.serviceLister, e.serviceSelectorCache, old, cur, endpointChanged)
services := endpointutil.GetServicesToUpdateOnPodChange(e.serviceLister, e.serviceSelectorCache, old, cur)
for key := range services {
e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod)
}

View File

@@ -43,7 +43,6 @@ import (
endptspkg "k8s.io/kubernetes/pkg/api/v1/endpoints"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/controller"
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
"k8s.io/kubernetes/pkg/features"
utilpointer "k8s.io/utils/pointer"
)
@@ -67,7 +66,6 @@ func testPod(namespace string, id int, nPorts int, isReady bool, makeDualstack b
Containers: []v1.Container{{Ports: []v1.ContainerPort{}}},
},
Status: v1.PodStatus{
PodIP: fmt.Sprintf("1.2.3.%d", 4+id),
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
@@ -83,16 +81,11 @@ func testPod(namespace string, id int, nPorts int, isReady bool, makeDualstack b
p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)})
}
p.Status.PodIPs = append(p.Status.PodIPs, v1.PodIP{IP: fmt.Sprintf("1.2.3.%d", 4+id)})
if makeDualstack {
p.Status.PodIPs = []v1.PodIP{
{
IP: p.Status.PodIP,
},
{
IP: fmt.Sprintf("2000::%d", id),
},
}
p.Status.PodIPs = append(p.Status.PodIPs, v1.PodIP{IP: fmt.Sprintf("2000::%d", id)})
}
p.Status.PodIP = p.Status.PodIPs[0].IP
return p
}
@@ -1238,169 +1231,6 @@ func TestPodToEndpointAddressForService(t *testing.T) {
}
func TestPodToEndpointAddress(t *testing.T) {
podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
ns := "test"
addPods(podStore, ns, 1, 1, 0, false)
pods := podStore.List()
if len(pods) != 1 {
t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods))
return
}
pod := pods[0].(*v1.Pod)
epa := podToEndpointAddress(pod)
if epa.IP != pod.Status.PodIP {
t.Errorf("IP: expected: %s, got: %s", pod.Status.PodIP, epa.IP)
}
if *(epa.NodeName) != pod.Spec.NodeName {
t.Errorf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName))
}
if epa.TargetRef.Kind != "Pod" {
t.Errorf("TargetRef.Kind: expected: %s, got: %s", "Pod", epa.TargetRef.Kind)
}
if epa.TargetRef.Namespace != pod.ObjectMeta.Namespace {
t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Namespace, epa.TargetRef.Namespace)
}
if epa.TargetRef.Name != pod.ObjectMeta.Name {
t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Name, epa.TargetRef.Name)
}
if epa.TargetRef.UID != pod.ObjectMeta.UID {
t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.UID, epa.TargetRef.UID)
}
if epa.TargetRef.ResourceVersion != pod.ObjectMeta.ResourceVersion {
t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.ResourceVersion, epa.TargetRef.ResourceVersion)
}
}
func TestPodChanged(t *testing.T) {
podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
ns := "test"
addPods(podStore, ns, 1, 1, 0, false)
pods := podStore.List()
if len(pods) != 1 {
t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods))
return
}
oldPod := pods[0].(*v1.Pod)
newPod := oldPod.DeepCopy()
if podChangedHelper(oldPod, newPod, endpointChanged) {
t.Errorf("Expected pod to be unchanged for copied pod")
}
newPod.Spec.NodeName = "changed"
if !podChangedHelper(oldPod, newPod, endpointChanged) {
t.Errorf("Expected pod to be changed for pod with NodeName changed")
}
newPod.Spec.NodeName = oldPod.Spec.NodeName
newPod.ObjectMeta.ResourceVersion = "changed"
if podChangedHelper(oldPod, newPod, endpointChanged) {
t.Errorf("Expected pod to be unchanged for pod with only ResourceVersion changed")
}
newPod.ObjectMeta.ResourceVersion = oldPod.ObjectMeta.ResourceVersion
newPod.Status.PodIP = "1.2.3.1"
if !podChangedHelper(oldPod, newPod, endpointChanged) {
t.Errorf("Expected pod to be changed with pod IP address change")
}
newPod.Status.PodIP = oldPod.Status.PodIP
/* dual stack tests */
// primary changes, because changing IPs is done by changing sandbox
// case 1: add new secondary IP
newPod.Status.PodIP = "1.1.3.1"
newPod.Status.PodIPs = []v1.PodIP{
{
IP: "1.1.3.1",
},
{
IP: "2000::1",
},
}
if !podChangedHelper(oldPod, newPod, endpointChanged) {
t.Errorf("Expected pod to be changed with adding secondary IP")
}
// reset
newPod.Status.PodIPs = nil
newPod.Status.PodIP = oldPod.Status.PodIP
// case 2: removing a secondary IP
saved := oldPod.Status.PodIP
oldPod.Status.PodIP = "1.1.3.1"
oldPod.Status.PodIPs = []v1.PodIP{
{
IP: "1.1.3.1",
},
{
IP: "2000::1",
},
}
newPod.Status.PodIP = "1.2.3.4"
newPod.Status.PodIPs = []v1.PodIP{
{
IP: "1.2.3.4",
},
}
// reset
oldPod.Status.PodIPs = nil
newPod.Status.PodIPs = nil
oldPod.Status.PodIP = saved
newPod.Status.PodIP = saved
// case 3: change secondary
// case 2: removing a secondary IP
saved = oldPod.Status.PodIP
oldPod.Status.PodIP = "1.1.3.1"
oldPod.Status.PodIPs = []v1.PodIP{
{
IP: "1.1.3.1",
},
{
IP: "2000::1",
},
}
newPod.Status.PodIP = "1.2.3.4"
newPod.Status.PodIPs = []v1.PodIP{
{
IP: "1.2.3.4",
},
{
IP: "2000::2",
},
}
// reset
oldPod.Status.PodIPs = nil
newPod.Status.PodIPs = nil
oldPod.Status.PodIP = saved
newPod.Status.PodIP = saved
/* end dual stack testing */
newPod.ObjectMeta.Name = "wrong-name"
if !podChangedHelper(oldPod, newPod, endpointChanged) {
t.Errorf("Expected pod to be changed with pod name change")
}
newPod.ObjectMeta.Name = oldPod.ObjectMeta.Name
saveConditions := oldPod.Status.Conditions
oldPod.Status.Conditions = nil
if !podChangedHelper(oldPod, newPod, endpointChanged) {
t.Errorf("Expected pod to be changed with pod readiness change")
}
oldPod.Status.Conditions = saveConditions
now := metav1.NewTime(time.Now().UTC())
newPod.ObjectMeta.DeletionTimestamp = &now
if !podChangedHelper(oldPod, newPod, endpointChanged) {
t.Errorf("Expected pod to be changed with DeletionTimestamp change")
}
newPod.ObjectMeta.DeletionTimestamp = oldPod.ObjectMeta.DeletionTimestamp.DeepCopy()
}
func TestLastTriggerChangeTimeAnnotation(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
@@ -1680,6 +1510,7 @@ func TestPodUpdatesBatching(t *testing.T) {
oldPod := old.(*v1.Pod)
newPod := oldPod.DeepCopy()
newPod.Status.PodIP = update.podIP
newPod.Status.PodIPs[0].IP = update.podIP
newPod.ResourceVersion = strconv.Itoa(resourceVersion)
resourceVersion++
@@ -1996,8 +1827,3 @@ func stringVal(str *string) string {
}
return *str
}
func podChangedHelper(oldPod, newPod *v1.Pod, endpointChanged endpointutil.EndpointsMatch) bool {
podChanged, _ := endpointutil.PodChanged(oldPod, newPod, endpointChanged)
return podChanged
}