From ef6d9edea5490e0e3e73a0a8ab0105926f843b3a Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Sat, 26 Nov 2022 18:14:12 +0000 Subject: [PATCH] endpointslicemirroring handle endpoints with multiple subsets Endpoints generated by the endpoints controller are in the canonical form, however, custom endpoints can not be in canonical format (there was a time they were canonicalized in the apiserver, but this caused performance issues because the endpoint controller kept updating them since the created endpoint were different than the stored one due to the canonicalization) There are cases where a custom endpoint may generate multiple slices due to the controller, per example, when the same address is present in different subsets. The endpointslice mirroring controller should canonicalize the endpoints subsets before start processing them to be consistent on the slices generated, there is no risk of hotlooping because the endpoint is only used as input. Change-Id: I2a8cd53c658a640aea559a88ce33e857fa98cc5c --- .../endpointslicemirroring/reconciler.go | 5 +- .../endpointslicemirroring/reconciler_test.go | 102 +++++++++++++- test/e2e/network/endpointslicemirroring.go | 133 ++++++++++++++++++ .../endpointslicemirroring_test.go | 96 +++++++++++-- 4 files changed, 325 insertions(+), 11 deletions(-) diff --git a/pkg/controller/endpointslicemirroring/reconciler.go b/pkg/controller/endpointslicemirroring/reconciler.go index 15f991eecbd..2fa83bde2fb 100644 --- a/pkg/controller/endpointslicemirroring/reconciler.go +++ b/pkg/controller/endpointslicemirroring/reconciler.go @@ -29,6 +29,7 @@ import ( clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" + endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints" "k8s.io/kubernetes/pkg/controller/endpointslicemirroring/metrics" endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" endpointsliceutil "k8s.io/kubernetes/pkg/controller/util/endpointslice" @@ -68,7 +69,9 @@ func (r *reconciler) reconcile(endpoints *corev1.Endpoints, existingSlices []*di numInvalidAddresses := 0 addressesSkipped := 0 - for _, subset := range endpoints.Subsets { + // canonicalize the Endpoints subsets before processing them + subsets := endpointsv1.RepackSubsets(endpoints.Subsets) + for _, subset := range subsets { multiKey := d.initPorts(subset.Ports) totalAddresses := len(subset.Addresses) + len(subset.NotReadyAddresses) diff --git a/pkg/controller/endpointslicemirroring/reconciler_test.go b/pkg/controller/endpointslicemirroring/reconciler_test.go index 36f405307ec..4818863f6fc 100644 --- a/pkg/controller/endpointslicemirroring/reconciler_test.go +++ b/pkg/controller/endpointslicemirroring/reconciler_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/record" "k8s.io/component-base/metrics/testutil" + endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints" "k8s.io/kubernetes/pkg/controller/endpointslicemirroring/metrics" endpointsliceutil "k8s.io/kubernetes/pkg/controller/util/endpointslice" "k8s.io/utils/pointer" @@ -90,6 +91,102 @@ func TestReconcile(t *testing.T) { expectedNumSlices: 1, expectedClientActions: 1, expectedMetrics: &expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 1, addedPerSync: 1, numCreated: 1}, + }, { + testName: "Endpoints with 2 subset, different port and address", + subsets: []corev1.EndpointSubset{ + { + Ports: []corev1.EndpointPort{{ + Name: "http", + Port: 80, + Protocol: corev1.ProtocolTCP, + }}, + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.0.1", + Hostname: "pod-1", + NodeName: pointer.String("node-1"), + }}, + }, + { + Ports: []corev1.EndpointPort{{ + Name: "https", + Port: 443, + Protocol: corev1.ProtocolTCP, + }}, + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.0.2", + Hostname: "pod-2", + NodeName: pointer.String("node-1"), + }}, + }, + }, + existingEndpointSlices: []*discovery.EndpointSlice{}, + expectedNumSlices: 2, + expectedClientActions: 2, + expectedMetrics: &expectedMetrics{desiredSlices: 2, actualSlices: 2, desiredEndpoints: 2, addedPerSync: 2, numCreated: 2}, + }, { + testName: "Endpoints with 2 subset, different port and same address", + subsets: []corev1.EndpointSubset{ + { + Ports: []corev1.EndpointPort{{ + Name: "http", + Port: 80, + Protocol: corev1.ProtocolTCP, + }}, + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.0.1", + Hostname: "pod-1", + NodeName: pointer.String("node-1"), + }}, + }, + { + Ports: []corev1.EndpointPort{{ + Name: "https", + Port: 443, + Protocol: corev1.ProtocolTCP, + }}, + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.0.1", + Hostname: "pod-1", + NodeName: pointer.String("node-1"), + }}, + }, + }, + existingEndpointSlices: []*discovery.EndpointSlice{}, + expectedNumSlices: 1, + expectedClientActions: 1, + expectedMetrics: &expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 1, addedPerSync: 1, numCreated: 1}, + }, { + testName: "Endpoints with 2 subset, different address and same port", + subsets: []corev1.EndpointSubset{ + { + Ports: []corev1.EndpointPort{{ + Name: "http", + Port: 80, + Protocol: corev1.ProtocolTCP, + }}, + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.0.1", + Hostname: "pod-1", + NodeName: pointer.String("node-1"), + }}, + }, + { + Ports: []corev1.EndpointPort{{ + Name: "http", + Port: 80, + Protocol: corev1.ProtocolTCP, + }}, + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.0.2", + Hostname: "pod-2", + NodeName: pointer.String("node-1"), + }}, + }, + }, + existingEndpointSlices: []*discovery.EndpointSlice{}, + expectedNumSlices: 1, + expectedClientActions: 1, + expectedMetrics: &expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 2, addedPerSync: 2, numCreated: 1}, }, { testName: "Endpoints with 1 subset, port, and address, pending deletion", subsets: []corev1.EndpointSubset{{ @@ -1015,7 +1112,10 @@ func expectEndpointSlices(t *testing.T, num, maxEndpointsPerSubset int, endpoint } } - for _, epSubset := range endpoints.Subsets { + // canonicalize endpoints to match the expected endpoints, otherwise the test + // that creates more endpoints than allowed fail becaused the list of final + // endpoints doesn't match. + for _, epSubset := range endpointsv1.RepackSubsets(endpoints.Subsets) { if len(epSubset.Addresses) == 0 && len(epSubset.NotReadyAddresses) == 0 { continue } diff --git a/test/e2e/network/endpointslicemirroring.go b/test/e2e/network/endpointslicemirroring.go index 3c4104b26f6..d74792e0da8 100644 --- a/test/e2e/network/endpointslicemirroring.go +++ b/test/e2e/network/endpointslicemirroring.go @@ -19,6 +19,7 @@ package network import ( "context" "fmt" + "net" "time" "github.com/onsi/ginkgo/v2" @@ -28,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/test/e2e/framework" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" "k8s.io/kubernetes/test/e2e/network/common" admissionapi "k8s.io/pod-security-admission/api" ) @@ -199,4 +201,135 @@ var _ = common.SIGDescribe("EndpointSliceMirroring", func() { } }) }) + + ginkgo.It("should mirror a custom Endpoint with multiple subsets and same IP address", func() { + ns := f.Namespace.Name + svc := createServiceReportErr(cs, f.Namespace.Name, &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "example-custom-endpoints", + }, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{ + { + Name: "port80", + Port: 80, + Protocol: v1.ProtocolTCP, + }, + { + Name: "port81", + Port: 81, + Protocol: v1.ProtocolTCP, + }, + }, + }, + }) + + // Add a backend pod to the service in the other node + port8080 := []v1.ContainerPort{ + { + ContainerPort: 8090, + Protocol: v1.ProtocolTCP, + }, + } + port9090 := []v1.ContainerPort{ + { + ContainerPort: 9090, + Protocol: v1.ProtocolTCP, + }, + } + + serverPod := e2epod.NewAgnhostPodFromContainers( + "", "pod-handle-http-request", nil, + e2epod.NewAgnhostContainer("container-handle-8090-request", nil, port8080, "netexec", "--http-port", "8090", "--udp-port", "-1"), + e2epod.NewAgnhostContainer("container-handle-9090-request", nil, port9090, "netexec", "--http-port", "9090", "--udp-port", "-1"), + ) + + pod := e2epod.NewPodClient(f).CreateSync(serverPod) + + if pod.Status.PodIP == "" { + framework.Failf("PodIP not assigned for pod %s", pod.Name) + } + + // create custom endpoints + endpoints := &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: svc.Name, + }, + Subsets: []v1.EndpointSubset{ + { + Ports: []v1.EndpointPort{{ + Name: "port80", + Port: 8090, + }}, + Addresses: []v1.EndpointAddress{{ + IP: pod.Status.PodIP, + }}, + }, + { + Ports: []v1.EndpointPort{{ + Name: "port81", + Port: 9090, + }}, + Addresses: []v1.EndpointAddress{{ + IP: pod.Status.PodIP, + }}, + }, + }, + } + + ginkgo.By("mirroring a new custom Endpoint", func() { + _, err := cs.CoreV1().Endpoints(f.Namespace.Name).Create(context.TODO(), endpoints, metav1.CreateOptions{}) + framework.ExpectNoError(err, "Unexpected error creating Endpoints") + + if err := wait.PollImmediate(2*time.Second, 12*time.Second, func() (bool, error) { + esList, err := cs.DiscoveryV1().EndpointSlices(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{ + LabelSelector: discoveryv1.LabelServiceName + "=" + svc.Name, + }) + if err != nil { + framework.Logf("Error listing EndpointSlices: %v", err) + return false, nil + } + if len(esList.Items) == 0 { + framework.Logf("Waiting for at least 1 EndpointSlice to exist, got %d", len(esList.Items)) + return false, nil + } + return true, nil + }); err != nil { + framework.Failf("Did not find matching EndpointSlice for %s/%s: %s", svc.Namespace, svc.Name, err) + } + }) + + // connect to the service must work + ginkgo.By("Creating a pause pods that will try to connect to the webservers") + pausePod := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil) + e2epod.NewPodClient(f).CreateSync(pausePod) + dest1 := net.JoinHostPort(svc.Spec.ClusterIP, "80") + dest2 := net.JoinHostPort(svc.Spec.ClusterIP, "81") + execHostnameTest(*pausePod, dest1, pod.Name) + execHostnameTest(*pausePod, dest2, pod.Name) + + // delete custom endpoints and wait until the endpoint slices are deleted too + ginkgo.By("mirroring deletion of a custom Endpoint", func() { + err := cs.CoreV1().Endpoints(f.Namespace.Name).Delete(context.TODO(), endpoints.Name, metav1.DeleteOptions{}) + framework.ExpectNoError(err, "Unexpected error deleting Endpoints") + + // Expect mirrored EndpointSlice resource to be updated. + if err := wait.PollImmediate(2*time.Second, 12*time.Second, func() (bool, error) { + esList, err := cs.DiscoveryV1().EndpointSlices(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{ + LabelSelector: discoveryv1.LabelServiceName + "=" + svc.Name, + }) + if err != nil { + return false, err + } + if len(esList.Items) != 0 { + framework.Logf("Waiting for 0 EndpointSlices to exist, got %d", len(esList.Items)) + return false, nil + } + + return true, nil + }); err != nil { + framework.Failf("Did not find matching EndpointSlice for %s/%s: %s", svc.Namespace, svc.Name, err) + } + }) + }) }) diff --git a/test/integration/endpointslice/endpointslicemirroring_test.go b/test/integration/endpointslice/endpointslicemirroring_test.go index be71e60e4fa..89ac76521f4 100644 --- a/test/integration/endpointslice/endpointslicemirroring_test.go +++ b/test/integration/endpointslice/endpointslicemirroring_test.go @@ -86,7 +86,7 @@ func TestEndpointSliceMirroring(t *testing.T) { testName string service *corev1.Service customEndpoints *corev1.Endpoints - expectEndpointSlice bool + expectEndpointSlice int expectEndpointSliceManagedBy string }{{ testName: "Service with selector", @@ -103,7 +103,7 @@ func TestEndpointSliceMirroring(t *testing.T) { }, }, }, - expectEndpointSlice: true, + expectEndpointSlice: 1, expectEndpointSliceManagedBy: "endpointslice-controller.k8s.io", }, { testName: "Service without selector", @@ -130,7 +130,85 @@ func TestEndpointSliceMirroring(t *testing.T) { }}, }}, }, - expectEndpointSlice: true, + expectEndpointSlice: 1, + expectEndpointSliceManagedBy: "endpointslicemirroring-controller.k8s.io", + }, { + testName: "Service without selector Endpoint multiple subsets and same address", + service: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-123", + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{{ + Port: int32(80), + }}, + }, + }, + customEndpoints: &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-123", + }, + Subsets: []corev1.EndpointSubset{ + { + Ports: []corev1.EndpointPort{{ + Name: "port1", + Port: 80, + }}, + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.0.1", + }}, + }, + { + Ports: []corev1.EndpointPort{{ + Name: "port2", + Port: 90, + }}, + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.0.1", + }}, + }, + }, + }, + expectEndpointSlice: 1, + expectEndpointSliceManagedBy: "endpointslicemirroring-controller.k8s.io", + }, { + testName: "Service without selector Endpoint multiple subsets", + service: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-123", + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{{ + Port: int32(80), + }}, + }, + }, + customEndpoints: &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-123", + }, + Subsets: []corev1.EndpointSubset{ + { + Ports: []corev1.EndpointPort{{ + Name: "port1", + Port: 80, + }}, + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.0.1", + }}, + }, + { + Ports: []corev1.EndpointPort{{ + Name: "port2", + Port: 90, + }}, + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.0.2", + }}, + }, + }, + }, + expectEndpointSlice: 2, expectEndpointSliceManagedBy: "endpointslicemirroring-controller.k8s.io", }, { testName: "Service without Endpoints", @@ -148,7 +226,7 @@ func TestEndpointSliceMirroring(t *testing.T) { }, }, customEndpoints: nil, - expectEndpointSlice: true, + expectEndpointSlice: 1, expectEndpointSliceManagedBy: "endpointslice-controller.k8s.io", }, { testName: "Endpoints without Service", @@ -166,7 +244,7 @@ func TestEndpointSliceMirroring(t *testing.T) { }}, }}, }, - expectEndpointSlice: false, + expectEndpointSlice: 0, }} for i, tc := range testCases { @@ -201,13 +279,13 @@ func TestEndpointSliceMirroring(t *testing.T) { return false, err } - if tc.expectEndpointSlice { - if len(esList.Items) == 0 { + if tc.expectEndpointSlice > 0 { + if len(esList.Items) < tc.expectEndpointSlice { t.Logf("Waiting for EndpointSlice to be created") return false, nil } - if len(esList.Items) > 1 { - return false, fmt.Errorf("Only expected 1 EndpointSlice, got %d", len(esList.Items)) + if len(esList.Items) != tc.expectEndpointSlice { + return false, fmt.Errorf("Only expected %d EndpointSlice, got %d", tc.expectEndpointSlice, len(esList.Items)) } endpointSlice := esList.Items[0] if tc.expectEndpointSliceManagedBy != "" {