endpointslicemirroring handle endpoints with multiple subsets

Endpoints generated by the endpoints controller are in the canonical
form, however, custom endpoints can not be in canonical format
(there was a time they were canonicalized in the apiserver, but this
caused performance issues because the endpoint controller kept
updating them since the created endpoint were different than the
stored one due to the canonicalization)

There are cases where a custom endpoint may generate multiple slices
due to the controller, per example, when the same address is present
in different subsets.

The endpointslice mirroring controller should canonicalize the
endpoints subsets before start processing them to be consistent
on the slices generated, there is no risk of hotlooping because
the endpoint is only used as input.

Change-Id: I2a8cd53c658a640aea559a88ce33e857fa98cc5c
This commit is contained in:
Antonio Ojea 2022-11-26 18:14:12 +00:00
parent 2e6d3393f7
commit ef6d9edea5
4 changed files with 325 additions and 11 deletions

View File

@ -29,6 +29,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints"
"k8s.io/kubernetes/pkg/controller/endpointslicemirroring/metrics"
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
endpointsliceutil "k8s.io/kubernetes/pkg/controller/util/endpointslice"
@ -68,7 +69,9 @@ func (r *reconciler) reconcile(endpoints *corev1.Endpoints, existingSlices []*di
numInvalidAddresses := 0
addressesSkipped := 0
for _, subset := range endpoints.Subsets {
// canonicalize the Endpoints subsets before processing them
subsets := endpointsv1.RepackSubsets(endpoints.Subsets)
for _, subset := range subsets {
multiKey := d.initPorts(subset.Ports)
totalAddresses := len(subset.Addresses) + len(subset.NotReadyAddresses)

View File

@ -28,6 +28,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"k8s.io/component-base/metrics/testutil"
endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints"
"k8s.io/kubernetes/pkg/controller/endpointslicemirroring/metrics"
endpointsliceutil "k8s.io/kubernetes/pkg/controller/util/endpointslice"
"k8s.io/utils/pointer"
@ -90,6 +91,102 @@ func TestReconcile(t *testing.T) {
expectedNumSlices: 1,
expectedClientActions: 1,
expectedMetrics: &expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 1, addedPerSync: 1, numCreated: 1},
}, {
testName: "Endpoints with 2 subset, different port and address",
subsets: []corev1.EndpointSubset{
{
Ports: []corev1.EndpointPort{{
Name: "http",
Port: 80,
Protocol: corev1.ProtocolTCP,
}},
Addresses: []corev1.EndpointAddress{{
IP: "10.0.0.1",
Hostname: "pod-1",
NodeName: pointer.String("node-1"),
}},
},
{
Ports: []corev1.EndpointPort{{
Name: "https",
Port: 443,
Protocol: corev1.ProtocolTCP,
}},
Addresses: []corev1.EndpointAddress{{
IP: "10.0.0.2",
Hostname: "pod-2",
NodeName: pointer.String("node-1"),
}},
},
},
existingEndpointSlices: []*discovery.EndpointSlice{},
expectedNumSlices: 2,
expectedClientActions: 2,
expectedMetrics: &expectedMetrics{desiredSlices: 2, actualSlices: 2, desiredEndpoints: 2, addedPerSync: 2, numCreated: 2},
}, {
testName: "Endpoints with 2 subset, different port and same address",
subsets: []corev1.EndpointSubset{
{
Ports: []corev1.EndpointPort{{
Name: "http",
Port: 80,
Protocol: corev1.ProtocolTCP,
}},
Addresses: []corev1.EndpointAddress{{
IP: "10.0.0.1",
Hostname: "pod-1",
NodeName: pointer.String("node-1"),
}},
},
{
Ports: []corev1.EndpointPort{{
Name: "https",
Port: 443,
Protocol: corev1.ProtocolTCP,
}},
Addresses: []corev1.EndpointAddress{{
IP: "10.0.0.1",
Hostname: "pod-1",
NodeName: pointer.String("node-1"),
}},
},
},
existingEndpointSlices: []*discovery.EndpointSlice{},
expectedNumSlices: 1,
expectedClientActions: 1,
expectedMetrics: &expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 1, addedPerSync: 1, numCreated: 1},
}, {
testName: "Endpoints with 2 subset, different address and same port",
subsets: []corev1.EndpointSubset{
{
Ports: []corev1.EndpointPort{{
Name: "http",
Port: 80,
Protocol: corev1.ProtocolTCP,
}},
Addresses: []corev1.EndpointAddress{{
IP: "10.0.0.1",
Hostname: "pod-1",
NodeName: pointer.String("node-1"),
}},
},
{
Ports: []corev1.EndpointPort{{
Name: "http",
Port: 80,
Protocol: corev1.ProtocolTCP,
}},
Addresses: []corev1.EndpointAddress{{
IP: "10.0.0.2",
Hostname: "pod-2",
NodeName: pointer.String("node-1"),
}},
},
},
existingEndpointSlices: []*discovery.EndpointSlice{},
expectedNumSlices: 1,
expectedClientActions: 1,
expectedMetrics: &expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 2, addedPerSync: 2, numCreated: 1},
}, {
testName: "Endpoints with 1 subset, port, and address, pending deletion",
subsets: []corev1.EndpointSubset{{
@ -1015,7 +1112,10 @@ func expectEndpointSlices(t *testing.T, num, maxEndpointsPerSubset int, endpoint
}
}
for _, epSubset := range endpoints.Subsets {
// canonicalize endpoints to match the expected endpoints, otherwise the test
// that creates more endpoints than allowed fail becaused the list of final
// endpoints doesn't match.
for _, epSubset := range endpointsv1.RepackSubsets(endpoints.Subsets) {
if len(epSubset.Addresses) == 0 && len(epSubset.NotReadyAddresses) == 0 {
continue
}

View File

@ -19,6 +19,7 @@ package network
import (
"context"
"fmt"
"net"
"time"
"github.com/onsi/ginkgo/v2"
@ -28,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
"k8s.io/kubernetes/test/e2e/network/common"
admissionapi "k8s.io/pod-security-admission/api"
)
@ -199,4 +201,135 @@ var _ = common.SIGDescribe("EndpointSliceMirroring", func() {
}
})
})
ginkgo.It("should mirror a custom Endpoint with multiple subsets and same IP address", func() {
ns := f.Namespace.Name
svc := createServiceReportErr(cs, f.Namespace.Name, &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "example-custom-endpoints",
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{
{
Name: "port80",
Port: 80,
Protocol: v1.ProtocolTCP,
},
{
Name: "port81",
Port: 81,
Protocol: v1.ProtocolTCP,
},
},
},
})
// Add a backend pod to the service in the other node
port8080 := []v1.ContainerPort{
{
ContainerPort: 8090,
Protocol: v1.ProtocolTCP,
},
}
port9090 := []v1.ContainerPort{
{
ContainerPort: 9090,
Protocol: v1.ProtocolTCP,
},
}
serverPod := e2epod.NewAgnhostPodFromContainers(
"", "pod-handle-http-request", nil,
e2epod.NewAgnhostContainer("container-handle-8090-request", nil, port8080, "netexec", "--http-port", "8090", "--udp-port", "-1"),
e2epod.NewAgnhostContainer("container-handle-9090-request", nil, port9090, "netexec", "--http-port", "9090", "--udp-port", "-1"),
)
pod := e2epod.NewPodClient(f).CreateSync(serverPod)
if pod.Status.PodIP == "" {
framework.Failf("PodIP not assigned for pod %s", pod.Name)
}
// create custom endpoints
endpoints := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: svc.Name,
},
Subsets: []v1.EndpointSubset{
{
Ports: []v1.EndpointPort{{
Name: "port80",
Port: 8090,
}},
Addresses: []v1.EndpointAddress{{
IP: pod.Status.PodIP,
}},
},
{
Ports: []v1.EndpointPort{{
Name: "port81",
Port: 9090,
}},
Addresses: []v1.EndpointAddress{{
IP: pod.Status.PodIP,
}},
},
},
}
ginkgo.By("mirroring a new custom Endpoint", func() {
_, err := cs.CoreV1().Endpoints(f.Namespace.Name).Create(context.TODO(), endpoints, metav1.CreateOptions{})
framework.ExpectNoError(err, "Unexpected error creating Endpoints")
if err := wait.PollImmediate(2*time.Second, 12*time.Second, func() (bool, error) {
esList, err := cs.DiscoveryV1().EndpointSlices(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{
LabelSelector: discoveryv1.LabelServiceName + "=" + svc.Name,
})
if err != nil {
framework.Logf("Error listing EndpointSlices: %v", err)
return false, nil
}
if len(esList.Items) == 0 {
framework.Logf("Waiting for at least 1 EndpointSlice to exist, got %d", len(esList.Items))
return false, nil
}
return true, nil
}); err != nil {
framework.Failf("Did not find matching EndpointSlice for %s/%s: %s", svc.Namespace, svc.Name, err)
}
})
// connect to the service must work
ginkgo.By("Creating a pause pods that will try to connect to the webservers")
pausePod := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil)
e2epod.NewPodClient(f).CreateSync(pausePod)
dest1 := net.JoinHostPort(svc.Spec.ClusterIP, "80")
dest2 := net.JoinHostPort(svc.Spec.ClusterIP, "81")
execHostnameTest(*pausePod, dest1, pod.Name)
execHostnameTest(*pausePod, dest2, pod.Name)
// delete custom endpoints and wait until the endpoint slices are deleted too
ginkgo.By("mirroring deletion of a custom Endpoint", func() {
err := cs.CoreV1().Endpoints(f.Namespace.Name).Delete(context.TODO(), endpoints.Name, metav1.DeleteOptions{})
framework.ExpectNoError(err, "Unexpected error deleting Endpoints")
// Expect mirrored EndpointSlice resource to be updated.
if err := wait.PollImmediate(2*time.Second, 12*time.Second, func() (bool, error) {
esList, err := cs.DiscoveryV1().EndpointSlices(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{
LabelSelector: discoveryv1.LabelServiceName + "=" + svc.Name,
})
if err != nil {
return false, err
}
if len(esList.Items) != 0 {
framework.Logf("Waiting for 0 EndpointSlices to exist, got %d", len(esList.Items))
return false, nil
}
return true, nil
}); err != nil {
framework.Failf("Did not find matching EndpointSlice for %s/%s: %s", svc.Namespace, svc.Name, err)
}
})
})
})

