From e1542606c275129ebcb9ffdb67696f8cc9b638d6 Mon Sep 17 00:00:00 2001 From: Rob Scott Date: Tue, 28 Jul 2020 18:14:40 -0700 Subject: [PATCH] Updating EndpointSlice controller to wait for cache to be updated This updates the EndpointSlice controller to make use of the EndpointSlice tracker to identify when expected changes are not present in the cache yet. If this is detected, the controller will wait to sync until all expected updates have been received. This should help avoid race conditions that would result in duplicate EndpointSlices or failed attempts to update stale EndpointSlices. To simplify this logic, this also moves the EndpointSlice tracker from relying on resource versions to generations. --- .../endpointslice/endpointslice_controller.go | 25 +- .../endpointslice_controller_test.go | 75 ++++ .../endpointslice/endpointslice_tracker.go | 132 +++++-- .../endpointslice_tracker_test.go | 366 ++++++++++++------ pkg/controller/endpointslice/errors.go | 30 ++ pkg/controller/endpointslice/reconciler.go | 4 +- .../endpointslice/reconciler_test.go | 21 +- pkg/controller/endpointslice/utils_test.go | 4 +- 8 files changed, 474 insertions(+), 183 deletions(-) create mode 100644 pkg/controller/endpointslice/errors.go diff --git a/pkg/controller/endpointslice/endpointslice_controller.go b/pkg/controller/endpointslice/endpointslice_controller.go index 4021e1b45ec..87b26e123f5 100644 --- a/pkg/controller/endpointslice/endpointslice_controller.go +++ b/pkg/controller/endpointslice/endpointslice_controller.go @@ -346,6 +346,10 @@ func (c *Controller) syncService(key string) error { return err } + if c.endpointSliceTracker.StaleSlices(service, endpointSlices) { + return &StaleInformerCache{"EndpointSlice informer cache is out of date"} + } + // We call ComputeEndpointLastChangeTriggerTime here to make sure that the // state of the trigger time tracker gets updated even if the sync turns out // to be no-op and we don't update the EndpointSlice objects. @@ -395,7 +399,7 @@ func (c *Controller) onEndpointSliceAdd(obj interface{}) { utilruntime.HandleError(fmt.Errorf("Invalid EndpointSlice provided to onEndpointSliceAdd()")) return } - if managedByController(endpointSlice) && c.endpointSliceTracker.Stale(endpointSlice) { + if managedByController(endpointSlice) && c.endpointSliceTracker.ShouldSync(endpointSlice) { c.queueServiceForEndpointSlice(endpointSlice) } } @@ -411,7 +415,18 @@ func (c *Controller) onEndpointSliceUpdate(prevObj, obj interface{}) { utilruntime.HandleError(fmt.Errorf("Invalid EndpointSlice provided to onEndpointSliceUpdate()")) return } - if managedByChanged(prevEndpointSlice, endpointSlice) || (managedByController(endpointSlice) && c.endpointSliceTracker.Stale(endpointSlice)) { + // EndpointSlice generation does not change when labels change. Although the + // controller will never change LabelServiceName, users might. This check + // ensures that we handle changes to this label. + svcName := endpointSlice.Labels[discovery.LabelServiceName] + prevSvcName := prevEndpointSlice.Labels[discovery.LabelServiceName] + if svcName != prevSvcName { + klog.Warningf("%s label changed from %s to %s for %s", discovery.LabelServiceName, prevSvcName, svcName, endpointSlice.Name) + c.queueServiceForEndpointSlice(endpointSlice) + c.queueServiceForEndpointSlice(prevEndpointSlice) + return + } + if managedByChanged(prevEndpointSlice, endpointSlice) || (managedByController(endpointSlice) && c.endpointSliceTracker.ShouldSync(endpointSlice)) { c.queueServiceForEndpointSlice(endpointSlice) } } @@ -422,7 +437,11 @@ func (c *Controller) onEndpointSliceUpdate(prevObj, obj interface{}) { func (c *Controller) onEndpointSliceDelete(obj interface{}) { endpointSlice := getEndpointSliceFromDeleteAction(obj) if endpointSlice != nil && managedByController(endpointSlice) && c.endpointSliceTracker.Has(endpointSlice) { - c.queueServiceForEndpointSlice(endpointSlice) + // This returns false if we didn't expect the EndpointSlice to be + // deleted. If that is the case, we queue the Service for another sync. + if !c.endpointSliceTracker.HandleDeletion(endpointSlice) { + c.queueServiceForEndpointSlice(endpointSlice) + } } } diff --git a/pkg/controller/endpointslice/endpointslice_controller_test.go b/pkg/controller/endpointslice/endpointslice_controller_test.go index 3ec5a303aad..819b4746906 100644 --- a/pkg/controller/endpointslice/endpointslice_controller_test.go +++ b/pkg/controller/endpointslice/endpointslice_controller_test.go @@ -1426,6 +1426,81 @@ func TestPodDeleteBatching(t *testing.T) { } } +func TestSyncServiceStaleInformer(t *testing.T) { + testcases := []struct { + name string + informerGenerationNumber int64 + trackerGenerationNumber int64 + expectError bool + }{ + { + name: "informer cache outdated", + informerGenerationNumber: 10, + trackerGenerationNumber: 12, + expectError: true, + }, + { + name: "cache and tracker synced", + informerGenerationNumber: 10, + trackerGenerationNumber: 10, + expectError: false, + }, + { + name: "tracker outdated", + informerGenerationNumber: 10, + trackerGenerationNumber: 1, + expectError: false, + }, + } + + for _, testcase := range testcases { + t.Run(testcase.name, func(t *testing.T) { + _, esController := newController([]string{"node-1"}, time.Duration(0)) + ns := metav1.NamespaceDefault + serviceName := "testing-1" + + // Store Service in the cache + esController.serviceStore.Add(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: ns}, + Spec: v1.ServiceSpec{ + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{{TargetPort: intstr.FromInt(80)}}, + }, + }) + + // Create EndpointSlice in the informer cache with informerGenerationNumber + epSlice1 := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "matching-1", + Namespace: ns, + Generation: testcase.informerGenerationNumber, + Labels: map[string]string{ + discovery.LabelServiceName: serviceName, + discovery.LabelManagedBy: controllerName, + }, + }, + AddressType: discovery.AddressTypeIPv4, + } + err := esController.endpointSliceStore.Add(epSlice1) + if err != nil { + t.Fatalf("Expected no error adding EndpointSlice: %v", err) + } + + // Create EndpointSlice in the tracker with trackerGenerationNumber + epSlice2 := epSlice1.DeepCopy() + epSlice2.Generation = testcase.trackerGenerationNumber + esController.endpointSliceTracker.Update(epSlice2) + + err = esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName)) + // Check if we got a StaleInformerCache error + if isStaleInformerCacheErr(err) != testcase.expectError { + t.Fatalf("Expected error because informer cache is outdated") + } + + }) + } +} + // Test helpers func addPods(t *testing.T, esController *endpointSliceController, namespace string, podsCount int) { t.Helper() diff --git a/pkg/controller/endpointslice/endpointslice_tracker.go b/pkg/controller/endpointslice/endpointslice_tracker.go index d404edd353a..16cd4bccae5 100644 --- a/pkg/controller/endpointslice/endpointslice_tracker.go +++ b/pkg/controller/endpointslice/endpointslice_tracker.go @@ -19,102 +19,154 @@ package endpointslice import ( "sync" + "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1beta1" "k8s.io/apimachinery/pkg/types" ) -// endpointSliceResourceVersions tracks expected EndpointSlice resource versions -// by EndpointSlice name. -type endpointSliceResourceVersions map[string]string +const ( + deletionExpected = -1 +) -// endpointSliceTracker tracks EndpointSlices and their associated resource -// versions to help determine if a change to an EndpointSlice has been processed -// by the EndpointSlice controller. +// generationsBySlice tracks expected EndpointSlice generations by EndpointSlice +// uid. A value of deletionExpected (-1) may be used here to indicate that we +// expect this EndpointSlice to be deleted. +type generationsBySlice map[types.UID]int64 + +// endpointSliceTracker tracks EndpointSlices and their associated generation to +// help determine if a change to an EndpointSlice has been processed by the +// EndpointSlice controller. type endpointSliceTracker struct { - // lock protects resourceVersionsByService. + // lock protects generationsByService. lock sync.Mutex - // resourceVersionsByService tracks the list of EndpointSlices and - // associated resource versions expected for a given Service. - resourceVersionsByService map[types.NamespacedName]endpointSliceResourceVersions + // generationsByService tracks the generations of EndpointSlices for each + // Service. + generationsByService map[types.NamespacedName]generationsBySlice } // newEndpointSliceTracker creates and initializes a new endpointSliceTracker. func newEndpointSliceTracker() *endpointSliceTracker { return &endpointSliceTracker{ - resourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{}, + generationsByService: map[types.NamespacedName]generationsBySlice{}, } } -// Has returns true if the endpointSliceTracker has a resource version for the +// Has returns true if the endpointSliceTracker has a generation for the // provided EndpointSlice. func (est *endpointSliceTracker) Has(endpointSlice *discovery.EndpointSlice) bool { est.lock.Lock() defer est.lock.Unlock() - rrv, ok := est.relatedResourceVersions(endpointSlice) + gfs, ok := est.generationsForSliceUnsafe(endpointSlice) if !ok { return false } - _, ok = rrv[endpointSlice.Name] + _, ok = gfs[endpointSlice.UID] return ok } -// Stale returns true if this endpointSliceTracker does not have a resource -// version for the provided EndpointSlice or it does not match the resource -// version of the provided EndpointSlice. -func (est *endpointSliceTracker) Stale(endpointSlice *discovery.EndpointSlice) bool { +// ShouldSync returns true if this endpointSliceTracker does not have a +// generation for the provided EndpointSlice or it is greater than the +// generation of the tracked EndpointSlice. +func (est *endpointSliceTracker) ShouldSync(endpointSlice *discovery.EndpointSlice) bool { est.lock.Lock() defer est.lock.Unlock() - rrv, ok := est.relatedResourceVersions(endpointSlice) + gfs, ok := est.generationsForSliceUnsafe(endpointSlice) if !ok { return true } - return rrv[endpointSlice.Name] != endpointSlice.ResourceVersion + g, ok := gfs[endpointSlice.UID] + return !ok || endpointSlice.Generation > g } -// Update adds or updates the resource version in this endpointSliceTracker for -// the provided EndpointSlice. +// StaleSlices returns true if one or more of the provided EndpointSlices +// have older generations than the corresponding tracked ones or if the tracker +// is expecting one or more of the provided EndpointSlices to be deleted. +func (est *endpointSliceTracker) StaleSlices(service *v1.Service, endpointSlices []*discovery.EndpointSlice) bool { + est.lock.Lock() + defer est.lock.Unlock() + + nn := types.NamespacedName{Name: service.Name, Namespace: service.Namespace} + gfs, ok := est.generationsByService[nn] + if !ok { + return false + } + for _, endpointSlice := range endpointSlices { + g, ok := gfs[endpointSlice.UID] + if ok && (g == deletionExpected || g > endpointSlice.Generation) { + return true + } + } + return false +} + +// Update adds or updates the generation in this endpointSliceTracker for the +// provided EndpointSlice. func (est *endpointSliceTracker) Update(endpointSlice *discovery.EndpointSlice) { est.lock.Lock() defer est.lock.Unlock() - rrv, ok := est.relatedResourceVersions(endpointSlice) + gfs, ok := est.generationsForSliceUnsafe(endpointSlice) + if !ok { - rrv = endpointSliceResourceVersions{} - est.resourceVersionsByService[getServiceNN(endpointSlice)] = rrv + gfs = generationsBySlice{} + est.generationsByService[getServiceNN(endpointSlice)] = gfs } - rrv[endpointSlice.Name] = endpointSlice.ResourceVersion + gfs[endpointSlice.UID] = endpointSlice.Generation } -// DeleteService removes the set of resource versions tracked for the Service. +// DeleteService removes the set of generations tracked for the Service. func (est *endpointSliceTracker) DeleteService(namespace, name string) { est.lock.Lock() defer est.lock.Unlock() serviceNN := types.NamespacedName{Name: name, Namespace: namespace} - delete(est.resourceVersionsByService, serviceNN) + delete(est.generationsByService, serviceNN) } -// Delete removes the resource version in this endpointSliceTracker for the -// provided EndpointSlice. -func (est *endpointSliceTracker) Delete(endpointSlice *discovery.EndpointSlice) { +// ExpectDeletion sets the generation to deletionExpected in this +// endpointSliceTracker for the provided EndpointSlice. +func (est *endpointSliceTracker) ExpectDeletion(endpointSlice *discovery.EndpointSlice) { est.lock.Lock() defer est.lock.Unlock() - rrv, ok := est.relatedResourceVersions(endpointSlice) - if ok { - delete(rrv, endpointSlice.Name) + gfs, ok := est.generationsForSliceUnsafe(endpointSlice) + + if !ok { + gfs = generationsBySlice{} + est.generationsByService[getServiceNN(endpointSlice)] = gfs } + gfs[endpointSlice.UID] = deletionExpected } -// relatedResourceVersions returns the set of resource versions tracked for the -// Service corresponding to the provided EndpointSlice, and a bool to indicate -// if it exists. -func (est *endpointSliceTracker) relatedResourceVersions(endpointSlice *discovery.EndpointSlice) (endpointSliceResourceVersions, bool) { +// HandleDeletion removes the generation in this endpointSliceTracker for the +// provided EndpointSlice. This returns true if the tracker expected this +// EndpointSlice to be deleted and false if not. +func (est *endpointSliceTracker) HandleDeletion(endpointSlice *discovery.EndpointSlice) bool { + est.lock.Lock() + defer est.lock.Unlock() + + gfs, ok := est.generationsForSliceUnsafe(endpointSlice) + + if ok { + g, ok := gfs[endpointSlice.UID] + delete(gfs, endpointSlice.UID) + if ok && g != deletionExpected { + return false + } + } + + return true +} + +// generationsForSliceUnsafe returns the generations for the Service +// corresponding to the provided EndpointSlice, and a bool to indicate if it +// exists. A lock must be applied before calling this function. +func (est *endpointSliceTracker) generationsForSliceUnsafe(endpointSlice *discovery.EndpointSlice) (generationsBySlice, bool) { serviceNN := getServiceNN(endpointSlice) - vers, ok := est.resourceVersionsByService[serviceNN] - return vers, ok + generations, ok := est.generationsByService[serviceNN] + return generations, ok } // getServiceNN returns a namespaced name for the Service corresponding to the diff --git a/pkg/controller/endpointslice/endpointslice_tracker_test.go b/pkg/controller/endpointslice/endpointslice_tracker_test.go index 500fa847556..2aa56d3809f 100644 --- a/pkg/controller/endpointslice/endpointslice_tracker_test.go +++ b/pkg/controller/endpointslice/endpointslice_tracker_test.go @@ -19,8 +19,7 @@ package endpointslice import ( "testing" - "github.com/stretchr/testify/assert" - + "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -29,72 +28,59 @@ import ( func TestEndpointSliceTrackerUpdate(t *testing.T) { epSlice1 := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ - Name: "example-1", - Namespace: "ns1", - ResourceVersion: "rv1", - Labels: map[string]string{discovery.LabelServiceName: "svc1"}, + Name: "example-1", + Namespace: "ns1", + UID: "original", + Generation: 1, + Labels: map[string]string{discovery.LabelServiceName: "svc1"}, }, } epSlice1DifferentNS := epSlice1.DeepCopy() epSlice1DifferentNS.Namespace = "ns2" + epSlice1DifferentNS.UID = "diff-ns" epSlice1DifferentService := epSlice1.DeepCopy() epSlice1DifferentService.Labels[discovery.LabelServiceName] = "svc2" + epSlice1DifferentService.UID = "diff-svc" - epSlice1DifferentRV := epSlice1.DeepCopy() - epSlice1DifferentRV.ResourceVersion = "rv2" + epSlice1NewerGen := epSlice1.DeepCopy() + epSlice1NewerGen.Generation = 2 testCases := map[string]struct { - updateParam *discovery.EndpointSlice - checksParam *discovery.EndpointSlice - expectHas bool - expectStale bool - expectResourceVersionsByService map[types.NamespacedName]endpointSliceResourceVersions + updateParam *discovery.EndpointSlice + checksParam *discovery.EndpointSlice + expectHas bool + expectShouldSync bool + expectGeneration int64 }{ "same slice": { - updateParam: epSlice1, - checksParam: epSlice1, - expectHas: true, - expectStale: false, - expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{ - {Namespace: epSlice1.Namespace, Name: "svc1"}: { - epSlice1.Name: epSlice1.ResourceVersion, - }, - }, + updateParam: epSlice1, + checksParam: epSlice1, + expectHas: true, + expectShouldSync: false, + expectGeneration: epSlice1.Generation, }, "different namespace": { - updateParam: epSlice1, - checksParam: epSlice1DifferentNS, - expectHas: false, - expectStale: true, - expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{ - {Namespace: epSlice1.Namespace, Name: "svc1"}: { - epSlice1.Name: epSlice1.ResourceVersion, - }, - }, + updateParam: epSlice1, + checksParam: epSlice1DifferentNS, + expectHas: false, + expectShouldSync: true, + expectGeneration: epSlice1.Generation, }, "different service": { - updateParam: epSlice1, - checksParam: epSlice1DifferentService, - expectHas: false, - expectStale: true, - expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{ - {Namespace: epSlice1.Namespace, Name: "svc1"}: { - epSlice1.Name: epSlice1.ResourceVersion, - }, - }, + updateParam: epSlice1, + checksParam: epSlice1DifferentService, + expectHas: false, + expectShouldSync: true, + expectGeneration: epSlice1.Generation, }, - "different resource version": { - updateParam: epSlice1, - checksParam: epSlice1DifferentRV, - expectHas: true, - expectStale: true, - expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{ - {Namespace: epSlice1.Namespace, Name: "svc1"}: { - epSlice1.Name: epSlice1.ResourceVersion, - }, - }, + "newer generation": { + updateParam: epSlice1, + checksParam: epSlice1NewerGen, + expectHas: true, + expectShouldSync: true, + expectGeneration: epSlice1.Generation, }, } @@ -105,80 +91,195 @@ func TestEndpointSliceTrackerUpdate(t *testing.T) { if esTracker.Has(tc.checksParam) != tc.expectHas { t.Errorf("tc.tracker.Has(%+v) == %t, expected %t", tc.checksParam, esTracker.Has(tc.checksParam), tc.expectHas) } - if esTracker.Stale(tc.checksParam) != tc.expectStale { - t.Errorf("tc.tracker.Stale(%+v) == %t, expected %t", tc.checksParam, esTracker.Stale(tc.checksParam), tc.expectStale) + if esTracker.ShouldSync(tc.checksParam) != tc.expectShouldSync { + t.Errorf("tc.tracker.ShouldSync(%+v) == %t, expected %t", tc.checksParam, esTracker.ShouldSync(tc.checksParam), tc.expectShouldSync) + } + serviceNN := types.NamespacedName{Namespace: epSlice1.Namespace, Name: "svc1"} + gfs, ok := esTracker.generationsByService[serviceNN] + if !ok { + t.Fatalf("expected tracker to have generations for %s Service", serviceNN.Name) + } + generation, ok := gfs[epSlice1.UID] + if !ok { + t.Fatalf("expected tracker to have generation for %s EndpointSlice", epSlice1.Name) + } + if tc.expectGeneration != generation { + t.Fatalf("expected generation to be %d, got %d", tc.expectGeneration, generation) } - assert.Equal(t, tc.expectResourceVersionsByService, esTracker.resourceVersionsByService) }) } } -func TestEndpointSliceTrackerDelete(t *testing.T) { +func TestEndpointSliceTrackerStaleSlices(t *testing.T) { epSlice1 := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ - Name: "example-1", - Namespace: "ns1", - ResourceVersion: "rv1", - Labels: map[string]string{discovery.LabelServiceName: "svc1"}, + Name: "example-1", + Namespace: "ns1", + UID: "original", + Generation: 1, + Labels: map[string]string{discovery.LabelServiceName: "svc1"}, + }, + } + + epSlice1NewerGen := epSlice1.DeepCopy() + epSlice1NewerGen.Generation = 2 + + testCases := []struct { + name string + tracker *endpointSliceTracker + serviceParam *v1.Service + slicesParam []*discovery.EndpointSlice + expectNewer bool + }{{ + name: "empty tracker", + tracker: &endpointSliceTracker{ + generationsByService: map[types.NamespacedName]generationsBySlice{}, + }, + serviceParam: &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"}}, + slicesParam: []*discovery.EndpointSlice{}, + expectNewer: false, + }, { + name: "empty slices", + tracker: &endpointSliceTracker{ + generationsByService: map[types.NamespacedName]generationsBySlice{ + {Name: "svc1", Namespace: "ns1"}: {}, + }, + }, + serviceParam: &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"}}, + slicesParam: []*discovery.EndpointSlice{}, + expectNewer: false, + }, { + name: "matching slices", + tracker: &endpointSliceTracker{ + generationsByService: map[types.NamespacedName]generationsBySlice{ + {Name: "svc1", Namespace: "ns1"}: { + epSlice1.UID: epSlice1.Generation, + }, + }, + }, + serviceParam: &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"}}, + slicesParam: []*discovery.EndpointSlice{epSlice1}, + expectNewer: false, + }, { + name: "newer slice in tracker", + tracker: &endpointSliceTracker{ + generationsByService: map[types.NamespacedName]generationsBySlice{ + {Name: "svc1", Namespace: "ns1"}: { + epSlice1.UID: epSlice1NewerGen.Generation, + }, + }, + }, + serviceParam: &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"}}, + slicesParam: []*discovery.EndpointSlice{epSlice1}, + expectNewer: true, + }, { + name: "newer slice in params", + tracker: &endpointSliceTracker{ + generationsByService: map[types.NamespacedName]generationsBySlice{ + {Name: "svc1", Namespace: "ns1"}: { + epSlice1.UID: epSlice1.Generation, + }, + }, + }, + serviceParam: &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"}}, + slicesParam: []*discovery.EndpointSlice{epSlice1NewerGen}, + expectNewer: false, + }} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actualNewer := tc.tracker.StaleSlices(tc.serviceParam, tc.slicesParam) + if actualNewer != tc.expectNewer { + t.Errorf("Expected %t, got %t", tc.expectNewer, actualNewer) + } + }) + } +} +func TestEndpointSliceTrackerDeletion(t *testing.T) { + epSlice1 := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "example-1", + Namespace: "ns1", + UID: "original", + Generation: 1, + Labels: map[string]string{discovery.LabelServiceName: "svc1"}, }, } epSlice1DifferentNS := epSlice1.DeepCopy() epSlice1DifferentNS.Namespace = "ns2" + epSlice1DifferentNS.UID = "diff-ns" epSlice1DifferentService := epSlice1.DeepCopy() epSlice1DifferentService.Labels[discovery.LabelServiceName] = "svc2" + epSlice1DifferentService.UID = "diff-svc" - epSlice1DifferentRV := epSlice1.DeepCopy() - epSlice1DifferentRV.ResourceVersion = "rv2" + epSlice1NewerGen := epSlice1.DeepCopy() + epSlice1NewerGen.Generation = 2 testCases := map[string]struct { - deleteParam *discovery.EndpointSlice - checksParam *discovery.EndpointSlice - expectHas bool - expectStale bool + expectDeletionParam *discovery.EndpointSlice + checksParam *discovery.EndpointSlice + deleteParam *discovery.EndpointSlice + expectHas bool + expectShouldSync bool + expectedHandleDeletionResp bool }{ "same slice": { - deleteParam: epSlice1, - checksParam: epSlice1, - expectHas: false, - expectStale: true, + expectDeletionParam: epSlice1, + checksParam: epSlice1, + deleteParam: epSlice1, + expectHas: true, + expectShouldSync: true, + expectedHandleDeletionResp: true, }, "different namespace": { - deleteParam: epSlice1DifferentNS, - checksParam: epSlice1DifferentNS, - expectHas: false, - expectStale: true, + expectDeletionParam: epSlice1DifferentNS, + checksParam: epSlice1DifferentNS, + deleteParam: epSlice1DifferentNS, + expectHas: true, + expectShouldSync: true, + expectedHandleDeletionResp: false, }, "different namespace, check original ep slice": { - deleteParam: epSlice1DifferentNS, - checksParam: epSlice1, - expectHas: true, - expectStale: false, + expectDeletionParam: epSlice1DifferentNS, + checksParam: epSlice1, + deleteParam: epSlice1DifferentNS, + expectHas: true, + expectShouldSync: false, + expectedHandleDeletionResp: false, }, "different service": { - deleteParam: epSlice1DifferentService, - checksParam: epSlice1DifferentService, - expectHas: false, - expectStale: true, + expectDeletionParam: epSlice1DifferentService, + checksParam: epSlice1DifferentService, + deleteParam: epSlice1DifferentService, + expectHas: true, + expectShouldSync: true, + expectedHandleDeletionResp: false, }, - "different service, check original ep slice": { - deleteParam: epSlice1DifferentService, - checksParam: epSlice1, - expectHas: true, - expectStale: false, + "expectDelete different service, check original ep slice, delete original": { + expectDeletionParam: epSlice1DifferentService, + checksParam: epSlice1, + deleteParam: epSlice1, + expectHas: true, + expectShouldSync: false, + expectedHandleDeletionResp: false, }, - "different resource version": { - deleteParam: epSlice1DifferentRV, - checksParam: epSlice1DifferentRV, - expectHas: false, - expectStale: true, + "different generation": { + expectDeletionParam: epSlice1NewerGen, + checksParam: epSlice1NewerGen, + deleteParam: epSlice1NewerGen, + expectHas: true, + expectShouldSync: true, + expectedHandleDeletionResp: true, }, - "different resource version, check original ep slice": { - deleteParam: epSlice1DifferentRV, - checksParam: epSlice1, - expectHas: false, - expectStale: true, + "expectDelete different generation, check original ep slice, delete original": { + expectDeletionParam: epSlice1NewerGen, + checksParam: epSlice1, + deleteParam: epSlice1, + expectHas: true, + expectShouldSync: true, + expectedHandleDeletionResp: true, }, } @@ -187,13 +288,20 @@ func TestEndpointSliceTrackerDelete(t *testing.T) { esTracker := newEndpointSliceTracker() esTracker.Update(epSlice1) - esTracker.Delete(tc.deleteParam) + esTracker.ExpectDeletion(tc.expectDeletionParam) if esTracker.Has(tc.checksParam) != tc.expectHas { t.Errorf("esTracker.Has(%+v) == %t, expected %t", tc.checksParam, esTracker.Has(tc.checksParam), tc.expectHas) } - if esTracker.Stale(tc.checksParam) != tc.expectStale { - t.Errorf("esTracker.Stale(%+v) == %t, expected %t", tc.checksParam, esTracker.Stale(tc.checksParam), tc.expectStale) + if esTracker.ShouldSync(tc.checksParam) != tc.expectShouldSync { + t.Errorf("esTracker.ShouldSync(%+v) == %t, expected %t", tc.checksParam, esTracker.ShouldSync(tc.checksParam), tc.expectShouldSync) } + if esTracker.HandleDeletion(epSlice1) != tc.expectedHandleDeletionResp { + t.Errorf("esTracker.ShouldSync(%+v) == %t, expected %t", epSlice1, esTracker.HandleDeletion(epSlice1), tc.expectedHandleDeletionResp) + } + if esTracker.Has(epSlice1) != false { + t.Errorf("esTracker.Has(%+v) == %t, expected false", epSlice1, esTracker.Has(epSlice1)) + } + }) } } @@ -203,48 +311,39 @@ func TestEndpointSliceTrackerDeleteService(t *testing.T) { svcName2, svcNS2 := "svc2", "ns2" epSlice1 := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ - Name: "example-1", - Namespace: svcNS1, - ResourceVersion: "rv1", - Labels: map[string]string{discovery.LabelServiceName: svcName1}, + Name: "example-1", + Namespace: svcNS1, + Generation: 1, + Labels: map[string]string{discovery.LabelServiceName: svcName1}, }, } testCases := map[string]struct { - updateParam *discovery.EndpointSlice - deleteServiceParam *types.NamespacedName - expectHas bool - expectStale bool - expectResourceVersionsByService map[types.NamespacedName]endpointSliceResourceVersions + updateParam *discovery.EndpointSlice + deleteServiceParam *types.NamespacedName + expectHas bool + expectShouldSync bool + expectGeneration int64 }{ "same service": { - updateParam: epSlice1, - deleteServiceParam: &types.NamespacedName{Namespace: svcNS1, Name: svcName1}, - expectHas: false, - expectStale: true, - expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{}, + updateParam: epSlice1, + deleteServiceParam: &types.NamespacedName{Namespace: svcNS1, Name: svcName1}, + expectHas: false, + expectShouldSync: true, }, "different namespace": { updateParam: epSlice1, deleteServiceParam: &types.NamespacedName{Namespace: svcNS2, Name: svcName1}, expectHas: true, - expectStale: false, - expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{ - {Namespace: epSlice1.Namespace, Name: "svc1"}: { - epSlice1.Name: epSlice1.ResourceVersion, - }, - }, + expectShouldSync: false, + expectGeneration: epSlice1.Generation, }, "different service": { updateParam: epSlice1, deleteServiceParam: &types.NamespacedName{Namespace: svcNS1, Name: svcName2}, expectHas: true, - expectStale: false, - expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{ - {Namespace: epSlice1.Namespace, Name: "svc1"}: { - epSlice1.Name: epSlice1.ResourceVersion, - }, - }, + expectShouldSync: false, + expectGeneration: epSlice1.Generation, }, } @@ -256,10 +355,23 @@ func TestEndpointSliceTrackerDeleteService(t *testing.T) { if esTracker.Has(tc.updateParam) != tc.expectHas { t.Errorf("tc.tracker.Has(%+v) == %t, expected %t", tc.updateParam, esTracker.Has(tc.updateParam), tc.expectHas) } - if esTracker.Stale(tc.updateParam) != tc.expectStale { - t.Errorf("tc.tracker.Stale(%+v) == %t, expected %t", tc.updateParam, esTracker.Stale(tc.updateParam), tc.expectStale) + if esTracker.ShouldSync(tc.updateParam) != tc.expectShouldSync { + t.Errorf("tc.tracker.ShouldSync(%+v) == %t, expected %t", tc.updateParam, esTracker.ShouldSync(tc.updateParam), tc.expectShouldSync) + } + if tc.expectGeneration != 0 { + serviceNN := types.NamespacedName{Namespace: epSlice1.Namespace, Name: "svc1"} + gfs, ok := esTracker.generationsByService[serviceNN] + if !ok { + t.Fatalf("expected tracker to have status for %s Service", serviceNN.Name) + } + generation, ok := gfs[epSlice1.UID] + if !ok { + t.Fatalf("expected tracker to have generation for %s EndpointSlice", epSlice1.Name) + } + if tc.expectGeneration != generation { + t.Fatalf("expected generation to be %d, got %d", tc.expectGeneration, generation) + } } - assert.Equal(t, tc.expectResourceVersionsByService, esTracker.resourceVersionsByService) }) } } diff --git a/pkg/controller/endpointslice/errors.go b/pkg/controller/endpointslice/errors.go new file mode 100644 index 00000000000..f7bcb20c673 --- /dev/null +++ b/pkg/controller/endpointslice/errors.go @@ -0,0 +1,30 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endpointslice + +// StaleInformerCache errors indicate that the informer cache includes out of +// date resources. +type StaleInformerCache struct { + msg string +} + +func (e *StaleInformerCache) Error() string { return e.msg } + +func isStaleInformerCacheErr(err error) bool { + _, ok := err.(*StaleInformerCache) + return ok +} diff --git a/pkg/controller/endpointslice/reconciler.go b/pkg/controller/endpointslice/reconciler.go index 1694844dd30..d6e8ff418e3 100644 --- a/pkg/controller/endpointslice/reconciler.go +++ b/pkg/controller/endpointslice/reconciler.go @@ -101,7 +101,7 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis if err != nil { errs = append(errs, fmt.Errorf("Error deleting %s EndpointSlice for Service %s/%s: %v", sliceToDelete.Name, service.Namespace, service.Name, err)) } else { - r.endpointSliceTracker.Delete(sliceToDelete) + r.endpointSliceTracker.ExpectDeletion(sliceToDelete) metrics.EndpointSliceChanges.WithLabelValues("delete").Inc() } } @@ -293,7 +293,7 @@ func (r *reconciler) finalize( if err != nil { return fmt.Errorf("failed to delete %s EndpointSlice for Service %s/%s: %v", endpointSlice.Name, service.Namespace, service.Name, err) } - r.endpointSliceTracker.Delete(endpointSlice) + r.endpointSliceTracker.ExpectDeletion(endpointSlice) metrics.EndpointSliceChanges.WithLabelValues("delete").Inc() } diff --git a/pkg/controller/endpointslice/reconciler_test.go b/pkg/controller/endpointslice/reconciler_test.go index 522881dda83..4851ac9e1c4 100644 --- a/pkg/controller/endpointslice/reconciler_test.go +++ b/pkg/controller/endpointslice/reconciler_test.go @@ -65,7 +65,7 @@ func TestReconcileEmpty(t *testing.T) { assert.Equal(t, svc.Name, slices[0].Labels[discovery.LabelServiceName]) assert.EqualValues(t, []discovery.EndpointPort{}, slices[0].Ports) assert.EqualValues(t, []discovery.Endpoint{}, slices[0].Endpoints) - expectTrackedResourceVersion(t, r.endpointSliceTracker, &slices[0], "100") + expectTrackedGeneration(t, r.endpointSliceTracker, &slices[0], 1) expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 1, numUpdated: 0, numDeleted: 0}) } @@ -473,7 +473,7 @@ func TestReconcile1Pod(t *testing.T) { t.Fatalf("Expected endpoint: %+v, got: %+v", expectedEndPointList[0], endpoint) } - expectTrackedResourceVersion(t, r.endpointSliceTracker, &slice, "100") + expectTrackedGeneration(t, r.endpointSliceTracker, &slice, 1) expectMetrics(t, expectedMetrics{ @@ -516,7 +516,7 @@ func TestReconcile1EndpointSlice(t *testing.T) { assert.Equal(t, svc.Name, slices[0].Labels[discovery.LabelServiceName]) assert.EqualValues(t, []discovery.EndpointPort{}, slices[0].Ports) assert.EqualValues(t, []discovery.Endpoint{}, slices[0].Endpoints) - expectTrackedResourceVersion(t, r.endpointSliceTracker, &slices[0], "200") + expectTrackedGeneration(t, r.endpointSliceTracker, &slices[0], 1) expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 0, numUpdated: 1, numDeleted: 0}) } @@ -1436,14 +1436,17 @@ func expectActions(t *testing.T, actions []k8stesting.Action, num int, verb, res } } -func expectTrackedResourceVersion(t *testing.T, tracker *endpointSliceTracker, slice *discovery.EndpointSlice, expectedRV string) { - rrv, _ := tracker.relatedResourceVersions(slice) - rv, tracked := rrv[slice.Name] - if !tracked { +func expectTrackedGeneration(t *testing.T, tracker *endpointSliceTracker, slice *discovery.EndpointSlice, expectedGeneration int64) { + gfs, ok := tracker.generationsForSliceUnsafe(slice) + if !ok { + t.Fatalf("Expected Service to be tracked for EndpointSlices %s", slice.Name) + } + generation, ok := gfs[slice.UID] + if !ok { t.Fatalf("Expected EndpointSlice %s to be tracked", slice.Name) } - if rv != expectedRV { - t.Errorf("Expected ResourceVersion of %s to be %s, got %s", slice.Name, expectedRV, rv) + if generation != expectedGeneration { + t.Errorf("Expected Generation of %s to be %d, got %d", slice.Name, expectedGeneration, generation) } } diff --git a/pkg/controller/endpointslice/utils_test.go b/pkg/controller/endpointslice/utils_test.go index 5c61dc3946f..c38cbaa5055 100644 --- a/pkg/controller/endpointslice/utils_test.go +++ b/pkg/controller/endpointslice/utils_test.go @@ -1001,13 +1001,13 @@ func newClientset() *fake.Clientset { endpointSlice.ObjectMeta.Name = fmt.Sprintf("%s-%s", endpointSlice.ObjectMeta.GenerateName, rand.String(8)) endpointSlice.ObjectMeta.GenerateName = "" } - endpointSlice.ObjectMeta.ResourceVersion = "100" + endpointSlice.Generation = 1 return false, endpointSlice, nil })) client.PrependReactor("update", "endpointslices", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) { endpointSlice := action.(k8stesting.CreateAction).GetObject().(*discovery.EndpointSlice) - endpointSlice.ObjectMeta.ResourceVersion = "200" + endpointSlice.Generation++ return false, endpointSlice, nil }))