From c421e22e2c6aa4d3fbe3abd7feb0035361539d22 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Sat, 16 Jan 2021 19:30:48 +0100 Subject: [PATCH] slice mirroring controller mirror annotations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add support to the endpoint slice mirroring controller to mirror annotations, in addition to labels, but don“t mirror endpoint triggertime annotation. Also, fix a bug in the endpointslice mirroring controller, that wasn't updating the mirrored slice with the new labels, in case that only the endpoint labels were modified. --- .../endpointslicemirroring/reconciler.go | 10 +- .../endpointslicemirroring/reconciler_test.go | 99 ++++++++- .../endpointslicemirroring/utils.go | 30 +++ .../endpointslicemirroring/utils_test.go | 154 ++++++++++++-- .../endpointslicemirroring_test.go | 194 +++++++++++++++++- 5 files changed, 462 insertions(+), 25 deletions(-) diff --git a/pkg/controller/endpointslicemirroring/reconciler.go b/pkg/controller/endpointslicemirroring/reconciler.go index a0090c94b61..168e2107230 100644 --- a/pkg/controller/endpointslicemirroring/reconciler.go +++ b/pkg/controller/endpointslicemirroring/reconciler.go @@ -22,6 +22,7 @@ import ( corev1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -196,9 +197,14 @@ func (r *reconciler) reconcileByPortMapping( // if >0 existing slices, mark all but 1 for deletion. slices.toDelete = existingSlices[1:] - // Return early if first slice matches desired endpoints. + // generated slices must mirror all endpoints annotations but EndpointsLastChangeTriggerTime + compareAnnotations := cloneAndRemoveKeys(endpoints.Annotations, corev1.EndpointsLastChangeTriggerTime) + compareLabels := cloneAndRemoveKeys(existingSlices[0].Labels, discovery.LabelManagedBy, discovery.LabelServiceName) + // Return early if first slice matches desired endpoints, labels and annotations totals = totalChanges(existingSlices[0], desiredSet) - if totals.added == 0 && totals.updated == 0 && totals.removed == 0 { + if totals.added == 0 && totals.updated == 0 && totals.removed == 0 && + apiequality.Semantic.DeepEqual(endpoints.Labels, compareLabels) && + apiequality.Semantic.DeepEqual(compareAnnotations, existingSlices[0].Annotations) { return slices, totals } } diff --git a/pkg/controller/endpointslicemirroring/reconciler_test.go b/pkg/controller/endpointslicemirroring/reconciler_test.go index bf1a193eac1..9b8b2d856be 100644 --- a/pkg/controller/endpointslicemirroring/reconciler_test.go +++ b/pkg/controller/endpointslicemirroring/reconciler_test.go @@ -43,6 +43,7 @@ func TestReconcile(t *testing.T) { testCases := []struct { testName string subsets []corev1.EndpointSubset + epLabels map[string]string endpointsDeletionPending bool maxEndpointsPerSubset int32 existingEndpointSlices []*discovery.EndpointSlice @@ -105,6 +106,102 @@ func TestReconcile(t *testing.T) { existingEndpointSlices: []*discovery.EndpointSlice{}, expectedNumSlices: 0, expectedClientActions: 0, + }, { + testName: "Endpoints with 1 subset, port, and address and existing slice with same fields", + subsets: []corev1.EndpointSubset{{ + Ports: []corev1.EndpointPort{{ + Name: "http", + Port: 80, + Protocol: corev1.ProtocolTCP, + }}, + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.0.1", + Hostname: "pod-1", + }}, + }}, + existingEndpointSlices: []*discovery.EndpointSlice{{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-ep-1", + }, + AddressType: discovery.AddressTypeIPv4, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr("http"), + Port: utilpointer.Int32Ptr(80), + Protocol: &protoTCP, + }}, + Endpoints: []discovery.Endpoint{{ + Addresses: []string{"10.0.0.1"}, + Hostname: utilpointer.StringPtr("pod-1"), + Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)}, + }}, + }}, + expectedNumSlices: 1, + expectedClientActions: 0, + }, { + testName: "Endpoints with 1 subset, port, and address and existing slice with an additional annotation", + subsets: []corev1.EndpointSubset{{ + Ports: []corev1.EndpointPort{{ + Name: "http", + Port: 80, + Protocol: corev1.ProtocolTCP, + }}, + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.0.1", + Hostname: "pod-1", + }}, + }}, + existingEndpointSlices: []*discovery.EndpointSlice{{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-ep-1", + Annotations: map[string]string{"foo": "bar"}, + }, + AddressType: discovery.AddressTypeIPv4, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr("http"), + Port: utilpointer.Int32Ptr(80), + Protocol: &protoTCP, + }}, + Endpoints: []discovery.Endpoint{{ + Addresses: []string{"10.0.0.1"}, + Hostname: utilpointer.StringPtr("pod-1"), + Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)}, + }}, + }}, + expectedNumSlices: 1, + expectedClientActions: 1, + }, { + testName: "Endpoints with 1 subset, port, label and address and existing slice with same fields but the label", + subsets: []corev1.EndpointSubset{{ + Ports: []corev1.EndpointPort{{ + Name: "http", + Port: 80, + Protocol: corev1.ProtocolTCP, + }}, + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.0.1", + Hostname: "pod-1", + }}, + }}, + epLabels: map[string]string{"foo": "bar"}, + existingEndpointSlices: []*discovery.EndpointSlice{{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-ep-1", + Annotations: map[string]string{"foo": "bar"}, + }, + AddressType: discovery.AddressTypeIPv4, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr("http"), + Port: utilpointer.Int32Ptr(80), + Protocol: &protoTCP, + }}, + Endpoints: []discovery.Endpoint{{ + Addresses: []string{"10.0.0.1"}, + Hostname: utilpointer.StringPtr("pod-1"), + Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)}, + }}, + }}, + expectedNumSlices: 1, + expectedClientActions: 1, }, { testName: "Endpoints with 1 subset, 2 ports, and 2 addresses", subsets: []corev1.EndpointSubset{{ @@ -641,7 +738,7 @@ func TestReconcile(t *testing.T) { setupMetrics() namespace := "test" endpoints := corev1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Name: "test-ep", Namespace: namespace}, + ObjectMeta: metav1.ObjectMeta{Name: "test-ep", Namespace: namespace, Labels: tc.epLabels}, Subsets: tc.subsets, } diff --git a/pkg/controller/endpointslicemirroring/utils.go b/pkg/controller/endpointslicemirroring/utils.go index 9f690d0f27d..9910b314758 100644 --- a/pkg/controller/endpointslicemirroring/utils.go +++ b/pkg/controller/endpointslicemirroring/utils.go @@ -69,6 +69,7 @@ func newEndpointSlice(endpoints *corev1.Endpoints, ports []discovery.EndpointPor epSlice := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{}, + Annotations: map[string]string{}, OwnerReferences: []metav1.OwnerReference{*ownerRef}, Namespace: endpoints.Namespace, }, @@ -77,13 +78,23 @@ func newEndpointSlice(endpoints *corev1.Endpoints, ports []discovery.EndpointPor Endpoints: []discovery.Endpoint{}, } + // clone all labels for label, val := range endpoints.Labels { epSlice.Labels[label] = val } + // overwrite specific labels epSlice.Labels[discovery.LabelServiceName] = endpoints.Name epSlice.Labels[discovery.LabelManagedBy] = controllerName + // clone all annotations but EndpointsLastChangeTriggerTime + for annotation, val := range endpoints.Annotations { + if annotation == corev1.EndpointsLastChangeTriggerTime { + continue + } + epSlice.Annotations[annotation] = val + } + if sliceName == "" { epSlice.GenerateName = getEndpointSlicePrefix(endpoints.Name) } else { @@ -228,3 +239,22 @@ func hasLeaderElection(annotations map[string]string) bool { _, ok := annotations[resourcelock.LeaderElectionRecordAnnotationKey] return ok } + +// cloneAndRemoveKeys is a copy of CloneAndRemoveLabels +// it is used here for annotations and labels +func cloneAndRemoveKeys(a map[string]string, keys ...string) map[string]string { + if len(keys) == 0 { + // Don't need to remove a key. + return a + } + // Clone. + newMap := map[string]string{} + for k, v := range a { + newMap[k] = v + } + // remove keys + for _, key := range keys { + delete(newMap, key) + } + return newMap +} diff --git a/pkg/controller/endpointslicemirroring/utils_test.go b/pkg/controller/endpointslicemirroring/utils_test.go index 45e1308d1ff..0988739fb4e 100644 --- a/pkg/controller/endpointslicemirroring/utils_test.go +++ b/pkg/controller/endpointslicemirroring/utils_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -38,42 +39,153 @@ func TestNewEndpointSlice(t *testing.T) { ports := []discovery.EndpointPort{{Name: &portName, Protocol: &protocol}} addrType := discovery.AddressTypeIPv4 + gvk := schema.GroupVersionKind{Version: "v1", Kind: "Endpoints"} endpoints := v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: "test", - Labels: map[string]string{"foo": "bar"}, }, Subsets: []v1.EndpointSubset{{ Ports: []v1.EndpointPort{{Port: 80}}, }}, } - - gvk := schema.GroupVersionKind{Version: "v1", Kind: "Endpoints"} ownerRef := metav1.NewControllerRef(&endpoints, gvk) - expectedSlice := discovery.EndpointSlice{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "foo": "bar", - discovery.LabelServiceName: endpoints.Name, - discovery.LabelManagedBy: controllerName, + testCases := []struct { + name string + tweakEndpoint func(ep *corev1.Endpoints) + expectedSlice discovery.EndpointSlice + }{ + { + name: "create slice from endpoints", + tweakEndpoint: func(ep *corev1.Endpoints) { + }, + expectedSlice: discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + discovery.LabelServiceName: endpoints.Name, + discovery.LabelManagedBy: controllerName, + }, + Annotations: map[string]string{}, + GenerateName: fmt.Sprintf("%s-", endpoints.Name), + Namespace: endpoints.Namespace, + OwnerReferences: []metav1.OwnerReference{*ownerRef}, + }, + Ports: ports, + AddressType: addrType, + Endpoints: []discovery.Endpoint{}, + }, + }, + { + name: "create slice from endpoints with annotations", + tweakEndpoint: func(ep *corev1.Endpoints) { + annotations := map[string]string{"foo": "bar"} + ep.Annotations = annotations + }, + expectedSlice: discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + discovery.LabelServiceName: endpoints.Name, + discovery.LabelManagedBy: controllerName, + }, + Annotations: map[string]string{"foo": "bar"}, + GenerateName: fmt.Sprintf("%s-", endpoints.Name), + Namespace: endpoints.Namespace, + OwnerReferences: []metav1.OwnerReference{*ownerRef}, + }, + Ports: ports, + AddressType: addrType, + Endpoints: []discovery.Endpoint{}, + }, + }, + { + name: "create slice from endpoints with labels", + tweakEndpoint: func(ep *corev1.Endpoints) { + labels := map[string]string{"foo": "bar"} + ep.Labels = labels + }, + expectedSlice: discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "foo": "bar", + discovery.LabelServiceName: endpoints.Name, + discovery.LabelManagedBy: controllerName, + }, + Annotations: map[string]string{}, + GenerateName: fmt.Sprintf("%s-", endpoints.Name), + Namespace: endpoints.Namespace, + OwnerReferences: []metav1.OwnerReference{*ownerRef}, + }, + Ports: ports, + AddressType: addrType, + Endpoints: []discovery.Endpoint{}, + }, + }, + { + name: "create slice from endpoints with labels and annotations", + tweakEndpoint: func(ep *corev1.Endpoints) { + labels := map[string]string{"foo": "bar"} + ep.Labels = labels + annotations := map[string]string{"foo2": "bar2"} + ep.Annotations = annotations + }, + expectedSlice: discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "foo": "bar", + discovery.LabelServiceName: endpoints.Name, + discovery.LabelManagedBy: controllerName, + }, + Annotations: map[string]string{"foo2": "bar2"}, + GenerateName: fmt.Sprintf("%s-", endpoints.Name), + Namespace: endpoints.Namespace, + OwnerReferences: []metav1.OwnerReference{*ownerRef}, + }, + Ports: ports, + AddressType: addrType, + Endpoints: []discovery.Endpoint{}, + }, + }, + { + name: "create slice from endpoints with labels and annotations triggertime", + tweakEndpoint: func(ep *corev1.Endpoints) { + labels := map[string]string{"foo": "bar"} + ep.Labels = labels + annotations := map[string]string{ + "foo2": "bar2", + corev1.EndpointsLastChangeTriggerTime: "date", + } + ep.Annotations = annotations + }, + expectedSlice: discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "foo": "bar", + discovery.LabelServiceName: endpoints.Name, + discovery.LabelManagedBy: controllerName, + }, + Annotations: map[string]string{"foo2": "bar2"}, + GenerateName: fmt.Sprintf("%s-", endpoints.Name), + Namespace: endpoints.Namespace, + OwnerReferences: []metav1.OwnerReference{*ownerRef}, + }, + Ports: ports, + AddressType: addrType, + Endpoints: []discovery.Endpoint{}, }, - GenerateName: fmt.Sprintf("%s-", endpoints.Name), - OwnerReferences: []metav1.OwnerReference{*ownerRef}, - Namespace: endpoints.Namespace, }, - Ports: ports, - AddressType: addrType, - Endpoints: []discovery.Endpoint{}, } - generatedSlice := newEndpointSlice(&endpoints, ports, addrType, "") - - assert.EqualValues(t, expectedSlice, *generatedSlice) - - if len(endpoints.Labels) > 1 { - t.Errorf("Expected Endpoints labels to not be modified, got %+v", endpoints.Labels) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ep := endpoints.DeepCopy() + tc.tweakEndpoint(ep) + generatedSlice := newEndpointSlice(ep, ports, addrType, "") + assert.EqualValues(t, tc.expectedSlice, *generatedSlice) + if len(endpoints.Labels) > 1 { + t.Errorf("Expected Endpoints labels to not be modified, got %+v", endpoints.Labels) + } + }) } } diff --git a/test/integration/endpointslice/endpointslicemirroring_test.go b/test/integration/endpointslice/endpointslicemirroring_test.go index 67030ac9b27..fcee245cf25 100644 --- a/test/integration/endpointslice/endpointslicemirroring_test.go +++ b/test/integration/endpointslice/endpointslicemirroring_test.go @@ -19,11 +19,14 @@ package endpointslice import ( "context" "fmt" + "sort" "testing" "time" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" @@ -194,7 +197,7 @@ func TestEndpointSliceMirroring(t *testing.T) { err = wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) { lSelector := discovery.LabelServiceName + "=" + resourceName - esList, err := client.DiscoveryV1beta1().EndpointSlices(ns.Name).List(context.TODO(), metav1.ListOptions{LabelSelector: lSelector}) + esList, err := client.DiscoveryV1().EndpointSlices(ns.Name).List(context.TODO(), metav1.ListOptions{LabelSelector: lSelector}) if err != nil { t.Logf("Error listing EndpointSlices: %v", err) return false, err @@ -228,3 +231,192 @@ func TestEndpointSliceMirroring(t *testing.T) { } } + +func TestEndpointSliceMirroringUpdates(t *testing.T) { + masterConfig := framework.NewIntegrationTestMasterConfig() + _, server, closeFn := framework.RunAMaster(masterConfig) + defer closeFn() + + config := restclient.Config{Host: server.URL} + client, err := clientset.NewForConfig(&config) + if err != nil { + t.Fatalf("Error creating clientset: %v", err) + } + + resyncPeriod := 12 * time.Hour + informers := informers.NewSharedInformerFactory(client, resyncPeriod) + + epsmController := endpointslicemirroring.NewController( + informers.Core().V1().Endpoints(), + informers.Discovery().V1().EndpointSlices(), + informers.Core().V1().Services(), + int32(100), + client, + 1*time.Second) + + // Start informer and controllers + stopCh := make(chan struct{}) + defer close(stopCh) + informers.Start(stopCh) + go epsmController.Run(1, stopCh) + + testCases := []struct { + testName string + tweakEndpoint func(ep *corev1.Endpoints) + }{ + { + testName: "Update labels", + tweakEndpoint: func(ep *corev1.Endpoints) { + ep.Labels["foo"] = "bar" + }, + }, + { + testName: "Update annotations", + tweakEndpoint: func(ep *corev1.Endpoints) { + ep.Annotations["foo2"] = "bar2" + }, + }, + { + testName: "Update annotations but triggertime", + tweakEndpoint: func(ep *corev1.Endpoints) { + ep.Annotations["foo2"] = "bar2" + ep.Annotations[corev1.EndpointsLastChangeTriggerTime] = "date" + }, + }, + { + testName: "Update addresses", + tweakEndpoint: func(ep *corev1.Endpoints) { + ep.Subsets[0].Addresses = []v1.EndpointAddress{{IP: "1.2.3.4"}, {IP: "1.2.3.6"}} + }, + }, + } + + for i, tc := range testCases { + t.Run(tc.testName, func(t *testing.T) { + ns := framework.CreateTestingNamespace(fmt.Sprintf("test-endpointslice-mirroring-%d", i), server, t) + defer framework.DeleteTestingNamespace(ns, server, t) + + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-123", + Namespace: ns.Name, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{{ + Port: int32(80), + }}, + }, + } + + customEndpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-123", + Namespace: ns.Name, + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + Subsets: []corev1.EndpointSubset{{ + Ports: []corev1.EndpointPort{{ + Port: 80, + }}, + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.0.1", + }}, + }}, + } + + _, err = client.CoreV1().Services(ns.Name).Create(context.TODO(), service, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Error creating service: %v", err) + } + + _, err = client.CoreV1().Endpoints(ns.Name).Create(context.TODO(), customEndpoints, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Error creating endpoints: %v", err) + } + + // update endpoint + tc.tweakEndpoint(customEndpoints) + _, err = client.CoreV1().Endpoints(ns.Name).Update(context.TODO(), customEndpoints, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Error updating endpoints: %v", err) + } + + // verify the endpoint updates were mirrored + err = wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) { + lSelector := discovery.LabelServiceName + "=" + service.Name + esList, err := client.DiscoveryV1().EndpointSlices(ns.Name).List(context.TODO(), metav1.ListOptions{LabelSelector: lSelector}) + if err != nil { + t.Logf("Error listing EndpointSlices: %v", err) + return false, err + } + + if len(esList.Items) == 0 { + t.Logf("Waiting for EndpointSlice to be created") + return false, nil + } + + for _, endpointSlice := range esList.Items { + if endpointSlice.Labels[discovery.LabelManagedBy] != "endpointslicemirroring-controller.k8s.io" { + return false, fmt.Errorf("Expected EndpointSlice to be managed by endpointslicemirroring-controller.k8s.io, got %s", endpointSlice.Labels[discovery.LabelManagedBy]) + } + + // compare addresses + epAddresses := []string{} + for _, address := range customEndpoints.Subsets[0].Addresses { + epAddresses = append(epAddresses, address.IP) + } + + sliceAddresses := []string{} + for _, sliceEndpoint := range endpointSlice.Endpoints { + sliceAddresses = append(sliceAddresses, sliceEndpoint.Addresses...) + } + + sort.Strings(epAddresses) + sort.Strings(sliceAddresses) + + if !apiequality.Semantic.DeepEqual(epAddresses, sliceAddresses) { + t.Logf("Expected EndpointSlice to have the same IP addresses, expected %v got %v", epAddresses, sliceAddresses) + return false, nil + } + + // check labels were mirrored + if !isSubset(customEndpoints.Labels, endpointSlice.Labels) { + t.Logf("Expected EndpointSlice to mirror labels, expected %v to be in received %v", customEndpoints.Labels, endpointSlice.Labels) + return false, nil + } + + // check annotations but endpoints.kubernetes.io/last-change-trigger-time were mirrored + annotations := map[string]string{} + for k, v := range customEndpoints.Annotations { + if k == corev1.EndpointsLastChangeTriggerTime { + continue + } + annotations[k] = v + } + if !apiequality.Semantic.DeepEqual(annotations, endpointSlice.Annotations) { + t.Logf("Expected EndpointSlice to mirror annotations, expected %v received %v", customEndpoints.Annotations, endpointSlice.Annotations) + return false, nil + } + } + return true, nil + }) + if err != nil { + t.Fatalf("Timed out waiting for conditions: %v", err) + } + }) + } +} + +// isSubset check if all the elements in a exist in b +func isSubset(a, b map[string]string) bool { + if len(a) > len(b) { + return false + } + for k, v1 := range a { + if v2, ok := b[k]; !ok || v1 != v2 { + return false + } + } + return true +}