From 3c804502d7cfcbb0cd9adb6e2f6fc0256f566f3a Mon Sep 17 00:00:00 2001 From: Rob Scott Date: Fri, 21 Aug 2020 16:31:57 -0700 Subject: [PATCH] Updating EndpointSliceMirroring controller to listen for Service changes This fixes a bug that could occur if a custom Endpoints resource was created before a Service was created. --- .../endpointslicemirroring_controller.go | 113 +++++++++++++---- .../endpointslicemirroring_controller_test.go | 119 ++++++++---------- .../endpointslicemirroring/utils.go | 29 ++++- .../endpointslicemirroring_test.go | 8 ++ 4 files changed, 169 insertions(+), 100 deletions(-) diff --git a/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go b/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go index 4d9221e8bb1..61f912ceead 100644 --- a/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go +++ b/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go @@ -25,7 +25,6 @@ import ( v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -121,6 +120,11 @@ func NewController(endpointsInformer coreinformers.EndpointsInformer, c.serviceLister = serviceInformer.Lister() c.servicesSynced = serviceInformer.Informer().HasSynced + serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.onServiceAdd, + UpdateFunc: c.onServiceUpdate, + DeleteFunc: c.onServiceDelete, + }) c.maxEndpointsPerSubset = maxEndpointsPerSubset @@ -273,28 +277,47 @@ func (c *Controller) syncEndpoints(key string) error { return err } - endpointSlices, err := endpointSlicesMirroredForService(c.endpointSliceLister, namespace, name) - + endpoints, err := c.endpointsLister.Endpoints(namespace).Get(name) if err != nil { - ep := &v1.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}} - c.eventRecorder.Eventf(ep, FailedToListEndpointSlices, - "Error listing EndpointSlices for Endpoints %s/%s: %v", ep.Namespace, ep.Name, err) + if apierrors.IsNotFound(err) { + klog.V(4).Infof("%s/%s Endpoints not found, cleaning up any mirrored EndpointSlices", namespace, name) + c.endpointSliceTracker.DeleteService(namespace, name) + return c.deleteMirroredSlices(namespace, name) + } return err } - endpoints, err := c.endpointsLister.Endpoints(namespace).Get(name) - if err != nil || !c.shouldMirror(endpoints) { - if apierrors.IsNotFound(err) || !c.shouldMirror(endpoints) { + if !c.shouldMirror(endpoints) { + klog.V(4).Infof("%s/%s Endpoints should not be mirrored, cleaning up any mirrored EndpointSlices", namespace, name) + c.endpointSliceTracker.DeleteService(namespace, name) + return c.deleteMirroredSlices(namespace, name) + } + + svc, err := c.serviceLister.Services(namespace).Get(name) + if err != nil { + if apierrors.IsNotFound(err) { + klog.V(4).Infof("%s/%s Service not found, cleaning up any mirrored EndpointSlices", namespace, name) c.endpointSliceTracker.DeleteService(namespace, name) - return c.reconciler.deleteEndpoints(namespace, name, endpointSlices) + return c.deleteMirroredSlices(namespace, name) } return err } + // This means that if a Service transitions away from a nil selector, any + // mirrored EndpointSlices will not be cleaned up. #91072 tracks this issue + // for this controller along with the Endpoints and EndpointSlice + // controllers. + if svc.Spec.Selector != nil { + return nil + } + + endpointSlices, err := endpointSlicesMirroredForService(c.endpointSliceLister, namespace, name) + if err != nil { + return err + } + err = c.reconciler.reconcile(endpoints, endpointSlices) if err != nil { - c.eventRecorder.Eventf(endpoints, v1.EventTypeWarning, FailedToUpdateEndpointSlices, - "Error updating EndpointSlices for Endpoints %s/%s: %v", endpoints.Namespace, endpoints.Name, err) return err } @@ -314,30 +337,56 @@ func (c *Controller) queueEndpoints(obj interface{}) { // shouldMirror returns true if an Endpoints resource should be mirrored by this // controller. This will be false if: +// - the Endpoints resource is nil. // - the Endpoints resource has a skip-mirror label. // - the Endpoints resource has a leader election annotation. -// - the corresponding Service resource does not exist. -// - the corresponding Service resource has a non-nil selector. +// This does not ensure that a corresponding Service exists with a nil selector. +// That check should be performed separately. func (c *Controller) shouldMirror(endpoints *v1.Endpoints) bool { if endpoints == nil || skipMirror(endpoints.Labels) || hasLeaderElection(endpoints.Annotations) { return false } - svc, err := c.serviceLister.Services(endpoints.Namespace).Get(endpoints.Name) - if err != nil { - if !apierrors.IsNotFound(err) { - klog.Errorf("Error fetching %s/%s Service: %v", endpoints.Namespace, endpoints.Name, err) - } - return false - } - - if svc.Spec.Selector != nil { - return false - } - return true } +// onServiceAdd queues a sync for the relevant Endpoints resource. +func (c *Controller) onServiceAdd(obj interface{}) { + service := obj.(*v1.Service) + if service == nil { + utilruntime.HandleError(fmt.Errorf("onServiceAdd() expected type v1.Service, got %T", obj)) + return + } + if service.Spec.Selector == nil { + c.queueEndpoints(obj) + } +} + +// onServiceUpdate queues a sync for the relevant Endpoints resource. +func (c *Controller) onServiceUpdate(prevObj, obj interface{}) { + service := obj.(*v1.Service) + prevService := prevObj.(*v1.Service) + if service == nil || prevService == nil { + utilruntime.HandleError(fmt.Errorf("onServiceUpdate() expected type v1.Endpoints, got %T, %T", prevObj, obj)) + return + } + if (service.Spec.Selector == nil) != (prevService.Spec.Selector == nil) { + c.queueEndpoints(obj) + } +} + +// onServiceDelete queues a sync for the relevant Endpoints resource. +func (c *Controller) onServiceDelete(obj interface{}) { + service := getServiceFromDeleteAction(obj) + if service == nil { + utilruntime.HandleError(fmt.Errorf("onServiceDelete() expected type v1.Service, got %T", obj)) + return + } + if service.Spec.Selector == nil { + c.queueEndpoints(obj) + } +} + // onEndpointsAdd queues a sync for the relevant Endpoints resource. func (c *Controller) onEndpointsAdd(obj interface{}) { endpoints := obj.(*v1.Endpoints) @@ -437,6 +486,18 @@ func (c *Controller) queueEndpointsForEndpointSlice(endpointSlice *discovery.End c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod) } +// deleteMirroredSlices will delete and EndpointSlices that have been mirrored +// for Endpoints with this namespace and name. +func (c *Controller) deleteMirroredSlices(namespace, name string) error { + endpointSlices, err := endpointSlicesMirroredForService(c.endpointSliceLister, namespace, name) + if err != nil { + return err + } + + c.endpointSliceTracker.DeleteService(namespace, name) + return c.reconciler.deleteEndpoints(namespace, name, endpointSlices) +} + // endpointSlicesMirroredForService returns the EndpointSlices that have been // mirrored for a Service by this controller. func endpointSlicesMirroredForService(endpointSliceLister discoverylisters.EndpointSliceLister, namespace, name string) ([]*discovery.EndpointSlice, error) { diff --git a/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller_test.go b/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller_test.go index 83b0d8faa1c..925549dbe9b 100644 --- a/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller_test.go +++ b/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller_test.go @@ -74,12 +74,14 @@ func TestSyncEndpoints(t *testing.T) { testCases := []struct { testName string + service *v1.Service endpoints *v1.Endpoints endpointSlices []*discovery.EndpointSlice expectedNumActions int expectedNumSlices int }{{ testName: "Endpoints with no addresses", + service: &v1.Service{}, endpoints: &v1.Endpoints{ Subsets: []v1.EndpointSubset{{ Ports: []v1.EndpointPort{{Port: 80}}, @@ -90,6 +92,7 @@ func TestSyncEndpoints(t *testing.T) { expectedNumSlices: 0, }, { testName: "Endpoints with skip label true", + service: &v1.Service{}, endpoints: &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{discovery.LabelSkipMirror: "true"}, @@ -104,6 +107,7 @@ func TestSyncEndpoints(t *testing.T) { expectedNumSlices: 0, }, { testName: "Endpoints with skip label false", + service: &v1.Service{}, endpoints: &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{discovery.LabelSkipMirror: "false"}, @@ -116,8 +120,37 @@ func TestSyncEndpoints(t *testing.T) { endpointSlices: []*discovery.EndpointSlice{}, expectedNumActions: 1, expectedNumSlices: 1, + }, { + testName: "Endpoints with missing Service", + service: nil, + endpoints: &v1.Endpoints{ + Subsets: []v1.EndpointSubset{{ + Ports: []v1.EndpointPort{{Port: 80}}, + Addresses: []v1.EndpointAddress{{IP: "10.0.0.1"}}, + }}, + }, + endpointSlices: []*discovery.EndpointSlice{}, + expectedNumActions: 0, + expectedNumSlices: 0, + }, { + testName: "Endpoints with Service with selector specified", + service: &v1.Service{ + Spec: v1.ServiceSpec{ + Selector: map[string]string{"foo": "bar"}, + }, + }, + endpoints: &v1.Endpoints{ + Subsets: []v1.EndpointSubset{{ + Ports: []v1.EndpointPort{{Port: 80}}, + Addresses: []v1.EndpointAddress{{IP: "10.0.0.1"}}, + }}, + }, + endpointSlices: []*discovery.EndpointSlice{}, + expectedNumActions: 0, + expectedNumSlices: 0, }, { testName: "Existing EndpointSlices that need to be cleaned up", + service: &v1.Service{}, endpoints: &v1.Endpoints{ Subsets: []v1.EndpointSubset{{ Ports: []v1.EndpointPort{{Port: 80}}, @@ -136,6 +169,7 @@ func TestSyncEndpoints(t *testing.T) { expectedNumSlices: 0, }, { testName: "Existing EndpointSlices managed by a different controller, no addresses to sync", + service: &v1.Service{}, endpoints: &v1.Endpoints{ Subsets: []v1.EndpointSubset{{ Ports: []v1.EndpointPort{{Port: 80}}, @@ -154,6 +188,7 @@ func TestSyncEndpoints(t *testing.T) { expectedNumSlices: 0, }, { testName: "Endpoints with 1000 addresses", + service: &v1.Service{}, endpoints: &v1.Endpoints{ Subsets: []v1.EndpointSubset{{ Ports: []v1.EndpointPort{{Port: 80}}, @@ -165,6 +200,7 @@ func TestSyncEndpoints(t *testing.T) { expectedNumSlices: 1, }, { testName: "Endpoints with 1001 addresses - 1 should not be mirrored", + service: &v1.Service{}, endpoints: &v1.Endpoints{ Subsets: []v1.EndpointSubset{{ Ports: []v1.EndpointPort{{Port: 80}}, @@ -182,10 +218,11 @@ func TestSyncEndpoints(t *testing.T) { tc.endpoints.Name = endpointsName tc.endpoints.Namespace = namespace esController.endpointsStore.Add(tc.endpoints) - esController.serviceStore.Add(&v1.Service{ObjectMeta: metav1.ObjectMeta{ - Name: endpointsName, - Namespace: namespace, - }}) + if tc.service != nil { + tc.service.Name = endpointsName + tc.service.Namespace = namespace + esController.serviceStore.Add(tc.service) + } for _, epSlice := range tc.endpointSlices { epSlice.Namespace = namespace @@ -214,45 +251,23 @@ func TestSyncEndpoints(t *testing.T) { } func TestShouldMirror(t *testing.T) { - svcWithSelector := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "with-selector", - Namespace: "example1", - }, - Spec: v1.ServiceSpec{ - Selector: map[string]string{"with": "selector"}, - }, - } - svcWithoutSelector := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "without-selector", - Namespace: "example1", - }, - Spec: v1.ServiceSpec{}, - } - testCases := []struct { testName string endpoints *v1.Endpoints - service *v1.Service shouldMirror bool }{{ - testName: "Service without selector with matching endpoints", - service: svcWithoutSelector, + testName: "Standard Endpoints", endpoints: &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ - Name: svcWithoutSelector.Name, - Namespace: svcWithoutSelector.Namespace, + Name: "test-endpoints", }, }, shouldMirror: true, }, { - testName: "Service without selector, matching Endpoints with skip-mirror=true", - service: svcWithoutSelector, + testName: "Endpoints with skip-mirror=true", endpoints: &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ - Name: svcWithSelector.Name, - Namespace: svcWithSelector.Namespace, + Name: "test-endpoints", Labels: map[string]string{ discovery.LabelSkipMirror: "true", }, @@ -260,12 +275,10 @@ func TestShouldMirror(t *testing.T) { }, shouldMirror: false, }, { - testName: "Service without selector, matching Endpoints with skip-mirror=invalid", - service: svcWithoutSelector, + testName: "Endpoints with skip-mirror=invalid", endpoints: &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ - Name: svcWithoutSelector.Name, - Namespace: svcWithoutSelector.Namespace, + Name: "test-endpoints", Labels: map[string]string{ discovery.LabelSkipMirror: "invalid", }, @@ -273,43 +286,16 @@ func TestShouldMirror(t *testing.T) { }, shouldMirror: true, }, { - testName: "Service without selector, matching Endpoints with leader election annotation", - service: svcWithoutSelector, + testName: "Endpoints with leader election annotation", endpoints: &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ - Name: svcWithSelector.Name, - Namespace: svcWithSelector.Namespace, + Name: "test-endpoints", Annotations: map[string]string{ resourcelock.LeaderElectionRecordAnnotationKey: "", }, }, }, shouldMirror: false, - }, { - testName: "Service without selector, matching Endpoints without skip label in different namespace", - service: svcWithSelector, - endpoints: &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: svcWithSelector.Name, - Namespace: svcWithSelector.Namespace + "different", - }, - }, - shouldMirror: false, - }, { - testName: "Service without selector or matching endpoints", - service: svcWithoutSelector, - endpoints: nil, - shouldMirror: false, - }, { - testName: "Endpoints without matching Service", - service: nil, - endpoints: &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: svcWithoutSelector.Name, - Namespace: svcWithoutSelector.Namespace, - }, - }, - shouldMirror: false, }} for _, tc := range testCases { @@ -323,13 +309,6 @@ func TestShouldMirror(t *testing.T) { } } - if tc.service != nil { - err := c.serviceStore.Add(tc.service) - if err != nil { - t.Fatalf("Error adding Service to store: %v", err) - } - } - shouldMirror := c.shouldMirror(tc.endpoints) if shouldMirror != tc.shouldMirror { diff --git a/pkg/controller/endpointslicemirroring/utils.go b/pkg/controller/endpointslicemirroring/utils.go index aa1d860da04..ca4d8f1a22f 100644 --- a/pkg/controller/endpointslicemirroring/utils.go +++ b/pkg/controller/endpointslicemirroring/utils.go @@ -185,14 +185,35 @@ func objectRefPtrEqual(ref1, ref2 *corev1.ObjectReference) bool { return true } +// getServiceFromDeleteAction parses a Service resource from a delete +// action. +func getServiceFromDeleteAction(obj interface{}) *corev1.Service { + if service, ok := obj.(*corev1.Service); ok { + return service + } + // If we reached here it means the Service was deleted but its final state + // is unrecorded. + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) + return nil + } + service, ok := tombstone.Obj.(*corev1.Service) + if !ok { + utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Service resource: %#v", obj)) + return nil + } + return service +} + // getEndpointsFromDeleteAction parses an Endpoints resource from a delete // action. func getEndpointsFromDeleteAction(obj interface{}) *corev1.Endpoints { - if endpointSlice, ok := obj.(*corev1.Endpoints); ok { - return endpointSlice + if endpoints, ok := obj.(*corev1.Endpoints); ok { + return endpoints } - // If we reached here it means the EndpointSlice was deleted but its final - // state is unrecorded. + // If we reached here it means the Endpoints resource was deleted but its + // final state is unrecorded. tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) diff --git a/test/integration/endpointslice/endpointslicemirroring_test.go b/test/integration/endpointslice/endpointslicemirroring_test.go index 76367146d20..2f8816bd421 100644 --- a/test/integration/endpointslice/endpointslicemirroring_test.go +++ b/test/integration/endpointslice/endpointslicemirroring_test.go @@ -156,6 +156,14 @@ func TestEndpointSliceMirroring(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "test-123", }, + Subsets: []corev1.EndpointSubset{{ + Ports: []corev1.EndpointPort{{ + Port: 80, + }}, + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.0.1", + }}, + }}, }, expectEndpointSlice: false, }}