Updating EndpointSliceMirroring controller to wait for cache to be updated

This matches the recent updates to the EndpointSliceTracker for the
EndpointSlice controller in #99345 that accomplished the same thing.
This commit is contained in:
Rob Scott 2021-03-03 19:41:54 -08:00
parent 4f9317596c
commit 06db357e06
No known key found for this signature in database
GPG Key ID: 90C19B2D4A99C91B
6 changed files with 381 additions and 173 deletions

View File

@ -19,102 +19,154 @@ package endpointslicemirroring
import (
"sync"
"k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
"k8s.io/apimachinery/pkg/types"
)
// endpointSliceResourceVersions tracks expected EndpointSlice resource versions
// by EndpointSlice name.
type endpointSliceResourceVersions map[string]string
const (
deletionExpected = -1
)
// endpointSliceTracker tracks EndpointSlices and their associated resource
// versions to help determine if a change to an EndpointSlice has been processed
// by the EndpointSlice controller.
// generationsBySlice tracks expected EndpointSlice generations by EndpointSlice
// uid. A value of deletionExpected (-1) may be used here to indicate that we
// expect this EndpointSlice to be deleted.
type generationsBySlice map[types.UID]int64
// endpointSliceTracker tracks EndpointSlices and their associated generation to
// help determine if a change to an EndpointSlice has been processed by the
// EndpointSlice controller.
type endpointSliceTracker struct {
// lock protects resourceVersionsByService.
// lock protects generationsByService.
lock sync.Mutex
// resourceVersionsByService tracks the list of EndpointSlices and
// associated resource versions expected for a given Service.
resourceVersionsByService map[types.NamespacedName]endpointSliceResourceVersions
// generationsByService tracks the generations of EndpointSlices for each
// Service.
generationsByService map[types.NamespacedName]generationsBySlice
}
// newEndpointSliceTracker creates and initializes a new endpointSliceTracker.
func newEndpointSliceTracker() *endpointSliceTracker {
return &endpointSliceTracker{
resourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{},
generationsByService: map[types.NamespacedName]generationsBySlice{},
}
}
// Has returns true if the endpointSliceTracker has a resource version for the
// Has returns true if the endpointSliceTracker has a generation for the
// provided EndpointSlice.
func (est *endpointSliceTracker) Has(endpointSlice *discovery.EndpointSlice) bool {
est.lock.Lock()
defer est.lock.Unlock()
rrv, ok := est.relatedResourceVersions(endpointSlice)
gfs, ok := est.generationsForSliceUnsafe(endpointSlice)
if !ok {
return false
}
_, ok = rrv[endpointSlice.Name]
_, ok = gfs[endpointSlice.UID]
return ok
}
// Stale returns true if this endpointSliceTracker does not have a resource
// version for the provided EndpointSlice or it does not match the resource
// version of the provided EndpointSlice.
func (est *endpointSliceTracker) Stale(endpointSlice *discovery.EndpointSlice) bool {
// ShouldSync returns true if this endpointSliceTracker does not have a
// generation for the provided EndpointSlice or it is greater than the
// generation of the tracked EndpointSlice.
func (est *endpointSliceTracker) ShouldSync(endpointSlice *discovery.EndpointSlice) bool {
est.lock.Lock()
defer est.lock.Unlock()
rrv, ok := est.relatedResourceVersions(endpointSlice)
gfs, ok := est.generationsForSliceUnsafe(endpointSlice)
if !ok {
return true
}
return rrv[endpointSlice.Name] != endpointSlice.ResourceVersion
g, ok := gfs[endpointSlice.UID]
return !ok || endpointSlice.Generation > g
}
// Update adds or updates the resource version in this endpointSliceTracker for
// the provided EndpointSlice.
// StaleSlices returns true if one or more of the provided EndpointSlices
// have older generations than the corresponding tracked ones or if the tracker
// is expecting one or more of the provided EndpointSlices to be deleted.
func (est *endpointSliceTracker) StaleSlices(service *v1.Service, endpointSlices []*discovery.EndpointSlice) bool {
est.lock.Lock()
defer est.lock.Unlock()
nn := types.NamespacedName{Name: service.Name, Namespace: service.Namespace}
gfs, ok := est.generationsByService[nn]
if !ok {
return false
}
for _, endpointSlice := range endpointSlices {
g, ok := gfs[endpointSlice.UID]
if ok && (g == deletionExpected || g > endpointSlice.Generation) {
return true
}
}
return false
}
// Update adds or updates the generation in this endpointSliceTracker for the
// provided EndpointSlice.
func (est *endpointSliceTracker) Update(endpointSlice *discovery.EndpointSlice) {
est.lock.Lock()
defer est.lock.Unlock()
rrv, ok := est.relatedResourceVersions(endpointSlice)
gfs, ok := est.generationsForSliceUnsafe(endpointSlice)
if !ok {
rrv = endpointSliceResourceVersions{}
est.resourceVersionsByService[getServiceNN(endpointSlice)] = rrv
gfs = generationsBySlice{}
est.generationsByService[getServiceNN(endpointSlice)] = gfs
}
rrv[endpointSlice.Name] = endpointSlice.ResourceVersion
gfs[endpointSlice.UID] = endpointSlice.Generation
}
// DeleteService removes the set of resource versions tracked for the Service.
// DeleteService removes the set of generations tracked for the Service.
func (est *endpointSliceTracker) DeleteService(namespace, name string) {
est.lock.Lock()
defer est.lock.Unlock()
serviceNN := types.NamespacedName{Name: name, Namespace: namespace}
delete(est.resourceVersionsByService, serviceNN)
delete(est.generationsByService, serviceNN)
}
// Delete removes the resource version in this endpointSliceTracker for the
// provided EndpointSlice.
func (est *endpointSliceTracker) Delete(endpointSlice *discovery.EndpointSlice) {
// ExpectDeletion sets the generation to deletionExpected in this
// endpointSliceTracker for the provided EndpointSlice.
func (est *endpointSliceTracker) ExpectDeletion(endpointSlice *discovery.EndpointSlice) {
est.lock.Lock()
defer est.lock.Unlock()
rrv, ok := est.relatedResourceVersions(endpointSlice)
if ok {
delete(rrv, endpointSlice.Name)
gfs, ok := est.generationsForSliceUnsafe(endpointSlice)
if !ok {
gfs = generationsBySlice{}
est.generationsByService[getServiceNN(endpointSlice)] = gfs
}
gfs[endpointSlice.UID] = deletionExpected
}
// relatedResourceVersions returns the set of resource versions tracked for the
// Service corresponding to the provided EndpointSlice, and a bool to indicate
// if it exists.
func (est *endpointSliceTracker) relatedResourceVersions(endpointSlice *discovery.EndpointSlice) (endpointSliceResourceVersions, bool) {
// HandleDeletion removes the generation in this endpointSliceTracker for the
// provided EndpointSlice. This returns true if the tracker expected this
// EndpointSlice to be deleted and false if not.
func (est *endpointSliceTracker) HandleDeletion(endpointSlice *discovery.EndpointSlice) bool {
est.lock.Lock()
defer est.lock.Unlock()
gfs, ok := est.generationsForSliceUnsafe(endpointSlice)
if ok {
g, ok := gfs[endpointSlice.UID]
delete(gfs, endpointSlice.UID)
if ok && g != deletionExpected {
return false
}
}
return true
}
// generationsForSliceUnsafe returns the generations for the Service
// corresponding to the provided EndpointSlice, and a bool to indicate if it
// exists. A lock must be applied before calling this function.
func (est *endpointSliceTracker) generationsForSliceUnsafe(endpointSlice *discovery.EndpointSlice) (generationsBySlice, bool) {
serviceNN := getServiceNN(endpointSlice)
vers, ok := est.resourceVersionsByService[serviceNN]
return vers, ok
generations, ok := est.generationsByService[serviceNN]
return generations, ok
}
// getServiceNN returns a namespaced name for the Service corresponding to the

View File

@ -19,8 +19,7 @@ package endpointslicemirroring
import (
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@ -29,72 +28,59 @@ import (
func TestEndpointSliceTrackerUpdate(t *testing.T) {
epSlice1 := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: "example-1",
Namespace: "ns1",
ResourceVersion: "rv1",
Labels: map[string]string{discovery.LabelServiceName: "svc1"},
Name: "example-1",
Namespace: "ns1",
UID: "original",
Generation: 1,
Labels: map[string]string{discovery.LabelServiceName: "svc1"},
},
}
epSlice1DifferentNS := epSlice1.DeepCopy()
epSlice1DifferentNS.Namespace = "ns2"
epSlice1DifferentNS.UID = "diff-ns"
epSlice1DifferentService := epSlice1.DeepCopy()
epSlice1DifferentService.Labels[discovery.LabelServiceName] = "svc2"
epSlice1DifferentService.UID = "diff-svc"
epSlice1DifferentRV := epSlice1.DeepCopy()
epSlice1DifferentRV.ResourceVersion = "rv2"
epSlice1NewerGen := epSlice1.DeepCopy()
epSlice1NewerGen.Generation = 2
testCases := map[string]struct {
updateParam *discovery.EndpointSlice
checksParam *discovery.EndpointSlice
expectHas bool
expectStale bool
expectResourceVersionsByService map[types.NamespacedName]endpointSliceResourceVersions
updateParam *discovery.EndpointSlice
checksParam *discovery.EndpointSlice
expectHas bool
expectShouldSync bool
expectGeneration int64
}{
"same slice": {
updateParam: epSlice1,
checksParam: epSlice1,
expectHas: true,
expectStale: false,
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{
{Namespace: epSlice1.Namespace, Name: "svc1"}: {
epSlice1.Name: epSlice1.ResourceVersion,
},
},
updateParam: epSlice1,
checksParam: epSlice1,
expectHas: true,
expectShouldSync: false,
expectGeneration: epSlice1.Generation,
},
"different namespace": {
updateParam: epSlice1,
checksParam: epSlice1DifferentNS,
expectHas: false,
expectStale: true,
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{
{Namespace: epSlice1.Namespace, Name: "svc1"}: {
epSlice1.Name: epSlice1.ResourceVersion,
},
},
updateParam: epSlice1,
checksParam: epSlice1DifferentNS,
expectHas: false,
expectShouldSync: true,
expectGeneration: epSlice1.Generation,
},
"different service": {
updateParam: epSlice1,
checksParam: epSlice1DifferentService,
expectHas: false,
expectStale: true,
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{
{Namespace: epSlice1.Namespace, Name: "svc1"}: {
epSlice1.Name: epSlice1.ResourceVersion,
},
},
updateParam: epSlice1,
checksParam: epSlice1DifferentService,
expectHas: false,
expectShouldSync: true,
expectGeneration: epSlice1.Generation,
},
"different resource version": {
updateParam: epSlice1,
checksParam: epSlice1DifferentRV,
expectHas: true,
expectStale: true,
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{
{Namespace: epSlice1.Namespace, Name: "svc1"}: {
epSlice1.Name: epSlice1.ResourceVersion,
},
},
"newer generation": {
updateParam: epSlice1,
checksParam: epSlice1NewerGen,
expectHas: true,
expectShouldSync: true,
expectGeneration: epSlice1.Generation,
},
}
@ -105,80 +91,195 @@ func TestEndpointSliceTrackerUpdate(t *testing.T) {
if esTracker.Has(tc.checksParam) != tc.expectHas {
t.Errorf("tc.tracker.Has(%+v) == %t, expected %t", tc.checksParam, esTracker.Has(tc.checksParam), tc.expectHas)
}
if esTracker.Stale(tc.checksParam) != tc.expectStale {
t.Errorf("tc.tracker.Stale(%+v) == %t, expected %t", tc.checksParam, esTracker.Stale(tc.checksParam), tc.expectStale)
if esTracker.ShouldSync(tc.checksParam) != tc.expectShouldSync {
t.Errorf("tc.tracker.ShouldSync(%+v) == %t, expected %t", tc.checksParam, esTracker.ShouldSync(tc.checksParam), tc.expectShouldSync)
}
serviceNN := types.NamespacedName{Namespace: epSlice1.Namespace, Name: "svc1"}
gfs, ok := esTracker.generationsByService[serviceNN]
if !ok {
t.Fatalf("expected tracker to have generations for %s Service", serviceNN.Name)
}
generation, ok := gfs[epSlice1.UID]
if !ok {
t.Fatalf("expected tracker to have generation for %s EndpointSlice", epSlice1.Name)
}
if tc.expectGeneration != generation {
t.Fatalf("expected generation to be %d, got %d", tc.expectGeneration, generation)
}
assert.Equal(t, tc.expectResourceVersionsByService, esTracker.resourceVersionsByService)
})
}
}
func TestEndpointSliceTrackerDelete(t *testing.T) {
func TestEndpointSliceTrackerStaleSlices(t *testing.T) {
epSlice1 := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: "example-1",
Namespace: "ns1",
ResourceVersion: "rv1",
Labels: map[string]string{discovery.LabelServiceName: "svc1"},
Name: "example-1",
Namespace: "ns1",
UID: "original",
Generation: 1,
Labels: map[string]string{discovery.LabelServiceName: "svc1"},
},
}
epSlice1NewerGen := epSlice1.DeepCopy()
epSlice1NewerGen.Generation = 2
testCases := []struct {
name string
tracker *endpointSliceTracker
serviceParam *v1.Service
slicesParam []*discovery.EndpointSlice
expectNewer bool
}{{
name: "empty tracker",
tracker: &endpointSliceTracker{
generationsByService: map[types.NamespacedName]generationsBySlice{},
},
serviceParam: &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"}},
slicesParam: []*discovery.EndpointSlice{},
expectNewer: false,
}, {
name: "empty slices",
tracker: &endpointSliceTracker{
generationsByService: map[types.NamespacedName]generationsBySlice{
{Name: "svc1", Namespace: "ns1"}: {},
},
},
serviceParam: &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"}},
slicesParam: []*discovery.EndpointSlice{},
expectNewer: false,
}, {
name: "matching slices",
tracker: &endpointSliceTracker{
generationsByService: map[types.NamespacedName]generationsBySlice{
{Name: "svc1", Namespace: "ns1"}: {
epSlice1.UID: epSlice1.Generation,
},
},
},
serviceParam: &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"}},
slicesParam: []*discovery.EndpointSlice{epSlice1},
expectNewer: false,
}, {
name: "newer slice in tracker",
tracker: &endpointSliceTracker{
generationsByService: map[types.NamespacedName]generationsBySlice{
{Name: "svc1", Namespace: "ns1"}: {
epSlice1.UID: epSlice1NewerGen.Generation,
},
},
},
serviceParam: &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"}},
slicesParam: []*discovery.EndpointSlice{epSlice1},
expectNewer: true,
}, {
name: "newer slice in params",
tracker: &endpointSliceTracker{
generationsByService: map[types.NamespacedName]generationsBySlice{
{Name: "svc1", Namespace: "ns1"}: {
epSlice1.UID: epSlice1.Generation,
},
},
},
serviceParam: &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"}},
slicesParam: []*discovery.EndpointSlice{epSlice1NewerGen},
expectNewer: false,
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actualNewer := tc.tracker.StaleSlices(tc.serviceParam, tc.slicesParam)
if actualNewer != tc.expectNewer {
t.Errorf("Expected %t, got %t", tc.expectNewer, actualNewer)
}
})
}
}
func TestEndpointSliceTrackerDeletion(t *testing.T) {
epSlice1 := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: "example-1",
Namespace: "ns1",
UID: "original",
Generation: 1,
Labels: map[string]string{discovery.LabelServiceName: "svc1"},
},
}
epSlice1DifferentNS := epSlice1.DeepCopy()
epSlice1DifferentNS.Namespace = "ns2"
epSlice1DifferentNS.UID = "diff-ns"
epSlice1DifferentService := epSlice1.DeepCopy()
epSlice1DifferentService.Labels[discovery.LabelServiceName] = "svc2"
epSlice1DifferentService.UID = "diff-svc"
epSlice1DifferentRV := epSlice1.DeepCopy()
epSlice1DifferentRV.ResourceVersion = "rv2"
epSlice1NewerGen := epSlice1.DeepCopy()
epSlice1NewerGen.Generation = 2
testCases := map[string]struct {
deleteParam *discovery.EndpointSlice
checksParam *discovery.EndpointSlice
expectHas bool
expectStale bool
expectDeletionParam *discovery.EndpointSlice
checksParam *discovery.EndpointSlice
deleteParam *discovery.EndpointSlice
expectHas bool
expectShouldSync bool
expectedHandleDeletionResp bool
}{
"same slice": {
deleteParam: epSlice1,
checksParam: epSlice1,
expectHas: false,
expectStale: true,
expectDeletionParam: epSlice1,
checksParam: epSlice1,
deleteParam: epSlice1,
expectHas: true,
expectShouldSync: true,
expectedHandleDeletionResp: true,
},
"different namespace": {
deleteParam: epSlice1DifferentNS,
checksParam: epSlice1DifferentNS,
expectHas: false,
expectStale: true,
expectDeletionParam: epSlice1DifferentNS,
checksParam: epSlice1DifferentNS,
deleteParam: epSlice1DifferentNS,
expectHas: true,
expectShouldSync: true,
expectedHandleDeletionResp: false,
},
"different namespace, check original ep slice": {
deleteParam: epSlice1DifferentNS,
checksParam: epSlice1,
expectHas: true,
expectStale: false,
expectDeletionParam: epSlice1DifferentNS,
checksParam: epSlice1,
deleteParam: epSlice1DifferentNS,
expectHas: true,
expectShouldSync: false,
expectedHandleDeletionResp: false,
},
"different service": {
deleteParam: epSlice1DifferentService,
checksParam: epSlice1DifferentService,
expectHas: false,
expectStale: true,
expectDeletionParam: epSlice1DifferentService,
checksParam: epSlice1DifferentService,
deleteParam: epSlice1DifferentService,
expectHas: true,
expectShouldSync: true,
expectedHandleDeletionResp: false,
},
"different service, check original ep slice": {
deleteParam: epSlice1DifferentService,
checksParam: epSlice1,
expectHas: true,
expectStale: false,
"expectDelete different service, check original ep slice, delete original": {
expectDeletionParam: epSlice1DifferentService,
checksParam: epSlice1,
deleteParam: epSlice1,
expectHas: true,
expectShouldSync: false,
expectedHandleDeletionResp: false,
},
"different resource version": {
deleteParam: epSlice1DifferentRV,
checksParam: epSlice1DifferentRV,
expectHas: false,
expectStale: true,
"different generation": {
expectDeletionParam: epSlice1NewerGen,
checksParam: epSlice1NewerGen,
deleteParam: epSlice1NewerGen,
expectHas: true,
expectShouldSync: true,
expectedHandleDeletionResp: true,
},
"different resource version, check original ep slice": {
deleteParam: epSlice1DifferentRV,
checksParam: epSlice1,
expectHas: false,
expectStale: true,
"expectDelete different generation, check original ep slice, delete original": {
expectDeletionParam: epSlice1NewerGen,
checksParam: epSlice1,
deleteParam: epSlice1,
expectHas: true,
expectShouldSync: true,
expectedHandleDeletionResp: true,
},
}
@ -187,13 +288,20 @@ func TestEndpointSliceTrackerDelete(t *testing.T) {
esTracker := newEndpointSliceTracker()
esTracker.Update(epSlice1)
esTracker.Delete(tc.deleteParam)
esTracker.ExpectDeletion(tc.expectDeletionParam)
if esTracker.Has(tc.checksParam) != tc.expectHas {
t.Errorf("esTracker.Has(%+v) == %t, expected %t", tc.checksParam, esTracker.Has(tc.checksParam), tc.expectHas)
}
if esTracker.Stale(tc.checksParam) != tc.expectStale {
t.Errorf("esTracker.Stale(%+v) == %t, expected %t", tc.checksParam, esTracker.Stale(tc.checksParam), tc.expectStale)
if esTracker.ShouldSync(tc.checksParam) != tc.expectShouldSync {
t.Errorf("esTracker.ShouldSync(%+v) == %t, expected %t", tc.checksParam, esTracker.ShouldSync(tc.checksParam), tc.expectShouldSync)
}
if esTracker.HandleDeletion(epSlice1) != tc.expectedHandleDeletionResp {
t.Errorf("esTracker.ShouldSync(%+v) == %t, expected %t", epSlice1, esTracker.HandleDeletion(epSlice1), tc.expectedHandleDeletionResp)
}
if esTracker.Has(epSlice1) != false {
t.Errorf("esTracker.Has(%+v) == %t, expected false", epSlice1, esTracker.Has(epSlice1))
}
})
}
}
@ -203,48 +311,39 @@ func TestEndpointSliceTrackerDeleteService(t *testing.T) {
svcName2, svcNS2 := "svc2", "ns2"
epSlice1 := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: "example-1",
Namespace: svcNS1,
ResourceVersion: "rv1",
Labels: map[string]string{discovery.LabelServiceName: svcName1},
Name: "example-1",
Namespace: svcNS1,
Generation: 1,
Labels: map[string]string{discovery.LabelServiceName: svcName1},
},
}
testCases := map[string]struct {
updateParam *discovery.EndpointSlice
deleteServiceParam *types.NamespacedName
expectHas bool
expectStale bool
expectResourceVersionsByService map[types.NamespacedName]endpointSliceResourceVersions
updateParam *discovery.EndpointSlice
deleteServiceParam *types.NamespacedName
expectHas bool
expectShouldSync bool
expectGeneration int64
}{
"same service": {
updateParam: epSlice1,
deleteServiceParam: &types.NamespacedName{Namespace: svcNS1, Name: svcName1},
expectHas: false,
expectStale: true,
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{},
updateParam: epSlice1,
deleteServiceParam: &types.NamespacedName{Namespace: svcNS1, Name: svcName1},
expectHas: false,
expectShouldSync: true,
},
"different namespace": {
updateParam: epSlice1,
deleteServiceParam: &types.NamespacedName{Namespace: svcNS2, Name: svcName1},
expectHas: true,
expectStale: false,
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{
{Namespace: epSlice1.Namespace, Name: "svc1"}: {
epSlice1.Name: epSlice1.ResourceVersion,
},
},
expectShouldSync: false,
expectGeneration: epSlice1.Generation,
},
"different service": {
updateParam: epSlice1,
deleteServiceParam: &types.NamespacedName{Namespace: svcNS1, Name: svcName2},
expectHas: true,
expectStale: false,
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{
{Namespace: epSlice1.Namespace, Name: "svc1"}: {
epSlice1.Name: epSlice1.ResourceVersion,
},
},
expectShouldSync: false,
expectGeneration: epSlice1.Generation,
},
}
@ -256,10 +355,23 @@ func TestEndpointSliceTrackerDeleteService(t *testing.T) {
if esTracker.Has(tc.updateParam) != tc.expectHas {
t.Errorf("tc.tracker.Has(%+v) == %t, expected %t", tc.updateParam, esTracker.Has(tc.updateParam), tc.expectHas)
}
if esTracker.Stale(tc.updateParam) != tc.expectStale {
t.Errorf("tc.tracker.Stale(%+v) == %t, expected %t", tc.updateParam, esTracker.Stale(tc.updateParam), tc.expectStale)
if esTracker.ShouldSync(tc.updateParam) != tc.expectShouldSync {
t.Errorf("tc.tracker.ShouldSync(%+v) == %t, expected %t", tc.updateParam, esTracker.ShouldSync(tc.updateParam), tc.expectShouldSync)
}
if tc.expectGeneration != 0 {
serviceNN := types.NamespacedName{Namespace: epSlice1.Namespace, Name: "svc1"}
gfs, ok := esTracker.generationsByService[serviceNN]
if !ok {
t.Fatalf("expected tracker to have status for %s Service", serviceNN.Name)
}
generation, ok := gfs[epSlice1.UID]
if !ok {
t.Fatalf("expected tracker to have generation for %s EndpointSlice", epSlice1.Name)
}
if tc.expectGeneration != generation {
t.Fatalf("expected generation to be %d, got %d", tc.expectGeneration, generation)
}
}
assert.Equal(t, tc.expectResourceVersionsByService, esTracker.resourceVersionsByService)
})
}
}

