Merge pull request #91399 from danwinship/endpoint-ipfamily

multiple IPv6/dual-stack endpoint fixes
This commit is contained in:
Kubernetes Prow Robot 2020-07-20 13:31:14 -07:00 committed by GitHub
commit 5a529aa3a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 423 additions and 375 deletions

View File

@ -48,7 +48,6 @@ go_test(
"//pkg/api/v1/endpoints:go_default_library", "//pkg/api/v1/endpoints:go_default_library",
"//pkg/apis/core:go_default_library", "//pkg/apis/core:go_default_library",
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",
"//pkg/controller/util/endpoint:go_default_library",
"//pkg/features:go_default_library", "//pkg/features:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -65,6 +64,7 @@ go_test(
"//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/util/testing:go_default_library", "//staging/src/k8s.io/client-go/util/testing:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
"//vendor/k8s.io/utils/net:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library", "//vendor/k8s.io/utils/pointer:go_default_library",
], ],
) )

View File

@ -19,7 +19,6 @@ package endpoint
import ( import (
"context" "context"
"fmt" "fmt"
"reflect"
"strconv" "strconv"
"time" "time"
@ -213,39 +212,27 @@ func (e *EndpointController) addPod(obj interface{}) {
} }
func podToEndpointAddressForService(svc *v1.Service, pod *v1.Pod) (*v1.EndpointAddress, error) { func podToEndpointAddressForService(svc *v1.Service, pod *v1.Pod) (*v1.EndpointAddress, error) {
var endpointIP string
if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) { if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
return podToEndpointAddress(pod), nil // In a legacy cluster, the pod IP is guaranteed to be usable
} endpointIP = pod.Status.PodIP
} else {
// api-server service controller ensured that the service got the correct IP Family ipv6Service := endpointutil.IsIPv6Service(svc)
// according to user setup, here we only need to match EndPoint IPs' family to service for _, podIP := range pod.Status.PodIPs {
// actual IP family. as in, we don't need to check service.IPFamily ipv6PodIP := utilnet.IsIPv6String(podIP.IP)
if ipv6Service == ipv6PodIP {
ipv6ClusterIP := utilnet.IsIPv6String(svc.Spec.ClusterIP) endpointIP = podIP.IP
for _, podIP := range pod.Status.PodIPs { break
ipv6PodIP := utilnet.IsIPv6String(podIP.IP) }
// same family? }
// TODO (khenidak) when we remove the max of 2 PodIP limit from pods if endpointIP == "" {
// we will have to return multiple endpoint addresses return nil, fmt.Errorf("failed to find a matching endpoint for service %v", svc.Name)
if ipv6ClusterIP == ipv6PodIP {
return &v1.EndpointAddress{
IP: podIP.IP,
NodeName: &pod.Spec.NodeName,
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Namespace: pod.ObjectMeta.Namespace,
Name: pod.ObjectMeta.Name,
UID: pod.ObjectMeta.UID,
ResourceVersion: pod.ObjectMeta.ResourceVersion,
}}, nil
} }
} }
return nil, fmt.Errorf("failed to find a matching endpoint for service %v", svc.Name)
}
func podToEndpointAddress(pod *v1.Pod) *v1.EndpointAddress {
return &v1.EndpointAddress{ return &v1.EndpointAddress{
IP: pod.Status.PodIP, IP: endpointIP,
NodeName: &pod.Spec.NodeName, NodeName: &pod.Spec.NodeName,
TargetRef: &v1.ObjectReference{ TargetRef: &v1.ObjectReference{
Kind: "Pod", Kind: "Pod",
@ -253,24 +240,15 @@ func podToEndpointAddress(pod *v1.Pod) *v1.EndpointAddress {
Name: pod.ObjectMeta.Name, Name: pod.ObjectMeta.Name,
UID: pod.ObjectMeta.UID, UID: pod.ObjectMeta.UID,
ResourceVersion: pod.ObjectMeta.ResourceVersion, ResourceVersion: pod.ObjectMeta.ResourceVersion,
}} },
} }, nil
func endpointChanged(pod1, pod2 *v1.Pod) bool {
endpointAddress1 := podToEndpointAddress(pod1)
endpointAddress2 := podToEndpointAddress(pod2)
endpointAddress1.TargetRef.ResourceVersion = ""
endpointAddress2.TargetRef.ResourceVersion = ""
return !reflect.DeepEqual(endpointAddress1, endpointAddress2)
} }
// When a pod is updated, figure out what services it used to be a member of // When a pod is updated, figure out what services it used to be a member of
// and what services it will be a member of, and enqueue the union of these. // and what services it will be a member of, and enqueue the union of these.
// old and cur must be *v1.Pod types. // old and cur must be *v1.Pod types.
func (e *EndpointController) updatePod(old, cur interface{}) { func (e *EndpointController) updatePod(old, cur interface{}) {
services := endpointutil.GetServicesToUpdateOnPodChange(e.serviceLister, e.serviceSelectorCache, old, cur, endpointChanged) services := endpointutil.GetServicesToUpdateOnPodChange(e.serviceLister, e.serviceSelectorCache, old, cur)
for key := range services { for key := range services {
e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod) e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod)
} }

View File

