diff --git a/pkg/controller/endpointslicemirroring/reconciler.go b/pkg/controller/endpointslicemirroring/reconciler.go index 15f991eecbd..2fa83bde2fb 100644 --- a/pkg/controller/endpointslicemirroring/reconciler.go +++ b/pkg/controller/endpointslicemirroring/reconciler.go @@ -29,6 +29,7 @@ import ( clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" + endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints" "k8s.io/kubernetes/pkg/controller/endpointslicemirroring/metrics" endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" endpointsliceutil "k8s.io/kubernetes/pkg/controller/util/endpointslice" @@ -68,7 +69,9 @@ func (r *reconciler) reconcile(endpoints *corev1.Endpoints, existingSlices []*di numInvalidAddresses := 0 addressesSkipped := 0 - for _, subset := range endpoints.Subsets { + // canonicalize the Endpoints subsets before processing them + subsets := endpointsv1.RepackSubsets(endpoints.Subsets) + for _, subset := range subsets { multiKey := d.initPorts(subset.Ports) totalAddresses := len(subset.Addresses) + len(subset.NotReadyAddresses) diff --git a/pkg/controller/endpointslicemirroring/reconciler_test.go b/pkg/controller/endpointslicemirroring/reconciler_test.go index 36f405307ec..4818863f6fc 100644 --- a/pkg/controller/endpointslicemirroring/reconciler_test.go +++ b/pkg/controller/endpointslicemirroring/reconciler_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/record" "k8s.io/component-base/metrics/testutil" + endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints" "k8s.io/kubernetes/pkg/controller/endpointslicemirroring/metrics" endpointsliceutil "k8s.io/kubernetes/pkg/controller/util/endpointslice" "k8s.io/utils/pointer" @@ -90,6 +91,102 @@ func TestReconcile(t *testing.T) { expectedNumSlices: 1, expectedClientActions: 1, expectedMetrics: &expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 1, addedPerSync: 1, numCreated: 1}, + }, { + testName: "Endpoints with 2 subset, different port and address", + subsets: []corev1.EndpointSubset{ + { + Ports: []corev1.EndpointPort{{ + Name: "http", + Port: 80, + Protocol: corev1.ProtocolTCP, + }}, + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.0.1", + Hostname: "pod-1", + NodeName: pointer.String("node-1"), + }}, + }, + { + Ports: []corev1.EndpointPort{{ + Name: "https", + Port: 443, + Protocol: corev1.ProtocolTCP, + }}, + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.0.2", + Hostname: "pod-2", + NodeName: pointer.String("node-1"), + }}, + }, + }, + existingEndpointSlices: []*discovery.EndpointSlice{}, + expectedNumSlices: 2, + expectedClientActions: 2, + expectedMetrics: &expectedMetrics{desiredSlices: 2, actualSlices: 2, desiredEndpoints: 2, addedPerSync: 2, numCreated: 2}, + }, { + testName: "Endpoints with 2 subset, different port and same address", + subsets: []corev1.EndpointSubset{ + { + Ports: []corev1.EndpointPort{{ + Name: "http", + Port: 80, + Protocol: corev1.ProtocolTCP, + }}, + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.0.1", + Hostname: "pod-1", + NodeName: pointer.String("node-1"), + }}, + }, + { + Ports: []corev1.EndpointPort{{ + Name: "https", + Port: 443, + Protocol: corev1.ProtocolTCP, + }}, + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.0.1", + Hostname: "pod-1", + NodeName: pointer.String("node-1"), + }}, + }, + }, + existingEndpointSlices: []*discovery.EndpointSlice{}, + expectedNumSlices: 1, + expectedClientActions: 1, + expectedMetrics: &expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 1, addedPerSync: 1, numCreated: 1}, + }, { + testName: "Endpoints with 2 subset, different address and same port", + subsets: []corev1.EndpointSubset{ + { + Ports: []corev1.EndpointPort{{ + Name: "http", + Port: 80, + Protocol: corev1.ProtocolTCP, + }}, + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.0.1", + Hostname: "pod-1", + NodeName: pointer.String("node-1"), + }}, + }, + { + Ports: []corev1.EndpointPort{{ + Name: "http", + Port: 80, + Protocol: corev1.ProtocolTCP, + }}, + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.0.2", + Hostname: "pod-2", + NodeName: pointer.String("node-1"), + }}, + }, + }, + existingEndpointSlices: []*discovery.EndpointSlice{}, + expectedNumSlices: 1, + expectedClientActions: 1, + expectedMetrics: &expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 2, addedPerSync: 2, numCreated: 1}, }, { testName: "Endpoints with 1 subset, port, and address, pending deletion", subsets: []corev1.EndpointSubset{{ @@ -1015,7 +1112,10 @@ func expectEndpointSlices(t *testing.T, num, maxEndpointsPerSubset int, endpoint } } - for _, epSubset := range endpoints.Subsets { + // canonicalize endpoints to match the expected endpoints, otherwise the test + // that creates more endpoints than allowed fail becaused the list of final + // endpoints doesn't match. + for _, epSubset := range endpointsv1.RepackSubsets(endpoints.Subsets) { if len(epSubset.Addresses) == 0 && len(epSubset.NotReadyAddresses) == 0 { continue } diff --git a/test/e2e/network/endpointslicemirroring.go b/test/e2e/network/endpointslicemirroring.go index 3c4104b26f6..d74792e0da8 100644 --- a/test/e2e/network/endpointslicemirroring.go +++ b/test/e2e/network/endpointslicemirroring.go @@ -19,6 +19,7 @@ package network import ( "context" "fmt" + "net" "time" "github.com/onsi/ginkgo/v2" @@ -28,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/test/e2e/framework" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" "k8s.io/kubernetes/test/e2e/network/common" admissionapi "k8s.io/pod-security-admission/api" ) @@ -199,4 +201,135 @@ var _ = common.SIGDescribe("EndpointSliceMirroring", func() { } }) }) + + ginkgo.It("should mirror a custom Endpoint with multiple subsets and same IP address", func() { + ns := f.Namespace.Name + svc := createServiceReportErr(cs, f.Namespace.Name, &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "example-custom-endpoints", + }, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{ + { + Name: "port80", + Port: 80, + Protocol: v1.ProtocolTCP, + }, + { + Name: "port81", + Port: 81, + Protocol: v1.ProtocolTCP, + }, + }, + }, + }) + + // Add a backend pod to the service in the other node + port8080 := []v1.ContainerPort{ + { + ContainerPort: 8090, + Protocol: v1.ProtocolTCP, + }, + } + port9090 := []v1.ContainerPort{ + { + ContainerPort: 9090, + Protocol: v1.ProtocolTCP, + }, + } + + serverPod := e2epod.NewAgnhostPodFromContainers( + "", "pod-handle-http-request", nil, + e2epod.NewAgnhostContainer("container-handle-8090-request", nil, port8080, "netexec", "--http-port", "8090", "--udp-port", "-1"), + e2epod.NewAgnhostContainer("container-handle-9090-request", nil, port9090, "netexec", "--http-port", "9090", "--udp-port", "-1"), + ) + + pod := e2epod.NewPodClient(f).CreateSync(serverPod) + + if pod.Status.PodIP == "" { + framework.Failf("PodIP not assigned for pod %s", pod.Name) + } + + // create custom endpoints + endpoints := &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: svc.Name, + }, + Subsets: []v1.EndpointSubset{ + { + Ports: []v1.EndpointPort{{ + Name: "port80", + Port: 8090, + }}, + Addresses: []v1.EndpointAddress{{ + IP: pod.Status.PodIP, + }}, + }, + { + Ports: []v1.EndpointPort{{ + Name: "port81", + Port: 9090, + }}, + Addresses: []v1.EndpointAddress{{ + IP: pod.Status.PodIP, + }}, + }, + }, + } + + ginkgo.By("mirroring a new custom Endpoint", func() { + _, err := cs.CoreV1().Endpoints(f.Namespace.Name).Create(context.TODO(), endpoints, metav1.CreateOptions{}) + framework.ExpectNoError(err, "Unexpected error creating Endpoints") + + if err := wait.PollImmediate(2*time.Second, 12*time.Second, func() (bool, error) { + esList, err := cs.DiscoveryV1().EndpointSlices(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{ + LabelSelector: discoveryv1.LabelServiceName + "=" + svc.Name, + }) + if err != nil { + framework.Logf("Error listing EndpointSlices: %v", err) + return false, nil + } + if len(esList.Items) == 0 { + framework.Logf("Waiting for at least 1 EndpointSlice to exist, got %d", len(esList.Items)) + return false, nil + } + return true, nil + }); err != nil { + framework.Failf("Did not find matching EndpointSlice for %s/%s: %s", svc.Namespace, svc.Name, err) + } + }) + + // connect to the service must work + ginkgo.By("Creating a pause pods that will try to connect to the webservers") + pausePod := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil) + e2epod.NewPodClient(f).CreateSync(pausePod) + dest1 := net.JoinHostPort(svc.Spec.ClusterIP, "80") + dest2 := net.JoinHostPort(svc.Spec.ClusterIP, "81") + execHostnameTest(*pausePod, dest1, pod.Name) + execHostnameTest(*pausePod, dest2, pod.Name) + + // delete custom endpoints and wait until the endpoint slices are deleted too + ginkgo.By("mirroring deletion of a custom Endpoint", func() { + err := cs.CoreV1().Endpoints(f.Namespace.Name).Delete(context.TODO(), endpoints.Name, metav1.DeleteOptions{}) + framework.ExpectNoError(err, "Unexpected error deleting Endpoints") + + // Expect mirrored EndpointSlice resource to be updated. + if err := wait.PollImmediate(2*time.Second, 12*time.Second, func() (bool, error) { + esList, err := cs.DiscoveryV1().EndpointSlices(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{ + LabelSelector: discoveryv1.LabelServiceName + "=" + svc.Name, + }) + if err != nil { + return false, err + } + if len(esList.Items) != 0 { + framework.Logf("Waiting for 0 EndpointSlices to exist, got %d", len(esList.Items)) + return false, nil + } + + return true, nil + }); err != nil { + framework.Failf("Did not find matching EndpointSlice for %s/%s: %s", svc.Namespace, svc.Name, err) + } + }) + }) }) diff --git a/test/integration/endpointslice/endpointslicemirroring_test.go b/test/integration/endpointslice/endpointslicemirroring_test.go index be71e60e4fa..89ac76521f4 100644 --- a/test/integration/endpointslice/endpointslicemirroring_test.go +++ b/test/integration/endpointslice/endpointslicemirroring_test.go @@ -86,7 +86,7 @@ func TestEndpointSliceMirroring(t *testing.T) { testName string service *corev1.Service customEndpoints *corev1.Endpoints - expectEndpointSlice bool + expectEndpointSlice int expectEndpointSliceManagedBy string }{{ testName: "Service with selector", @@ -103,7 +103,7 @@ func TestEndpointSliceMirroring(t *testing.T) { }, }, }, - expectEndpointSlice: true, + expectEndpointSlice: 1, expectEndpointSliceManagedBy: "endpointslice-controller.k8s.io", }, { testName: "Service without selector", @@ -130,7 +130,85 @@ func TestEndpointSliceMirroring(t *testing.T) { }}, }}, }, - expectEndpointSlice: true, + expectEndpointSlice: 1, + expectEndpointSliceManagedBy: "endpointslicemirroring-controller.k8s.io", + }, { + testName: "Service without selector Endpoint multiple subsets and same address", + service: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-123", + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{{ + Port: int32(80), + }}, + }, + }, + customEndpoints: &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-123", + }, + Subsets: []corev1.EndpointSubset{ + { + Ports: []corev1.EndpointPort{{ + Name: "port1", + Port: 80, + }}, + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.0.1", + }}, + }, + { + Ports: []corev1.EndpointPort{{ + Name: "port2", + Port: 90, + }}, + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.0.1", + }}, + }, + }, + }, + expectEndpointSlice: 1, + expectEndpointSliceManagedBy: "endpointslicemirroring-controller.k8s.io", + }, { + testName: "Service without selector Endpoint multiple subsets", + service: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-123", + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{{ + Port: int32(80), + }}, + }, + }, + customEndpoints: &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-123", + }, + Subsets: []corev1.EndpointSubset{ + { + Ports: []corev1.EndpointPort{{ + Name: "port1", + Port: 80, + }}, + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.0.1", + }}, + }, + { + Ports: []corev1.EndpointPort{{ + Name: "port2", + Port: 90, + }}, + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.0.2", + }}, + }, + }, + }, + expectEndpointSlice: 2, expectEndpointSliceManagedBy: "endpointslicemirroring-controller.k8s.io", }, { testName: "Service without Endpoints", @@ -148,7 +226,7 @@ func TestEndpointSliceMirroring(t *testing.T) { }, }, customEndpoints: nil, - expectEndpointSlice: true, + expectEndpointSlice: 1, expectEndpointSliceManagedBy: "endpointslice-controller.k8s.io", }, { testName: "Endpoints without Service", @@ -166,7 +244,7 @@ func TestEndpointSliceMirroring(t *testing.T) { }}, }}, }, - expectEndpointSlice: false, + expectEndpointSlice: 0, }} for i, tc := range testCases { @@ -201,13 +279,13 @@ func TestEndpointSliceMirroring(t *testing.T) { return false, err } - if tc.expectEndpointSlice { - if len(esList.Items) == 0 { + if tc.expectEndpointSlice > 0 { + if len(esList.Items) < tc.expectEndpointSlice { t.Logf("Waiting for EndpointSlice to be created") return false, nil } - if len(esList.Items) > 1 { - return false, fmt.Errorf("Only expected 1 EndpointSlice, got %d", len(esList.Items)) + if len(esList.Items) != tc.expectEndpointSlice { + return false, fmt.Errorf("Only expected %d EndpointSlice, got %d", tc.expectEndpointSlice, len(esList.Items)) } endpointSlice := esList.Items[0] if tc.expectEndpointSliceManagedBy != "" {