diff --git a/pkg/controller/endpointslicemirroring/BUILD b/pkg/controller/endpointslicemirroring/BUILD index ade8a8bfef6..831f8ab3472 100644 --- a/pkg/controller/endpointslicemirroring/BUILD +++ b/pkg/controller/endpointslicemirroring/BUILD @@ -64,6 +64,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/rand:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", diff --git a/pkg/controller/endpointslicemirroring/endpointslice_tracker.go b/pkg/controller/endpointslicemirroring/endpointslice_tracker.go index 17449202d3c..c16df7c10b8 100644 --- a/pkg/controller/endpointslicemirroring/endpointslice_tracker.go +++ b/pkg/controller/endpointslicemirroring/endpointslice_tracker.go @@ -45,61 +45,76 @@ func newEndpointSliceTracker() *endpointSliceTracker { } } -// has returns true if the endpointSliceTracker has a resource version for the +// Has returns true if the endpointSliceTracker has a resource version for the // provided EndpointSlice. -func (est *endpointSliceTracker) has(endpointSlice *discovery.EndpointSlice) bool { +func (est *endpointSliceTracker) Has(endpointSlice *discovery.EndpointSlice) bool { est.lock.Lock() defer est.lock.Unlock() - rrv := est.relatedResourceVersions(endpointSlice) - _, ok := rrv[endpointSlice.Name] + rrv, ok := est.relatedResourceVersions(endpointSlice) + if !ok { + return false + } + _, ok = rrv[endpointSlice.Name] return ok } -// stale returns true if this endpointSliceTracker does not have a resource +// Stale returns true if this endpointSliceTracker does not have a resource // version for the provided EndpointSlice or it does not match the resource // version of the provided EndpointSlice. -func (est *endpointSliceTracker) stale(endpointSlice *discovery.EndpointSlice) bool { +func (est *endpointSliceTracker) Stale(endpointSlice *discovery.EndpointSlice) bool { est.lock.Lock() defer est.lock.Unlock() - rrv := est.relatedResourceVersions(endpointSlice) + rrv, ok := est.relatedResourceVersions(endpointSlice) + if !ok { + return true + } return rrv[endpointSlice.Name] != endpointSlice.ResourceVersion } -// update adds or updates the resource version in this endpointSliceTracker for +// Update adds or updates the resource version in this endpointSliceTracker for // the provided EndpointSlice. -func (est *endpointSliceTracker) update(endpointSlice *discovery.EndpointSlice) { +func (est *endpointSliceTracker) Update(endpointSlice *discovery.EndpointSlice) { est.lock.Lock() defer est.lock.Unlock() - rrv := est.relatedResourceVersions(endpointSlice) + rrv, ok := est.relatedResourceVersions(endpointSlice) + if !ok { + rrv = endpointSliceResourceVersions{} + est.resourceVersionsByService[getServiceNN(endpointSlice)] = rrv + } rrv[endpointSlice.Name] = endpointSlice.ResourceVersion } -// delete removes the resource version in this endpointSliceTracker for the -// provided EndpointSlice. -func (est *endpointSliceTracker) delete(endpointSlice *discovery.EndpointSlice) { +// DeleteService removes the set of resource versions tracked for the Service. +func (est *endpointSliceTracker) DeleteService(namespace, name string) { est.lock.Lock() defer est.lock.Unlock() - rrv := est.relatedResourceVersions(endpointSlice) - delete(rrv, endpointSlice.Name) + serviceNN := types.NamespacedName{Name: name, Namespace: namespace} + delete(est.resourceVersionsByService, serviceNN) +} + +// Delete removes the resource version in this endpointSliceTracker for the +// provided EndpointSlice. +func (est *endpointSliceTracker) Delete(endpointSlice *discovery.EndpointSlice) { + est.lock.Lock() + defer est.lock.Unlock() + + rrv, ok := est.relatedResourceVersions(endpointSlice) + if ok { + delete(rrv, endpointSlice.Name) + } } // relatedResourceVersions returns the set of resource versions tracked for the -// Service corresponding to the provided EndpointSlice. If no resource versions -// are currently tracked for this service, an empty set is initialized. -func (est *endpointSliceTracker) relatedResourceVersions(endpointSlice *discovery.EndpointSlice) endpointSliceResourceVersions { +// Service corresponding to the provided EndpointSlice, and a bool to indicate +// if it exists. +func (est *endpointSliceTracker) relatedResourceVersions(endpointSlice *discovery.EndpointSlice) (endpointSliceResourceVersions, bool) { serviceNN := getServiceNN(endpointSlice) vers, ok := est.resourceVersionsByService[serviceNN] - - if !ok { - vers = endpointSliceResourceVersions{} - est.resourceVersionsByService[serviceNN] = vers - } - - return vers + return vers, ok } // getServiceNN returns a namespaced name for the Service corresponding to the diff --git a/pkg/controller/endpointslicemirroring/endpointslice_tracker_test.go b/pkg/controller/endpointslicemirroring/endpointslice_tracker_test.go index 1a5f7fbe9d1..12d56b66348 100644 --- a/pkg/controller/endpointslicemirroring/endpointslice_tracker_test.go +++ b/pkg/controller/endpointslicemirroring/endpointslice_tracker_test.go @@ -19,8 +19,11 @@ package endpointslicemirroring import ( "testing" + "github.com/stretchr/testify/assert" + discovery "k8s.io/api/discovery/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" ) func TestEndpointSliceTrackerUpdate(t *testing.T) { @@ -43,47 +46,69 @@ func TestEndpointSliceTrackerUpdate(t *testing.T) { epSlice1DifferentRV.ResourceVersion = "rv2" testCases := map[string]struct { - updateParam *discovery.EndpointSlice - checksParam *discovery.EndpointSlice - expectHas bool - expectStale bool + updateParam *discovery.EndpointSlice + checksParam *discovery.EndpointSlice + expectHas bool + expectStale bool + expectResourceVersionsByService map[types.NamespacedName]endpointSliceResourceVersions }{ "same slice": { updateParam: epSlice1, checksParam: epSlice1, expectHas: true, expectStale: false, + expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{ + {Namespace: epSlice1.Namespace, Name: "svc1"}: { + epSlice1.Name: epSlice1.ResourceVersion, + }, + }, }, "different namespace": { updateParam: epSlice1, checksParam: epSlice1DifferentNS, expectHas: false, expectStale: true, + expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{ + {Namespace: epSlice1.Namespace, Name: "svc1"}: { + epSlice1.Name: epSlice1.ResourceVersion, + }, + }, }, "different service": { updateParam: epSlice1, checksParam: epSlice1DifferentService, expectHas: false, expectStale: true, + expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{ + {Namespace: epSlice1.Namespace, Name: "svc1"}: { + epSlice1.Name: epSlice1.ResourceVersion, + }, + }, }, "different resource version": { updateParam: epSlice1, checksParam: epSlice1DifferentRV, expectHas: true, expectStale: true, + expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{ + {Namespace: epSlice1.Namespace, Name: "svc1"}: { + epSlice1.Name: epSlice1.ResourceVersion, + }, + }, }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { esTracker := newEndpointSliceTracker() - esTracker.update(tc.updateParam) - if esTracker.has(tc.checksParam) != tc.expectHas { - t.Errorf("tc.tracker.has(%+v) == %t, expected %t", tc.checksParam, esTracker.has(tc.checksParam), tc.expectHas) + esTracker.Update(tc.updateParam) + if esTracker.Has(tc.checksParam) != tc.expectHas { + t.Errorf("tc.tracker.Has(%+v) == %t, expected %t", tc.checksParam, esTracker.Has(tc.checksParam), tc.expectHas) } - if esTracker.stale(tc.checksParam) != tc.expectStale { - t.Errorf("tc.tracker.stale(%+v) == %t, expected %t", tc.checksParam, esTracker.stale(tc.checksParam), tc.expectStale) + if esTracker.Stale(tc.checksParam) != tc.expectStale { + t.Errorf("tc.tracker.Stale(%+v) == %t, expected %t", tc.checksParam, esTracker.Stale(tc.checksParam), tc.expectStale) } + assert.Equal(t, tc.expectResourceVersionsByService, esTracker.resourceVersionsByService) }) } } @@ -160,15 +185,81 @@ func TestEndpointSliceTrackerDelete(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { esTracker := newEndpointSliceTracker() - esTracker.update(epSlice1) + esTracker.Update(epSlice1) - esTracker.delete(tc.deleteParam) - if esTracker.has(tc.checksParam) != tc.expectHas { - t.Errorf("esTracker.has(%+v) == %t, expected %t", tc.checksParam, esTracker.has(tc.checksParam), tc.expectHas) + esTracker.Delete(tc.deleteParam) + if esTracker.Has(tc.checksParam) != tc.expectHas { + t.Errorf("esTracker.Has(%+v) == %t, expected %t", tc.checksParam, esTracker.Has(tc.checksParam), tc.expectHas) } - if esTracker.stale(tc.checksParam) != tc.expectStale { - t.Errorf("esTracker.stale(%+v) == %t, expected %t", tc.checksParam, esTracker.stale(tc.checksParam), tc.expectStale) + if esTracker.Stale(tc.checksParam) != tc.expectStale { + t.Errorf("esTracker.Stale(%+v) == %t, expected %t", tc.checksParam, esTracker.Stale(tc.checksParam), tc.expectStale) } }) } } + +func TestEndpointSliceTrackerDeleteService(t *testing.T) { + svcName1, svcNS1 := "svc1", "ns1" + svcName2, svcNS2 := "svc2", "ns2" + epSlice1 := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "example-1", + Namespace: svcNS1, + ResourceVersion: "rv1", + Labels: map[string]string{discovery.LabelServiceName: svcName1}, + }, + } + + testCases := map[string]struct { + updateParam *discovery.EndpointSlice + deleteServiceParam *types.NamespacedName + expectHas bool + expectStale bool + expectResourceVersionsByService map[types.NamespacedName]endpointSliceResourceVersions + }{ + "same service": { + updateParam: epSlice1, + deleteServiceParam: &types.NamespacedName{Namespace: svcNS1, Name: svcName1}, + expectHas: false, + expectStale: true, + expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{}, + }, + "different namespace": { + updateParam: epSlice1, + deleteServiceParam: &types.NamespacedName{Namespace: svcNS2, Name: svcName1}, + expectHas: true, + expectStale: false, + expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{ + {Namespace: epSlice1.Namespace, Name: "svc1"}: { + epSlice1.Name: epSlice1.ResourceVersion, + }, + }, + }, + "different service": { + updateParam: epSlice1, + deleteServiceParam: &types.NamespacedName{Namespace: svcNS1, Name: svcName2}, + expectHas: true, + expectStale: false, + expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{ + {Namespace: epSlice1.Namespace, Name: "svc1"}: { + epSlice1.Name: epSlice1.ResourceVersion, + }, + }, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + esTracker := newEndpointSliceTracker() + esTracker.Update(tc.updateParam) + esTracker.DeleteService(tc.deleteServiceParam.Namespace, tc.deleteServiceParam.Name) + if esTracker.Has(tc.updateParam) != tc.expectHas { + t.Errorf("tc.tracker.Has(%+v) == %t, expected %t", tc.updateParam, esTracker.Has(tc.updateParam), tc.expectHas) + } + if esTracker.Stale(tc.updateParam) != tc.expectStale { + t.Errorf("tc.tracker.Stale(%+v) == %t, expected %t", tc.updateParam, esTracker.Stale(tc.updateParam), tc.expectStale) + } + assert.Equal(t, tc.expectResourceVersionsByService, esTracker.resourceVersionsByService) + }) + } +} diff --git a/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go b/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go index bc05ddaacdb..4d9221e8bb1 100644 --- a/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go +++ b/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go @@ -285,6 +285,7 @@ func (c *Controller) syncEndpoints(key string) error { endpoints, err := c.endpointsLister.Endpoints(namespace).Get(name) if err != nil || !c.shouldMirror(endpoints) { if apierrors.IsNotFound(err) || !c.shouldMirror(endpoints) { + c.endpointSliceTracker.DeleteService(namespace, name) return c.reconciler.deleteEndpoints(namespace, name, endpointSlices) } return err @@ -389,7 +390,7 @@ func (c *Controller) onEndpointSliceAdd(obj interface{}) { utilruntime.HandleError(fmt.Errorf("onEndpointSliceAdd() expected type discovery.EndpointSlice, got %T", obj)) return } - if managedByController(endpointSlice) && c.endpointSliceTracker.stale(endpointSlice) { + if managedByController(endpointSlice) && c.endpointSliceTracker.Stale(endpointSlice) { c.queueEndpointsForEndpointSlice(endpointSlice) } } @@ -405,7 +406,7 @@ func (c *Controller) onEndpointSliceUpdate(prevObj, obj interface{}) { utilruntime.HandleError(fmt.Errorf("onEndpointSliceUpdated() expected type discovery.EndpointSlice, got %T, %T", prevObj, obj)) return } - if managedByChanged(prevEndpointSlice, endpointSlice) || (managedByController(endpointSlice) && c.endpointSliceTracker.stale(endpointSlice)) { + if managedByChanged(prevEndpointSlice, endpointSlice) || (managedByController(endpointSlice) && c.endpointSliceTracker.Stale(endpointSlice)) { c.queueEndpointsForEndpointSlice(endpointSlice) } } @@ -419,7 +420,7 @@ func (c *Controller) onEndpointSliceDelete(obj interface{}) { utilruntime.HandleError(fmt.Errorf("onEndpointSliceDelete() expected type discovery.EndpointSlice, got %T", obj)) return } - if managedByController(endpointSlice) && c.endpointSliceTracker.has(endpointSlice) { + if managedByController(endpointSlice) && c.endpointSliceTracker.Has(endpointSlice) { c.queueEndpointsForEndpointSlice(endpointSlice) } } diff --git a/pkg/controller/endpointslicemirroring/reconciler.go b/pkg/controller/endpointslicemirroring/reconciler.go index 3aafa42c567..00debd967c4 100644 --- a/pkg/controller/endpointslicemirroring/reconciler.go +++ b/pkg/controller/endpointslicemirroring/reconciler.go @@ -246,7 +246,7 @@ func (r *reconciler) finalize(endpoints *corev1.Endpoints, slices slicesByAction } errs = append(errs, fmt.Errorf("Error creating EndpointSlice for Endpoints %s/%s: %v", endpoints.Namespace, endpoints.Name, err)) } else { - r.endpointSliceTracker.update(createdSlice) + r.endpointSliceTracker.Update(createdSlice) metrics.EndpointSliceChanges.WithLabelValues("create").Inc() } } @@ -257,7 +257,7 @@ func (r *reconciler) finalize(endpoints *corev1.Endpoints, slices slicesByAction if err != nil { errs = append(errs, fmt.Errorf("Error updating %s EndpointSlice for Endpoints %s/%s: %v", endpointSlice.Name, endpoints.Namespace, endpoints.Name, err)) } else { - r.endpointSliceTracker.update(updatedSlice) + r.endpointSliceTracker.Update(updatedSlice) metrics.EndpointSliceChanges.WithLabelValues("update").Inc() } } @@ -267,7 +267,7 @@ func (r *reconciler) finalize(endpoints *corev1.Endpoints, slices slicesByAction if err != nil { errs = append(errs, fmt.Errorf("Error deleting %s EndpointSlice for Endpoints %s/%s: %v", endpointSlice.Name, endpoints.Namespace, endpoints.Name, err)) } else { - r.endpointSliceTracker.delete(endpointSlice) + r.endpointSliceTracker.Delete(endpointSlice) metrics.EndpointSliceChanges.WithLabelValues("delete").Inc() } }