Introduce cache for APIServices

This commit is contained in:
Ted Yu 2019-08-12 20:25:35 -07:00 committed by Ted Yu
parent 29669a5e21
commit 7db2c859bc
3 changed files with 144 additions and 28 deletions

View File

@ -46,6 +46,7 @@ go_test(
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/fake:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1:go_default_library",

View File

@ -20,6 +20,8 @@ import (
"fmt"
"net/http"
"net/url"
"reflect"
"sync"
"time"
v1 "k8s.io/api/core/v1"
@ -72,6 +74,10 @@ type AvailableConditionController struct {
syncFn func(key string) error
queue workqueue.RateLimitingInterface
// map from service-namespace -> service-name -> apiservice names
cache map[string]map[string][]string
// this lock protects operations on the above cache
cacheLock sync.RWMutex
}
// NewAvailableConditionController returns a new AvailableConditionController.
@ -413,26 +419,23 @@ func (c *AvailableConditionController) processNextWorkItem() bool {
return true
}
func (c *AvailableConditionController) enqueue(obj *apiregistrationv1.APIService) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
klog.Errorf("Couldn't get key for object %#v: %v", obj, err)
return
}
c.queue.Add(key)
}
func (c *AvailableConditionController) addAPIService(obj interface{}) {
castObj := obj.(*apiregistrationv1.APIService)
klog.V(4).Infof("Adding %s", castObj.Name)
c.enqueue(castObj)
if castObj.Spec.Service != nil {
c.rebuildAPIServiceCache()
}
c.queue.Add(castObj.Name)
}
func (c *AvailableConditionController) updateAPIService(obj, _ interface{}) {
castObj := obj.(*apiregistrationv1.APIService)
klog.V(4).Infof("Updating %s", castObj.Name)
c.enqueue(castObj)
func (c *AvailableConditionController) updateAPIService(oldObj, newObj interface{}) {
castObj := newObj.(*apiregistrationv1.APIService)
oldCastObj := oldObj.(*apiregistrationv1.APIService)
klog.V(4).Infof("Updating %s", oldCastObj.Name)
if !reflect.DeepEqual(castObj.Spec.Service, oldCastObj.Spec.Service) {
c.rebuildAPIServiceCache()
}
c.queue.Add(oldCastObj.Name)
}
func (c *AvailableConditionController) deleteAPIService(obj interface{}) {
@ -450,42 +453,55 @@ func (c *AvailableConditionController) deleteAPIService(obj interface{}) {
}
}
klog.V(4).Infof("Deleting %q", castObj.Name)
c.enqueue(castObj)
if castObj.Spec.Service != nil {
c.rebuildAPIServiceCache()
}
c.queue.Add(castObj.Name)
}
// there aren't very many apiservices, just check them all.
func (c *AvailableConditionController) getAPIServicesFor(obj runtime.Object) []*apiregistrationv1.APIService {
func (c *AvailableConditionController) getAPIServicesFor(obj runtime.Object) []string {
metadata, err := meta.Accessor(obj)
if err != nil {
utilruntime.HandleError(err)
return nil
}
c.cacheLock.RLock()
defer c.cacheLock.RUnlock()
return c.cache[metadata.GetNamespace()][metadata.GetName()]
}
var ret []*apiregistrationv1.APIService
// if the service/endpoint handler wins the race against the cache rebuilding, it may queue a no-longer-relevant apiservice
// (which will get processed an extra time - this doesn't matter),
// and miss a newly relevant apiservice (which will get queued by the apiservice handler)
func (c *AvailableConditionController) rebuildAPIServiceCache() {
apiServiceList, _ := c.apiServiceLister.List(labels.Everything())
newCache := map[string]map[string][]string{}
for _, apiService := range apiServiceList {
if apiService.Spec.Service == nil {
continue
}
if apiService.Spec.Service.Namespace == metadata.GetNamespace() && apiService.Spec.Service.Name == metadata.GetName() {
ret = append(ret, apiService)
if newCache[apiService.Spec.Service.Namespace] == nil {
newCache[apiService.Spec.Service.Namespace] = map[string][]string{}
}
newCache[apiService.Spec.Service.Namespace][apiService.Spec.Service.Name] = append(newCache[apiService.Spec.Service.Namespace][apiService.Spec.Service.Name], apiService.Name)
}
return ret
c.cacheLock.Lock()
defer c.cacheLock.Unlock()
c.cache = newCache
}
// TODO, think of a way to avoid checking on every service manipulation
func (c *AvailableConditionController) addService(obj interface{}) {
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Service)) {
c.enqueue(apiService)
c.queue.Add(apiService)
}
}
func (c *AvailableConditionController) updateService(obj, _ interface{}) {
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Service)) {
c.enqueue(apiService)
c.queue.Add(apiService)
}
}
@ -504,19 +520,19 @@ func (c *AvailableConditionController) deleteService(obj interface{}) {
}
}
for _, apiService := range c.getAPIServicesFor(castObj) {
c.enqueue(apiService)
c.queue.Add(apiService)
}
}
func (c *AvailableConditionController) addEndpoints(obj interface{}) {
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Endpoints)) {
c.enqueue(apiService)
c.queue.Add(apiService)
}
}
func (c *AvailableConditionController) updateEndpoints(obj, _ interface{}) {
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Endpoints)) {
c.enqueue(apiService)
c.queue.Add(apiService)
}
}
@ -535,6 +551,6 @@ func (c *AvailableConditionController) deleteEndpoints(obj interface{}) {
}
}
for _, apiService := range c.getAPIServicesFor(castObj) {
c.enqueue(apiService)
c.queue.Add(apiService)
}
}

