Merge pull request #114155 from aojea/mirroring_repack

endpointslicemirroring handle endpoints with multiple subsets
This commit is contained in:
Kubernetes Prow Robot 2022-12-10 07:53:42 -08:00 committed by GitHub
commit 2118bc8aec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 325 additions and 11 deletions

View File

@ -29,6 +29,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints"
"k8s.io/kubernetes/pkg/controller/endpointslicemirroring/metrics"
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
endpointsliceutil "k8s.io/kubernetes/pkg/controller/util/endpointslice"
@ -68,7 +69,9 @@ func (r *reconciler) reconcile(endpoints *corev1.Endpoints, existingSlices []*di
numInvalidAddresses := 0
addressesSkipped := 0
for _, subset := range endpoints.Subsets {
// canonicalize the Endpoints subsets before processing them
subsets := endpointsv1.RepackSubsets(endpoints.Subsets)
for _, subset := range subsets {
multiKey := d.initPorts(subset.Ports)
totalAddresses := len(subset.Addresses) + len(subset.NotReadyAddresses)

View File

@ -28,6 +28,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"k8s.io/component-base/metrics/testutil"
endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints"
"k8s.io/kubernetes/pkg/controller/endpointslicemirroring/metrics"
endpointsliceutil "k8s.io/kubernetes/pkg/controller/util/endpointslice"
"k8s.io/utils/pointer"
@ -90,6 +91,102 @@ func TestReconcile(t *testing.T) {
expectedNumSlices: 1,
expectedClientActions: 1,
expectedMetrics: &expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 1, addedPerSync: 1, numCreated: 1},
}, {
testName: "Endpoints with 2 subset, different port and address",
subsets: []corev1.EndpointSubset{
{
Ports: []corev1.EndpointPort{{
Name: "http",
Port: 80,
Protocol: corev1.ProtocolTCP,
}},
Addresses: []corev1.EndpointAddress{{
IP: "10.0.0.1",
Hostname: "pod-1",
NodeName: pointer.String("node-1"),
}},
},
{
Ports: []corev1.EndpointPort{{
Name: "https",
Port: 443,
Protocol: corev1.ProtocolTCP,
}},
Addresses: []corev1.EndpointAddress{{
IP: "10.0.0.2",
Hostname: "pod-2",
NodeName: pointer.String("node-1"),
}},
},
},
existingEndpointSlices: []*discovery.EndpointSlice{},
expectedNumSlices: 2,
expectedClientActions: 2,
expectedMetrics: &expectedMetrics{desiredSlices: 2, actualSlices: 2, desiredEndpoints: 2, addedPerSync: 2, numCreated: 2},
}, {
testName: "Endpoints with 2 subset, different port and same address",
subsets: []corev1.EndpointSubset{
{
Ports: []corev1.EndpointPort{{
Name: "http",
Port: 80,
Protocol: corev1.ProtocolTCP,
}},
Addresses: []corev1.EndpointAddress{{
IP: "10.0.0.1",
Hostname: "pod-1",
NodeName: pointer.String("node-1"),
}},
},
{
Ports: []corev1.EndpointPort{{
Name: "https",
Port: 443,
Protocol: corev1.ProtocolTCP,
}},
Addresses: []corev1.EndpointAddress{{
IP: "10.0.0.1",
Hostname: "pod-1",
NodeName: pointer.String("node-1"),
}},
},
},
existingEndpointSlices: []*discovery.EndpointSlice{},
expectedNumSlices: 1,
expectedClientActions: 1,
expectedMetrics: &expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 1, addedPerSync: 1, numCreated: 1},
}, {
testName: "Endpoints with 2 subset, different address and same port",
subsets: []corev1.EndpointSubset{
{
Ports: []corev1.EndpointPort{{
Name: "http",
Port: 80,
Protocol: corev1.ProtocolTCP,
}},
Addresses: []corev1.EndpointAddress{{
IP: "10.0.0.1",
Hostname: "pod-1",
NodeName: pointer.String("node-1"),
}},
},
{
Ports: []corev1.EndpointPort{{
Name: "http",
Port: 80,
Protocol: corev1.ProtocolTCP,
}},
Addresses: []corev1.EndpointAddress{{
IP: "10.0.0.2",
Hostname: "pod-2",
NodeName: pointer.String("node-1"),
}},
},
},
existingEndpointSlices: []*discovery.EndpointSlice{},
expectedNumSlices: 1,
expectedClientActions: 1,
expectedMetrics: &expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 2, addedPerSync: 2, numCreated: 1},
}, {
testName: "Endpoints with 1 subset, port, and address, pending deletion",
subsets: []corev1.EndpointSubset{{
@ -1015,7 +1112,10 @@ func expectEndpointSlices(t *testing.T, num, maxEndpointsPerSubset int, endpoint
}
}
for _, epSubset := range endpoints.Subsets {
// canonicalize endpoints to match the expected endpoints, otherwise the test
// that creates more endpoints than allowed fail becaused the list of final
// endpoints doesn't match.
for _, epSubset := range endpointsv1.RepackSubsets(endpoints.Subsets) {
if len(epSubset.Addresses) == 0 && len(epSubset.NotReadyAddresses) == 0 {
continue
}

View File

@ -19,6 +19,7 @@ package network
import (
"context"
"fmt"
"net"
"time"
"github.com/onsi/ginkgo/v2"
@ -28,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
"k8s.io/kubernetes/test/e2e/network/common"
admissionapi "k8s.io/pod-security-admission/api"
)
@ -199,4 +201,135 @@ var _ = common.SIGDescribe("EndpointSliceMirroring", func() {
}
})
})
ginkgo.It("should mirror a custom Endpoint with multiple subsets and same IP address", func() {
ns := f.Namespace.Name
svc := createServiceReportErr(cs, f.Namespace.Name, &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "example-custom-endpoints",
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{
{
Name: "port80",
Port: 80,
Protocol: v1.ProtocolTCP,
},
{
Name: "port81",
Port: 81,
Protocol: v1.ProtocolTCP,
},
},
},
})
// Add a backend pod to the service in the other node
port8080 := []v1.ContainerPort{
{
ContainerPort: 8090,
Protocol: v1.ProtocolTCP,
},
}
port9090 := []v1.ContainerPort{
{
ContainerPort: 9090,
Protocol: v1.ProtocolTCP,
},
}
serverPod := e2epod.NewAgnhostPodFromContainers(
"", "pod-handle-http-request", nil,
e2epod.NewAgnhostContainer("container-handle-8090-request", nil, port8080, "netexec", "--http-port", "8090", "--udp-port", "-1"),
e2epod.NewAgnhostContainer("container-handle-9090-request", nil, port9090, "netexec", "--http-port", "9090", "--udp-port", "-1"),
)
pod := e2epod.NewPodClient(f).CreateSync(serverPod)
if pod.Status.PodIP == "" {
framework.Failf("PodIP not assigned for pod %s", pod.Name)
}
// create custom endpoints
endpoints := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: svc.Name,
},
Subsets: []v1.EndpointSubset{
{
Ports: []v1.EndpointPort{{
Name: "port80",
Port: 8090,
}},
Addresses: []v1.EndpointAddress{{
IP: pod.Status.PodIP,
}},
},
{
Ports: []v1.EndpointPort{{
Name: "port81",
Port: 9090,
}},
Addresses: []v1.EndpointAddress{{
IP: pod.Status.PodIP,
}},
},
},
}
ginkgo.By("mirroring a new custom Endpoint", func() {
_, err := cs.CoreV1().Endpoints(f.Namespace.Name).Create(context.TODO(), endpoints, metav1.CreateOptions{})
framework.ExpectNoError(err, "Unexpected error creating Endpoints")
if err := wait.PollImmediate(2*time.Second, 12*time.Second, func() (bool, error) {
esList, err := cs.DiscoveryV1().EndpointSlices(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{
LabelSelector: discoveryv1.LabelServiceName + "=" + svc.Name,
})
if err != nil {
framework.Logf("Error listing EndpointSlices: %v", err)
return false, nil
}
if len(esList.Items) == 0 {
framework.Logf("Waiting for at least 1 EndpointSlice to exist, got %d", len(esList.Items))
return false, nil
}
return true, nil
}); err != nil {
framework.Failf("Did not find matching EndpointSlice for %s/%s: %s", svc.Namespace, svc.Name, err)
}
})
// connect to the service must work
ginkgo.By("Creating a pause pods that will try to connect to the webservers")
pausePod := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil)
e2epod.NewPodClient(f).CreateSync(pausePod)
dest1 := net.JoinHostPort(svc.Spec.ClusterIP, "80")
dest2 := net.JoinHostPort(svc.Spec.ClusterIP, "81")
execHostnameTest(*pausePod, dest1, pod.Name)
execHostnameTest(*pausePod, dest2, pod.Name)
// delete custom endpoints and wait until the endpoint slices are deleted too
ginkgo.By("mirroring deletion of a custom Endpoint", func() {
err := cs.CoreV1().Endpoints(f.Namespace.Name).Delete(context.TODO(), endpoints.Name, metav1.DeleteOptions{})
framework.ExpectNoError(err, "Unexpected error deleting Endpoints")
// Expect mirrored EndpointSlice resource to be updated.
if err := wait.PollImmediate(2*time.Second, 12*time.Second, func() (bool, error) {
esList, err := cs.DiscoveryV1().EndpointSlices(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{
LabelSelector: discoveryv1.LabelServiceName + "=" + svc.Name,
})
if err != nil {
return false, err
}
if len(esList.Items) != 0 {
framework.Logf("Waiting for 0 EndpointSlices to exist, got %d", len(esList.Items))
return false, nil
}
return true, nil
}); err != nil {
framework.Failf("Did not find matching EndpointSlice for %s/%s: %s", svc.Namespace, svc.Name, err)
}
})
})
})

