dual stack services (#91824)

* api: structure change

* api: defaulting, conversion, and validation

* [FIX] validation: auto remove second ip/family when service changes to SingleStack

* [FIX] api: defaulting, conversion, and validation

* api-server: clusterIPs alloc, printers, storage and strategy

* [FIX] clusterIPs default on read

* alloc: auto remove second ip/family when service changes to SingleStack

* api-server: repair loop handling for clusterIPs

* api-server: force kubernetes default service into single stack

* api-server: tie dualstack feature flag with endpoint feature flag

* controller-manager: feature flag, endpoint, and endpointSlice controllers handling multi family service

* [FIX] controller-manager: feature flag, endpoint, and endpointSlicecontrollers handling multi family service

* kube-proxy: feature-flag, utils, proxier, and meta proxier

* [FIX] kubeproxy: call both proxier at the same time

* kubenet: remove forced pod IP sorting

* kubectl: modify describe to include ClusterIPs, IPFamilies, and IPFamilyPolicy

* e2e: fix tests that depends on IPFamily field AND add dual stack tests

* e2e: fix expected error message for ClusterIP immutability

* add integration tests for dualstack

the third phase of dual stack is a very complex change in the API,
basically it introduces Dual Stack services. Main changes are:

- It pluralizes the Service IPFamily field to IPFamilies,
and removes the singular field.
- It introduces a new field IPFamilyPolicyType that can take
3 values to express the "dual-stack(mad)ness" of the cluster:
SingleStack, PreferDualStack and RequireDualStack
- It pluralizes ClusterIP to ClusterIPs.

The goal is to add coverage to the services API operations,
taking into account the 6 different modes a cluster can have:

- single stack: IP4 or IPv6 (as of today)
- dual stack: IPv4 only, IPv6 only, IPv4 - IPv6, IPv6 - IPv4

* [FIX] add integration tests for dualstack

* generated data

* generated files

Co-authored-by: Antonio Ojea <aojea@redhat.com>
This commit is contained in:
Khaled Henidak (Kal)
2020-10-26 13:15:59 -07:00
committed by GitHub
parent d0e06cf3e0
commit 6675eba3ef
84 changed files with 11170 additions and 3514 deletions

View File

@@ -27,6 +27,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",

View File

@@ -340,7 +340,6 @@ func TestSyncServiceFull(t *testing.T) {
client, esController := newController([]string{"node-1"}, time.Duration(0))
namespace := metav1.NamespaceDefault
serviceName := "all-the-protocols"
ipv6Family := v1.IPv6Protocol
pod1 := newPod(1, namespace, true, 0)
pod1.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.4"}}
@@ -364,8 +363,8 @@ func TestSyncServiceFull(t *testing.T) {
{Name: "udp-example", TargetPort: intstr.FromInt(161), Protocol: v1.ProtocolUDP},
{Name: "sctp-example", TargetPort: intstr.FromInt(3456), Protocol: v1.ProtocolSCTP},
},
Selector: map[string]string{"foo": "bar"},
IPFamily: &ipv6Family,
Selector: map[string]string{"foo": "bar"},
IPFamilies: []v1.IPFamily{v1.IPv6Protocol},
},
}
esController.serviceStore.Add(service)
@@ -491,8 +490,9 @@ func TestPodAddsBatching(t *testing.T) {
esController.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80}},
Selector: map[string]string{"foo": "bar"},
IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
Ports: []v1.ServicePort{{Port: 80}},
},
})
@@ -624,8 +624,9 @@ func TestPodUpdatesBatching(t *testing.T) {
esController.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80}},
Selector: map[string]string{"foo": "bar"},
IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
Ports: []v1.ServicePort{{Port: 80}},
},
})
@@ -758,8 +759,9 @@ func TestPodDeleteBatching(t *testing.T) {
esController.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80}},
Selector: map[string]string{"foo": "bar"},
IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
Ports: []v1.ServicePort{{Port: 80}},
},
})
@@ -810,8 +812,9 @@ func createService(t *testing.T, esController *endpointSliceController, namespac
UID: types.UID(namespace + "-" + serviceName),
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{{TargetPort: intstr.FromInt(80)}},
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{TargetPort: intstr.FromInt(80)}},
Selector: map[string]string{"foo": "bar"},
IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
},
}
esController.serviceStore.Add(service)

View File

