mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 14:07:14 +00:00
Merge pull request #85703 from robscott/endpointslice-controller-race-fix
Fixing Potential Race Condition in EndpointSlice Controller.
This commit is contained in:
commit
d4ba10ec80
@ -5,6 +5,7 @@ go_library(
|
|||||||
srcs = [
|
srcs = [
|
||||||
"endpointset.go",
|
"endpointset.go",
|
||||||
"endpointslice_controller.go",
|
"endpointslice_controller.go",
|
||||||
|
"endpointslice_tracker.go",
|
||||||
"reconciler.go",
|
"reconciler.go",
|
||||||
"utils.go",
|
"utils.go",
|
||||||
],
|
],
|
||||||
@ -49,6 +50,7 @@ go_test(
|
|||||||
name = "go_default_test",
|
name = "go_default_test",
|
||||||
srcs = [
|
srcs = [
|
||||||
"endpointslice_controller_test.go",
|
"endpointslice_controller_test.go",
|
||||||
|
"endpointslice_tracker_test.go",
|
||||||
"reconciler_test.go",
|
"reconciler_test.go",
|
||||||
"utils_test.go",
|
"utils_test.go",
|
||||||
],
|
],
|
||||||
|
@ -63,7 +63,7 @@ const (
|
|||||||
func NewController(podInformer coreinformers.PodInformer,
|
func NewController(podInformer coreinformers.PodInformer,
|
||||||
serviceInformer coreinformers.ServiceInformer,
|
serviceInformer coreinformers.ServiceInformer,
|
||||||
nodeInformer coreinformers.NodeInformer,
|
nodeInformer coreinformers.NodeInformer,
|
||||||
esInformer discoveryinformers.EndpointSliceInformer,
|
endpointSliceInformer discoveryinformers.EndpointSliceInformer,
|
||||||
maxEndpointsPerSlice int32,
|
maxEndpointsPerSlice int32,
|
||||||
client clientset.Interface,
|
client clientset.Interface,
|
||||||
) *Controller {
|
) *Controller {
|
||||||
@ -105,8 +105,15 @@ func NewController(podInformer coreinformers.PodInformer,
|
|||||||
c.nodeLister = nodeInformer.Lister()
|
c.nodeLister = nodeInformer.Lister()
|
||||||
c.nodesSynced = nodeInformer.Informer().HasSynced
|
c.nodesSynced = nodeInformer.Informer().HasSynced
|
||||||
|
|
||||||
c.endpointSliceLister = esInformer.Lister()
|
endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
c.endpointSlicesSynced = esInformer.Informer().HasSynced
|
AddFunc: c.onEndpointSliceAdd,
|
||||||
|
UpdateFunc: c.onEndpointSliceUpdate,
|
||||||
|
DeleteFunc: c.onEndpointSliceDelete,
|
||||||
|
})
|
||||||
|
|
||||||
|
c.endpointSliceLister = endpointSliceInformer.Lister()
|
||||||
|
c.endpointSlicesSynced = endpointSliceInformer.Informer().HasSynced
|
||||||
|
c.endpointSliceTracker = newEndpointSliceTracker()
|
||||||
|
|
||||||
c.maxEndpointsPerSlice = maxEndpointsPerSlice
|
c.maxEndpointsPerSlice = maxEndpointsPerSlice
|
||||||
|
|
||||||
@ -114,6 +121,7 @@ func NewController(podInformer coreinformers.PodInformer,
|
|||||||
client: c.client,
|
client: c.client,
|
||||||
nodeLister: c.nodeLister,
|
nodeLister: c.nodeLister,
|
||||||
maxEndpointsPerSlice: c.maxEndpointsPerSlice,
|
maxEndpointsPerSlice: c.maxEndpointsPerSlice,
|
||||||
|
endpointSliceTracker: c.endpointSliceTracker,
|
||||||
metricsCache: endpointslicemetrics.NewCache(maxEndpointsPerSlice),
|
metricsCache: endpointslicemetrics.NewCache(maxEndpointsPerSlice),
|
||||||
}
|
}
|
||||||
c.triggerTimeTracker = endpointutil.NewTriggerTimeTracker()
|
c.triggerTimeTracker = endpointutil.NewTriggerTimeTracker()
|
||||||
@ -152,6 +160,10 @@ type Controller struct {
|
|||||||
// endpointSlicesSynced returns true if the endpoint slice shared informer has been synced at least once.
|
// endpointSlicesSynced returns true if the endpoint slice shared informer has been synced at least once.
|
||||||
// Added as a member to the struct to allow injection for testing.
|
// Added as a member to the struct to allow injection for testing.
|
||||||
endpointSlicesSynced cache.InformerSynced
|
endpointSlicesSynced cache.InformerSynced
|
||||||
|
// endpointSliceTracker tracks the list of EndpointSlices and associated
|
||||||
|
// resource versions expected for each Service. It can help determine if a
|
||||||
|
// cached EndpointSlice is out of date.
|
||||||
|
endpointSliceTracker *endpointSliceTracker
|
||||||
|
|
||||||
// nodeLister is able to list/get nodes and is populated by the
|
// nodeLister is able to list/get nodes and is populated by the
|
||||||
// shared informer passed to NewController
|
// shared informer passed to NewController
|
||||||
@ -343,6 +355,57 @@ func (c *Controller) onServiceDelete(obj interface{}) {
|
|||||||
c.queue.Add(key)
|
c.queue.Add(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// onEndpointSliceAdd queues a sync for the relevant Service for a sync if the
|
||||||
|
// EndpointSlice resource version does not match the expected version in the
|
||||||
|
// endpointSliceTracker.
|
||||||
|
func (c *Controller) onEndpointSliceAdd(obj interface{}) {
|
||||||
|
endpointSlice := obj.(*discovery.EndpointSlice)
|
||||||
|
if endpointSlice == nil {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("Invalid EndpointSlice provided to onEndpointSliceAdd()"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if managedByController(endpointSlice) && c.endpointSliceTracker.Stale(endpointSlice) {
|
||||||
|
c.queueServiceForEndpointSlice(endpointSlice)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// onEndpointSliceUpdate queues a sync for the relevant Service for a sync if
|
||||||
|
// the EndpointSlice resource version does not match the expected version in the
|
||||||
|
// endpointSliceTracker or the managed-by value of the EndpointSlice has changed
|
||||||
|
// from or to this controller.
|
||||||
|
func (c *Controller) onEndpointSliceUpdate(prevObj, obj interface{}) {
|
||||||
|
prevEndpointSlice := obj.(*discovery.EndpointSlice)
|
||||||
|
endpointSlice := obj.(*discovery.EndpointSlice)
|
||||||
|
if endpointSlice == nil || prevEndpointSlice == nil {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("Invalid EndpointSlice provided to onEndpointSliceUpdate()"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if managedByChanged(prevEndpointSlice, endpointSlice) || (managedByController(endpointSlice) && c.endpointSliceTracker.Stale(endpointSlice)) {
|
||||||
|
c.queueServiceForEndpointSlice(endpointSlice)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// onEndpointSliceDelete queues a sync for the relevant Service for a sync if the
|
||||||
|
// EndpointSlice resource version does not match the expected version in the
|
||||||
|
// endpointSliceTracker.
|
||||||
|
func (c *Controller) onEndpointSliceDelete(obj interface{}) {
|
||||||
|
endpointSlice := getEndpointSliceFromDeleteAction(obj)
|
||||||
|
if endpointSlice != nil && managedByController(endpointSlice) && c.endpointSliceTracker.Has(endpointSlice) {
|
||||||
|
c.queueServiceForEndpointSlice(endpointSlice)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// queueServiceForEndpointSlice attempts to queue the corresponding Service for
|
||||||
|
// the provided EndpointSlice.
|
||||||
|
func (c *Controller) queueServiceForEndpointSlice(endpointSlice *discovery.EndpointSlice) {
|
||||||
|
key, err := serviceControllerKey(endpointSlice)
|
||||||
|
if err != nil {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("Couldn't get key for EndpointSlice %+v: %v", endpointSlice, err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.queue.Add(key)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Controller) addPod(obj interface{}) {
|
func (c *Controller) addPod(obj interface{}) {
|
||||||
pod := obj.(*v1.Pod)
|
pod := obj.(*v1.Pod)
|
||||||
services, err := c.serviceSelectorCache.GetPodServiceMemberships(c.serviceLister, pod)
|
services, err := c.serviceSelectorCache.GetPodServiceMemberships(c.serviceLister, pod)
|
||||||
|
123
pkg/controller/endpointslice/endpointslice_tracker.go
Normal file
123
pkg/controller/endpointslice/endpointslice_tracker.go
Normal file
@ -0,0 +1,123 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2019 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package endpointslice
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
discovery "k8s.io/api/discovery/v1beta1"
|
||||||
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// endpointSliceResourceVersions tracks expected EndpointSlice resource versions
|
||||||
|
// by EndpointSlice name.
|
||||||
|
type endpointSliceResourceVersions map[string]string
|
||||||
|
|
||||||
|
// endpointSliceTracker tracks EndpointSlices and their associated resource
|
||||||
|
// versions to help determine if a change to an EndpointSlice has been processed
|
||||||
|
// by the EndpointSlice controller.
|
||||||
|
type endpointSliceTracker struct {
|
||||||
|
// lock protects resourceVersionsByService.
|
||||||
|
lock sync.Mutex
|
||||||
|
// resourceVersionsByService tracks the list of EndpointSlices and
|
||||||
|
// associated resource versions expected for a given Service.
|
||||||
|
resourceVersionsByService map[types.NamespacedName]endpointSliceResourceVersions
|
||||||
|
}
|
||||||
|
|
||||||
|
// newEndpointSliceTracker creates and initializes a new endpointSliceTracker.
|
||||||
|
func newEndpointSliceTracker() *endpointSliceTracker {
|
||||||
|
return &endpointSliceTracker{
|
||||||
|
resourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Has returns true if the endpointSliceTracker has a resource version for the
|
||||||
|
// provided EndpointSlice.
|
||||||
|
func (est *endpointSliceTracker) Has(endpointSlice *discovery.EndpointSlice) bool {
|
||||||
|
est.lock.Lock()
|
||||||
|
defer est.lock.Unlock()
|
||||||
|
|
||||||
|
rrv := est.relatedResourceVersions(endpointSlice)
|
||||||
|
_, ok := rrv[endpointSlice.Name]
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stale returns true if this endpointSliceTracker does not have a resource
|
||||||
|
// version for the provided EndpointSlice or it does not match the resource
|
||||||
|
// version of the provided EndpointSlice.
|
||||||
|
func (est *endpointSliceTracker) Stale(endpointSlice *discovery.EndpointSlice) bool {
|
||||||
|
est.lock.Lock()
|
||||||
|
defer est.lock.Unlock()
|
||||||
|
|
||||||
|
rrv := est.relatedResourceVersions(endpointSlice)
|
||||||
|
return rrv[endpointSlice.Name] != endpointSlice.ResourceVersion
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update adds or updates the resource version in this endpointSliceTracker for
|
||||||
|
// the provided EndpointSlice.
|
||||||
|
func (est *endpointSliceTracker) Update(endpointSlice *discovery.EndpointSlice) {
|
||||||
|
est.lock.Lock()
|
||||||
|
defer est.lock.Unlock()
|
||||||
|
|
||||||
|
rrv := est.relatedResourceVersions(endpointSlice)
|
||||||
|
rrv[endpointSlice.Name] = endpointSlice.ResourceVersion
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete removes the resource version in this endpointSliceTracker for the
|
||||||
|
// provided EndpointSlice.
|
||||||
|
func (est *endpointSliceTracker) Delete(endpointSlice *discovery.EndpointSlice) {
|
||||||
|
est.lock.Lock()
|
||||||
|
defer est.lock.Unlock()
|
||||||
|
|
||||||
|
rrv := est.relatedResourceVersions(endpointSlice)
|
||||||
|
delete(rrv, endpointSlice.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// relatedResourceVersions returns the set of resource versions tracked for the
|
||||||
|
// Service corresponding to the provided EndpointSlice. If no resource versions
|
||||||
|
// are currently tracked for this service, an empty set is initialized.
|
||||||
|
func (est *endpointSliceTracker) relatedResourceVersions(endpointSlice *discovery.EndpointSlice) endpointSliceResourceVersions {
|
||||||
|
serviceNN := getServiceNN(endpointSlice)
|
||||||
|
vers, ok := est.resourceVersionsByService[serviceNN]
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
vers = endpointSliceResourceVersions{}
|
||||||
|
est.resourceVersionsByService[serviceNN] = vers
|
||||||
|
}
|
||||||
|
|
||||||
|
return vers
|
||||||
|
}
|
||||||
|
|
||||||
|
// getServiceNN returns a namespaced name for the Service corresponding to the
|
||||||
|
// provided EndpointSlice.
|
||||||
|
func getServiceNN(endpointSlice *discovery.EndpointSlice) types.NamespacedName {
|
||||||
|
serviceName, _ := endpointSlice.Labels[discovery.LabelServiceName]
|
||||||
|
return types.NamespacedName{Name: serviceName, Namespace: endpointSlice.Namespace}
|
||||||
|
}
|
||||||
|
|
||||||
|
// managedByChanged returns true if one of the provided EndpointSlices is
|
||||||
|
// managed by the EndpointSlice controller while the other is not.
|
||||||
|
func managedByChanged(endpointSlice1, endpointSlice2 *discovery.EndpointSlice) bool {
|
||||||
|
return managedByController(endpointSlice1) != managedByController(endpointSlice2)
|
||||||
|
}
|
||||||
|
|
||||||
|
// managedByController returns true if the controller of the provided
|
||||||
|
// EndpointSlices is the EndpointSlice controller.
|
||||||
|
func managedByController(endpointSlice *discovery.EndpointSlice) bool {
|
||||||
|
managedBy, _ := endpointSlice.Labels[discovery.LabelManagedBy]
|
||||||
|
return managedBy == controllerName
|
||||||
|
}
|
174
pkg/controller/endpointslice/endpointslice_tracker_test.go
Normal file
174
pkg/controller/endpointslice/endpointslice_tracker_test.go
Normal file
@ -0,0 +1,174 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2019 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package endpointslice
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
discovery "k8s.io/api/discovery/v1beta1"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestEndpointSliceTrackerUpdate(t *testing.T) {
|
||||||
|
epSlice1 := &discovery.EndpointSlice{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "example-1",
|
||||||
|
Namespace: "ns1",
|
||||||
|
ResourceVersion: "rv1",
|
||||||
|
Labels: map[string]string{discovery.LabelServiceName: "svc1"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
epSlice1DifferentNS := epSlice1.DeepCopy()
|
||||||
|
epSlice1DifferentNS.Namespace = "ns2"
|
||||||
|
|
||||||
|
epSlice1DifferentService := epSlice1.DeepCopy()
|
||||||
|
epSlice1DifferentService.Labels[discovery.LabelServiceName] = "svc2"
|
||||||
|
|
||||||
|
epSlice1DifferentRV := epSlice1.DeepCopy()
|
||||||
|
epSlice1DifferentRV.ResourceVersion = "rv2"
|
||||||
|
|
||||||
|
testCases := map[string]struct {
|
||||||
|
updateParam *discovery.EndpointSlice
|
||||||
|
checksParam *discovery.EndpointSlice
|
||||||
|
expectHas bool
|
||||||
|
expectStale bool
|
||||||
|
}{
|
||||||
|
"same slice": {
|
||||||
|
updateParam: epSlice1,
|
||||||
|
checksParam: epSlice1,
|
||||||
|
expectHas: true,
|
||||||
|
expectStale: false,
|
||||||
|
},
|
||||||
|
"different namespace": {
|
||||||
|
updateParam: epSlice1,
|
||||||
|
checksParam: epSlice1DifferentNS,
|
||||||
|
expectHas: false,
|
||||||
|
expectStale: true,
|
||||||
|
},
|
||||||
|
"different service": {
|
||||||
|
updateParam: epSlice1,
|
||||||
|
checksParam: epSlice1DifferentService,
|
||||||
|
expectHas: false,
|
||||||
|
expectStale: true,
|
||||||
|
},
|
||||||
|
"different resource version": {
|
||||||
|
updateParam: epSlice1,
|
||||||
|
checksParam: epSlice1DifferentRV,
|
||||||
|
expectHas: true,
|
||||||
|
expectStale: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, tc := range testCases {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
esTracker := newEndpointSliceTracker()
|
||||||
|
esTracker.Update(tc.updateParam)
|
||||||
|
if esTracker.Has(tc.checksParam) != tc.expectHas {
|
||||||
|
t.Errorf("tc.tracker.Has(%+v) == %t, expected %t", tc.checksParam, esTracker.Has(tc.checksParam), tc.expectHas)
|
||||||
|
}
|
||||||
|
if esTracker.Stale(tc.checksParam) != tc.expectStale {
|
||||||
|
t.Errorf("tc.tracker.Stale(%+v) == %t, expected %t", tc.checksParam, esTracker.Stale(tc.checksParam), tc.expectStale)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEndpointSliceTrackerDelete(t *testing.T) {
|
||||||
|
epSlice1 := &discovery.EndpointSlice{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "example-1",
|
||||||
|
Namespace: "ns1",
|
||||||
|
ResourceVersion: "rv1",
|
||||||
|
Labels: map[string]string{discovery.LabelServiceName: "svc1"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
epSlice1DifferentNS := epSlice1.DeepCopy()
|
||||||
|
epSlice1DifferentNS.Namespace = "ns2"
|
||||||
|
|
||||||
|
epSlice1DifferentService := epSlice1.DeepCopy()
|
||||||
|
epSlice1DifferentService.Labels[discovery.LabelServiceName] = "svc2"
|
||||||
|
|
||||||
|
epSlice1DifferentRV := epSlice1.DeepCopy()
|
||||||
|
epSlice1DifferentRV.ResourceVersion = "rv2"
|
||||||
|
|
||||||
|
testCases := map[string]struct {
|
||||||
|
deleteParam *discovery.EndpointSlice
|
||||||
|
checksParam *discovery.EndpointSlice
|
||||||
|
expectHas bool
|
||||||
|
expectStale bool
|
||||||
|
}{
|
||||||
|
"same slice": {
|
||||||
|
deleteParam: epSlice1,
|
||||||
|
checksParam: epSlice1,
|
||||||
|
expectHas: false,
|
||||||
|
expectStale: true,
|
||||||
|
},
|
||||||
|
"different namespace": {
|
||||||
|
deleteParam: epSlice1DifferentNS,
|
||||||
|
checksParam: epSlice1DifferentNS,
|
||||||
|
expectHas: false,
|
||||||
|
expectStale: true,
|
||||||
|
},
|
||||||
|
"different namespace, check original ep slice": {
|
||||||
|
deleteParam: epSlice1DifferentNS,
|
||||||
|
checksParam: epSlice1,
|
||||||
|
expectHas: true,
|
||||||
|
expectStale: false,
|
||||||
|
},
|
||||||
|
"different service": {
|
||||||
|
deleteParam: epSlice1DifferentService,
|
||||||
|
checksParam: epSlice1DifferentService,
|
||||||
|
expectHas: false,
|
||||||
|
expectStale: true,
|
||||||
|
},
|
||||||
|
"different service, check original ep slice": {
|
||||||
|
deleteParam: epSlice1DifferentService,
|
||||||
|
checksParam: epSlice1,
|
||||||
|
expectHas: true,
|
||||||
|
expectStale: false,
|
||||||
|
},
|
||||||
|
"different resource version": {
|
||||||
|
deleteParam: epSlice1DifferentRV,
|
||||||
|
checksParam: epSlice1DifferentRV,
|
||||||
|
expectHas: false,
|
||||||
|
expectStale: true,
|
||||||
|
},
|
||||||
|
"different resource version, check original ep slice": {
|
||||||
|
deleteParam: epSlice1DifferentRV,
|
||||||
|
checksParam: epSlice1,
|
||||||
|
expectHas: false,
|
||||||
|
expectStale: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, tc := range testCases {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
esTracker := newEndpointSliceTracker()
|
||||||
|
esTracker.Update(epSlice1)
|
||||||
|
|
||||||
|
esTracker.Delete(tc.deleteParam)
|
||||||
|
if esTracker.Has(tc.checksParam) != tc.expectHas {
|
||||||
|
t.Errorf("esTracker.Has(%+v) == %t, expected %t", tc.checksParam, esTracker.Has(tc.checksParam), tc.expectHas)
|
||||||
|
}
|
||||||
|
if esTracker.Stale(tc.checksParam) != tc.expectStale {
|
||||||
|
t.Errorf("esTracker.Stale(%+v) == %t, expected %t", tc.checksParam, esTracker.Stale(tc.checksParam), tc.expectStale)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
@ -40,6 +40,7 @@ type reconciler struct {
|
|||||||
client clientset.Interface
|
client clientset.Interface
|
||||||
nodeLister corelisters.NodeLister
|
nodeLister corelisters.NodeLister
|
||||||
maxEndpointsPerSlice int32
|
maxEndpointsPerSlice int32
|
||||||
|
endpointSliceTracker *endpointSliceTracker
|
||||||
metricsCache *metrics.Cache
|
metricsCache *metrics.Cache
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -212,6 +213,7 @@ func (r *reconciler) finalize(
|
|||||||
}
|
}
|
||||||
errs = append(errs, fmt.Errorf("Error creating EndpointSlice for Service %s/%s: %v", service.Namespace, service.Name, err))
|
errs = append(errs, fmt.Errorf("Error creating EndpointSlice for Service %s/%s: %v", service.Namespace, service.Name, err))
|
||||||
} else {
|
} else {
|
||||||
|
r.endpointSliceTracker.Update(endpointSlice)
|
||||||
metrics.EndpointSliceChanges.WithLabelValues("create").Inc()
|
metrics.EndpointSliceChanges.WithLabelValues("create").Inc()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -222,6 +224,7 @@ func (r *reconciler) finalize(
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
errs = append(errs, fmt.Errorf("Error updating %s EndpointSlice for Service %s/%s: %v", endpointSlice.Name, service.Namespace, service.Name, err))
|
errs = append(errs, fmt.Errorf("Error updating %s EndpointSlice for Service %s/%s: %v", endpointSlice.Name, service.Namespace, service.Name, err))
|
||||||
} else {
|
} else {
|
||||||
|
r.endpointSliceTracker.Update(endpointSlice)
|
||||||
metrics.EndpointSliceChanges.WithLabelValues("update").Inc()
|
metrics.EndpointSliceChanges.WithLabelValues("update").Inc()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -231,6 +234,7 @@ func (r *reconciler) finalize(
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
errs = append(errs, fmt.Errorf("Error deleting %s EndpointSlice for Service %s/%s: %v", endpointSlice.Name, service.Namespace, service.Name, err))
|
errs = append(errs, fmt.Errorf("Error deleting %s EndpointSlice for Service %s/%s: %v", endpointSlice.Name, service.Namespace, service.Name, err))
|
||||||
} else {
|
} else {
|
||||||
|
r.endpointSliceTracker.Delete(endpointSlice)
|
||||||
metrics.EndpointSliceChanges.WithLabelValues("delete").Inc()
|
metrics.EndpointSliceChanges.WithLabelValues("delete").Inc()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -752,6 +752,7 @@ func newReconciler(client *fake.Clientset, nodes []*corev1.Node, maxEndpointsPer
|
|||||||
client: client,
|
client: client,
|
||||||
nodeLister: corelisters.NewNodeLister(indexer),
|
nodeLister: corelisters.NewNodeLister(indexer),
|
||||||
maxEndpointsPerSlice: maxEndpointsPerSlice,
|
maxEndpointsPerSlice: maxEndpointsPerSlice,
|
||||||
|
endpointSliceTracker: newEndpointSliceTracker(),
|
||||||
metricsCache: metrics.NewCache(maxEndpointsPerSlice),
|
metricsCache: metrics.NewCache(maxEndpointsPerSlice),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -26,6 +26,8 @@ import (
|
|||||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||||
api "k8s.io/kubernetes/pkg/apis/core"
|
api "k8s.io/kubernetes/pkg/apis/core"
|
||||||
@ -236,6 +238,27 @@ func getSliceToFill(endpointSlices []*discovery.EndpointSlice, numEndpoints, max
|
|||||||
return closestSlice
|
return closestSlice
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getEndpointSliceFromDeleteAction parses an EndpointSlice from a delete action.
|
||||||
|
func getEndpointSliceFromDeleteAction(obj interface{}) *discovery.EndpointSlice {
|
||||||
|
if endpointSlice, ok := obj.(*discovery.EndpointSlice); ok {
|
||||||
|
// Enqueue all the services that the pod used to be a member of.
|
||||||
|
// This is the same thing we do when we add a pod.
|
||||||
|
return endpointSlice
|
||||||
|
}
|
||||||
|
// If we reached here it means the pod was deleted but its final state is unrecorded.
|
||||||
|
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||||
|
if !ok {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
endpointSlice, ok := tombstone.Obj.(*discovery.EndpointSlice)
|
||||||
|
if !ok {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a EndpointSlice: %#v", obj))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return endpointSlice
|
||||||
|
}
|
||||||
|
|
||||||
// addTriggerTimeAnnotation adds a triggerTime annotation to an EndpointSlice
|
// addTriggerTimeAnnotation adds a triggerTime annotation to an EndpointSlice
|
||||||
func addTriggerTimeAnnotation(endpointSlice *discovery.EndpointSlice, triggerTime time.Time) {
|
func addTriggerTimeAnnotation(endpointSlice *discovery.EndpointSlice, triggerTime time.Time) {
|
||||||
if endpointSlice.Annotations == nil {
|
if endpointSlice.Annotations == nil {
|
||||||
@ -249,6 +272,19 @@ func addTriggerTimeAnnotation(endpointSlice *discovery.EndpointSlice, triggerTim
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// serviceControllerKey returns a controller key for a Service but derived from
|
||||||
|
// an EndpointSlice.
|
||||||
|
func serviceControllerKey(endpointSlice *discovery.EndpointSlice) (string, error) {
|
||||||
|
if endpointSlice == nil {
|
||||||
|
return "", fmt.Errorf("nil EndpointSlice passed to serviceControllerKey()")
|
||||||
|
}
|
||||||
|
serviceName, ok := endpointSlice.Labels[discovery.LabelServiceName]
|
||||||
|
if !ok || serviceName == "" {
|
||||||
|
return "", fmt.Errorf("EndpointSlice missing %s label", discovery.LabelServiceName)
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("%s/%s", endpointSlice.Namespace, serviceName), nil
|
||||||
|
}
|
||||||
|
|
||||||
// endpointSliceEndpointLen helps sort endpoint slices by the number of
|
// endpointSliceEndpointLen helps sort endpoint slices by the number of
|
||||||
// endpoints they contain.
|
// endpoints they contain.
|
||||||
type endpointSliceEndpointLen []*discovery.EndpointSlice
|
type endpointSliceEndpointLen []*discovery.EndpointSlice
|
||||||
|
@ -310,6 +310,49 @@ func TestPodChangedWithPodEndpointChanged(t *testing.T) {
|
|||||||
newPod.ObjectMeta.DeletionTimestamp = oldPod.ObjectMeta.DeletionTimestamp.DeepCopy()
|
newPod.ObjectMeta.DeletionTimestamp = oldPod.ObjectMeta.DeletionTimestamp.DeepCopy()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestServiceControllerKey(t *testing.T) {
|
||||||
|
testCases := map[string]struct {
|
||||||
|
endpointSlice *discovery.EndpointSlice
|
||||||
|
expectedKey string
|
||||||
|
expectedErr error
|
||||||
|
}{
|
||||||
|
"nil EndpointSlice": {
|
||||||
|
endpointSlice: nil,
|
||||||
|
expectedKey: "",
|
||||||
|
expectedErr: fmt.Errorf("nil EndpointSlice passed to serviceControllerKey()"),
|
||||||
|
},
|
||||||
|
"empty EndpointSlice": {
|
||||||
|
endpointSlice: &discovery.EndpointSlice{},
|
||||||
|
expectedKey: "",
|
||||||
|
expectedErr: fmt.Errorf("EndpointSlice missing kubernetes.io/service-name label"),
|
||||||
|
},
|
||||||
|
"valid EndpointSlice": {
|
||||||
|
endpointSlice: &discovery.EndpointSlice{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Namespace: "ns",
|
||||||
|
Labels: map[string]string{
|
||||||
|
discovery.LabelServiceName: "svc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedKey: "ns/svc",
|
||||||
|
expectedErr: nil,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, tc := range testCases {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
actualKey, actualErr := serviceControllerKey(tc.endpointSlice)
|
||||||
|
if !reflect.DeepEqual(actualErr, tc.expectedErr) {
|
||||||
|
t.Errorf("Expected %s, got %s", tc.expectedErr, actualErr)
|
||||||
|
}
|
||||||
|
if actualKey != tc.expectedKey {
|
||||||
|
t.Errorf("Expected %s, got %s", tc.expectedKey, actualKey)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Test helpers
|
// Test helpers
|
||||||
|
|
||||||
func newPod(n int, namespace string, ready bool, nPorts int) *v1.Pod {
|
func newPod(n int, namespace string, ready bool, nPorts int) *v1.Pod {
|
||||||
|
Loading…
Reference in New Issue
Block a user