@ -43,8 +43,8 @@ import (
endptspkg "k8s.io/kubernetes/pkg/api/v1/endpoints" endptspkg "k8s.io/kubernetes/pkg/api/v1/endpoints"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
utilnet "k8s.io/utils/net"
utilpointer "k8s.io/utils/pointer" utilpointer "k8s.io/utils/pointer"
) )
@ -55,7 +55,12 @@ var triggerTime = time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC)
var triggerTimeString = triggerTime.Format(time.RFC3339Nano) var triggerTimeString = triggerTime.Format(time.RFC3339Nano)
var oldTriggerTimeString = triggerTime.Add(-time.Hour).Format(time.RFC3339Nano) var oldTriggerTimeString = triggerTime.Add(-time.Hour).Format(time.RFC3339Nano)
func testPod(namespace string, id int, nPorts int, isReady bool, makeDualstack bool) *v1.Pod { var ipv4only = []v1.IPFamily{v1.IPv4Protocol}
var ipv6only = []v1.IPFamily{v1.IPv6Protocol}
var ipv4ipv6 = []v1.IPFamily{v1.IPv4Protocol, v1.IPv6Protocol}
var ipv6ipv4 = []v1.IPFamily{v1.IPv6Protocol, v1.IPv4Protocol}
func testPod(namespace string, id int, nPorts int, isReady bool, ipFamilies []v1.IPFamily) *v1.Pod {
p := &v1.Pod{ p := &v1.Pod{
TypeMeta: metav1.TypeMeta{APIVersion: "v1"}, TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
@ -67,7 +72,6 @@ func testPod(namespace string, id int, nPorts int, isReady bool, makeDualstack b
Containers: []v1.Container{{Ports: []v1.ContainerPort{}}}, Containers: []v1.Container{{Ports: []v1.ContainerPort{}}},
}, },
Status: v1.PodStatus{ Status: v1.PodStatus{
PodIP: fmt.Sprintf("1.2.3.%d", 4+id),
Conditions: []v1.PodCondition{ Conditions: []v1.PodCondition{
{ {
Type: v1.PodReady, Type: v1.PodReady,
@ -83,24 +87,24 @@ func testPod(namespace string, id int, nPorts int, isReady bool, makeDualstack b
p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports, p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)}) v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)})
} }
if makeDualstack { for _, family := range ipFamilies {
p.Status.PodIPs = []v1.PodIP{ var ip string
{ if family == v1.IPv4Protocol {
IP: p.Status.PodIP, ip = fmt.Sprintf("1.2.3.%d", 4+id)
}, } else {
{ ip = fmt.Sprintf("2000::%d", 4+id)
IP: fmt.Sprintf("2000::%d", id),
},
} }
p.Status.PodIPs = append(p.Status.PodIPs, v1.PodIP{IP: ip})
} }
p.Status.PodIP = p.Status.PodIPs[0].IP
return p return p
} }
func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int, makeDualstack bool) { func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int, ipFamilies []v1.IPFamily) {
for i := 0; i < nPods+nNotReady; i++ { for i := 0; i < nPods+nNotReady; i++ {
isReady := i < nPods isReady := i < nPods
pod := testPod(namespace, i, nPorts, isReady, makeDualstack) pod := testPod(namespace, i, nPorts, isReady, ipFamilies)
store.Add(pod) store.Add(pod)
} }
} }
@ -308,7 +312,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) {
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}}, Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
}}, }},
}) })
addPods(endpoints.podStore, ns, 1, 1, 0, false) addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
endpoints.serviceStore.Add(&v1.Service{ endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{ Spec: v1.ServiceSpec{
@ -352,7 +356,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) {
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "UDP"}}, Ports: []v1.EndpointPort{{Port: 1000, Protocol: "UDP"}},
}}, }},
}) })
addPods(endpoints.podStore, ns, 1, 1, 0, false) addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
endpoints.serviceStore.Add(&v1.Service{ endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{ Spec: v1.ServiceSpec{
@ -396,7 +400,7 @@ func TestSyncEndpointsProtocolSCTP(t *testing.T) {
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "SCTP"}}, Ports: []v1.EndpointPort{{Port: 1000, Protocol: "SCTP"}},
}}, }},
}) })
addPods(endpoints.podStore, ns, 1, 1, 0, false) addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
endpoints.serviceStore.Add(&v1.Service{ endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{ Spec: v1.ServiceSpec{
@ -437,7 +441,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
}, },
Subsets: []v1.EndpointSubset{}, Subsets: []v1.EndpointSubset{},
}) })
addPods(endpoints.podStore, ns, 1, 1, 0, false) addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
endpoints.serviceStore.Add(&v1.Service{ endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{ Spec: v1.ServiceSpec{
@ -477,7 +481,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
}, },
Subsets: []v1.EndpointSubset{}, Subsets: []v1.EndpointSubset{},
}) })
addPods(endpoints.podStore, ns, 0, 1, 1, false) addPods(endpoints.podStore, ns, 0, 1, 1, ipv4only)
endpoints.serviceStore.Add(&v1.Service{ endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{ Spec: v1.ServiceSpec{
@ -517,7 +521,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
}, },
Subsets: []v1.EndpointSubset{}, Subsets: []v1.EndpointSubset{},
}) })
addPods(endpoints.podStore, ns, 1, 1, 1, false) addPods(endpoints.podStore, ns, 1, 1, 1, ipv4only)
endpoints.serviceStore.Add(&v1.Service{ endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{ Spec: v1.ServiceSpec{
@ -561,7 +565,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
Ports: []v1.EndpointPort{{Port: 1000}}, Ports: []v1.EndpointPort{{Port: 1000}},
}}, }},
}) })
addPods(endpoints.podStore, ns, 1, 1, 0, false) addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
endpoints.serviceStore.Add(&v1.Service{ endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{ Spec: v1.ServiceSpec{
@ -604,7 +608,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}}, }},
}) })
addPods(endpoints.podStore, metav1.NamespaceDefault, 1, 1, 0, false) addPods(endpoints.podStore, metav1.NamespaceDefault, 1, 1, 0, ipv4only)
endpoints.serviceStore.Add(&v1.Service{ endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: metav1.NamespaceDefault}, ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: metav1.NamespaceDefault},
Spec: v1.ServiceSpec{ Spec: v1.ServiceSpec{
@ -621,8 +625,8 @@ func TestSyncEndpointsItems(t *testing.T) {
testServer, endpointsHandler := makeTestServer(t, ns) testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close() defer testServer.Close()
endpoints := newController(testServer.URL, 0*time.Second) endpoints := newController(testServer.URL, 0*time.Second)
addPods(endpoints.podStore, ns, 3, 2, 0, false) addPods(endpoints.podStore, ns, 3, 2, 0, ipv4only)
addPods(endpoints.podStore, "blah", 5, 2, 0, false) // make sure these aren't found! addPods(endpoints.podStore, "blah", 5, 2, 0, ipv4only) // make sure these aren't found!
endpoints.serviceStore.Add(&v1.Service{ endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
@ -666,7 +670,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) {
testServer, endpointsHandler := makeTestServer(t, ns) testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close() defer testServer.Close()
endpoints := newController(testServer.URL, 0*time.Second) endpoints := newController(testServer.URL, 0*time.Second)
addPods(endpoints.podStore, ns, 3, 2, 0, false) addPods(endpoints.podStore, ns, 3, 2, 0, ipv4only)
serviceLabels := map[string]string{"foo": "bar"} serviceLabels := map[string]string{"foo": "bar"}
endpoints.serviceStore.Add(&v1.Service{ endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
@ -728,7 +732,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
Ports: []v1.EndpointPort{{Port: 1000}}, Ports: []v1.EndpointPort{{Port: 1000}},
}}, }},
}) })
addPods(endpoints.podStore, ns, 1, 1, 0, false) addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
serviceLabels := map[string]string{"baz": "blah"} serviceLabels := map[string]string{"baz": "blah"}
endpoints.serviceStore.Add(&v1.Service{ endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
@ -778,7 +782,7 @@ func TestWaitsForAllInformersToBeSynced2(t *testing.T) {
testServer, endpointsHandler := makeTestServer(t, ns) testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close() defer testServer.Close()
endpoints := newController(testServer.URL, 0*time.Second) endpoints := newController(testServer.URL, 0*time.Second)
addPods(endpoints.podStore, ns, 1, 1, 0, false) addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
service := &v1.Service{ service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
@ -830,7 +834,7 @@ func TestSyncEndpointsHeadlessService(t *testing.T) {
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}}, Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
}}, }},
}) })
addPods(endpoints.podStore, ns, 1, 1, 0, false) addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
service := &v1.Service{ service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, Labels: map[string]string{"a": "b"}}, ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, Labels: map[string]string{"a": "b"}},
Spec: v1.ServiceSpec{ Spec: v1.ServiceSpec{
@ -994,7 +998,7 @@ func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) {
Ports: nil, Ports: nil,
}, },
}) })
addPods(endpoints.podStore, ns, 1, 1, 0, false) addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
endpoints.syncService(ns + "/foo") endpoints.syncService(ns + "/foo")
endpointsHandler.ValidateRequestCount(t, 1) endpointsHandler.ValidateRequestCount(t, 1)
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{ data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
@ -1102,89 +1106,211 @@ func TestShouldPodBeInEndpoints(t *testing.T) {
} }
} }
} }
func TestPodToEndpointAddressForService(t *testing.T) { func TestPodToEndpointAddressForService(t *testing.T) {
ipv4 := v1.IPv4Protocol
ipv6 := v1.IPv6Protocol
testCases := []struct { testCases := []struct {
name string name string
expectedEndPointIP string
enableDualStack bool enableDualStack bool
expectError bool ipFamilies []v1.IPFamily
enableDualStackPod bool
service v1.Service service v1.Service
expectedEndpointFamily v1.IPFamily
expectError bool
}{ }{
{ {
name: "v4 service, in a single stack cluster", name: "v4 service, in a single stack cluster",
expectedEndPointIP: "1.2.3.4",
enableDualStack: false, enableDualStack: false,
expectError: false, ipFamilies: ipv4only,
enableDualStackPod: false,
service: v1.Service{ service: v1.Service{
Spec: v1.ServiceSpec{ Spec: v1.ServiceSpec{
ClusterIP: "10.0.0.1", ClusterIP: "10.0.0.1",
}, },
}, },
expectedEndpointFamily: ipv4,
}, },
{ {
name: "v4 service, in a dual stack cluster", name: "v4 service, in a dual stack cluster",
expectedEndPointIP: "1.2.3.4", enableDualStack: true,
enableDualStack: true, ipFamilies: ipv4ipv6,
expectError: false,
enableDualStackPod: true,
service: v1.Service{ service: v1.Service{
Spec: v1.ServiceSpec{ Spec: v1.ServiceSpec{
ClusterIP: "10.0.0.1", ClusterIP: "10.0.0.1",
}, },
}, },
expectedEndpointFamily: ipv4,
}, },
{ {
name: "v6 service, in a dual stack cluster. dual stack enabled", name: "v4 service, in a dual stack ipv6-primary cluster",
expectedEndPointIP: "2000::0",
enableDualStack: true, enableDualStack: true,
expectError: false, ipFamilies: ipv6ipv4,
enableDualStackPod: true,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: "10.0.0.1",
},
},
expectedEndpointFamily: ipv4,
},
{
name: "v4 headless service, in a single stack cluster",
enableDualStack: false,
ipFamilies: ipv4only,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: v1.ClusterIPNone,
},
},
expectedEndpointFamily: ipv4,
},
{
name: "v4 headless service, in a dual stack cluster",
enableDualStack: false,
ipFamilies: ipv4ipv6,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: v1.ClusterIPNone,
IPFamily: &ipv4,
},
},
expectedEndpointFamily: ipv4,
},
{
name: "v4 legacy headless service, in a dual stack cluster",
enableDualStack: false,
ipFamilies: ipv4ipv6,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: v1.ClusterIPNone,
},
},
expectedEndpointFamily: ipv4,
},
{
name: "v4 legacy headless service, in a dual stack ipv6-primary cluster",
enableDualStack: true,
ipFamilies: ipv6ipv4,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: v1.ClusterIPNone,
},
},
expectedEndpointFamily: ipv4,
},
{
name: "v6 service, in a dual stack cluster",
enableDualStack: true,
ipFamilies: ipv4ipv6,
service: v1.Service{ service: v1.Service{
Spec: v1.ServiceSpec{ Spec: v1.ServiceSpec{
ClusterIP: "3000::1", ClusterIP: "3000::1",
}, },
}, },
expectedEndpointFamily: ipv6,
},
{
name: "v6 headless service, in a single stack cluster",
enableDualStack: false,
ipFamilies: ipv6only,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: v1.ClusterIPNone,
},
},
expectedEndpointFamily: ipv6,
},
{
name: "v6 headless service, in a dual stack cluster",
enableDualStack: true,
ipFamilies: ipv4ipv6,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: v1.ClusterIPNone,
IPFamily: &ipv6,
},
},
expectedEndpointFamily: ipv6,
},
{
name: "v6 legacy headless service, in a dual stack cluster",
enableDualStack: false,
ipFamilies: ipv4ipv6,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: v1.ClusterIPNone,
},
},
// This is not the behavior we *want*, but it's the behavior we currently expect.
expectedEndpointFamily: ipv4,
}, },
// in reality this is a misconfigured cluster // in reality this is a misconfigured cluster
// i.e user is not using dual stack and have PodIP == v4 and ServiceIP==v6 // i.e user is not using dual stack and have PodIP == v4 and ServiceIP==v6
// we are testing that we will keep producing the expected behavior // we are testing that we will keep producing the expected behavior
{ {
name: "v6 service, in a v4 only cluster. dual stack disabled", name: "v6 service, in a v4 only cluster. dual stack disabled",
expectedEndPointIP: "1.2.3.4",
enableDualStack: false, enableDualStack: false,
expectError: false, ipFamilies: ipv4only,
enableDualStackPod: false,
service: v1.Service{ service: v1.Service{
Spec: v1.ServiceSpec{ Spec: v1.ServiceSpec{
ClusterIP: "3000::1", ClusterIP: "3000::1",
}, },
}, },
expectedEndpointFamily: ipv4,
}, },
// but this will actually give an error
{ {
name: "v6 service, in a v4 only cluster - dual stack enabled", name: "v6 service, in a v4 only cluster - dual stack enabled",
expectedEndPointIP: "1.2.3.4",
enableDualStack: true, enableDualStack: true,
expectError: true, ipFamilies: ipv4only,
enableDualStackPod: false,
service: v1.Service{ service: v1.Service{
Spec: v1.ServiceSpec{ Spec: v1.ServiceSpec{
ClusterIP: "3000::1", ClusterIP: "3000::1",
}, },
}, },
expectError: true,
}, },
} }
for _, tc := range testCases { for _, tc := range testCases {
@ -1192,7 +1318,7 @@ func TestPodToEndpointAddressForService(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, tc.enableDualStack)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, tc.enableDualStack)()
podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
ns := "test" ns := "test"
addPods(podStore, ns, 1, 1, 0, tc.enableDualStackPod) addPods(podStore, ns, 1, 1, 0, tc.ipFamilies)
pods := podStore.List() pods := podStore.List()
if len(pods) != 1 { if len(pods) != 1 {
t.Fatalf("podStore size: expected: %d, got: %d", 1, len(pods)) t.Fatalf("podStore size: expected: %d, got: %d", 1, len(pods))
@ -1212,8 +1338,8 @@ func TestPodToEndpointAddressForService(t *testing.T) {
return return
} }
if epa.IP != tc.expectedEndPointIP { if utilnet.IsIPv6String(epa.IP) != (tc.expectedEndpointFamily == ipv6) {
t.Fatalf("IP: expected: %s, got: %s", pod.Status.PodIP, epa.IP) t.Fatalf("IP: expected %s, got: %s", tc.expectedEndpointFamily, epa.IP)
} }
if *(epa.NodeName) != pod.Spec.NodeName { if *(epa.NodeName) != pod.Spec.NodeName {
t.Fatalf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName)) t.Fatalf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName))
@ -1238,169 +1364,6 @@ func TestPodToEndpointAddressForService(t *testing.T) {
} }
func TestPodToEndpointAddress(t *testing.T) {
podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
ns := "test"
addPods(podStore, ns, 1, 1, 0, false)
pods := podStore.List()
if len(pods) != 1 {
t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods))
return
}
pod := pods[0].(*v1.Pod)
epa := podToEndpointAddress(pod)
if epa.IP != pod.Status.PodIP {
t.Errorf("IP: expected: %s, got: %s", pod.Status.PodIP, epa.IP)
}
if *(epa.NodeName) != pod.Spec.NodeName {
t.Errorf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName))
}
if epa.TargetRef.Kind != "Pod" {
t.Errorf("TargetRef.Kind: expected: %s, got: %s", "Pod", epa.TargetRef.Kind)
}
if epa.TargetRef.Namespace != pod.ObjectMeta.Namespace {
t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Namespace, epa.TargetRef.Namespace)
}
if epa.TargetRef.Name != pod.ObjectMeta.Name {
t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Name, epa.TargetRef.Name)
}
if epa.TargetRef.UID != pod.ObjectMeta.UID {
t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.UID, epa.TargetRef.UID)
}
if epa.TargetRef.ResourceVersion != pod.ObjectMeta.ResourceVersion {
t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.ResourceVersion, epa.TargetRef.ResourceVersion)
}
}
func TestPodChanged(t *testing.T) {
podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
ns := "test"
addPods(podStore, ns, 1, 1, 0, false)
pods := podStore.List()
if len(pods) != 1 {
t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods))
return
}
oldPod := pods[0].(*v1.Pod)
newPod := oldPod.DeepCopy()
if podChangedHelper(oldPod, newPod, endpointChanged) {
t.Errorf("Expected pod to be unchanged for copied pod")
}
newPod.Spec.NodeName = "changed"
if !podChangedHelper(oldPod, newPod, endpointChanged) {
t.Errorf("Expected pod to be changed for pod with NodeName changed")
}
newPod.Spec.NodeName = oldPod.Spec.NodeName
newPod.ObjectMeta.ResourceVersion = "changed"
if podChangedHelper(oldPod, newPod, endpointChanged) {
t.Errorf("Expected pod to be unchanged for pod with only ResourceVersion changed")
}
newPod.ObjectMeta.ResourceVersion = oldPod.ObjectMeta.ResourceVersion
newPod.Status.PodIP = "1.2.3.1"
if !podChangedHelper(oldPod, newPod, endpointChanged) {
t.Errorf("Expected pod to be changed with pod IP address change")
}
newPod.Status.PodIP = oldPod.Status.PodIP
/* dual stack tests */
// primary changes, because changing IPs is done by changing sandbox
// case 1: add new secondary IP
newPod.Status.PodIP = "1.1.3.1"
newPod.Status.PodIPs = []v1.PodIP{
{
IP: "1.1.3.1",
},
{
IP: "2000::1",
},
}
if !podChangedHelper(oldPod, newPod, endpointChanged) {
t.Errorf("Expected pod to be changed with adding secondary IP")
}
// reset
newPod.Status.PodIPs = nil
newPod.Status.PodIP = oldPod.Status.PodIP
// case 2: removing a secondary IP
saved := oldPod.Status.PodIP
oldPod.Status.PodIP = "1.1.3.1"
oldPod.Status.PodIPs = []v1.PodIP{
{
IP: "1.1.3.1",
},
{
IP: "2000::1",
},
}
newPod.Status.PodIP = "1.2.3.4"
newPod.Status.PodIPs = []v1.PodIP{
{
IP: "1.2.3.4",
},
}
// reset
oldPod.Status.PodIPs = nil
newPod.Status.PodIPs = nil
oldPod.Status.PodIP = saved
newPod.Status.PodIP = saved
// case 3: change secondary
// case 2: removing a secondary IP
saved = oldPod.Status.PodIP
oldPod.Status.PodIP = "1.1.3.1"
oldPod.Status.PodIPs = []v1.PodIP{
{
IP: "1.1.3.1",
},
{
IP: "2000::1",
},
}
newPod.Status.PodIP = "1.2.3.4"
newPod.Status.PodIPs = []v1.PodIP{
{
IP: "1.2.3.4",
},
{
IP: "2000::2",
},
}
// reset
oldPod.Status.PodIPs = nil
newPod.Status.PodIPs = nil
oldPod.Status.PodIP = saved
newPod.Status.PodIP = saved
/* end dual stack testing */
newPod.ObjectMeta.Name = "wrong-name"
if !podChangedHelper(oldPod, newPod, endpointChanged) {
t.Errorf("Expected pod to be changed with pod name change")
}
newPod.ObjectMeta.Name = oldPod.ObjectMeta.Name
saveConditions := oldPod.Status.Conditions
oldPod.Status.Conditions = nil
if !podChangedHelper(oldPod, newPod, endpointChanged) {
t.Errorf("Expected pod to be changed with pod readiness change")
}
oldPod.Status.Conditions = saveConditions
now := metav1.NewTime(time.Now().UTC())
newPod.ObjectMeta.DeletionTimestamp = &now
if !podChangedHelper(oldPod, newPod, endpointChanged) {
t.Errorf("Expected pod to be changed with DeletionTimestamp change")
}
newPod.ObjectMeta.DeletionTimestamp = oldPod.ObjectMeta.DeletionTimestamp.DeepCopy()
}
func TestLastTriggerChangeTimeAnnotation(t *testing.T) { func TestLastTriggerChangeTimeAnnotation(t *testing.T) {
ns := "other" ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns) testServer, endpointsHandler := makeTestServer(t, ns)
@ -1417,7 +1380,7 @@ func TestLastTriggerChangeTimeAnnotation(t *testing.T) {
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}}, Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
}}, }},
}) })
addPods(endpoints.podStore, ns, 1, 1, 0, false) addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
endpoints.serviceStore.Add(&v1.Service{ endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)}, ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
Spec: v1.ServiceSpec{ Spec: v1.ServiceSpec{
@ -1467,7 +1430,7 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) {
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}}, Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
}}, }},
}) })
addPods(endpoints.podStore, ns, 1, 1, 0, false) addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
endpoints.serviceStore.Add(&v1.Service{ endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)}, ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
Spec: v1.ServiceSpec{ Spec: v1.ServiceSpec{
@ -1518,7 +1481,7 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationCleared(t *testing.T) {
}}, }},
}) })
// Neither pod nor service has trigger time, this should cause annotation to be cleared. // Neither pod nor service has trigger time, this should cause annotation to be cleared.
addPods(endpoints.podStore, ns, 1, 1, 0, false) addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
endpoints.serviceStore.Add(&v1.Service{ endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{ Spec: v1.ServiceSpec{
@ -1657,7 +1620,7 @@ func TestPodUpdatesBatching(t *testing.T) {
go endpoints.Run(1, stopCh) go endpoints.Run(1, stopCh)
addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, false) addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, ipv4only)
endpoints.serviceStore.Add(&v1.Service{ endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
@ -1680,6 +1643,7 @@ func TestPodUpdatesBatching(t *testing.T) {
oldPod := old.(*v1.Pod) oldPod := old.(*v1.Pod)
newPod := oldPod.DeepCopy() newPod := oldPod.DeepCopy()
newPod.Status.PodIP = update.podIP newPod.Status.PodIP = update.podIP
newPod.Status.PodIPs[0].IP = update.podIP
newPod.ResourceVersion = strconv.Itoa(resourceVersion) newPod.ResourceVersion = strconv.Itoa(resourceVersion)
resourceVersion++ resourceVersion++
@ -1790,7 +1754,7 @@ func TestPodAddsBatching(t *testing.T) {
for i, add := range tc.adds { for i, add := range tc.adds {
time.Sleep(add.delay) time.Sleep(add.delay)
p := testPod(ns, i, 1, true, false) p := testPod(ns, i, 1, true, ipv4only)
endpoints.podStore.Add(p) endpoints.podStore.Add(p)
endpoints.addPod(p) endpoints.addPod(p)
} }
@ -1901,7 +1865,7 @@ func TestPodDeleteBatching(t *testing.T) {
go endpoints.Run(1, stopCh) go endpoints.Run(1, stopCh)
addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, false) addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, ipv4only)
endpoints.serviceStore.Add(&v1.Service{ endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
@ -1996,8 +1960,3 @@ func stringVal(str *string) string {
} }
return *str return *str
} }
func podChangedHelper(oldPod, newPod *v1.Pod, endpointChanged endpointutil.EndpointsMatch) bool {
podChanged, _ := endpointutil.PodChanged(oldPod, newPod, endpointChanged)
return podChanged
}

View File

@ -457,7 +457,7 @@ func (c *Controller) addPod(obj interface{}) {
} }
func (c *Controller) updatePod(old, cur interface{}) { func (c *Controller) updatePod(old, cur interface{}) {
services := endpointutil.GetServicesToUpdateOnPodChange(c.serviceLister, c.serviceSelectorCache, old, cur, podEndpointChanged) services := endpointutil.GetServicesToUpdateOnPodChange(c.serviceLister, c.serviceSelectorCache, old, cur)
for key := range services { for key := range services {
c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod) c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod)
} }

View File

@ -59,7 +59,7 @@ type endpointMeta struct {
func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time) error { func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time) error {
addressType := discovery.AddressTypeIPv4 addressType := discovery.AddressTypeIPv4
if isIPv6Service(service) { if endpointutil.IsIPv6Service(service) {
addressType = discovery.AddressTypeIPv6 addressType = discovery.AddressTypeIPv6
} }

View File

@ -18,7 +18,6 @@ package endpointslice
import ( import (
"fmt" "fmt"
"reflect"
"time" "time"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
@ -36,19 +35,7 @@ import (
utilnet "k8s.io/utils/net" utilnet "k8s.io/utils/net"
) )
// podEndpointChanged returns true if the results of podToEndpoint are different // podToEndpoint returns an Endpoint object generated from pod, node, and service.
// for the pods passed to this function.
func podEndpointChanged(pod1, pod2 *corev1.Pod) bool {
endpoint1 := podToEndpoint(pod1, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}})
endpoint2 := podToEndpoint(pod2, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}})
endpoint1.TargetRef.ResourceVersion = ""
endpoint2.TargetRef.ResourceVersion = ""
return !reflect.DeepEqual(endpoint1, endpoint2)
}
// podToEndpoint returns an Endpoint object generated from a Pod and Node.
func podToEndpoint(pod *corev1.Pod, node *corev1.Node, service *corev1.Service) discovery.Endpoint { func podToEndpoint(pod *corev1.Pod, node *corev1.Node, service *corev1.Service) discovery.Endpoint {
// Build out topology information. This is currently limited to hostname, // Build out topology information. This is currently limited to hostname,
// zone, and region, but this will be expanded in the future. // zone, and region, but this will be expanded in the future.
@ -133,7 +120,7 @@ func getEndpointAddresses(podStatus corev1.PodStatus, service *corev1.Service) [
for _, podIP := range podStatus.PodIPs { for _, podIP := range podStatus.PodIPs {
isIPv6PodIP := utilnet.IsIPv6String(podIP.IP) isIPv6PodIP := utilnet.IsIPv6String(podIP.IP)
if isIPv6PodIP == isIPv6Service(service) { if isIPv6PodIP == endpointutil.IsIPv6Service(service) {
addresses = append(addresses, podIP.IP) addresses = append(addresses, podIP.IP)
} }
} }
@ -141,12 +128,6 @@ func getEndpointAddresses(podStatus corev1.PodStatus, service *corev1.Service) [
return addresses return addresses
} }
// isIPv6Service returns true if the Service uses IPv6 addresses.
func isIPv6Service(service *corev1.Service) bool {
// IPFamily is not guaranteed to be set, even in an IPv6 only cluster.
return (service.Spec.IPFamily != nil && *service.Spec.IPFamily == corev1.IPv6Protocol) || utilnet.IsIPv6String(service.Spec.ClusterIP)
}
// endpointsEqualBeyondHash returns true if endpoints have equal attributes // endpointsEqualBeyondHash returns true if endpoints have equal attributes
// but excludes equality checks that would have already been covered with // but excludes equality checks that would have already been covered with
// endpoint hashing (see hashEndpoint func for more info). // endpoint hashing (see hashEndpoint func for more info).

View File

@ -20,7 +20,6 @@ import (
"fmt" "fmt"
"reflect" "reflect"
"testing" "testing"
"time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
@ -32,8 +31,6 @@ import (
"k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing" k8stesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
utilpointer "k8s.io/utils/pointer" utilpointer "k8s.io/utils/pointer"
) )
@ -255,61 +252,6 @@ func TestPodToEndpoint(t *testing.T) {
} }
} }
func TestPodChangedWithPodEndpointChanged(t *testing.T) {
podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
ns := "test"
podStore.Add(newPod(1, ns, true, 1))
pods := podStore.List()
if len(pods) != 1 {
t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods))
return
}
oldPod := pods[0].(*v1.Pod)
newPod := oldPod.DeepCopy()
if podChangedHelper(oldPod, newPod, podEndpointChanged) {
t.Errorf("Expected pod to be unchanged for copied pod")
}
newPod.Spec.NodeName = "changed"
if !podChangedHelper(oldPod, newPod, podEndpointChanged) {
t.Errorf("Expected pod to be changed for pod with NodeName changed")
}
newPod.Spec.NodeName = oldPod.Spec.NodeName
newPod.ObjectMeta.ResourceVersion = "changed"
if podChangedHelper(oldPod, newPod, podEndpointChanged) {
t.Errorf("Expected pod to be unchanged for pod with only ResourceVersion changed")
}
newPod.ObjectMeta.ResourceVersion = oldPod.ObjectMeta.ResourceVersion
newPod.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.1"}}
if !podChangedHelper(oldPod, newPod, podEndpointChanged) {
t.Errorf("Expected pod to be changed with pod IP address change")
}
newPod.Status.PodIPs = oldPod.Status.PodIPs
newPod.ObjectMeta.Name = "wrong-name"
if !podChangedHelper(oldPod, newPod, podEndpointChanged) {
t.Errorf("Expected pod to be changed with pod name change")
}
newPod.ObjectMeta.Name = oldPod.ObjectMeta.Name
saveConditions := oldPod.Status.Conditions
oldPod.Status.Conditions = nil
if !podChangedHelper(oldPod, newPod, podEndpointChanged) {
t.Errorf("Expected pod to be changed with pod readiness change")
}
oldPod.Status.Conditions = saveConditions
now := metav1.NewTime(time.Now().UTC())
newPod.ObjectMeta.DeletionTimestamp = &now
if !podChangedHelper(oldPod, newPod, podEndpointChanged) {
t.Errorf("Expected pod to be changed with DeletionTimestamp change")
}
newPod.ObjectMeta.DeletionTimestamp = oldPod.ObjectMeta.DeletionTimestamp.DeepCopy()
}
func TestServiceControllerKey(t *testing.T) { func TestServiceControllerKey(t *testing.T) {
testCases := map[string]struct { testCases := map[string]struct {
endpointSlice *discovery.EndpointSlice endpointSlice *discovery.EndpointSlice
@ -545,8 +487,3 @@ func newEmptyEndpointSlice(n int, namespace string, endpointMeta endpointMeta, s
Endpoints: []discovery.Endpoint{}, Endpoints: []discovery.Endpoint{},
} }
} }
func podChangedHelper(oldPod, newPod *v1.Pod, endpointChanged endpointutil.EndpointsMatch) bool {
podChanged, _ := endpointutil.PodChanged(oldPod, newPod, podEndpointChanged)
return podChanged
}

View File

@ -10,6 +10,7 @@ go_library(
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
"//pkg/api/v1/pod:go_default_library", "//pkg/api/v1/pod:go_default_library",
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",
"//pkg/util/hash:go_default_library", "//pkg/util/hash:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
@ -19,6 +20,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/utils/net:go_default_library",
], ],
) )

