mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 19:01:49 +00:00
Merge pull request #113929 from howardjohn/endpointslice/use-optimized-set
endpoints: remove obsolete ServiceSelectorCache
This commit is contained in:
commit
f11e9aaf10
@ -110,8 +110,6 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme
|
||||
|
||||
e.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod
|
||||
|
||||
e.serviceSelectorCache = endpointutil.NewServiceSelectorCache()
|
||||
|
||||
return e
|
||||
}
|
||||
|
||||
@ -157,10 +155,6 @@ type Controller struct {
|
||||
triggerTimeTracker *endpointutil.TriggerTimeTracker
|
||||
|
||||
endpointUpdatesBatchPeriod time.Duration
|
||||
|
||||
// serviceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls
|
||||
// to AsSelectorPreValidated (see #73527)
|
||||
serviceSelectorCache *endpointutil.ServiceSelectorCache
|
||||
}
|
||||
|
||||
// Run will not return until stopCh is closed. workers determines how many
|
||||
@ -198,7 +192,7 @@ func (e *Controller) Run(ctx context.Context, workers int) {
|
||||
// enqueue them. obj must have *v1.Pod type.
|
||||
func (e *Controller) addPod(obj interface{}) {
|
||||
pod := obj.(*v1.Pod)
|
||||
services, err := e.serviceSelectorCache.GetPodServiceMemberships(e.serviceLister, pod)
|
||||
services, err := endpointutil.GetPodServiceMemberships(e.serviceLister, pod)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))
|
||||
return
|
||||
@ -267,7 +261,7 @@ func podToEndpointAddressForService(svc *v1.Service, pod *v1.Pod) (*v1.EndpointA
|
||||
// and what services it will be a member of, and enqueue the union of these.
|
||||
// old and cur must be *v1.Pod types.
|
||||
func (e *Controller) updatePod(old, cur interface{}) {
|
||||
services := endpointutil.GetServicesToUpdateOnPodChange(e.serviceLister, e.serviceSelectorCache, old, cur)
|
||||
services := endpointutil.GetServicesToUpdateOnPodChange(e.serviceLister, old, cur)
|
||||
for key := range services {
|
||||
e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod)
|
||||
}
|
||||
@ -289,8 +283,6 @@ func (e *Controller) onServiceUpdate(obj interface{}) {
|
||||
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
|
||||
return
|
||||
}
|
||||
|
||||
_ = e.serviceSelectorCache.Update(key, obj.(*v1.Service).Spec.Selector)
|
||||
e.queue.Add(key)
|
||||
}
|
||||
|
||||
@ -301,8 +293,6 @@ func (e *Controller) onServiceDelete(obj interface{}) {
|
||||
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
|
||||
return
|
||||
}
|
||||
|
||||
e.serviceSelectorCache.Delete(key)
|
||||
e.queue.Add(key)
|
||||
}
|
||||
|
||||
|
@ -145,7 +145,6 @@ func NewController(podInformer coreinformers.PodInformer,
|
||||
c.eventRecorder = recorder
|
||||
|
||||
c.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod
|
||||
c.serviceSelectorCache = endpointutil.NewServiceSelectorCache()
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
|
||||
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
@ -234,10 +233,6 @@ type Controller struct {
|
||||
// This can be used to reduce overall number of all endpoint slice updates.
|
||||
endpointUpdatesBatchPeriod time.Duration
|
||||
|
||||
// serviceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls
|
||||
// to AsSelectorPreValidated (see #73527)
|
||||
serviceSelectorCache *endpointutil.ServiceSelectorCache
|
||||
|
||||
// topologyCache tracks the distribution of Nodes and endpoints across zones
|
||||
// to enable TopologyAwareHints.
|
||||
topologyCache *topologycache.TopologyCache
|
||||
@ -395,7 +390,6 @@ func (c *Controller) onServiceUpdate(obj interface{}) {
|
||||
return
|
||||
}
|
||||
|
||||
_ = c.serviceSelectorCache.Update(key, obj.(*v1.Service).Spec.Selector)
|
||||
c.queue.Add(key)
|
||||
}
|
||||
|
||||
@ -407,7 +401,6 @@ func (c *Controller) onServiceDelete(obj interface{}) {
|
||||
return
|
||||
}
|
||||
|
||||
c.serviceSelectorCache.Delete(key)
|
||||
c.queue.Add(key)
|
||||
}
|
||||
|
||||
@ -486,7 +479,7 @@ func (c *Controller) queueServiceForEndpointSlice(endpointSlice *discovery.Endpo
|
||||
|
||||
func (c *Controller) addPod(obj interface{}) {
|
||||
pod := obj.(*v1.Pod)
|
||||
services, err := c.serviceSelectorCache.GetPodServiceMemberships(c.serviceLister, pod)
|
||||
services, err := endpointutil.GetPodServiceMemberships(c.serviceLister, pod)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))
|
||||
return
|
||||
@ -497,7 +490,7 @@ func (c *Controller) addPod(obj interface{}) {
|
||||
}
|
||||
|
||||
func (c *Controller) updatePod(old, cur interface{}) {
|
||||
services := endpointutil.GetServicesToUpdateOnPodChange(c.serviceLister, c.serviceSelectorCache, old, cur)
|
||||
services := endpointutil.GetServicesToUpdateOnPodChange(c.serviceLister, old, cur)
|
||||
for key := range services {
|
||||
c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod)
|
||||
}
|
||||
|
@ -22,7 +22,6 @@ import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
@ -49,54 +48,15 @@ var semanticIgnoreResourceVersion = conversion.EqualitiesOrDie(
|
||||
},
|
||||
)
|
||||
|
||||
// ServiceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls to AsSelectorPreValidated (see #73527)
|
||||
type ServiceSelectorCache struct {
|
||||
lock sync.RWMutex
|
||||
cache map[string]labels.Selector
|
||||
}
|
||||
|
||||
// NewServiceSelectorCache init ServiceSelectorCache for both endpoint controller and endpointSlice controller.
|
||||
func NewServiceSelectorCache() *ServiceSelectorCache {
|
||||
return &ServiceSelectorCache{
|
||||
cache: map[string]labels.Selector{},
|
||||
}
|
||||
}
|
||||
|
||||
// Get return selector and existence in ServiceSelectorCache by key.
|
||||
func (sc *ServiceSelectorCache) Get(key string) (labels.Selector, bool) {
|
||||
sc.lock.RLock()
|
||||
selector, ok := sc.cache[key]
|
||||
// fine-grained lock improves GetPodServiceMemberships performance(16.5%) than defer measured by BenchmarkGetPodServiceMemberships
|
||||
sc.lock.RUnlock()
|
||||
return selector, ok
|
||||
}
|
||||
|
||||
// Update can update or add a selector in ServiceSelectorCache while service's selector changed.
|
||||
func (sc *ServiceSelectorCache) Update(key string, rawSelector map[string]string) labels.Selector {
|
||||
sc.lock.Lock()
|
||||
defer sc.lock.Unlock()
|
||||
selector := labels.Set(rawSelector).AsSelectorPreValidated()
|
||||
sc.cache[key] = selector
|
||||
return selector
|
||||
}
|
||||
|
||||
// Delete can delete selector which exist in ServiceSelectorCache.
|
||||
func (sc *ServiceSelectorCache) Delete(key string) {
|
||||
sc.lock.Lock()
|
||||
defer sc.lock.Unlock()
|
||||
delete(sc.cache, key)
|
||||
}
|
||||
|
||||
// GetPodServiceMemberships returns a set of Service keys for Services that have
|
||||
// a selector matching the given pod.
|
||||
func (sc *ServiceSelectorCache) GetPodServiceMemberships(serviceLister v1listers.ServiceLister, pod *v1.Pod) (sets.String, error) {
|
||||
func GetPodServiceMemberships(serviceLister v1listers.ServiceLister, pod *v1.Pod) (sets.String, error) {
|
||||
set := sets.String{}
|
||||
services, err := serviceLister.Services(pod.Namespace).List(labels.Everything())
|
||||
if err != nil {
|
||||
return set, err
|
||||
}
|
||||
|
||||
var selector labels.Selector
|
||||
for _, service := range services {
|
||||
if service.Spec.Selector == nil {
|
||||
// if the service has a nil selector this means selectors match nothing, not everything.
|
||||
@ -106,13 +66,7 @@ func (sc *ServiceSelectorCache) GetPodServiceMemberships(serviceLister v1listers
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if v, ok := sc.Get(key); ok {
|
||||
selector = v
|
||||
} else {
|
||||
selector = sc.Update(key, service.Spec.Selector)
|
||||
}
|
||||
|
||||
if selector.Matches(labels.Set(pod.Labels)) {
|
||||
if labels.ValidatedSetSelector(service.Spec.Selector).Matches(labels.Set(pod.Labels)) {
|
||||
set.Insert(key)
|
||||
}
|
||||
}
|
||||
@ -206,7 +160,7 @@ func podEndpointsChanged(oldPod, newPod *v1.Pod) (bool, bool) {
|
||||
|
||||
// GetServicesToUpdateOnPodChange returns a set of Service keys for Services
|
||||
// that have potentially been affected by a change to this pod.
|
||||
func GetServicesToUpdateOnPodChange(serviceLister v1listers.ServiceLister, selectorCache *ServiceSelectorCache, old, cur interface{}) sets.String {
|
||||
func GetServicesToUpdateOnPodChange(serviceLister v1listers.ServiceLister, old, cur interface{}) sets.String {
|
||||
newPod := cur.(*v1.Pod)
|
||||
oldPod := old.(*v1.Pod)
|
||||
if newPod.ResourceVersion == oldPod.ResourceVersion {
|
||||
@ -222,14 +176,14 @@ func GetServicesToUpdateOnPodChange(serviceLister v1listers.ServiceLister, selec
|
||||
return sets.String{}
|
||||
}
|
||||
|
||||
services, err := selectorCache.GetPodServiceMemberships(serviceLister, newPod)
|
||||
services, err := GetPodServiceMemberships(serviceLister, newPod)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("unable to get pod %s/%s's service memberships: %v", newPod.Namespace, newPod.Name, err))
|
||||
return sets.String{}
|
||||
}
|
||||
|
||||
if labelsChanged {
|
||||
oldServices, err := selectorCache.GetPodServiceMemberships(serviceLister, oldPod)
|
||||
oldServices, err := GetPodServiceMemberships(serviceLister, oldPod)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("unable to get pod %s/%s's service memberships: %v", newPod.Namespace, newPod.Name, err))
|
||||
}
|
||||
|
@ -18,14 +18,12 @@ package endpoint
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
@ -330,7 +328,7 @@ func genSimpleSvc(namespace, name string) *v1.Service {
|
||||
}
|
||||
}
|
||||
|
||||
func TestServiceSelectorCache_GetPodServiceMemberships(t *testing.T) {
|
||||
func TestGetPodServiceMemberships(t *testing.T) {
|
||||
fakeInformerFactory := informers.NewSharedInformerFactory(&fake.Clientset{}, 0*time.Second)
|
||||
for i := 0; i < 3; i++ {
|
||||
service := &v1.Service{
|
||||
@ -361,7 +359,6 @@ func TestServiceSelectorCache_GetPodServiceMemberships(t *testing.T) {
|
||||
pods = append(pods, pod)
|
||||
}
|
||||
|
||||
cache := NewServiceSelectorCache()
|
||||
tests := []struct {
|
||||
name string
|
||||
pod *v1.Pod
|
||||
@ -395,7 +392,7 @@ func TestServiceSelectorCache_GetPodServiceMemberships(t *testing.T) {
|
||||
}
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
services, err := cache.GetPodServiceMemberships(fakeInformerFactory.Core().V1().Services().Lister(), test.pod)
|
||||
services, err := GetPodServiceMemberships(fakeInformerFactory.Core().V1().Services().Lister(), test.pod)
|
||||
if err != nil {
|
||||
t.Errorf("Error from cache.GetPodServiceMemberships: %v", err)
|
||||
} else if !services.Equal(test.expect) {
|
||||
@ -405,57 +402,6 @@ func TestServiceSelectorCache_GetPodServiceMemberships(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestServiceSelectorCache_Update(t *testing.T) {
|
||||
var selectors []labels.Selector
|
||||
for i := 0; i < 5; i++ {
|
||||
selector := labels.Set(map[string]string{"app": fmt.Sprintf("test-%d", i)}).AsSelectorPreValidated()
|
||||
selectors = append(selectors, selector)
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
key string
|
||||
cache *ServiceSelectorCache
|
||||
update map[string]string
|
||||
expect labels.Selector
|
||||
}{
|
||||
{
|
||||
name: "add test/service-0",
|
||||
key: "test/service-0",
|
||||
cache: generateServiceSelectorCache(map[string]labels.Selector{}),
|
||||
update: map[string]string{"app": "test-0"},
|
||||
expect: selectors[0],
|
||||
},
|
||||
{
|
||||
name: "add test/service-1",
|
||||
key: "test/service-1",
|
||||
cache: generateServiceSelectorCache(map[string]labels.Selector{"test/service-0": selectors[0]}),
|
||||
update: map[string]string{"app": "test-1"},
|
||||
expect: selectors[1],
|
||||
},
|
||||
{
|
||||
name: "update test/service-2",
|
||||
key: "test/service-2",
|
||||
cache: generateServiceSelectorCache(map[string]labels.Selector{"test/service-2": selectors[2]}),
|
||||
update: map[string]string{"app": "test-0"},
|
||||
expect: selectors[0],
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
selector := test.cache.Update(test.key, test.update)
|
||||
if !reflect.DeepEqual(selector, test.expect) {
|
||||
t.Errorf("Expect selector %v , but got %v", test.expect, selector)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func generateServiceSelectorCache(cache map[string]labels.Selector) *ServiceSelectorCache {
|
||||
return &ServiceSelectorCache{
|
||||
cache: cache,
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkGetPodServiceMemberships(b *testing.B) {
|
||||
// init fake service informer.
|
||||
fakeInformerFactory := informers.NewSharedInformerFactory(&fake.Clientset{}, 0*time.Second)
|
||||
@ -484,11 +430,10 @@ func BenchmarkGetPodServiceMemberships(b *testing.B) {
|
||||
},
|
||||
}
|
||||
|
||||
cache := NewServiceSelectorCache()
|
||||
expect := sets.NewString("test/service-0")
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
services, err := cache.GetPodServiceMemberships(fakeInformerFactory.Core().V1().Services().Lister(), pod)
|
||||
services, err := GetPodServiceMemberships(fakeInformerFactory.Core().V1().Services().Lister(), pod)
|
||||
if err != nil {
|
||||
b.Fatalf("Error from GetPodServiceMemberships(): %v", err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user