From 02c0181f26bd08a66b0619490e8453c8658ca02f Mon Sep 17 00:00:00 2001 From: deads2k Date: Mon, 2 May 2016 10:48:06 -0400 Subject: [PATCH] reduce conflict retries --- pkg/storage/etcd/api_object_versioner.go | 26 ++++++++++- pkg/storage/etcd/api_object_versioner_test.go | 17 +++++++ .../pkg/admission/resourcequota/controller.go | 46 ++++++++++++++++++- 3 files changed, 86 insertions(+), 3 deletions(-) diff --git a/pkg/storage/etcd/api_object_versioner.go b/pkg/storage/etcd/api_object_versioner.go index 94cdd3595cf..639f24a8afd 100644 --- a/pkg/storage/etcd/api_object_versioner.go +++ b/pkg/storage/etcd/api_object_versioner.go @@ -71,4 +71,28 @@ func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, e } // APIObjectVersioner implements Versioner -var _ storage.Versioner = APIObjectVersioner{} +var Versioner storage.Versioner = APIObjectVersioner{} + +// CompareResourceVersion compares etcd resource versions. Outside this API they are all strings, +// but etcd resource versions are special, they're actually ints, so we can easily compare them. +func (a APIObjectVersioner) CompareResourceVersion(lhs, rhs runtime.Object) int { + lhsVersion, err := Versioner.ObjectResourceVersion(lhs) + if err != nil { + // coder error + panic(err) + } + rhsVersion, err := Versioner.ObjectResourceVersion(rhs) + if err != nil { + // coder error + panic(err) + } + + if lhsVersion == rhsVersion { + return 0 + } + if lhsVersion < rhsVersion { + return -1 + } + + return 1 +} diff --git a/pkg/storage/etcd/api_object_versioner_test.go b/pkg/storage/etcd/api_object_versioner_test.go index 3a32fd9491e..86767c8e07a 100644 --- a/pkg/storage/etcd/api_object_versioner_test.go +++ b/pkg/storage/etcd/api_object_versioner_test.go @@ -39,3 +39,20 @@ func TestObjectVersioner(t *testing.T) { t.Errorf("unexpected resource version: %#v", obj) } } + +func TestCompareResourceVersion(t *testing.T) { + five := &storagetesting.TestResource{ObjectMeta: api.ObjectMeta{ResourceVersion: "5"}} + six := &storagetesting.TestResource{ObjectMeta: api.ObjectMeta{ResourceVersion: "6"}} + + versioner := APIObjectVersioner{} + + if e, a := -1, versioner.CompareResourceVersion(five, six); e != a { + t.Errorf("expected %v got %v", e, a) + } + if e, a := 1, versioner.CompareResourceVersion(six, five); e != a { + t.Errorf("expected %v got %v", e, a) + } + if e, a := 0, versioner.CompareResourceVersion(six, six); e != a { + t.Errorf("expected %v got %v", e, a) + } +} diff --git a/plugin/pkg/admission/resourcequota/controller.go b/plugin/pkg/admission/resourcequota/controller.go index 02c304795ac..495e076f5ed 100644 --- a/plugin/pkg/admission/resourcequota/controller.go +++ b/plugin/pkg/admission/resourcequota/controller.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/quota" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage/etcd" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" @@ -52,6 +53,10 @@ type quotaEvaluator struct { // We track the lookup result here so that for repeated requests, we don't look it up very often. liveLookupCache *lru.Cache liveTTL time.Duration + // updatedQuotas holds a cache of quotas that we've updated. This is used to pull the "really latest" during back to + // back quota evaluations that touch the same quota doc. This only works because we can compare etcd resourceVersions + // for the same resource as integers. Before this change: 22 updates with 12 conflicts. after this change: 15 updates with 0 conflicts + updatedQuotas *lru.Cache // TODO these are used together to bucket items by namespace and then batch them up for processing. // The technique is valuable for rollup activities to avoid fanout and reduce resource contention. @@ -101,6 +106,10 @@ func newQuotaEvaluator(client clientset.Interface, registry quota.Registry) (*qu if err != nil { return nil, err } + updatedCache, err := lru.New(100) + if err != nil { + return nil, err + } lw := &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { return client.Core().ResourceQuotas(api.NamespaceAll).List(options) @@ -118,6 +127,7 @@ func newQuotaEvaluator(client clientset.Interface, registry quota.Registry) (*qu registry: registry, liveLookupCache: liveLookupCache, liveTTL: time.Duration(30 * time.Second), + updatedQuotas: updatedCache, queue: workqueue.New(), work: map[string][]*admissionWaiter{}, @@ -247,9 +257,14 @@ func (e *quotaEvaluator) checkQuotas(quotas []api.ResourceQuota, admissionAttrib continue } - if _, err := e.client.Core().ResourceQuotas(newQuota.Namespace).UpdateStatus(&newQuota); err != nil { + if updatedQuota, err := e.client.Core().ResourceQuotas(newQuota.Namespace).UpdateStatus(&newQuota); err != nil { updatedFailedQuotas = append(updatedFailedQuotas, newQuota) lastErr = err + + } else { + // update our cache + e.updateCache(updatedQuota) + } } @@ -472,6 +487,31 @@ func (e *quotaEvaluator) getWork() (string, []*admissionWaiter, bool) { return ns, []*admissionWaiter{}, false } +func (e *quotaEvaluator) updateCache(quota *api.ResourceQuota) { + key := quota.Namespace + "/" + quota.Name + e.updatedQuotas.Add(key, quota) +} + +var etcdVersioner = etcd.APIObjectVersioner{} + +// checkCache compares the passed quota against the value in the look-aside cache and returns the newer +// if the cache is out of date, it deletes the stale entry. This only works because of etcd resourceVersions +// being monotonically increasing integers +func (e *quotaEvaluator) checkCache(quota *api.ResourceQuota) *api.ResourceQuota { + key := quota.Namespace + "/" + quota.Name + uncastCachedQuota, ok := e.updatedQuotas.Get(key) + if !ok { + return quota + } + cachedQuota := uncastCachedQuota.(*api.ResourceQuota) + + if etcdVersioner.CompareResourceVersion(quota, cachedQuota) >= 0 { + e.updatedQuotas.Remove(key) + return quota + } + return cachedQuota +} + func (e *quotaEvaluator) getQuotas(namespace string) ([]api.ResourceQuota, error) { // determine if there are any quotas in this namespace // if there are no quotas, we don't need to do anything @@ -508,8 +548,10 @@ func (e *quotaEvaluator) getQuotas(namespace string) ([]api.ResourceQuota, error resourceQuotas := []api.ResourceQuota{} for i := range items { + quota := items[i].(*api.ResourceQuota) + quota = e.checkCache(quota) // always make a copy. We're going to muck around with this and we should never mutate the originals - resourceQuotas = append(resourceQuotas, *items[i].(*api.ResourceQuota)) + resourceQuotas = append(resourceQuotas, *quota) } return resourceQuotas, nil