View File

@ -86,7 +86,7 @@ func TestEndpointSliceMirroring(t *testing.T) {
testName string
service *corev1.Service
customEndpoints *corev1.Endpoints
expectEndpointSlice bool
expectEndpointSlice int
expectEndpointSliceManagedBy string
}{{
testName: "Service with selector",
@ -103,7 +103,7 @@ func TestEndpointSliceMirroring(t *testing.T) {
},
},
},
expectEndpointSlice: true,
expectEndpointSlice: 1,
expectEndpointSliceManagedBy: "endpointslice-controller.k8s.io",
}, {
testName: "Service without selector",
@ -130,7 +130,85 @@ func TestEndpointSliceMirroring(t *testing.T) {
}},
}},
},
expectEndpointSlice: true,
expectEndpointSlice: 1,
expectEndpointSliceManagedBy: "endpointslicemirroring-controller.k8s.io",
}, {
testName: "Service without selector Endpoint multiple subsets and same address",
service: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test-123",
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{{
Port: int32(80),
}},
},
},
customEndpoints: &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "test-123",
},
Subsets: []corev1.EndpointSubset{
{
Ports: []corev1.EndpointPort{{
Name: "port1",
Port: 80,
}},
Addresses: []corev1.EndpointAddress{{
IP: "10.0.0.1",
}},
},
{
Ports: []corev1.EndpointPort{{
Name: "port2",
Port: 90,
}},
Addresses: []corev1.EndpointAddress{{
IP: "10.0.0.1",
}},
},
},
},
expectEndpointSlice: 1,
expectEndpointSliceManagedBy: "endpointslicemirroring-controller.k8s.io",
}, {
testName: "Service without selector Endpoint multiple subsets",
service: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test-123",
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{{
Port: int32(80),
}},
},
},
customEndpoints: &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "test-123",
},
Subsets: []corev1.EndpointSubset{
{
Ports: []corev1.EndpointPort{{
Name: "port1",
Port: 80,
}},
Addresses: []corev1.EndpointAddress{{
IP: "10.0.0.1",
}},
},
{
Ports: []corev1.EndpointPort{{
Name: "port2",
Port: 90,
}},
Addresses: []corev1.EndpointAddress{{
IP: "10.0.0.2",
}},
},
},
},
expectEndpointSlice: 2,
expectEndpointSliceManagedBy: "endpointslicemirroring-controller.k8s.io",
}, {
testName: "Service without Endpoints",
@ -148,7 +226,7 @@ func TestEndpointSliceMirroring(t *testing.T) {
},
},
customEndpoints: nil,
expectEndpointSlice: true,
expectEndpointSlice: 1,
expectEndpointSliceManagedBy: "endpointslice-controller.k8s.io",
}, {
testName: "Endpoints without Service",
@ -166,7 +244,7 @@ func TestEndpointSliceMirroring(t *testing.T) {
}},
}},
},
expectEndpointSlice: false,
expectEndpointSlice: 0,
}}
for i, tc := range testCases {
@ -201,13 +279,13 @@ func TestEndpointSliceMirroring(t *testing.T) {
return false, err
}
if tc.expectEndpointSlice {
if len(esList.Items) == 0 {
if tc.expectEndpointSlice > 0 {
if len(esList.Items) < tc.expectEndpointSlice {
t.Logf("Waiting for EndpointSlice to be created")
return false, nil
}
if len(esList.Items) > 1 {
return false, fmt.Errorf("Only expected 1 EndpointSlice, got %d", len(esList.Items))
if len(esList.Items) != tc.expectEndpointSlice {
return false, fmt.Errorf("Only expected %d EndpointSlice, got %d", tc.expectEndpointSlice, len(esList.Items))
}
endpointSlice := esList.Items[0]
if tc.expectEndpointSliceManagedBy != "" {