View File

@ -86,7 +86,7 @@ func TestEndpointSliceMirroring(t *testing.T) {
testName string
service *corev1.Service
customEndpoints *corev1.Endpoints
expectEndpointSlice bool
expectEndpointSlice int
expectEndpointSliceManagedBy string
}{{
testName: "Service with selector",
@ -103,7 +103,7 @@ func TestEndpointSliceMirroring(t *testing.T) {
},
},
},
expectEndpointSlice: true,
expectEndpointSlice: 1,
expectEndpointSliceManagedBy: "endpointslice-controller.k8s.io",
}, {
testName: "Service without selector",
@ -130,7 +130,85 @@ func TestEndpointSliceMirroring(t *testing.T) {
}},
}},
},
expectEndpointSlice: true,
expectEndpointSlice: 1,
expectEndpointSliceManagedBy: "endpointslicemirroring-controller.k8s.io",
}, {
testName: "Service without selector Endpoint multiple subsets and same address",
service: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test-123",
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{{
Port: int32(80),
}},
},
},
customEndpoints: &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "test-123",
},
Subsets: []corev1.EndpointSubset{
{
Ports: []corev1.EndpointPort{{
Name: "port1",
Port: 80,
}},
Addresses: []corev1.EndpointAddress{{
IP: "10.0.0.1",
}},
},
{
Ports: []corev1.EndpointPort{{
Name: "port2",
Port: 90,
}},
Addresses: []corev1.EndpointAddress{{
IP: "10.0.0.1",
}},
},
},
},
expectEndpointSlice: 1,
expectEndpointSliceManagedBy: "endpointslicemirroring-controller.k8s.io",
}, {
testName: "Service without selector Endpoint multiple subsets",
service: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test-123",
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{{
Port: int32(80),
}},
},
},
customEndpoints: &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "test-123",
},
Subsets: []corev1.EndpointSubset{
{
Ports: []corev1.EndpointPort{{
Name: "port1",
Port: 80,
}},
Addresses: []corev1.EndpointAddress{{
IP: "10.0.0.1",
}},
},
{
Ports: []corev1.EndpointPort{{
Name: "port2",
Port: 90,
}},
Addresses: []corev1.EndpointAddress{{
IP: "10.0.0.2",
}},
},
},
},
expectEndpointSlice: 2,
expectEndpointSliceManagedBy: "endpointslicemirroring-controller.k8s.io",
}, {
testName: "Service without Endpoints",
@ -148,7 +226,7 @@ func TestEndpointSliceMirroring(t *testing.T) {
},
},
customEndpoints: nil,
expectEndpointSlice: true,
expectEndpointSlice: 1,
expectEndpointSliceManagedBy: "endpointslice-controller.k8s.io",
}, {
testName: "Endpoints without Service",
@ -166,7 +244,7 @@ func TestEndpointSliceMirroring(t *testing.T) {
}},
}},
},
expectEndpointSlice: false,
expectEndpointSlice: 0,
}}
for i, tc := range testCases {
@ -201,13 +279,13 @@ func TestEndpointSliceMirroring(t *testing.T) {
return false, err
}
if tc.expectEndpointSlice {
if len(esList.Items) == 0 {
if tc.expectEndpointSlice > 0 {
if len(esList.Items) < tc.expectEndpointSlice {
t.Logf("Waiting for EndpointSlice to be created")
return false, nil
}
if len(esList.Items) > 1 {
return false, fmt.Errorf("Only expected 1 EndpointSlice, got %d", len(esList.Items))
if len(esList.Items) != tc.expectEndpointSlice {
return false, fmt.Errorf("Only expected %d EndpointSlice, got %d", tc.expectEndpointSlice, len(esList.Items))
}
endpointSlice := esList.Items[0]
if tc.expectEndpointSliceManagedBy != "" {