diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/BUILD b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/BUILD index 82bc318322b..543eb3eae39 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/BUILD +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/BUILD @@ -46,6 +46,7 @@ go_test( "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", + "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/fake:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1:go_default_library", diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go index ea643f28cfd..d984a64536f 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go @@ -20,6 +20,8 @@ import ( "fmt" "net/http" "net/url" + "reflect" + "sync" "time" v1 "k8s.io/api/core/v1" @@ -72,6 +74,10 @@ type AvailableConditionController struct { syncFn func(key string) error queue workqueue.RateLimitingInterface + // map from service-namespace -> service-name -> apiservice names + cache map[string]map[string][]string + // this lock protects operations on the above cache + cacheLock sync.RWMutex } // NewAvailableConditionController returns a new AvailableConditionController. @@ -413,26 +419,23 @@ func (c *AvailableConditionController) processNextWorkItem() bool { return true } -func (c *AvailableConditionController) enqueue(obj *apiregistrationv1.APIService) { - key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) - if err != nil { - klog.Errorf("Couldn't get key for object %#v: %v", obj, err) - return - } - - c.queue.Add(key) -} - func (c *AvailableConditionController) addAPIService(obj interface{}) { castObj := obj.(*apiregistrationv1.APIService) klog.V(4).Infof("Adding %s", castObj.Name) - c.enqueue(castObj) + if castObj.Spec.Service != nil { + c.rebuildAPIServiceCache() + } + c.queue.Add(castObj.Name) } -func (c *AvailableConditionController) updateAPIService(obj, _ interface{}) { - castObj := obj.(*apiregistrationv1.APIService) - klog.V(4).Infof("Updating %s", castObj.Name) - c.enqueue(castObj) +func (c *AvailableConditionController) updateAPIService(oldObj, newObj interface{}) { + castObj := newObj.(*apiregistrationv1.APIService) + oldCastObj := oldObj.(*apiregistrationv1.APIService) + klog.V(4).Infof("Updating %s", oldCastObj.Name) + if !reflect.DeepEqual(castObj.Spec.Service, oldCastObj.Spec.Service) { + c.rebuildAPIServiceCache() + } + c.queue.Add(oldCastObj.Name) } func (c *AvailableConditionController) deleteAPIService(obj interface{}) { @@ -450,42 +453,55 @@ func (c *AvailableConditionController) deleteAPIService(obj interface{}) { } } klog.V(4).Infof("Deleting %q", castObj.Name) - c.enqueue(castObj) + if castObj.Spec.Service != nil { + c.rebuildAPIServiceCache() + } + c.queue.Add(castObj.Name) } -// there aren't very many apiservices, just check them all. -func (c *AvailableConditionController) getAPIServicesFor(obj runtime.Object) []*apiregistrationv1.APIService { +func (c *AvailableConditionController) getAPIServicesFor(obj runtime.Object) []string { metadata, err := meta.Accessor(obj) if err != nil { utilruntime.HandleError(err) return nil } + c.cacheLock.RLock() + defer c.cacheLock.RUnlock() + return c.cache[metadata.GetNamespace()][metadata.GetName()] +} - var ret []*apiregistrationv1.APIService +// if the service/endpoint handler wins the race against the cache rebuilding, it may queue a no-longer-relevant apiservice +// (which will get processed an extra time - this doesn't matter), +// and miss a newly relevant apiservice (which will get queued by the apiservice handler) +func (c *AvailableConditionController) rebuildAPIServiceCache() { apiServiceList, _ := c.apiServiceLister.List(labels.Everything()) + newCache := map[string]map[string][]string{} for _, apiService := range apiServiceList { if apiService.Spec.Service == nil { continue } - if apiService.Spec.Service.Namespace == metadata.GetNamespace() && apiService.Spec.Service.Name == metadata.GetName() { - ret = append(ret, apiService) + if newCache[apiService.Spec.Service.Namespace] == nil { + newCache[apiService.Spec.Service.Namespace] = map[string][]string{} } + newCache[apiService.Spec.Service.Namespace][apiService.Spec.Service.Name] = append(newCache[apiService.Spec.Service.Namespace][apiService.Spec.Service.Name], apiService.Name) } - return ret + c.cacheLock.Lock() + defer c.cacheLock.Unlock() + c.cache = newCache } // TODO, think of a way to avoid checking on every service manipulation func (c *AvailableConditionController) addService(obj interface{}) { for _, apiService := range c.getAPIServicesFor(obj.(*v1.Service)) { - c.enqueue(apiService) + c.queue.Add(apiService) } } func (c *AvailableConditionController) updateService(obj, _ interface{}) { for _, apiService := range c.getAPIServicesFor(obj.(*v1.Service)) { - c.enqueue(apiService) + c.queue.Add(apiService) } } @@ -504,19 +520,19 @@ func (c *AvailableConditionController) deleteService(obj interface{}) { } } for _, apiService := range c.getAPIServicesFor(castObj) { - c.enqueue(apiService) + c.queue.Add(apiService) } } func (c *AvailableConditionController) addEndpoints(obj interface{}) { for _, apiService := range c.getAPIServicesFor(obj.(*v1.Endpoints)) { - c.enqueue(apiService) + c.queue.Add(apiService) } } func (c *AvailableConditionController) updateEndpoints(obj, _ interface{}) { for _, apiService := range c.getAPIServicesFor(obj.(*v1.Endpoints)) { - c.enqueue(apiService) + c.queue.Add(apiService) } } @@ -535,6 +551,6 @@ func (c *AvailableConditionController) deleteEndpoints(obj interface{}) { } } for _, apiService := range c.getAPIServicesFor(castObj) { - c.enqueue(apiService) + c.queue.Add(apiService) } } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go index 62f5a87ae5e..af1f94166f9 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go @@ -24,6 +24,7 @@ import ( "net/url" "strings" "testing" + "time" "github.com/davecgh/go-spew/spew" @@ -32,6 +33,7 @@ import ( v1listers "k8s.io/client-go/listers/core/v1" clienttesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" apiregistration "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/fake" apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1" @@ -103,6 +105,103 @@ func newRemoteAPIService(name string) *apiregistration.APIService { } } +func setupAPIServices(apiServices []*apiregistration.APIService) (*AvailableConditionController, *fake.Clientset) { + fakeClient := fake.NewSimpleClientset() + apiServiceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + endpointsIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + + testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer testServer.Close() + + for _, o := range apiServices { + apiServiceIndexer.Add(o) + } + + c := AvailableConditionController{ + apiServiceClient: fakeClient.ApiregistrationV1(), + apiServiceLister: listers.NewAPIServiceLister(apiServiceIndexer), + serviceLister: v1listers.NewServiceLister(serviceIndexer), + endpointsLister: v1listers.NewEndpointsLister(endpointsIndexer), + discoveryClient: testServer.Client(), + serviceResolver: &fakeServiceResolver{url: testServer.URL}, + queue: workqueue.NewNamedRateLimitingQueue( + // We want a fairly tight requeue time. The controller listens to the API, but because it relies on the routability of the + // service network, it is possible for an external, non-watchable factor to affect availability. This keeps + // the maximum disruption time to a minimum, but it does prevent hot loops. + workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second), + "AvailableConditionController"), + } + for _, svc := range apiServices { + c.addAPIService(svc) + } + return &c, fakeClient +} + +func BenchmarkBuildCache(b *testing.B) { + apiServiceName := "remote.group" + // model 1 APIService pointing at a given service, and 30 pointing at local group/versions + apiServices := []*apiregistration.APIService{newRemoteAPIService(apiServiceName)} + for i := 0; i < 30; i++ { + apiServices = append(apiServices, newLocalAPIService(fmt.Sprintf("local.group%d", i))) + } + // model one service backing an API service, and 100 unrelated services + services := []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)} + for i := 0; i < 100; i++ { + services = append(services, newService("foo", fmt.Sprintf("bar%d", i), testServicePort, testServicePortName)) + } + c, _ := setupAPIServices(apiServices) + b.ReportAllocs() + b.ResetTimer() + for n := 1; n <= b.N; n++ { + for _, svc := range services { + c.addService(svc) + } + for _, svc := range services { + c.updateService(svc, svc) + } + for _, svc := range services { + c.deleteService(svc) + } + } +} + +func TestBuildCache(t *testing.T) { + tests := []struct { + name string + + apiServiceName string + apiServices []*apiregistration.APIService + services []*v1.Service + endpoints []*v1.Endpoints + + expectedAvailability apiregistration.APIServiceCondition + }{ + { + name: "api service", + apiServiceName: "remote.group", + apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")}, + services: []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)}, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + c, fakeClient := setupAPIServices(tc.apiServices) + for _, svc := range tc.services { + c.addService(svc) + } + + c.sync(tc.apiServiceName) + + // ought to have one action writing status + if e, a := 1, len(fakeClient.Actions()); e != a { + t.Fatalf("%v expected %v, got %v", tc.name, e, fakeClient.Actions()) + } + }) + } +} func TestSync(t *testing.T) { tests := []struct { name string