mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
Merge pull request #25091 from deads2k/reduce-conflicts
Automatic merge from submit-queue reduce conflict retries Eliminates quota admission conflicts due to latent caches on the same API server. @derekwaynecarr
This commit is contained in:
commit
e5cb165ecc
@ -71,4 +71,28 @@ func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, e
|
||||
}
|
||||
|
||||
// APIObjectVersioner implements Versioner
|
||||
var _ storage.Versioner = APIObjectVersioner{}
|
||||
var Versioner storage.Versioner = APIObjectVersioner{}
|
||||
|
||||
// CompareResourceVersion compares etcd resource versions. Outside this API they are all strings,
|
||||
// but etcd resource versions are special, they're actually ints, so we can easily compare them.
|
||||
func (a APIObjectVersioner) CompareResourceVersion(lhs, rhs runtime.Object) int {
|
||||
lhsVersion, err := Versioner.ObjectResourceVersion(lhs)
|
||||
if err != nil {
|
||||
// coder error
|
||||
panic(err)
|
||||
}
|
||||
rhsVersion, err := Versioner.ObjectResourceVersion(rhs)
|
||||
if err != nil {
|
||||
// coder error
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if lhsVersion == rhsVersion {
|
||||
return 0
|
||||
}
|
||||
if lhsVersion < rhsVersion {
|
||||
return -1
|
||||
}
|
||||
|
||||
return 1
|
||||
}
|
||||
|
@ -39,3 +39,20 @@ func TestObjectVersioner(t *testing.T) {
|
||||
t.Errorf("unexpected resource version: %#v", obj)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCompareResourceVersion(t *testing.T) {
|
||||
five := &storagetesting.TestResource{ObjectMeta: api.ObjectMeta{ResourceVersion: "5"}}
|
||||
six := &storagetesting.TestResource{ObjectMeta: api.ObjectMeta{ResourceVersion: "6"}}
|
||||
|
||||
versioner := APIObjectVersioner{}
|
||||
|
||||
if e, a := -1, versioner.CompareResourceVersion(five, six); e != a {
|
||||
t.Errorf("expected %v got %v", e, a)
|
||||
}
|
||||
if e, a := 1, versioner.CompareResourceVersion(six, five); e != a {
|
||||
t.Errorf("expected %v got %v", e, a)
|
||||
}
|
||||
if e, a := 0, versioner.CompareResourceVersion(six, six); e != a {
|
||||
t.Errorf("expected %v got %v", e, a)
|
||||
}
|
||||
}
|
||||
|
@ -32,6 +32,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
"k8s.io/kubernetes/pkg/quota"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/storage/etcd"
|
||||
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
@ -52,6 +53,10 @@ type quotaEvaluator struct {
|
||||
// We track the lookup result here so that for repeated requests, we don't look it up very often.
|
||||
liveLookupCache *lru.Cache
|
||||
liveTTL time.Duration
|
||||
// updatedQuotas holds a cache of quotas that we've updated. This is used to pull the "really latest" during back to
|
||||
// back quota evaluations that touch the same quota doc. This only works because we can compare etcd resourceVersions
|
||||
// for the same resource as integers. Before this change: 22 updates with 12 conflicts. after this change: 15 updates with 0 conflicts
|
||||
updatedQuotas *lru.Cache
|
||||
|
||||
// TODO these are used together to bucket items by namespace and then batch them up for processing.
|
||||
// The technique is valuable for rollup activities to avoid fanout and reduce resource contention.
|
||||
@ -101,6 +106,10 @@ func newQuotaEvaluator(client clientset.Interface, registry quota.Registry) (*qu
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
updatedCache, err := lru.New(100)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lw := &cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
return client.Core().ResourceQuotas(api.NamespaceAll).List(options)
|
||||
@ -118,6 +127,7 @@ func newQuotaEvaluator(client clientset.Interface, registry quota.Registry) (*qu
|
||||
registry: registry,
|
||||
liveLookupCache: liveLookupCache,
|
||||
liveTTL: time.Duration(30 * time.Second),
|
||||
updatedQuotas: updatedCache,
|
||||
|
||||
queue: workqueue.New(),
|
||||
work: map[string][]*admissionWaiter{},
|
||||
@ -247,9 +257,14 @@ func (e *quotaEvaluator) checkQuotas(quotas []api.ResourceQuota, admissionAttrib
|
||||
continue
|
||||
}
|
||||
|
||||
if _, err := e.client.Core().ResourceQuotas(newQuota.Namespace).UpdateStatus(&newQuota); err != nil {
|
||||
if updatedQuota, err := e.client.Core().ResourceQuotas(newQuota.Namespace).UpdateStatus(&newQuota); err != nil {
|
||||
updatedFailedQuotas = append(updatedFailedQuotas, newQuota)
|
||||
lastErr = err
|
||||
|
||||
} else {
|
||||
// update our cache
|
||||
e.updateCache(updatedQuota)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -472,6 +487,31 @@ func (e *quotaEvaluator) getWork() (string, []*admissionWaiter, bool) {
|
||||
return ns, []*admissionWaiter{}, false
|
||||
}
|
||||
|
||||
func (e *quotaEvaluator) updateCache(quota *api.ResourceQuota) {
|
||||
key := quota.Namespace + "/" + quota.Name
|
||||
e.updatedQuotas.Add(key, quota)
|
||||
}
|
||||
|
||||
var etcdVersioner = etcd.APIObjectVersioner{}
|
||||
|
||||
// checkCache compares the passed quota against the value in the look-aside cache and returns the newer
|
||||
// if the cache is out of date, it deletes the stale entry. This only works because of etcd resourceVersions
|
||||
// being monotonically increasing integers
|
||||
func (e *quotaEvaluator) checkCache(quota *api.ResourceQuota) *api.ResourceQuota {
|
||||
key := quota.Namespace + "/" + quota.Name
|
||||
uncastCachedQuota, ok := e.updatedQuotas.Get(key)
|
||||
if !ok {
|
||||
return quota
|
||||
}
|
||||
cachedQuota := uncastCachedQuota.(*api.ResourceQuota)
|
||||
|
||||
if etcdVersioner.CompareResourceVersion(quota, cachedQuota) >= 0 {
|
||||
e.updatedQuotas.Remove(key)
|
||||
return quota
|
||||
}
|
||||
return cachedQuota
|
||||
}
|
||||
|
||||
func (e *quotaEvaluator) getQuotas(namespace string) ([]api.ResourceQuota, error) {
|
||||
// determine if there are any quotas in this namespace
|
||||
// if there are no quotas, we don't need to do anything
|
||||
@ -508,8 +548,10 @@ func (e *quotaEvaluator) getQuotas(namespace string) ([]api.ResourceQuota, error
|
||||
|
||||
resourceQuotas := []api.ResourceQuota{}
|
||||
for i := range items {
|
||||
quota := items[i].(*api.ResourceQuota)
|
||||
quota = e.checkCache(quota)
|
||||
// always make a copy. We're going to muck around with this and we should never mutate the originals
|
||||
resourceQuotas = append(resourceQuotas, *items[i].(*api.ResourceQuota))
|
||||
resourceQuotas = append(resourceQuotas, *quota)
|
||||
}
|
||||
|
||||
return resourceQuotas, nil
|
||||
|
Loading…
Reference in New Issue
Block a user