View File

@ -24,6 +24,7 @@ import (
"net/url"
"strings"
"testing"
"time"
"github.com/davecgh/go-spew/spew"
@ -32,6 +33,7 @@ import (
v1listers "k8s.io/client-go/listers/core/v1"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
apiregistration "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
"k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/fake"
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"
@ -103,6 +105,103 @@ func newRemoteAPIService(name string) *apiregistration.APIService {
}
}
func setupAPIServices(apiServices []*apiregistration.APIService) (*AvailableConditionController, *fake.Clientset) {
fakeClient := fake.NewSimpleClientset()
apiServiceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
endpointsIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer testServer.Close()
for _, o := range apiServices {
apiServiceIndexer.Add(o)
}
c := AvailableConditionController{
apiServiceClient: fakeClient.ApiregistrationV1(),
apiServiceLister: listers.NewAPIServiceLister(apiServiceIndexer),
serviceLister: v1listers.NewServiceLister(serviceIndexer),
endpointsLister: v1listers.NewEndpointsLister(endpointsIndexer),
discoveryClient: testServer.Client(),
serviceResolver: &fakeServiceResolver{url: testServer.URL},
queue: workqueue.NewNamedRateLimitingQueue(
// We want a fairly tight requeue time. The controller listens to the API, but because it relies on the routability of the
// service network, it is possible for an external, non-watchable factor to affect availability. This keeps
// the maximum disruption time to a minimum, but it does prevent hot loops.
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second),
"AvailableConditionController"),
}
for _, svc := range apiServices {
c.addAPIService(svc)
}
return &c, fakeClient
}
func BenchmarkBuildCache(b *testing.B) {
apiServiceName := "remote.group"
// model 1 APIService pointing at a given service, and 30 pointing at local group/versions
apiServices := []*apiregistration.APIService{newRemoteAPIService(apiServiceName)}
for i := 0; i < 30; i++ {
apiServices = append(apiServices, newLocalAPIService(fmt.Sprintf("local.group%d", i)))
}
// model one service backing an API service, and 100 unrelated services
services := []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)}
for i := 0; i < 100; i++ {
services = append(services, newService("foo", fmt.Sprintf("bar%d", i), testServicePort, testServicePortName))
}
c, _ := setupAPIServices(apiServices)
b.ReportAllocs()
b.ResetTimer()
for n := 1; n <= b.N; n++ {
for _, svc := range services {
c.addService(svc)
}
for _, svc := range services {
c.updateService(svc, svc)
}
for _, svc := range services {
c.deleteService(svc)
}
}
}
func TestBuildCache(t *testing.T) {
tests := []struct {
name string
apiServiceName string
apiServices []*apiregistration.APIService
services []*v1.Service
endpoints []*v1.Endpoints
expectedAvailability apiregistration.APIServiceCondition
}{
{
name: "api service",
apiServiceName: "remote.group",
apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")},
services: []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
c, fakeClient := setupAPIServices(tc.apiServices)
for _, svc := range tc.services {
c.addService(svc)
}
c.sync(tc.apiServiceName)
// ought to have one action writing status
if e, a := 1, len(fakeClient.Actions()); e != a {
t.Fatalf("%v expected %v, got %v", tc.name, e, fakeClient.Actions())
}
})
}
}
func TestSync(t *testing.T) {
tests := []struct {
name string