@@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
@@ -56,12 +57,62 @@ type endpointMeta struct {
// slices for the given service. It creates, updates, or deletes endpoint slices
// to ensure the desired set of pods are represented by endpoint slices.
func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time) error {
addressType := discovery.AddressTypeIPv4
slicesToDelete := []*discovery.EndpointSlice{} // slices that are no longer matching any address the service has
errs := []error{} // all errors generated in the process of reconciling
slicesByAddressType := make(map[discovery.AddressType][]*discovery.EndpointSlice) // slices by address type
if endpointutil.IsIPv6Service(service) {
addressType = discovery.AddressTypeIPv6
// addresses that this service supports [o(1) find]
serviceSupportedAddressesTypes := getAddressTypesForService(service)
// loop through slices identifying their address type.
// slices that no longer match address type supported by services
// go to delete, other slices goes to the reconciler machinery
// for further adjustment
for _, existingSlice := range existingSlices {
// service no longer supports that address type, add it to deleted slices
if _, ok := serviceSupportedAddressesTypes[existingSlice.AddressType]; !ok {
slicesToDelete = append(slicesToDelete, existingSlice)
continue
}
// add list if it is not on our map
if _, ok := slicesByAddressType[existingSlice.AddressType]; !ok {
slicesByAddressType[existingSlice.AddressType] = make([]*discovery.EndpointSlice, 0, 1)
}
slicesByAddressType[existingSlice.AddressType] = append(slicesByAddressType[existingSlice.AddressType], existingSlice)
}
// reconcile for existing.
for addressType := range serviceSupportedAddressesTypes {
existingSlices := slicesByAddressType[addressType]
err := r.reconcileByAddressType(service, pods, existingSlices, triggerTime, addressType)
if err != nil {
errs = append(errs, err)
}
}
// delete those which are of addressType that is no longer supported
// by the service
for _, sliceToDelete := range slicesToDelete {
err := r.client.DiscoveryV1beta1().EndpointSlices(service.Namespace).Delete(context.TODO(), sliceToDelete.Name, metav1.DeleteOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("Error deleting %s EndpointSlice for Service %s/%s: %v", sliceToDelete.Name, service.Namespace, service.Name, err))
} else {
r.endpointSliceTracker.Delete(sliceToDelete)
metrics.EndpointSliceChanges.WithLabelValues("delete").Inc()
}
}
return utilerrors.NewAggregate(errs)
}
// reconcileByAddressType takes a set of pods currently matching a service selector and
// compares them with the endpoints already present in any existing endpoint
// slices (by address type) for the given service. It creates, updates, or deletes endpoint slices
// to ensure the desired set of pods are represented by endpoint slices.
func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time, addressType discovery.AddressType) error {
slicesToCreate := []*discovery.EndpointSlice{}
slicesToUpdate := []*discovery.EndpointSlice{}
slicesToDelete := []*discovery.EndpointSlice{}
@@ -70,7 +121,7 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis
existingSlicesByPortMap := map[endpointutil.PortMapKey][]*discovery.EndpointSlice{}
numExistingEndpoints := 0
for _, existingSlice := range existingSlices {
if existingSlice.AddressType == addressType && ownedBy(existingSlice, service) {
if ownedBy(existingSlice, service) {
epHash := endpointutil.NewPortMapKey(existingSlice.Ports)
existingSlicesByPortMap[epHash] = append(existingSlicesByPortMap[epHash], existingSlice)
numExistingEndpoints += len(existingSlice.Endpoints)
@@ -106,7 +157,7 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis
if err != nil {
return err
}
endpoint := podToEndpoint(pod, node, service)
endpoint := podToEndpoint(pod, node, service, addressType)
if len(endpoint.Addresses) > 0 {
desiredEndpointsByPortMap[epHash].Insert(&endpoint)
numDesiredEndpoints++

View File

@@ -70,7 +70,10 @@ func TestReconcileEmpty(t *testing.T) {
// a slice should be created
func TestReconcile1Pod(t *testing.T) {
namespace := "test"
ipv6Family := corev1.IPv6Protocol
noFamilyService, _ := newServiceAndEndpointMeta("foo", namespace)
noFamilyService.Spec.ClusterIP = "10.0.0.10"
noFamilyService.Spec.IPFamilies = nil
svcv4, _ := newServiceAndEndpointMeta("foo", namespace)
svcv4ClusterIP, _ := newServiceAndEndpointMeta("foo", namespace)
svcv4ClusterIP.Spec.ClusterIP = "1.1.1.1"
@@ -80,9 +83,17 @@ func TestReconcile1Pod(t *testing.T) {
svcv4BadLabels.Labels = map[string]string{discovery.LabelServiceName: "bad",
discovery.LabelManagedBy: "actor", corev1.IsHeadlessService: "invalid"}
svcv6, _ := newServiceAndEndpointMeta("foo", namespace)
svcv6.Spec.IPFamily = &ipv6Family
svcv6.Spec.IPFamilies = []corev1.IPFamily{corev1.IPv6Protocol}
svcv6ClusterIP, _ := newServiceAndEndpointMeta("foo", namespace)
svcv6ClusterIP.Spec.ClusterIP = "1234::5678:0000:0000:9abc:def1"
// newServiceAndEndpointMeta generates v4 single stack
svcv6ClusterIP.Spec.IPFamilies = []corev1.IPFamily{corev1.IPv6Protocol}
// dual stack
dualStackSvc, _ := newServiceAndEndpointMeta("foo", namespace)
dualStackSvc.Spec.IPFamilies = []corev1.IPFamily{corev1.IPv4Protocol, corev1.IPv6Protocol}
dualStackSvc.Spec.ClusterIP = "10.0.0.10"
dualStackSvc.Spec.ClusterIPs = []string{"10.0.0.10", "2000::1"}
pod1 := newPod(1, namespace, true, 1)
pod1.Status.PodIPs = []corev1.PodIP{{IP: "1.2.3.4"}, {IP: "1234::5678:0000:0000:9abc:def0"}}
@@ -98,26 +109,56 @@ func TestReconcile1Pod(t *testing.T) {
}
testCases := map[string]struct {
service corev1.Service
expectedAddressType discovery.AddressType
expectedEndpoint discovery.Endpoint
expectedLabels map[string]string
service corev1.Service
expectedAddressType discovery.AddressType
expectedEndpoint discovery.Endpoint
expectedLabels map[string]string
expectedEndpointPerSlice map[discovery.AddressType][]discovery.Endpoint
}{
"ipv4": {
service: svcv4,
expectedAddressType: discovery.AddressTypeIPv4,
expectedEndpoint: discovery.Endpoint{
Addresses: []string{"1.2.3.4"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{
"kubernetes.io/hostname": "node-1",
"topology.kubernetes.io/zone": "us-central1-a",
"topology.kubernetes.io/region": "us-central1",
"no-family-service": {
service: noFamilyService,
expectedEndpointPerSlice: map[discovery.AddressType][]discovery.Endpoint{
discovery.AddressTypeIPv4: {
{
Addresses: []string{"1.2.3.4"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{
"kubernetes.io/hostname": "node-1",
"topology.kubernetes.io/zone": "us-central1-a",
"topology.kubernetes.io/region": "us-central1",
},
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Namespace: namespace,
Name: "pod1",
},
},
},
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Namespace: namespace,
Name: "pod1",
},
expectedLabels: map[string]string{
discovery.LabelManagedBy: controllerName,
discovery.LabelServiceName: "foo",
},
},
"ipv4": {
service: svcv4,
expectedEndpointPerSlice: map[discovery.AddressType][]discovery.Endpoint{
discovery.AddressTypeIPv4: {
{
Addresses: []string{"1.2.3.4"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{
"kubernetes.io/hostname": "node-1",
"topology.kubernetes.io/zone": "us-central1-a",
"topology.kubernetes.io/region": "us-central1",
},
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Namespace: namespace,
Name: "pod1",
},
},
},
},
expectedLabels: map[string]string{
@@ -127,7 +168,25 @@ func TestReconcile1Pod(t *testing.T) {
},
},
"ipv4-clusterip": {
service: svcv4ClusterIP,
service: svcv4ClusterIP,
expectedEndpointPerSlice: map[discovery.AddressType][]discovery.Endpoint{
discovery.AddressTypeIPv4: {
{
Addresses: []string{"1.2.3.4"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{
"kubernetes.io/hostname": "node-1",
"topology.kubernetes.io/zone": "us-central1-a",
"topology.kubernetes.io/region": "us-central1",
},
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Namespace: namespace,
Name: "pod1",
},
},
},
},
expectedAddressType: discovery.AddressTypeIPv4,
expectedEndpoint: discovery.Endpoint{
Addresses: []string{"1.2.3.4"},
@@ -149,7 +208,25 @@ func TestReconcile1Pod(t *testing.T) {
},
},
"ipv4-labels": {
service: svcv4Labels,
service: svcv4Labels,
expectedEndpointPerSlice: map[discovery.AddressType][]discovery.Endpoint{
discovery.AddressTypeIPv4: {
{
Addresses: []string{"1.2.3.4"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{
"kubernetes.io/hostname": "node-1",
"topology.kubernetes.io/zone": "us-central1-a",
"topology.kubernetes.io/region": "us-central1",
},
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Namespace: namespace,
Name: "pod1",
},
},
},
},
expectedAddressType: discovery.AddressTypeIPv4,
expectedEndpoint: discovery.Endpoint{
Addresses: []string{"1.2.3.4"},
@@ -173,7 +250,25 @@ func TestReconcile1Pod(t *testing.T) {
},
},
"ipv4-bad-labels": {
service: svcv4BadLabels,
service: svcv4BadLabels,
expectedEndpointPerSlice: map[discovery.AddressType][]discovery.Endpoint{
discovery.AddressTypeIPv4: {
{
Addresses: []string{"1.2.3.4"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{
"kubernetes.io/hostname": "node-1",
"topology.kubernetes.io/zone": "us-central1-a",
"topology.kubernetes.io/region": "us-central1",
},
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Namespace: namespace,
Name: "pod1",
},
},
},
},
expectedAddressType: discovery.AddressTypeIPv4,
expectedEndpoint: discovery.Endpoint{
Addresses: []string{"1.2.3.4"},
@@ -195,21 +290,25 @@ func TestReconcile1Pod(t *testing.T) {
corev1.IsHeadlessService: "",
},
},
"ipv6": {
service: svcv6,
expectedAddressType: discovery.AddressTypeIPv6,
expectedEndpoint: discovery.Endpoint{
Addresses: []string{"1234::5678:0000:0000:9abc:def0"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{
"kubernetes.io/hostname": "node-1",
"topology.kubernetes.io/zone": "us-central1-a",
"topology.kubernetes.io/region": "us-central1",
},
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Namespace: namespace,
Name: "pod1",
service: svcv6,
expectedEndpointPerSlice: map[discovery.AddressType][]discovery.Endpoint{
discovery.AddressTypeIPv6: {
{
Addresses: []string{"1234::5678:0000:0000:9abc:def0"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{
"kubernetes.io/hostname": "node-1",
"topology.kubernetes.io/zone": "us-central1-a",
"topology.kubernetes.io/region": "us-central1",
},
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Namespace: namespace,
Name: "pod1",
},
},
},
},
expectedLabels: map[string]string{
@@ -218,21 +317,67 @@ func TestReconcile1Pod(t *testing.T) {
corev1.IsHeadlessService: "",
},
},
"ipv6-clusterip": {
service: svcv6ClusterIP,
expectedAddressType: discovery.AddressTypeIPv6,
expectedEndpoint: discovery.Endpoint{
Addresses: []string{"1234::5678:0000:0000:9abc:def0"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{
"kubernetes.io/hostname": "node-1",
"topology.kubernetes.io/zone": "us-central1-a",
"topology.kubernetes.io/region": "us-central1",
service: svcv6ClusterIP,
expectedEndpointPerSlice: map[discovery.AddressType][]discovery.Endpoint{
discovery.AddressTypeIPv6: {
{
Addresses: []string{"1234::5678:0000:0000:9abc:def0"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{
"kubernetes.io/hostname": "node-1",
"topology.kubernetes.io/zone": "us-central1-a",
"topology.kubernetes.io/region": "us-central1",
},
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Namespace: namespace,
Name: "pod1",
},
},
},
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Namespace: namespace,
Name: "pod1",
},
expectedLabels: map[string]string{
discovery.LabelManagedBy: controllerName,
discovery.LabelServiceName: "foo",
},
},
"dualstack-service": {
service: dualStackSvc,
expectedEndpointPerSlice: map[discovery.AddressType][]discovery.Endpoint{
discovery.AddressTypeIPv6: {
{
Addresses: []string{"1234::5678:0000:0000:9abc:def0"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{
"kubernetes.io/hostname": "node-1",
"topology.kubernetes.io/zone": "us-central1-a",
"topology.kubernetes.io/region": "us-central1",
},
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Namespace: namespace,
Name: "pod1",
},
},
},
discovery.AddressTypeIPv4: {
{
Addresses: []string{"1.2.3.4"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{
"kubernetes.io/hostname": "node-1",
"topology.kubernetes.io/zone": "us-central1-a",
"topology.kubernetes.io/region": "us-central1",
},
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Namespace: namespace,
Name: "pod1",
},
},
},
},
expectedLabels: map[string]string{
@@ -250,41 +395,61 @@ func TestReconcile1Pod(t *testing.T) {
r := newReconciler(client, []*corev1.Node{node1}, defaultMaxEndpointsPerSlice)
reconcileHelper(t, r, &testCase.service, []*corev1.Pod{pod1}, []*discovery.EndpointSlice{}, triggerTime)
if len(client.Actions()) != 1 {
t.Errorf("Expected 1 clientset action, got %d", len(client.Actions()))
if len(client.Actions()) != len(testCase.expectedEndpointPerSlice) {
t.Errorf("Expected %v clientset action, got %d", len(testCase.expectedEndpointPerSlice), len(client.Actions()))
}
slices := fetchEndpointSlices(t, client, namespace)
if len(slices) != 1 {
t.Fatalf("Expected 1 EndpointSlice, got %d", len(slices))
if len(slices) != len(testCase.expectedEndpointPerSlice) {
t.Fatalf("Expected %v EndpointSlice, got %d", len(testCase.expectedEndpointPerSlice), len(slices))
}
slice := slices[0]
if !strings.HasPrefix(slice.Name, testCase.service.Name) {
t.Errorf("Expected EndpointSlice name to start with %s, got %s", testCase.service.Name, slice.Name)
for _, slice := range slices {
if !strings.HasPrefix(slice.Name, testCase.service.Name) {
t.Fatalf("Expected EndpointSlice name to start with %s, got %s", testCase.service.Name, slice.Name)
}
if !reflect.DeepEqual(testCase.expectedLabels, slice.Labels) {
t.Errorf("Expected EndpointSlice to have labels: %v , got %v", testCase.expectedLabels, slice.Labels)
}
if slice.Labels[discovery.LabelServiceName] != testCase.service.Name {
t.Fatalf("Expected EndpointSlice to have label set with %s value, got %s", testCase.service.Name, slice.Labels[discovery.LabelServiceName])
}
if slice.Annotations[corev1.EndpointsLastChangeTriggerTime] != triggerTime.Format(time.RFC3339Nano) {
t.Fatalf("Expected EndpointSlice trigger time annotation to be %s, got %s", triggerTime.Format(time.RFC3339Nano), slice.Annotations[corev1.EndpointsLastChangeTriggerTime])
}
// validate that this slice has address type matching expected
expectedEndPointList := testCase.expectedEndpointPerSlice[slice.AddressType]
if expectedEndPointList == nil {
t.Fatalf("address type %v is not expected", slice.AddressType)
}
if len(slice.Endpoints) != len(expectedEndPointList) {
t.Fatalf("Expected %v Endpoint, got %d", len(expectedEndPointList), len(slice.Endpoints))
}
// test is limited to *ONE* endpoint
endpoint := slice.Endpoints[0]
if !reflect.DeepEqual(endpoint, expectedEndPointList[0]) {
t.Fatalf("Expected endpoint: %+v, got: %+v", expectedEndPointList[0], endpoint)
}
expectTrackedResourceVersion(t, r.endpointSliceTracker, &slice, "100")
expectMetrics(t,
expectedMetrics{
desiredSlices: 1,
actualSlices: 1,
desiredEndpoints: 1,
addedPerSync: len(testCase.expectedEndpointPerSlice),
removedPerSync: 0,
numCreated: len(testCase.expectedEndpointPerSlice),
numUpdated: 0,
numDeleted: 0})
}
if !reflect.DeepEqual(testCase.expectedLabels, slice.Labels) {
t.Errorf("Expected EndpointSlice to have labels: %v , got %v", testCase.expectedLabels, slice.Labels)
}
if slice.Annotations[corev1.EndpointsLastChangeTriggerTime] != triggerTime.Format(time.RFC3339Nano) {
t.Errorf("Expected EndpointSlice trigger time annotation to be %s, got %s", triggerTime.Format(time.RFC3339Nano), slice.Annotations[corev1.EndpointsLastChangeTriggerTime])
}
if len(slice.Endpoints) != 1 {
t.Fatalf("Expected 1 Endpoint, got %d", len(slice.Endpoints))
}
endpoint := slice.Endpoints[0]
if !reflect.DeepEqual(endpoint, testCase.expectedEndpoint) {
t.Errorf("Expected endpoint: %+v, got: %+v", testCase.expectedEndpoint, endpoint)
}
expectTrackedResourceVersion(t, r.endpointSliceTracker, &slice, "100")
expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 1, addedPerSync: 1, removedPerSync: 0, numCreated: 1, numUpdated: 0, numDeleted: 0})
})
}
}
@@ -404,13 +569,13 @@ func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) {
// have approximately 1/4 in first slice
endpointSlice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
for i := 1; i < len(pods)-4; i += 4 {
endpointSlice1.Endpoints = append(endpointSlice1.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, &svc))
endpointSlice1.Endpoints = append(endpointSlice1.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, &svc, discovery.AddressTypeIPv4))
}
// have approximately 1/4 in second slice
endpointSlice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc)
for i := 3; i < len(pods)-4; i += 4 {
endpointSlice2.Endpoints = append(endpointSlice2.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, &svc))
endpointSlice2.Endpoints = append(endpointSlice2.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, &svc, discovery.AddressTypeIPv4))
}
existingSlices := []*discovery.EndpointSlice{endpointSlice1, endpointSlice2}
@@ -460,13 +625,13 @@ func TestReconcileEndpointSlicesSomePreexistingWorseAllocation(t *testing.T) {
// have approximately 1/4 in first slice
endpointSlice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
for i := 1; i < len(pods)-4; i += 4 {
endpointSlice1.Endpoints = append(endpointSlice1.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, &svc))
endpointSlice1.Endpoints = append(endpointSlice1.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, &svc, discovery.AddressTypeIPv4))
}
// have approximately 1/4 in second slice
endpointSlice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc)
for i := 3; i < len(pods)-4; i += 4 {
endpointSlice2.Endpoints = append(endpointSlice2.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, &svc))
endpointSlice2.Endpoints = append(endpointSlice2.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, &svc, discovery.AddressTypeIPv4))
}
existingSlices := []*discovery.EndpointSlice{endpointSlice1, endpointSlice2}
@@ -621,7 +786,7 @@ func TestReconcileEndpointSlicesRecycling(t *testing.T) {
if i%30 == 0 {
existingSlices = append(existingSlices, newEmptyEndpointSlice(sliceNum, namespace, endpointMeta, svc))
}
existingSlices[sliceNum].Endpoints = append(existingSlices[sliceNum].Endpoints, podToEndpoint(pod, &corev1.Node{}, &svc))
existingSlices[sliceNum].Endpoints = append(existingSlices[sliceNum].Endpoints, podToEndpoint(pod, &corev1.Node{}, &svc, discovery.AddressTypeIPv4))
}
cmc := newCacheMutationCheck(existingSlices)
@@ -663,7 +828,7 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) {
slice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
for i := 0; i < 80; i++ {
pod := newPod(i, namespace, true, 1)
slice1.Endpoints = append(slice1.Endpoints, podToEndpoint(pod, &corev1.Node{}, &svc))
slice1.Endpoints = append(slice1.Endpoints, podToEndpoint(pod, &corev1.Node{}, &svc, discovery.AddressTypeIPv4))
pods = append(pods, pod)
}
existingSlices = append(existingSlices, slice1)
@@ -671,7 +836,7 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) {
slice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc)
for i := 100; i < 120; i++ {
pod := newPod(i, namespace, true, 1)
slice2.Endpoints = append(slice2.Endpoints, podToEndpoint(pod, &corev1.Node{}, &svc))
slice2.Endpoints = append(slice2.Endpoints, podToEndpoint(pod, &corev1.Node{}, &svc, discovery.AddressTypeIPv4))
pods = append(pods, pod)
}
existingSlices = append(existingSlices, slice2)
@@ -724,7 +889,7 @@ func TestReconcileEndpointSlicesReplaceDeprecated(t *testing.T) {
slice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
for i := 0; i < 80; i++ {
pod := newPod(i, namespace, true, 1)
slice1.Endpoints = append(slice1.Endpoints, podToEndpoint(pod, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}}))
slice1.Endpoints = append(slice1.Endpoints, podToEndpoint(pod, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}}, discovery.AddressTypeIPv4))
pods = append(pods, pod)
}
existingSlices = append(existingSlices, slice1)
@@ -732,7 +897,7 @@ func TestReconcileEndpointSlicesReplaceDeprecated(t *testing.T) {
slice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc)
for i := 100; i < 150; i++ {
pod := newPod(i, namespace, true, 1)
slice2.Endpoints = append(slice2.Endpoints, podToEndpoint(pod, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}}))
slice2.Endpoints = append(slice2.Endpoints, podToEndpoint(pod, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}}, discovery.AddressTypeIPv4))
pods = append(pods, pod)
}
existingSlices = append(existingSlices, slice2)
@@ -791,7 +956,7 @@ func TestReconcileEndpointSlicesRecreation(t *testing.T) {
slice := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
pod := newPod(1, namespace, true, 1)
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pod, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}}))
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pod, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}}, discovery.AddressTypeIPv4))
if !tc.ownedByService {
slice.OwnerReferences[0].UID = "different"
@@ -848,7 +1013,8 @@ func TestReconcileEndpointSlicesNamedPorts(t *testing.T) {
TargetPort: portNameIntStr,
Protocol: corev1.ProtocolTCP,
}},
Selector: map[string]string{"foo": "bar"},
Selector: map[string]string{"foo": "bar"},
IPFamilies: []corev1.IPFamily{corev1.IPv4Protocol},
},
}
@@ -1221,6 +1387,11 @@ func expectUnorderedSlicesWithTopLevelAttrs(t *testing.T, endpointSlices []disco
func expectActions(t *testing.T, actions []k8stesting.Action, num int, verb, resource string) {
t.Helper()
// if actions are less the below logic will panic
if num > len(actions) {
t.Fatalf("len of actions %v is unexpected. Expected to be at least %v", len(actions), num+1)
}
for i := 0; i < num; i++ {
relativePos := len(actions) - i - 1
assert.Equal(t, verb, actions[relativePos].GetVerb(), "Expected action -%d verb to be %s", i, verb)

View File

@@ -37,8 +37,8 @@ import (
utilnet "k8s.io/utils/net"
)
// podToEndpoint returns an Endpoint object generated from pod, node, and service.
func podToEndpoint(pod *corev1.Pod, node *corev1.Node, service *corev1.Service) discovery.Endpoint {
// podToEndpoint returns an Endpoint object generated from a Pod, a Node, and a Service for a particular addressType.
func podToEndpoint(pod *corev1.Pod, node *corev1.Node, service *corev1.Service, addressType discovery.AddressType) discovery.Endpoint {
// Build out topology information. This is currently limited to hostname,
// zone, and region, but this will be expanded in the future.
topology := map[string]string{}
@@ -62,7 +62,7 @@ func podToEndpoint(pod *corev1.Pod, node *corev1.Node, service *corev1.Service)
ready := service.Spec.PublishNotReadyAddresses || podutil.IsPodReady(pod)
ep := discovery.Endpoint{
Addresses: getEndpointAddresses(pod.Status, service),
Addresses: getEndpointAddresses(pod.Status, service, addressType),
Conditions: discovery.EndpointConditions{
Ready: &ready,
},
@@ -117,12 +117,16 @@ func getEndpointPorts(service *corev1.Service, pod *corev1.Pod) []discovery.Endp
}
// getEndpointAddresses returns a list of addresses generated from a pod status.
func getEndpointAddresses(podStatus corev1.PodStatus, service *corev1.Service) []string {
func getEndpointAddresses(podStatus corev1.PodStatus, service *corev1.Service, addressType discovery.AddressType) []string {
addresses := []string{}
for _, podIP := range podStatus.PodIPs {
isIPv6PodIP := utilnet.IsIPv6String(podIP.IP)
if isIPv6PodIP == endpointutil.IsIPv6Service(service) {
if isIPv6PodIP && addressType == discovery.AddressTypeIPv6 {
addresses = append(addresses, podIP.IP)
}
if !isIPv6PodIP && addressType == discovery.AddressTypeIPv4 {
addresses = append(addresses, podIP.IP)
}
}
@@ -346,3 +350,62 @@ func (sl endpointSliceEndpointLen) Swap(i, j int) { sl[i], sl[j] = sl[j], sl[i]
func (sl endpointSliceEndpointLen) Less(i, j int) bool {
return len(sl[i].Endpoints) > len(sl[j].Endpoints)
}
// returns a map of address types used by a service
func getAddressTypesForService(service *corev1.Service) map[discovery.AddressType]struct{} {
serviceSupportedAddresses := make(map[discovery.AddressType]struct{})
// TODO: (khenidak) when address types are removed in favor of
// v1.IPFamily this will need to be removed, and work directly with
// v1.IPFamily types
// IMPORTANT: we assume that IP of (discovery.AddressType enum) is never in use
// as it gets deprecated
for _, family := range service.Spec.IPFamilies {
if family == corev1.IPv4Protocol {
serviceSupportedAddresses[discovery.AddressTypeIPv4] = struct{}{}
}
if family == corev1.IPv6Protocol {
serviceSupportedAddresses[discovery.AddressTypeIPv6] = struct{}{}
}
}
if len(serviceSupportedAddresses) > 0 {
return serviceSupportedAddresses // we have found families for this service
}
// TODO (khenidak) remove when (1) dual stack becomes
// enabled by default (2) v1.19 falls off supported versions
// Why do we need this:
// a cluster being upgraded to the new apis
// will have service.spec.IPFamilies: nil
// if the controller manager connected to old api
// server. This will have the nasty side effect of
// removing all slices already created for this service.
// this will disable all routing to service vip (ClusterIP)
// this ensures that this does not happen. Same for headless services
// we assume it is dual stack, until they get defaulted by *new* api-server
// this ensures that traffic is not disrupted until then. But *may*
// include undesired families for headless services until then.
if len(service.Spec.ClusterIP) > 0 && service.Spec.ClusterIP != corev1.ClusterIPNone { // headfull
addrType := discovery.AddressTypeIPv4
if utilnet.IsIPv6String(service.Spec.ClusterIP) {
addrType = discovery.AddressTypeIPv6
}
serviceSupportedAddresses[addrType] = struct{}{}
klog.V(2).Infof("couldn't find ipfamilies for headless service: %v/%v. This could happen if controller manager is connected to an old apiserver that does not support ip families yet. EndpointSlices for this Service will use %s as the IP Family based on familyOf(ClusterIP:%v).", service.Namespace, service.Name, addrType, service.Spec.ClusterIP)
return serviceSupportedAddresses
}
// headless
// for now we assume two families. This should have minimal side effect
// if the service is headless with no selector, then this will remain the case
// if the service is headless with selector then chances are pods are still using single family
// since kubelet will need to restart in order to start patching pod status with multiple ips
serviceSupportedAddresses[discovery.AddressTypeIPv4] = struct{}{}
serviceSupportedAddresses[discovery.AddressTypeIPv6] = struct{}{}
klog.V(2).Infof("couldn't find ipfamilies for headless service: %v/%v likely because controller manager is likely connected to an old apiserver that does not support ip families yet. The service endpoint slice will use dual stack families until api-server default it correctly", service.Namespace, service.Name)
return serviceSupportedAddresses
}

View File

@@ -385,7 +385,7 @@ func TestPodToEndpoint(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
endpoint := podToEndpoint(testCase.pod, testCase.node, testCase.svc)
endpoint := podToEndpoint(testCase.pod, testCase.node, testCase.svc, discovery.AddressTypeIPv4)
if !reflect.DeepEqual(testCase.expectedEndpoint, endpoint) {
t.Errorf("Expected endpoint: %v, got: %v", testCase.expectedEndpoint, endpoint)
}
@@ -889,7 +889,8 @@ func newServiceAndEndpointMeta(name, namespace string) (v1.Service, endpointMeta
Protocol: v1.ProtocolTCP,
Name: name,
}},
Selector: map[string]string{"foo": "bar"},
Selector: map[string]string{"foo": "bar"},
IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
},
}
@@ -918,3 +919,135 @@ func newEmptyEndpointSlice(n int, namespace string, endpointMeta endpointMeta, s
Endpoints: []discovery.Endpoint{},
}
}
func TestSupportedServiceAddressType(t *testing.T) {
testCases := []struct {
name string
service v1.Service
expectedAddressTypes []discovery.AddressType
}{
{
name: "v4 service with no ip families (cluster upgrade)",
expectedAddressTypes: []discovery.AddressType{discovery.AddressTypeIPv4},
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: "10.0.0.10",
IPFamilies: nil,
},
},
},
{
name: "v6 service with no ip families (cluster upgrade)",
expectedAddressTypes: []discovery.AddressType{discovery.AddressTypeIPv6},
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: "2000::1",
IPFamilies: nil,
},
},
},
{
name: "v4 service",
expectedAddressTypes: []discovery.AddressType{discovery.AddressTypeIPv4},
service: v1.Service{
Spec: v1.ServiceSpec{
IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
},
},
},
{
name: "v6 services",
expectedAddressTypes: []discovery.AddressType{discovery.AddressTypeIPv6},
service: v1.Service{
Spec: v1.ServiceSpec{
IPFamilies: []v1.IPFamily{v1.IPv6Protocol},
},
},
},
{
name: "v4,v6 service",
expectedAddressTypes: []discovery.AddressType{discovery.AddressTypeIPv4, discovery.AddressTypeIPv6},
service: v1.Service{
Spec: v1.ServiceSpec{
IPFamilies: []v1.IPFamily{v1.IPv4Protocol, v1.IPv6Protocol},
},
},
},
{
name: "v6,v4 service",
expectedAddressTypes: []discovery.AddressType{discovery.AddressTypeIPv6, discovery.AddressTypeIPv4},
service: v1.Service{
Spec: v1.ServiceSpec{
IPFamilies: []v1.IPFamily{v1.IPv6Protocol, v1.IPv4Protocol},
},
},
},
{
name: "headless with no selector and no families (old api-server)",
expectedAddressTypes: []discovery.AddressType{discovery.AddressTypeIPv6, discovery.AddressTypeIPv4},
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: v1.ClusterIPNone,
IPFamilies: nil,
},
},
},
{
name: "headless with selector and no families (old api-server)",
expectedAddressTypes: []discovery.AddressType{discovery.AddressTypeIPv6, discovery.AddressTypeIPv4},
service: v1.Service{
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
ClusterIP: v1.ClusterIPNone,
IPFamilies: nil,
},
},
},
{
name: "headless with no selector with families",
expectedAddressTypes: []discovery.AddressType{discovery.AddressTypeIPv4, discovery.AddressTypeIPv6},
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: v1.ClusterIPNone,
IPFamilies: []v1.IPFamily{v1.IPv4Protocol, v1.IPv6Protocol},
},
},
},
{
name: "headless with selector with families",
expectedAddressTypes: []discovery.AddressType{discovery.AddressTypeIPv4, discovery.AddressTypeIPv6},
service: v1.Service{
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
ClusterIP: v1.ClusterIPNone,
IPFamilies: []v1.IPFamily{v1.IPv4Protocol, v1.IPv6Protocol},
},
},
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
addressTypes := getAddressTypesForService(&testCase.service)
if len(addressTypes) != len(testCase.expectedAddressTypes) {
t.Fatalf("expected count address types %v got %v", len(testCase.expectedAddressTypes), len(addressTypes))
}
// compare
for _, expectedAddressType := range testCase.expectedAddressTypes {
found := false
for key := range addressTypes {
if key == expectedAddressType {
found = true
break
}
}
if !found {
t.Fatalf("expected address type %v was not found in the result", expectedAddressType)
}
}
})
}
}