From d9f2cc0c955a923cc0515d370ac60fa335b5fab0 Mon Sep 17 00:00:00 2001 From: John Howard Date: Tue, 15 Nov 2022 10:40:23 -0800 Subject: [PATCH] endpoints: remove obsolete ServiceSelectorCache MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Since https://github.com/kubernetes/kubernetes/pull/112648, we can efficiently handle selectors from pre-existing `map[string]string`, making the cache obsolete. Benchmark: ``` name old time/op new time/op delta GetPodServiceMemberships-48 189µs ± 1% 193µs ± 1% +2.10% (p=0.000 n=10+10) name old alloc/op new alloc/op delta GetPodServiceMemberships-48 59.0kB ± 0% 58.9kB ± 0% -0.09% (p=0.000 n=9+9) name old allocs/op new allocs/op delta GetPodServiceMemberships-48 1.02k ± 0% 1.02k ± 0% ~ (all equal) ``` --- .../endpoint/endpoints_controller.go | 14 +---- .../endpointslice/endpointslice_controller.go | 11 +--- .../util/endpoint/controller_utils.go | 56 ++--------------- .../util/endpoint/controller_utils_test.go | 61 +------------------ 4 files changed, 12 insertions(+), 130 deletions(-) diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index d003cf1b71f..1ceff54e0e9 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -110,8 +110,6 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme e.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod - e.serviceSelectorCache = endpointutil.NewServiceSelectorCache() - return e } @@ -157,10 +155,6 @@ type Controller struct { triggerTimeTracker *endpointutil.TriggerTimeTracker endpointUpdatesBatchPeriod time.Duration - - // serviceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls - // to AsSelectorPreValidated (see #73527) - serviceSelectorCache *endpointutil.ServiceSelectorCache } // Run will not return until stopCh is closed. workers determines how many @@ -198,7 +192,7 @@ func (e *Controller) Run(ctx context.Context, workers int) { // enqueue them. obj must have *v1.Pod type. func (e *Controller) addPod(obj interface{}) { pod := obj.(*v1.Pod) - services, err := e.serviceSelectorCache.GetPodServiceMemberships(e.serviceLister, pod) + services, err := endpointutil.GetPodServiceMemberships(e.serviceLister, pod) if err != nil { utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err)) return @@ -267,7 +261,7 @@ func podToEndpointAddressForService(svc *v1.Service, pod *v1.Pod) (*v1.EndpointA // and what services it will be a member of, and enqueue the union of these. // old and cur must be *v1.Pod types. func (e *Controller) updatePod(old, cur interface{}) { - services := endpointutil.GetServicesToUpdateOnPodChange(e.serviceLister, e.serviceSelectorCache, old, cur) + services := endpointutil.GetServicesToUpdateOnPodChange(e.serviceLister, old, cur) for key := range services { e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod) } @@ -289,8 +283,6 @@ func (e *Controller) onServiceUpdate(obj interface{}) { utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) return } - - _ = e.serviceSelectorCache.Update(key, obj.(*v1.Service).Spec.Selector) e.queue.Add(key) } @@ -301,8 +293,6 @@ func (e *Controller) onServiceDelete(obj interface{}) { utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) return } - - e.serviceSelectorCache.Delete(key) e.queue.Add(key) } diff --git a/pkg/controller/endpointslice/endpointslice_controller.go b/pkg/controller/endpointslice/endpointslice_controller.go index f4767f240de..5b9db597346 100644 --- a/pkg/controller/endpointslice/endpointslice_controller.go +++ b/pkg/controller/endpointslice/endpointslice_controller.go @@ -145,7 +145,6 @@ func NewController(podInformer coreinformers.PodInformer, c.eventRecorder = recorder c.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod - c.serviceSelectorCache = endpointutil.NewServiceSelectorCache() if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) { nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -234,10 +233,6 @@ type Controller struct { // This can be used to reduce overall number of all endpoint slice updates. endpointUpdatesBatchPeriod time.Duration - // serviceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls - // to AsSelectorPreValidated (see #73527) - serviceSelectorCache *endpointutil.ServiceSelectorCache - // topologyCache tracks the distribution of Nodes and endpoints across zones // to enable TopologyAwareHints. topologyCache *topologycache.TopologyCache @@ -395,7 +390,6 @@ func (c *Controller) onServiceUpdate(obj interface{}) { return } - _ = c.serviceSelectorCache.Update(key, obj.(*v1.Service).Spec.Selector) c.queue.Add(key) } @@ -407,7 +401,6 @@ func (c *Controller) onServiceDelete(obj interface{}) { return } - c.serviceSelectorCache.Delete(key) c.queue.Add(key) } @@ -486,7 +479,7 @@ func (c *Controller) queueServiceForEndpointSlice(endpointSlice *discovery.Endpo func (c *Controller) addPod(obj interface{}) { pod := obj.(*v1.Pod) - services, err := c.serviceSelectorCache.GetPodServiceMemberships(c.serviceLister, pod) + services, err := endpointutil.GetPodServiceMemberships(c.serviceLister, pod) if err != nil { utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err)) return @@ -497,7 +490,7 @@ func (c *Controller) addPod(obj interface{}) { } func (c *Controller) updatePod(old, cur interface{}) { - services := endpointutil.GetServicesToUpdateOnPodChange(c.serviceLister, c.serviceSelectorCache, old, cur) + services := endpointutil.GetServicesToUpdateOnPodChange(c.serviceLister, old, cur) for key := range services { c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod) } diff --git a/pkg/controller/util/endpoint/controller_utils.go b/pkg/controller/util/endpoint/controller_utils.go index 24410bf8fa0..f33d5f320e4 100644 --- a/pkg/controller/util/endpoint/controller_utils.go +++ b/pkg/controller/util/endpoint/controller_utils.go @@ -22,7 +22,6 @@ import ( "fmt" "reflect" "sort" - "sync" v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" @@ -49,54 +48,15 @@ var semanticIgnoreResourceVersion = conversion.EqualitiesOrDie( }, ) -// ServiceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls to AsSelectorPreValidated (see #73527) -type ServiceSelectorCache struct { - lock sync.RWMutex - cache map[string]labels.Selector -} - -// NewServiceSelectorCache init ServiceSelectorCache for both endpoint controller and endpointSlice controller. -func NewServiceSelectorCache() *ServiceSelectorCache { - return &ServiceSelectorCache{ - cache: map[string]labels.Selector{}, - } -} - -// Get return selector and existence in ServiceSelectorCache by key. -func (sc *ServiceSelectorCache) Get(key string) (labels.Selector, bool) { - sc.lock.RLock() - selector, ok := sc.cache[key] - // fine-grained lock improves GetPodServiceMemberships performance(16.5%) than defer measured by BenchmarkGetPodServiceMemberships - sc.lock.RUnlock() - return selector, ok -} - -// Update can update or add a selector in ServiceSelectorCache while service's selector changed. -func (sc *ServiceSelectorCache) Update(key string, rawSelector map[string]string) labels.Selector { - sc.lock.Lock() - defer sc.lock.Unlock() - selector := labels.Set(rawSelector).AsSelectorPreValidated() - sc.cache[key] = selector - return selector -} - -// Delete can delete selector which exist in ServiceSelectorCache. -func (sc *ServiceSelectorCache) Delete(key string) { - sc.lock.Lock() - defer sc.lock.Unlock() - delete(sc.cache, key) -} - // GetPodServiceMemberships returns a set of Service keys for Services that have // a selector matching the given pod. -func (sc *ServiceSelectorCache) GetPodServiceMemberships(serviceLister v1listers.ServiceLister, pod *v1.Pod) (sets.String, error) { +func GetPodServiceMemberships(serviceLister v1listers.ServiceLister, pod *v1.Pod) (sets.String, error) { set := sets.String{} services, err := serviceLister.Services(pod.Namespace).List(labels.Everything()) if err != nil { return set, err } - var selector labels.Selector for _, service := range services { if service.Spec.Selector == nil { // if the service has a nil selector this means selectors match nothing, not everything. @@ -106,13 +66,7 @@ func (sc *ServiceSelectorCache) GetPodServiceMemberships(serviceLister v1listers if err != nil { return nil, err } - if v, ok := sc.Get(key); ok { - selector = v - } else { - selector = sc.Update(key, service.Spec.Selector) - } - - if selector.Matches(labels.Set(pod.Labels)) { + if labels.ValidatedSetSelector(service.Spec.Selector).Matches(labels.Set(pod.Labels)) { set.Insert(key) } } @@ -206,7 +160,7 @@ func podEndpointsChanged(oldPod, newPod *v1.Pod) (bool, bool) { // GetServicesToUpdateOnPodChange returns a set of Service keys for Services // that have potentially been affected by a change to this pod. -func GetServicesToUpdateOnPodChange(serviceLister v1listers.ServiceLister, selectorCache *ServiceSelectorCache, old, cur interface{}) sets.String { +func GetServicesToUpdateOnPodChange(serviceLister v1listers.ServiceLister, old, cur interface{}) sets.String { newPod := cur.(*v1.Pod) oldPod := old.(*v1.Pod) if newPod.ResourceVersion == oldPod.ResourceVersion { @@ -222,14 +176,14 @@ func GetServicesToUpdateOnPodChange(serviceLister v1listers.ServiceLister, selec return sets.String{} } - services, err := selectorCache.GetPodServiceMemberships(serviceLister, newPod) + services, err := GetPodServiceMemberships(serviceLister, newPod) if err != nil { utilruntime.HandleError(fmt.Errorf("unable to get pod %s/%s's service memberships: %v", newPod.Namespace, newPod.Name, err)) return sets.String{} } if labelsChanged { - oldServices, err := selectorCache.GetPodServiceMemberships(serviceLister, oldPod) + oldServices, err := GetPodServiceMemberships(serviceLister, oldPod) if err != nil { utilruntime.HandleError(fmt.Errorf("unable to get pod %s/%s's service memberships: %v", newPod.Namespace, newPod.Name, err)) } diff --git a/pkg/controller/util/endpoint/controller_utils_test.go b/pkg/controller/util/endpoint/controller_utils_test.go index 07580b55ce1..43f27f1d3ae 100644 --- a/pkg/controller/util/endpoint/controller_utils_test.go +++ b/pkg/controller/util/endpoint/controller_utils_test.go @@ -18,14 +18,12 @@ package endpoint import ( "fmt" - "reflect" "testing" "time" v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" @@ -330,7 +328,7 @@ func genSimpleSvc(namespace, name string) *v1.Service { } } -func TestServiceSelectorCache_GetPodServiceMemberships(t *testing.T) { +func TestGetPodServiceMemberships(t *testing.T) { fakeInformerFactory := informers.NewSharedInformerFactory(&fake.Clientset{}, 0*time.Second) for i := 0; i < 3; i++ { service := &v1.Service{ @@ -361,7 +359,6 @@ func TestServiceSelectorCache_GetPodServiceMemberships(t *testing.T) { pods = append(pods, pod) } - cache := NewServiceSelectorCache() tests := []struct { name string pod *v1.Pod @@ -395,7 +392,7 @@ func TestServiceSelectorCache_GetPodServiceMemberships(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - services, err := cache.GetPodServiceMemberships(fakeInformerFactory.Core().V1().Services().Lister(), test.pod) + services, err := GetPodServiceMemberships(fakeInformerFactory.Core().V1().Services().Lister(), test.pod) if err != nil { t.Errorf("Error from cache.GetPodServiceMemberships: %v", err) } else if !services.Equal(test.expect) { @@ -405,57 +402,6 @@ func TestServiceSelectorCache_GetPodServiceMemberships(t *testing.T) { } } -func TestServiceSelectorCache_Update(t *testing.T) { - var selectors []labels.Selector - for i := 0; i < 5; i++ { - selector := labels.Set(map[string]string{"app": fmt.Sprintf("test-%d", i)}).AsSelectorPreValidated() - selectors = append(selectors, selector) - } - tests := []struct { - name string - key string - cache *ServiceSelectorCache - update map[string]string - expect labels.Selector - }{ - { - name: "add test/service-0", - key: "test/service-0", - cache: generateServiceSelectorCache(map[string]labels.Selector{}), - update: map[string]string{"app": "test-0"}, - expect: selectors[0], - }, - { - name: "add test/service-1", - key: "test/service-1", - cache: generateServiceSelectorCache(map[string]labels.Selector{"test/service-0": selectors[0]}), - update: map[string]string{"app": "test-1"}, - expect: selectors[1], - }, - { - name: "update test/service-2", - key: "test/service-2", - cache: generateServiceSelectorCache(map[string]labels.Selector{"test/service-2": selectors[2]}), - update: map[string]string{"app": "test-0"}, - expect: selectors[0], - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - selector := test.cache.Update(test.key, test.update) - if !reflect.DeepEqual(selector, test.expect) { - t.Errorf("Expect selector %v , but got %v", test.expect, selector) - } - }) - } -} - -func generateServiceSelectorCache(cache map[string]labels.Selector) *ServiceSelectorCache { - return &ServiceSelectorCache{ - cache: cache, - } -} - func BenchmarkGetPodServiceMemberships(b *testing.B) { // init fake service informer. fakeInformerFactory := informers.NewSharedInformerFactory(&fake.Clientset{}, 0*time.Second) @@ -484,11 +430,10 @@ func BenchmarkGetPodServiceMemberships(b *testing.B) { }, } - cache := NewServiceSelectorCache() expect := sets.NewString("test/service-0") b.ResetTimer() for i := 0; i < b.N; i++ { - services, err := cache.GetPodServiceMemberships(fakeInformerFactory.Core().V1().Services().Lister(), pod) + services, err := GetPodServiceMemberships(fakeInformerFactory.Core().V1().Services().Lister(), pod) if err != nil { b.Fatalf("Error from GetPodServiceMemberships(): %v", err) }