View File

@ -316,6 +316,10 @@ func (c *Controller) syncEndpoints(key string) error {
return err
}
if c.endpointSliceTracker.StaleSlices(svc, endpointSlices) {
return &StaleInformerCache{"EndpointSlice informer cache is out of date"}
}
err = c.reconciler.reconcile(endpoints, endpointSlices)
if err != nil {
return err
@ -439,7 +443,7 @@ func (c *Controller) onEndpointSliceAdd(obj interface{}) {
utilruntime.HandleError(fmt.Errorf("onEndpointSliceAdd() expected type discovery.EndpointSlice, got %T", obj))
return
}
if managedByController(endpointSlice) && c.endpointSliceTracker.Stale(endpointSlice) {
if managedByController(endpointSlice) && c.endpointSliceTracker.ShouldSync(endpointSlice) {
c.queueEndpointsForEndpointSlice(endpointSlice)
}
}
@ -455,7 +459,18 @@ func (c *Controller) onEndpointSliceUpdate(prevObj, obj interface{}) {
utilruntime.HandleError(fmt.Errorf("onEndpointSliceUpdated() expected type discovery.EndpointSlice, got %T, %T", prevObj, obj))
return
}
if managedByChanged(prevEndpointSlice, endpointSlice) || (managedByController(endpointSlice) && c.endpointSliceTracker.Stale(endpointSlice)) {
// EndpointSlice generation does not change when labels change. Although the
// controller will never change LabelServiceName, users might. This check
// ensures that we handle changes to this label.
svcName := endpointSlice.Labels[discovery.LabelServiceName]
prevSvcName := prevEndpointSlice.Labels[discovery.LabelServiceName]
if svcName != prevSvcName {
klog.Warningf("%s label changed from %s to %s for %s", discovery.LabelServiceName, prevSvcName, svcName, endpointSlice.Name)
c.queueEndpointsForEndpointSlice(endpointSlice)
c.queueEndpointsForEndpointSlice(prevEndpointSlice)
return
}
if managedByChanged(prevEndpointSlice, endpointSlice) || (managedByController(endpointSlice) && c.endpointSliceTracker.ShouldSync(endpointSlice)) {
c.queueEndpointsForEndpointSlice(endpointSlice)
}
}
@ -470,7 +485,11 @@ func (c *Controller) onEndpointSliceDelete(obj interface{}) {
return
}
if managedByController(endpointSlice) && c.endpointSliceTracker.Has(endpointSlice) {
c.queueEndpointsForEndpointSlice(endpointSlice)
// This returns false if we didn't expect the EndpointSlice to be
// deleted. If that is the case, we queue the Service for another sync.
if !c.endpointSliceTracker.HandleDeletion(endpointSlice) {
c.queueEndpointsForEndpointSlice(endpointSlice)
}
}
}

