Merge pull request #100103 from robscott/stale-tracker-fix-v2

Updating EndpointSlice controllers to avoid duplicate creations
This commit is contained in:
Kubernetes Prow Robot 2021-03-10 22:42:55 -08:00 committed by GitHub
commit e2cdf0e3f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 111 additions and 8 deletions

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
@ -61,7 +62,8 @@ type endpointSliceController struct {
}
func newController(nodeNames []string, batchPeriod time.Duration) (*fake.Clientset, *endpointSliceController) {
client := newClientset()
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
nodeInformer := informerFactory.Core().V1().Nodes()
indexer := nodeInformer.Informer().GetIndexer()
@ -69,11 +71,36 @@ func newController(nodeNames []string, batchPeriod time.Duration) (*fake.Clients
indexer.Add(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}})
}
esInformer := informerFactory.Discovery().V1().EndpointSlices()
esIndexer := esInformer.Informer().GetIndexer()
// These reactors are required to mock functionality that would be covered
// automatically if we weren't using the fake client.
client.PrependReactor("create", "endpointslices", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) {
endpointSlice := action.(k8stesting.CreateAction).GetObject().(*discovery.EndpointSlice)
if endpointSlice.ObjectMeta.GenerateName != "" {
endpointSlice.ObjectMeta.Name = fmt.Sprintf("%s-%s", endpointSlice.ObjectMeta.GenerateName, rand.String(8))
endpointSlice.ObjectMeta.GenerateName = ""
}
endpointSlice.Generation = 1
esIndexer.Add(endpointSlice)
return false, endpointSlice, nil
}))
client.PrependReactor("update", "endpointslices", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) {
endpointSlice := action.(k8stesting.CreateAction).GetObject().(*discovery.EndpointSlice)
endpointSlice.Generation++
esIndexer.Update(endpointSlice)
return false, endpointSlice, nil
}))
esController := NewController(
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().Services(),
nodeInformer,
informerFactory.Discovery().V1().EndpointSlices(),
esInformer,
int32(100),
client,
batchPeriod)

View File

@ -80,9 +80,12 @@ func (est *endpointSliceTracker) ShouldSync(endpointSlice *discovery.EndpointSli
return !ok || endpointSlice.Generation > g
}
// StaleSlices returns true if one or more of the provided EndpointSlices
// have older generations than the corresponding tracked ones or if the tracker
// is expecting one or more of the provided EndpointSlices to be deleted.
// StaleSlices returns true if any of the following are true:
// 1. One or more of the provided EndpointSlices have older generations than the
// corresponding tracked ones.
// 2. The tracker is expecting one or more of the provided EndpointSlices to be
// deleted.
// 3. The tracker is tracking EndpointSlices that have not been provided.
func (est *endpointSliceTracker) StaleSlices(service *v1.Service, endpointSlices []*discovery.EndpointSlice) bool {
est.lock.Lock()
defer est.lock.Unlock()
@ -92,12 +95,23 @@ func (est *endpointSliceTracker) StaleSlices(service *v1.Service, endpointSlices
if !ok {
return false
}
providedSlices := map[types.UID]int64{}
for _, endpointSlice := range endpointSlices {
providedSlices[endpointSlice.UID] = endpointSlice.Generation
g, ok := gfs[endpointSlice.UID]
if ok && (g == deletionExpected || g > endpointSlice.Generation) {
return true
}
}
for uid, generation := range gfs {
if generation == deletionExpected {
continue
}
_, ok := providedSlices[uid]
if !ok {
return true
}
}
return false
}

View File

@ -184,6 +184,30 @@ func TestEndpointSliceTrackerStaleSlices(t *testing.T) {
serviceParam: &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"}},
slicesParam: []*discovery.EndpointSlice{epSlice1NewerGen},
expectNewer: false,
}, {
name: "slice in params is expected to be deleted",
tracker: &endpointSliceTracker{
generationsByService: map[types.NamespacedName]generationsBySlice{
{Name: "svc1", Namespace: "ns1"}: {
epSlice1.UID: deletionExpected,
},
},
},
serviceParam: &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"}},
slicesParam: []*discovery.EndpointSlice{epSlice1},
expectNewer: true,
}, {
name: "slice in tracker but not in params",
tracker: &endpointSliceTracker{
generationsByService: map[types.NamespacedName]generationsBySlice{
{Name: "svc1", Namespace: "ns1"}: {
epSlice1.UID: epSlice1.Generation,
},
},
},
serviceParam: &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"}},
slicesParam: []*discovery.EndpointSlice{},
expectNewer: true,
}}
for _, tc := range testCases {

View File

@ -80,9 +80,12 @@ func (est *endpointSliceTracker) ShouldSync(endpointSlice *discovery.EndpointSli
return !ok || endpointSlice.Generation > g
}
// StaleSlices returns true if one or more of the provided EndpointSlices
// have older generations than the corresponding tracked ones or if the tracker
// is expecting one or more of the provided EndpointSlices to be deleted.
// StaleSlices returns true if any of the following are true:
// 1. One or more of the provided EndpointSlices have older generations than the
// corresponding tracked ones.
// 2. The tracker is expecting one or more of the provided EndpointSlices to be
// deleted.
// 3. The tracker is tracking EndpointSlices that have not been provided.
func (est *endpointSliceTracker) StaleSlices(service *v1.Service, endpointSlices []*discovery.EndpointSlice) bool {
est.lock.Lock()
defer est.lock.Unlock()
@ -92,12 +95,23 @@ func (est *endpointSliceTracker) StaleSlices(service *v1.Service, endpointSlices
if !ok {
return false
}
providedSlices := map[types.UID]int64{}
for _, endpointSlice := range endpointSlices {
providedSlices[endpointSlice.UID] = endpointSlice.Generation
g, ok := gfs[endpointSlice.UID]
if ok && (g == deletionExpected || g > endpointSlice.Generation) {
return true
}
}
for uid, generation := range gfs {
if generation == deletionExpected {
continue
}
_, ok := providedSlices[uid]
if !ok {
return true
}
}
return false
}

View File

@ -184,6 +184,30 @@ func TestEndpointSliceTrackerStaleSlices(t *testing.T) {
serviceParam: &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"}},
slicesParam: []*discovery.EndpointSlice{epSlice1NewerGen},
expectNewer: false,
}, {
name: "slice in params is expected to be deleted",
tracker: &endpointSliceTracker{
generationsByService: map[types.NamespacedName]generationsBySlice{
{Name: "svc1", Namespace: "ns1"}: {
epSlice1.UID: deletionExpected,
},
},
},
serviceParam: &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"}},
slicesParam: []*discovery.EndpointSlice{epSlice1},
expectNewer: true,
}, {
name: "slice in tracker but not in params",
tracker: &endpointSliceTracker{
generationsByService: map[types.NamespacedName]generationsBySlice{
{Name: "svc1", Namespace: "ns1"}: {
epSlice1.UID: epSlice1.Generation,
},
},
},
serviceParam: &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "svc1", Namespace: "ns1"}},
slicesParam: []*discovery.EndpointSlice{},
expectNewer: true,
}}
for _, tc := range testCases {