From 794f0cb7f1fdd2ac9b3faad743a12cf7b03e04d6 Mon Sep 17 00:00:00 2001 From: Rob Scott Date: Thu, 28 Oct 2021 23:29:32 -0700 Subject: [PATCH] Fixing how EndpointSlice Mirroring handles Service selector transitions --- .../endpointslicemirroring_controller.go | 11 +- .../endpointslicemirroring_test.go | 145 ++++++++++++++++++ 2 files changed, 150 insertions(+), 6 deletions(-) diff --git a/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go b/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go index fa20c1c856a..11664d98614 100644 --- a/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go +++ b/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go @@ -304,12 +304,11 @@ func (c *Controller) syncEndpoints(key string) error { return err } - // This means that if a Service transitions away from a nil selector, any - // mirrored EndpointSlices will not be cleaned up. #91072 tracks this issue - // for this controller along with the Endpoints and EndpointSlice - // controllers. + // If a selector is specified, clean up any mirrored slices. if svc.Spec.Selector != nil { - return nil + klog.V(4).Infof("%s/%s Service now has selector, cleaning up any mirrored EndpointSlices", namespace, name) + c.endpointSliceTracker.DeleteService(namespace, name) + return c.deleteMirroredSlices(namespace, name) } endpointSlices, err := endpointSlicesMirroredForService(c.endpointSliceLister, namespace, name) @@ -372,7 +371,7 @@ func (c *Controller) onServiceUpdate(prevObj, obj interface{}) { service := obj.(*v1.Service) prevService := prevObj.(*v1.Service) if service == nil || prevService == nil { - utilruntime.HandleError(fmt.Errorf("onServiceUpdate() expected type v1.Endpoints, got %T, %T", prevObj, obj)) + utilruntime.HandleError(fmt.Errorf("onServiceUpdate() expected type v1.Service, got %T, %T", prevObj, obj)) return } if (service.Spec.Selector == nil) != (prevService.Spec.Selector == nil) { diff --git a/test/integration/endpointslice/endpointslicemirroring_test.go b/test/integration/endpointslice/endpointslicemirroring_test.go index fde3b2a8860..a139a688aac 100644 --- a/test/integration/endpointslice/endpointslicemirroring_test.go +++ b/test/integration/endpointslice/endpointslicemirroring_test.go @@ -30,6 +30,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" "k8s.io/kubernetes/pkg/controller/endpoint" @@ -408,6 +409,150 @@ func TestEndpointSliceMirroringUpdates(t *testing.T) { } } +func TestEndpointSliceMirroringSelectorTransition(t *testing.T) { + controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() + _, server, closeFn := framework.RunAnAPIServer(controlPlaneConfig) + defer closeFn() + + config := restclient.Config{Host: server.URL} + client, err := clientset.NewForConfig(&config) + if err != nil { + t.Fatalf("Error creating clientset: %v", err) + } + + resyncPeriod := 12 * time.Hour + informers := informers.NewSharedInformerFactory(client, resyncPeriod) + + epsmController := endpointslicemirroring.NewController( + informers.Core().V1().Endpoints(), + informers.Discovery().V1().EndpointSlices(), + informers.Core().V1().Services(), + int32(100), + client, + 1*time.Second) + + // Start informer and controllers + stopCh := make(chan struct{}) + defer close(stopCh) + informers.Start(stopCh) + go epsmController.Run(1, stopCh) + + testCases := []struct { + testName string + startingSelector map[string]string + startingMirroredSlices int + endingSelector map[string]string + endingMirroredSlices int + }{ + { + testName: "nil -> {foo: bar} selector", + startingSelector: nil, + startingMirroredSlices: 1, + endingSelector: map[string]string{"foo": "bar"}, + endingMirroredSlices: 0, + }, + { + testName: "{foo: bar} -> nil selector", + startingSelector: map[string]string{"foo": "bar"}, + startingMirroredSlices: 0, + endingSelector: nil, + endingMirroredSlices: 1, + }, + { + testName: "{} -> {foo: bar} selector", + startingSelector: map[string]string{}, + startingMirroredSlices: 1, + endingSelector: map[string]string{"foo": "bar"}, + endingMirroredSlices: 0, + }, + { + testName: "{foo: bar} -> {} selector", + startingSelector: map[string]string{"foo": "bar"}, + startingMirroredSlices: 0, + endingSelector: map[string]string{}, + endingMirroredSlices: 1, + }, + } + + for i, tc := range testCases { + t.Run(tc.testName, func(t *testing.T) { + ns := framework.CreateTestingNamespace(fmt.Sprintf("test-endpointslice-mirroring-%d", i), server, t) + defer framework.DeleteTestingNamespace(ns, server, t) + meta := metav1.ObjectMeta{Name: "test-123", Namespace: ns.Name} + + service := &corev1.Service{ + ObjectMeta: meta, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{{ + Port: int32(80), + }}, + Selector: tc.startingSelector, + }, + } + + customEndpoints := &corev1.Endpoints{ + ObjectMeta: meta, + Subsets: []corev1.EndpointSubset{{ + Ports: []corev1.EndpointPort{{ + Port: 80, + }}, + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.0.1", + }}, + }}, + } + + _, err = client.CoreV1().Services(ns.Name).Create(context.TODO(), service, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Error creating service: %v", err) + } + + _, err = client.CoreV1().Endpoints(ns.Name).Create(context.TODO(), customEndpoints, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Error creating endpoints: %v", err) + } + + // verify the expected number of mirrored slices exist + err = waitForMirroredSlices(t, client, ns.Name, service.Name, tc.startingMirroredSlices) + if err != nil { + t.Fatalf("Timed out waiting for initial mirrored slices to match expectations: %v", err) + } + + service.Spec.Selector = tc.endingSelector + _, err = client.CoreV1().Services(ns.Name).Update(context.TODO(), service, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Error updating service: %v", err) + } + + // verify the expected number of mirrored slices exist + err = waitForMirroredSlices(t, client, ns.Name, service.Name, tc.endingMirroredSlices) + if err != nil { + t.Fatalf("Timed out waiting for final mirrored slices to match expectations: %v", err) + } + }) + } +} + +func waitForMirroredSlices(t *testing.T, client *kubernetes.Clientset, nsName, svcName string, num int) error { + t.Helper() + return wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) { + lSelector := discovery.LabelServiceName + "=" + svcName + lSelector += "," + discovery.LabelManagedBy + "=endpointslicemirroring-controller.k8s.io" + esList, err := client.DiscoveryV1().EndpointSlices(nsName).List(context.TODO(), metav1.ListOptions{LabelSelector: lSelector}) + if err != nil { + t.Logf("Error listing EndpointSlices: %v", err) + return false, err + } + + if len(esList.Items) != num { + t.Logf("Expected %d slices to be mirrored, got %d", num, len(esList.Items)) + return false, nil + } + + return true, nil + }) +} + // isSubset check if all the elements in a exist in b func isSubset(a, b map[string]string) bool { if len(a) > len(b) {