View File

@ -0,0 +1,25 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package endpointslicemirroring
// StaleInformerCache errors indicate that the informer cache includes out of
// date resources.
type StaleInformerCache struct {
msg string
}
func (e *StaleInformerCache) Error() string { return e.msg }

View File

@ -263,7 +263,7 @@ func (r *reconciler) finalize(endpoints *corev1.Endpoints, slices slicesByAction
if err != nil {
return fmt.Errorf("failed to delete %s EndpointSlice for Endpoints %s/%s: %v", endpointSlice.Name, endpoints.Namespace, endpoints.Name, err)
}
r.endpointSliceTracker.Delete(endpointSlice)
r.endpointSliceTracker.ExpectDeletion(endpointSlice)
metrics.EndpointSliceChanges.WithLabelValues("delete").Inc()
}

View File

@ -172,13 +172,13 @@ func newClientset() *fake.Clientset {
endpointSlice.ObjectMeta.Name = fmt.Sprintf("%s-%s", endpointSlice.ObjectMeta.GenerateName, rand.String(8))
endpointSlice.ObjectMeta.GenerateName = ""
}
endpointSlice.ObjectMeta.ResourceVersion = "100"
endpointSlice.ObjectMeta.Generation = 1
return false, endpointSlice, nil
}))
client.PrependReactor("update", "endpointslices", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) {
endpointSlice := action.(k8stesting.CreateAction).GetObject().(*discovery.EndpointSlice)
endpointSlice.ObjectMeta.ResourceVersion = "200"
endpointSlice.ObjectMeta.Generation++
return false, endpointSlice, nil
}))