View File

@ -32,8 +32,10 @@ import (
v1listers "k8s.io/client-go/listers/core/v1" v1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/hash" "k8s.io/kubernetes/pkg/util/hash"
utilnet "k8s.io/utils/net"
) )
// ServiceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls to AsSelectorPreValidated (see #73527) // ServiceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls to AsSelectorPreValidated (see #73527)
@ -106,9 +108,6 @@ func (sc *ServiceSelectorCache) GetPodServiceMemberships(serviceLister v1listers
return set, nil return set, nil
} }
// EndpointsMatch is a type of function that returns true if pod endpoints match.
type EndpointsMatch func(*v1.Pod, *v1.Pod) bool
// PortMapKey is used to uniquely identify groups of endpoint ports. // PortMapKey is used to uniquely identify groups of endpoint ports.
type PortMapKey string type PortMapKey string
@ -153,9 +152,10 @@ func ShouldSetHostname(pod *v1.Pod, svc *v1.Service) bool {
return len(pod.Spec.Hostname) > 0 && pod.Spec.Subdomain == svc.Name && svc.Namespace == pod.Namespace return len(pod.Spec.Hostname) > 0 && pod.Spec.Subdomain == svc.Name && svc.Namespace == pod.Namespace
} }
// PodChanged returns two boolean values, the first returns true if the pod. // podEndpointsChanged returns two boolean values. The first is true if the pod has
// has changed, the second value returns true if the pod labels have changed. // changed in a way that may change existing endpoints. The second value is true if the
func PodChanged(oldPod, newPod *v1.Pod, endpointChanged EndpointsMatch) (bool, bool) { // pod has changed in a way that may affect which Services it matches.
func podEndpointsChanged(oldPod, newPod *v1.Pod) (bool, bool) {
// Check if the pod labels have changed, indicating a possible // Check if the pod labels have changed, indicating a possible
// change in the service membership // change in the service membership
labelsChanged := false labelsChanged := false
@ -175,16 +175,27 @@ func PodChanged(oldPod, newPod *v1.Pod, endpointChanged EndpointsMatch) (bool, b
if podutil.IsPodReady(oldPod) != podutil.IsPodReady(newPod) { if podutil.IsPodReady(oldPod) != podutil.IsPodReady(newPod) {
return true, labelsChanged return true, labelsChanged
} }
// Convert the pod to an Endpoint, clear inert fields,
// and see if they are the same. // Check if the pod IPs have changed
// TODO: Add a watcher for node changes separate from this if len(oldPod.Status.PodIPs) != len(newPod.Status.PodIPs) {
// We don't want to trigger multiple syncs at a pod level when a node changes return true, labelsChanged
return endpointChanged(newPod, oldPod), labelsChanged }
for i := range oldPod.Status.PodIPs {
if oldPod.Status.PodIPs[i].IP != newPod.Status.PodIPs[i].IP {
return true, labelsChanged
}
}
// Endpoints may also reference a pod's Name, Namespace, UID, and NodeName, but
// the first three are immutable, and NodeName is immutable once initially set,
// which happens before the pod gets an IP.
return false, labelsChanged
} }
// GetServicesToUpdateOnPodChange returns a set of Service keys for Services // GetServicesToUpdateOnPodChange returns a set of Service keys for Services
// that have potentially been affected by a change to this pod. // that have potentially been affected by a change to this pod.
func GetServicesToUpdateOnPodChange(serviceLister v1listers.ServiceLister, selectorCache *ServiceSelectorCache, old, cur interface{}, endpointChanged EndpointsMatch) sets.String { func GetServicesToUpdateOnPodChange(serviceLister v1listers.ServiceLister, selectorCache *ServiceSelectorCache, old, cur interface{}) sets.String {
newPod := cur.(*v1.Pod) newPod := cur.(*v1.Pod)
oldPod := old.(*v1.Pod) oldPod := old.(*v1.Pod)
if newPod.ResourceVersion == oldPod.ResourceVersion { if newPod.ResourceVersion == oldPod.ResourceVersion {
@ -193,7 +204,7 @@ func GetServicesToUpdateOnPodChange(serviceLister v1listers.ServiceLister, selec
return sets.String{} return sets.String{}
} }
podChanged, labelsChanged := PodChanged(oldPod, newPod, endpointChanged) podChanged, labelsChanged := podEndpointsChanged(oldPod, newPod)
// If both the pod and labels are unchanged, no update is needed // If both the pod and labels are unchanged, no update is needed
if !podChanged && !labelsChanged { if !podChanged && !labelsChanged {
@ -266,3 +277,18 @@ func (sl portsInOrder) Less(i, j int) bool {
h2 := DeepHashObjectToString(sl[j]) h2 := DeepHashObjectToString(sl[j])
return h1 < h2 return h1 < h2
} }
// IsIPv6Service checks if svc should have IPv6 endpoints
func IsIPv6Service(svc *v1.Service) bool {
if helper.IsServiceIPSet(svc) {
return utilnet.IsIPv6String(svc.Spec.ClusterIP)
} else if svc.Spec.IPFamily != nil {
return *svc.Spec.IPFamily == v1.IPv6Protocol
} else {
// FIXME: for legacy headless Services with no IPFamily, the current
// thinking is that we should use the cluster default. Unfortunately
// the endpoint controller doesn't know the cluster default. For now,
// assume it's IPv4.
return false
}
}

View File

@ -499,3 +499,168 @@ func BenchmarkGetPodServiceMemberships(b *testing.B) {
} }
} }
} }
func Test_podChanged(t *testing.T) {
testCases := []struct {
testName string
modifier func(*v1.Pod, *v1.Pod)
podChanged bool
labelsChanged bool
}{
{
testName: "no changes",
modifier: func(old, new *v1.Pod) {},
podChanged: false,
labelsChanged: false,
}, {
testName: "change NodeName",
modifier: func(old, new *v1.Pod) {
new.Spec.NodeName = "changed"
},
// NodeName can only change before the pod has an IP, and we don't care about the
// pod yet at that point so we ignore this change
podChanged: false,
labelsChanged: false,
}, {
testName: "change ResourceVersion",
modifier: func(old, new *v1.Pod) {
new.ObjectMeta.ResourceVersion = "changed"
},
// ResourceVersion is intentionally ignored if nothing else changed
podChanged: false,
labelsChanged: false,
}, {
testName: "add primary IPv4",
modifier: func(old, new *v1.Pod) {
new.Status.PodIP = "1.2.3.4"
new.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.4"}}
},
podChanged: true,
labelsChanged: false,
}, {
testName: "modify primary IPv4",
modifier: func(old, new *v1.Pod) {
old.Status.PodIP = "1.2.3.4"
old.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.4"}}
new.Status.PodIP = "2.3.4.5"
new.Status.PodIPs = []v1.PodIP{{IP: "2.3.4.5"}}
},
podChanged: true,
labelsChanged: false,
}, {
testName: "add primary IPv6",
modifier: func(old, new *v1.Pod) {
new.Status.PodIP = "fd00:10:96::1"
new.Status.PodIPs = []v1.PodIP{{IP: "fd00:10:96::1"}}
},
podChanged: true,
labelsChanged: false,
}, {
testName: "modify primary IPv6",
modifier: func(old, new *v1.Pod) {
old.Status.PodIP = "fd00:10:96::1"
old.Status.PodIPs = []v1.PodIP{{IP: "fd00:10:96::1"}}
new.Status.PodIP = "fd00:10:96::2"
new.Status.PodIPs = []v1.PodIP{{IP: "fd00:10:96::2"}}
},
podChanged: true,
labelsChanged: false,
}, {
testName: "add secondary IP",
modifier: func(old, new *v1.Pod) {
old.Status.PodIP = "1.2.3.4"
old.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.4"}}
new.Status.PodIP = "1.2.3.4"
new.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.4"}, {IP: "fd00:10:96::1"}}
},
podChanged: true,
labelsChanged: false,
}, {
testName: "modify secondary IP",
modifier: func(old, new *v1.Pod) {
old.Status.PodIP = "1.2.3.4"
old.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.4"}, {IP: "fd00:10:96::1"}}
new.Status.PodIP = "1.2.3.4"
new.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.4"}, {IP: "fd00:10:96::2"}}
},
podChanged: true,
labelsChanged: false,
}, {
testName: "remove secondary IP",
modifier: func(old, new *v1.Pod) {
old.Status.PodIP = "1.2.3.4"
old.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.4"}, {IP: "fd00:10:96::1"}}
new.Status.PodIP = "1.2.3.4"
new.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.4"}}
},
podChanged: true,
labelsChanged: false,
}, {
testName: "change readiness",
modifier: func(old, new *v1.Pod) {
new.Status.Conditions[0].Status = v1.ConditionTrue
},
podChanged: true,
labelsChanged: false,
}, {
testName: "mark for deletion",
modifier: func(old, new *v1.Pod) {
now := metav1.NewTime(time.Now().UTC())
new.ObjectMeta.DeletionTimestamp = &now
},
podChanged: true,
labelsChanged: false,
}, {
testName: "add label",
modifier: func(old, new *v1.Pod) {
new.Labels["label"] = "new"
},
podChanged: false,
labelsChanged: true,
}, {
testName: "modify label",
modifier: func(old, new *v1.Pod) {
old.Labels["label"] = "old"
new.Labels["label"] = "new"
},
podChanged: false,
labelsChanged: true,
}, {
testName: "remove label",
modifier: func(old, new *v1.Pod) {
old.Labels["label"] = "old"
},
podChanged: false,
labelsChanged: true,
},
}
orig := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test",
Name: "pod",
Labels: map[string]string{"foo": "bar"},
},
Status: v1.PodStatus{
Conditions: []v1.PodCondition{
{Type: v1.PodReady, Status: v1.ConditionFalse},
},
},
}
for _, tc := range testCases {
t.Run(tc.testName, func(t *testing.T) {
old := orig.DeepCopy()
new := old.DeepCopy()
tc.modifier(old, new)
podChanged, labelsChanged := podEndpointsChanged(old, new)
if podChanged != tc.podChanged {
t.Errorf("Expected podChanged to be %t, got %t", tc.podChanged, podChanged)
}
if labelsChanged != tc.labelsChanged {
t.Errorf("Expected labelsChanged to be %t, got %t", tc.labelsChanged, labelsChanged)
}
})
}
}