From 2133f9e9ff926ad40aa55b85b6242ca51ae4fe99 Mon Sep 17 00:00:00 2001 From: louisgong Date: Tue, 22 Oct 2019 19:57:28 +0800 Subject: [PATCH] add service selector cache in endpoint controller and endpointSlice controller --- .../endpoint/endpoints_controller.go | 36 +++- .../endpoint/endpoints_controller_test.go | 2 +- .../endpointslice/endpointslice_controller.go | 35 +++- pkg/controller/util/endpoint/BUILD | 4 + .../util/endpoint/controller_utils.go | 98 +++++++--- .../util/endpoint/controller_utils_test.go | 175 ++++++++++++++++++ 6 files changed, 309 insertions(+), 41 deletions(-) diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 625f68da217..14996408cfa 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -46,8 +47,6 @@ import ( helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/controller" endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" - - utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/features" utilnet "k8s.io/utils/net" ) @@ -93,11 +92,11 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme } serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: e.enqueueService, + AddFunc: e.onServiceUpdate, UpdateFunc: func(old, cur interface{}) { - e.enqueueService(cur) + e.onServiceUpdate(cur) }, - DeleteFunc: e.enqueueService, + DeleteFunc: e.onServiceDelete, }) e.serviceLister = serviceInformer.Lister() e.servicesSynced = serviceInformer.Informer().HasSynced @@ -119,6 +118,8 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme e.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod + e.serviceSelectorCache = endpointutil.NewServiceSelectorCache() + return e } @@ -164,6 +165,10 @@ type EndpointController struct { triggerTimeTracker *endpointutil.TriggerTimeTracker endpointUpdatesBatchPeriod time.Duration + + // serviceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls + // to AsSelectorPreValidated (see #73527) + serviceSelectorCache *endpointutil.ServiceSelectorCache } // Run will not return until stopCh is closed. workers determines how many @@ -195,7 +200,7 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) { // enqueue them. obj must have *v1.Pod type. func (e *EndpointController) addPod(obj interface{}) { pod := obj.(*v1.Pod) - services, err := endpointutil.GetPodServiceMemberships(e.serviceLister, pod) + services, err := e.serviceSelectorCache.GetPodServiceMemberships(e.serviceLister, pod) if err != nil { utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err)) return @@ -263,7 +268,7 @@ func endpointChanged(pod1, pod2 *v1.Pod) bool { // and what services it will be a member of, and enqueue the union of these. // old and cur must be *v1.Pod types. func (e *EndpointController) updatePod(old, cur interface{}) { - services := endpointutil.GetServicesToUpdateOnPodChange(e.serviceLister, old, cur, endpointChanged) + services := endpointutil.GetServicesToUpdateOnPodChange(e.serviceLister, e.serviceSelectorCache, old, cur, endpointChanged) for key := range services { e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod) } @@ -278,14 +283,27 @@ func (e *EndpointController) deletePod(obj interface{}) { } } -// obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item. -func (e *EndpointController) enqueueService(obj interface{}) { +// onServiceUpdate updates the Service Selector in the cache and queues the Service for processing. +func (e *EndpointController) onServiceUpdate(obj interface{}) { key, err := controller.KeyFunc(obj) if err != nil { utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) return } + _ = e.serviceSelectorCache.Update(key, obj.(*v1.Service).Spec.Selector) + e.queue.Add(key) +} + +// onServiceDelete removes the Service Selector from the cache and queues the Service for processing. +func (e *EndpointController) onServiceDelete(obj interface{}) { + key, err := controller.KeyFunc(obj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) + return + } + + e.serviceSelectorCache.Delete(key) e.queue.Add(key) } diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 96ea347a5fa..f1cd5e01ac4 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -782,7 +782,7 @@ func TestWaitsForAllInformersToBeSynced2(t *testing.T) { }, } endpoints.serviceStore.Add(service) - endpoints.enqueueService(service) + endpoints.onServiceUpdate(service) endpoints.podsSynced = test.podsSynced endpoints.servicesSynced = test.servicesSynced endpoints.endpointsSynced = test.endpointsSynced diff --git a/pkg/controller/endpointslice/endpointslice_controller.go b/pkg/controller/endpointslice/endpointslice_controller.go index 06ea773c600..62fc453ecfb 100644 --- a/pkg/controller/endpointslice/endpointslice_controller.go +++ b/pkg/controller/endpointslice/endpointslice_controller.go @@ -82,11 +82,11 @@ func NewController(podInformer coreinformers.PodInformer, } serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: c.enqueueService, + AddFunc: c.onServiceUpdate, UpdateFunc: func(old, cur interface{}) { - c.enqueueService(cur) + c.onServiceUpdate(cur) }, - DeleteFunc: c.enqueueService, + DeleteFunc: c.onServiceDelete, }) c.serviceLister = serviceInformer.Lister() c.servicesSynced = serviceInformer.Informer().HasSynced @@ -118,6 +118,8 @@ func NewController(podInformer coreinformers.PodInformer, c.eventBroadcaster = broadcaster c.eventRecorder = recorder + c.serviceSelectorCache = endpointutil.NewServiceSelectorCache() + return c } @@ -176,6 +178,10 @@ type Controller struct { // workerLoopPeriod is the time between worker runs. The workers // process the queue of service and pod changes workerLoopPeriod time.Duration + + // serviceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls + // to AsSelectorPreValidated (see #73527) + serviceSelectorCache *endpointutil.ServiceSelectorCache } // Run will not return until stopCh is closed. @@ -307,20 +313,33 @@ func (c *Controller) syncService(key string) error { return nil } -// obj could be a *v1.Service or a DeletionalFinalStateUnknown marker item -func (c *Controller) enqueueService(obj interface{}) { +// onServiceUpdate updates the Service Selector in the cache and queues the Service for processing. +func (c *Controller) onServiceUpdate(obj interface{}) { key, err := controller.KeyFunc(obj) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for object")) + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) return } + _ = c.serviceSelectorCache.Update(key, obj.(*v1.Service).Spec.Selector) + c.queue.Add(key) +} + +// onServiceDelete removes the Service Selector from the cache and queues the Service for processing. +func (c *Controller) onServiceDelete(obj interface{}) { + key, err := controller.KeyFunc(obj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) + return + } + + c.serviceSelectorCache.Delete(key) c.queue.Add(key) } func (c *Controller) addPod(obj interface{}) { pod := obj.(*v1.Pod) - services, err := endpointutil.GetPodServiceMemberships(c.serviceLister, pod) + services, err := c.serviceSelectorCache.GetPodServiceMemberships(c.serviceLister, pod) if err != nil { utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err)) return @@ -331,7 +350,7 @@ func (c *Controller) addPod(obj interface{}) { } func (c *Controller) updatePod(old, cur interface{}) { - services := endpointutil.GetServicesToUpdateOnPodChange(c.serviceLister, old, cur, podEndpointChanged) + services := endpointutil.GetServicesToUpdateOnPodChange(c.serviceLister, c.serviceSelectorCache, old, cur, podEndpointChanged) for key := range services { c.queue.Add(key) } diff --git a/pkg/controller/util/endpoint/BUILD b/pkg/controller/util/endpoint/BUILD index c7c3c19a528..dc0331f6807 100644 --- a/pkg/controller/util/endpoint/BUILD +++ b/pkg/controller/util/endpoint/BUILD @@ -14,6 +14,7 @@ go_library( "//pkg/util/hash:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", @@ -31,8 +32,11 @@ go_test( deps = [ "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/client-go/informers:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", ], ) diff --git a/pkg/controller/util/endpoint/controller_utils.go b/pkg/controller/util/endpoint/controller_utils.go index 07d3f3e1cca..3c5383da842 100644 --- a/pkg/controller/util/endpoint/controller_utils.go +++ b/pkg/controller/util/endpoint/controller_utils.go @@ -22,9 +22,11 @@ import ( "fmt" "reflect" "sort" + "sync" v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1alpha1" + "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" v1listers "k8s.io/client-go/listers/core/v1" @@ -34,6 +36,76 @@ import ( "k8s.io/kubernetes/pkg/util/hash" ) +// ServiceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls to AsSelectorPreValidated (see #73527) +type ServiceSelectorCache struct { + lock sync.RWMutex + cache map[string]labels.Selector +} + +// NewServiceSelectorCache init ServiceSelectorCache for both endpoint controller and endpointSlice controller. +func NewServiceSelectorCache() *ServiceSelectorCache { + return &ServiceSelectorCache{ + cache: map[string]labels.Selector{}, + } +} + +// Get return selector and existence in ServiceSelectorCache by key. +func (sc *ServiceSelectorCache) Get(key string) (labels.Selector, bool) { + sc.lock.RLock() + selector, ok := sc.cache[key] + // fine-grained lock improves GetPodServiceMemberships performance(16.5%) than defer measured by BenchmarkGetPodServiceMemberships + sc.lock.RUnlock() + return selector, ok +} + +// Update can update or add a selector in ServiceSelectorCache while service's selector changed. +func (sc *ServiceSelectorCache) Update(key string, rawSelector map[string]string) labels.Selector { + sc.lock.Lock() + defer sc.lock.Unlock() + selector := labels.Set(rawSelector).AsSelectorPreValidated() + sc.cache[key] = selector + return selector +} + +// Delete can delete selector which exist in ServiceSelectorCache. +func (sc *ServiceSelectorCache) Delete(key string) { + sc.lock.Lock() + defer sc.lock.Unlock() + delete(sc.cache, key) +} + +// GetPodServiceMemberships returns a set of Service keys for Services that have +// a selector matching the given pod. +func (sc *ServiceSelectorCache) GetPodServiceMemberships(serviceLister v1listers.ServiceLister, pod *v1.Pod) (sets.String, error) { + set := sets.String{} + services, err := serviceLister.Services(pod.Namespace).List(labels.Everything()) + if err != nil { + return set, err + } + + var selector labels.Selector + for _, service := range services { + if service.Spec.Selector == nil { + // if the service has a nil selector this means selectors match nothing, not everything. + continue + } + key, err := controller.KeyFunc(service) + if err != nil { + return nil, err + } + if v, ok := sc.Get(key); ok { + selector = v + } else { + selector = sc.Update(key, service.Spec.Selector) + } + + if selector.Matches(labels.Set(pod.Labels)) { + set.Insert(key) + } + } + return set, nil +} + // EndpointsMatch is a type of function that returns true if pod endpoints match. type EndpointsMatch func(*v1.Pod, *v1.Pod) bool @@ -100,29 +172,9 @@ func PodChanged(oldPod, newPod *v1.Pod, endpointChanged EndpointsMatch) (bool, b return endpointChanged(newPod, oldPod), labelsChanged } -// GetPodServiceMemberships returns a set of Service keys for Services that have -// a selector matching the given pod. -func GetPodServiceMemberships(serviceLister v1listers.ServiceLister, pod *v1.Pod) (sets.String, error) { - set := sets.String{} - services, err := serviceLister.GetPodServices(pod) - if err != nil { - // don't log this error because this function makes pointless - // errors when no services match - return set, nil - } - for i := range services { - key, err := controller.KeyFunc(services[i]) - if err != nil { - return nil, err - } - set.Insert(key) - } - return set, nil -} - // GetServicesToUpdateOnPodChange returns a set of Service keys for Services // that have potentially been affected by a change to this pod. -func GetServicesToUpdateOnPodChange(serviceLister v1listers.ServiceLister, old, cur interface{}, endpointChanged EndpointsMatch) sets.String { +func GetServicesToUpdateOnPodChange(serviceLister v1listers.ServiceLister, selectorCache *ServiceSelectorCache, old, cur interface{}, endpointChanged EndpointsMatch) sets.String { newPod := cur.(*v1.Pod) oldPod := old.(*v1.Pod) if newPod.ResourceVersion == oldPod.ResourceVersion { @@ -138,14 +190,14 @@ func GetServicesToUpdateOnPodChange(serviceLister v1listers.ServiceLister, old, return sets.String{} } - services, err := GetPodServiceMemberships(serviceLister, newPod) + services, err := selectorCache.GetPodServiceMemberships(serviceLister, newPod) if err != nil { utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", newPod.Namespace, newPod.Name, err)) return sets.String{} } if labelsChanged { - oldServices, err := GetPodServiceMemberships(serviceLister, oldPod) + oldServices, err := selectorCache.GetPodServiceMemberships(serviceLister, oldPod) if err != nil { utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", newPod.Namespace, newPod.Name, err)) } diff --git a/pkg/controller/util/endpoint/controller_utils_test.go b/pkg/controller/util/endpoint/controller_utils_test.go index 1f12fbcb83c..bccfd66ae74 100644 --- a/pkg/controller/util/endpoint/controller_utils_test.go +++ b/pkg/controller/util/endpoint/controller_utils_test.go @@ -17,10 +17,17 @@ limitations under the License. package endpoint import ( + "fmt" + "reflect" "testing" + "time" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" ) func TestDetermineNeededServiceUpdates(t *testing.T) { @@ -224,3 +231,171 @@ func TestShouldPodBeInEndpoints(t *testing.T) { } } } + +func TestServiceSelectorCache_GetPodServiceMemberships(t *testing.T) { + fakeInformerFactory := informers.NewSharedInformerFactory(&fake.Clientset{}, 0*time.Second) + for i := 0; i < 3; i++ { + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("service-%d", i), + Namespace: "test", + }, + Spec: v1.ServiceSpec{ + Selector: map[string]string{ + "app": fmt.Sprintf("test-%d", i), + }, + }, + } + fakeInformerFactory.Core().V1().Services().Informer().GetStore().Add(service) + } + var pods []*v1.Pod + for i := 0; i < 5; i++ { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test", + Name: fmt.Sprintf("test-pod-%d", i), + Labels: map[string]string{ + "app": fmt.Sprintf("test-%d", i), + "label": fmt.Sprintf("label-%d", i), + }, + }, + } + pods = append(pods, pod) + } + + cache := NewServiceSelectorCache() + tests := []struct { + name string + pod *v1.Pod + expect sets.String + }{ + { + name: "get servicesMemberships for pod-0", + pod: pods[0], + expect: sets.NewString("test/service-0"), + }, + { + name: "get servicesMemberships for pod-1", + pod: pods[1], + expect: sets.NewString("test/service-1"), + }, + { + name: "get servicesMemberships for pod-2", + pod: pods[2], + expect: sets.NewString("test/service-2"), + }, + { + name: "get servicesMemberships for pod-3", + pod: pods[3], + expect: sets.NewString(), + }, + { + name: "get servicesMemberships for pod-4", + pod: pods[4], + expect: sets.NewString(), + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + services, err := cache.GetPodServiceMemberships(fakeInformerFactory.Core().V1().Services().Lister(), test.pod) + if err != nil { + t.Errorf("Error from cache.GetPodServiceMemberships: %v", err) + } else if !services.Equal(test.expect) { + t.Errorf("Expect service %v, but got %v", test.expect, services) + } + }) + } +} + +func TestServiceSelectorCache_Update(t *testing.T) { + var selectors []labels.Selector + for i := 0; i < 5; i++ { + selector := labels.Set(map[string]string{"app": fmt.Sprintf("test-%d", i)}).AsSelectorPreValidated() + selectors = append(selectors, selector) + } + tests := []struct { + name string + key string + cache *ServiceSelectorCache + update map[string]string + expect labels.Selector + }{ + { + name: "add test/service-0", + key: "test/service-0", + cache: generateServiceSelectorCache(map[string]labels.Selector{}), + update: map[string]string{"app": "test-0"}, + expect: selectors[0], + }, + { + name: "add test/service-1", + key: "test/service-1", + cache: generateServiceSelectorCache(map[string]labels.Selector{"test/service-0": selectors[0]}), + update: map[string]string{"app": "test-1"}, + expect: selectors[1], + }, + { + name: "update test/service-2", + key: "test/service-2", + cache: generateServiceSelectorCache(map[string]labels.Selector{"test/service-2": selectors[2]}), + update: map[string]string{"app": "test-0"}, + expect: selectors[0], + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + selector := test.cache.Update(test.key, test.update) + if !reflect.DeepEqual(selector, test.expect) { + t.Errorf("Expect selector %v , but got %v", test.expect, selector) + } + }) + } +} + +func generateServiceSelectorCache(cache map[string]labels.Selector) *ServiceSelectorCache { + return &ServiceSelectorCache{ + cache: cache, + } +} + +func BenchmarkGetPodServiceMemberships(b *testing.B) { + // init fake service informer. + fakeInformerFactory := informers.NewSharedInformerFactory(&fake.Clientset{}, 0*time.Second) + for i := 0; i < 1000; i++ { + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("service-%d", i), + Namespace: "test", + }, + Spec: v1.ServiceSpec{ + Selector: map[string]string{ + "app": fmt.Sprintf("test-%d", i), + }, + }, + } + fakeInformerFactory.Core().V1().Services().Informer().GetStore().Add(service) + } + + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test", + Name: "test-pod-0", + Labels: map[string]string{ + "app": "test-0", + }, + }, + } + + cache := NewServiceSelectorCache() + expect := sets.NewString("test/service-0") + b.ResetTimer() + for i := 0; i < b.N; i++ { + services, err := cache.GetPodServiceMemberships(fakeInformerFactory.Core().V1().Services().Lister(), pod) + if err != nil { + b.Fatalf("Error from GetPodServiceMemberships(): %v", err) + } + if len(services) != len(expect) { + b.Errorf("Expect services size %d, but got: %v", len(expect), len(services)) + } + } +}