mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 12:15:52 +00:00
Merge pull request #8050 from fgrzadkowski/cache
Add cache with multiple shards to decrease lock contention
This commit is contained in:
commit
48dfad6701
@ -25,11 +25,11 @@ import (
|
||||
"path"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
@ -56,6 +56,18 @@ var (
|
||||
"because two concurrent threads can miss the cache and generate the same entry twice.",
|
||||
},
|
||||
)
|
||||
cacheGetLatency = prometheus.NewSummary(
|
||||
prometheus.SummaryOpts{
|
||||
Name: "etcd_request_cache_get_latencies_summary",
|
||||
Help: "Latency in microseconds of getting an object from etcd cache",
|
||||
},
|
||||
)
|
||||
cacheAddLatency = prometheus.NewSummary(
|
||||
prometheus.SummaryOpts{
|
||||
Name: "etcd_request_cache_add_latencies_summary",
|
||||
Help: "Latency in microseconds of adding an object to etcd cache",
|
||||
},
|
||||
)
|
||||
etcdRequestLatenciesSummary = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Name: "etcd_request_latencies_summary",
|
||||
@ -65,6 +77,17 @@ var (
|
||||
)
|
||||
)
|
||||
|
||||
const maxEtcdCacheEntries int = 50000
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(cacheHitCounter)
|
||||
prometheus.MustRegister(cacheMissCounter)
|
||||
prometheus.MustRegister(cacheEntryCounter)
|
||||
prometheus.MustRegister(cacheAddLatency)
|
||||
prometheus.MustRegister(cacheGetLatency)
|
||||
prometheus.MustRegister(etcdRequestLatenciesSummary)
|
||||
}
|
||||
|
||||
func getTypeName(obj interface{}) string {
|
||||
return reflect.TypeOf(obj).String()
|
||||
}
|
||||
@ -73,13 +96,6 @@ func recordEtcdRequestLatency(verb, resource string, startTime time.Time) {
|
||||
etcdRequestLatenciesSummary.WithLabelValues(verb, resource).Observe(float64(time.Since(startTime) / time.Microsecond))
|
||||
}
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(cacheHitCounter)
|
||||
prometheus.MustRegister(cacheMissCounter)
|
||||
prometheus.MustRegister(cacheEntryCounter)
|
||||
prometheus.MustRegister(etcdRequestLatenciesSummary)
|
||||
}
|
||||
|
||||
// EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client.
|
||||
type EtcdHelper struct {
|
||||
Client EtcdGetSet
|
||||
@ -96,8 +112,7 @@ type EtcdHelper struct {
|
||||
// support multi-object transaction that will result in many objects with the same index.
|
||||
// Number of entries stored in the cache is controlled by maxEtcdCacheEntries constant.
|
||||
// TODO: Measure how much this cache helps after the conversion code is optimized.
|
||||
cache map[uint64]runtime.Object
|
||||
mutex *sync.RWMutex
|
||||
cache util.Cache
|
||||
}
|
||||
|
||||
// NewEtcdHelper creates a helper that works against objects that use the internal
|
||||
@ -108,8 +123,7 @@ func NewEtcdHelper(client EtcdGetSet, codec runtime.Codec, prefix string) EtcdHe
|
||||
Codec: codec,
|
||||
Versioner: APIObjectVersioner{},
|
||||
PathPrefix: prefix,
|
||||
cache: make(map[uint64]runtime.Object),
|
||||
mutex: new(sync.RWMutex),
|
||||
cache: util.NewCache(maxEtcdCacheEntries),
|
||||
}
|
||||
}
|
||||
|
||||
@ -207,16 +221,13 @@ type etcdCache interface {
|
||||
addToCache(index uint64, obj runtime.Object)
|
||||
}
|
||||
|
||||
const maxEtcdCacheEntries int = 50000
|
||||
|
||||
func (h *EtcdHelper) getFromCache(index uint64) (runtime.Object, bool) {
|
||||
var obj runtime.Object
|
||||
func() {
|
||||
h.mutex.RLock()
|
||||
defer h.mutex.RUnlock()
|
||||
obj = h.cache[index]
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
cacheGetLatency.Observe(float64(time.Since(startTime) / time.Microsecond))
|
||||
}()
|
||||
if obj != nil {
|
||||
obj, found := h.cache.Get(index)
|
||||
if found {
|
||||
// We should not return the object itself to avoid poluting the cache if someone
|
||||
// modifies returned values.
|
||||
objCopy, err := conversion.DeepCopy(obj)
|
||||
@ -232,24 +243,19 @@ func (h *EtcdHelper) getFromCache(index uint64) (runtime.Object, bool) {
|
||||
}
|
||||
|
||||
func (h *EtcdHelper) addToCache(index uint64, obj runtime.Object) {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
cacheAddLatency.Observe(float64(time.Since(startTime) / time.Microsecond))
|
||||
}()
|
||||
objCopy, err := conversion.DeepCopy(obj)
|
||||
if err != nil {
|
||||
glog.Errorf("Error during DeepCopy of cached object: %q", err)
|
||||
return
|
||||
}
|
||||
h.mutex.Lock()
|
||||
defer h.mutex.Unlock()
|
||||
if _, found := h.cache[index]; !found {
|
||||
isOverwrite := h.cache.Add(index, objCopy)
|
||||
if !isOverwrite {
|
||||
cacheEntryCounter.Inc()
|
||||
}
|
||||
h.cache[index] = objCopy.(runtime.Object)
|
||||
if len(h.cache) > maxEtcdCacheEntries {
|
||||
var randomKey uint64
|
||||
for randomKey = range h.cache {
|
||||
break
|
||||
}
|
||||
delete(h.cache, randomKey)
|
||||
}
|
||||
}
|
||||
|
||||
// ExtractToList works on a *List api object (an object that satisfies the runtime.IsList
|
||||
|
80
pkg/util/cache.go
Normal file
80
pkg/util/cache.go
Normal file
@ -0,0 +1,80 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
const (
|
||||
shardsCount int = 32
|
||||
)
|
||||
|
||||
type Cache []*cacheShard
|
||||
|
||||
func NewCache(maxSize int) Cache {
|
||||
cache := make(Cache, shardsCount)
|
||||
for i := 0; i < shardsCount; i++ {
|
||||
cache[i] = &cacheShard{
|
||||
items: make(map[uint64]interface{}),
|
||||
maxSize: maxSize / shardsCount,
|
||||
}
|
||||
}
|
||||
return cache
|
||||
}
|
||||
|
||||
func (c Cache) getShard(index uint64) *cacheShard {
|
||||
return c[index%uint64(shardsCount)]
|
||||
}
|
||||
|
||||
// Returns true if object already existed, false otherwise.
|
||||
func (c *Cache) Add(index uint64, obj interface{}) bool {
|
||||
return c.getShard(index).add(index, obj)
|
||||
}
|
||||
|
||||
func (c *Cache) Get(index uint64) (obj interface{}, found bool) {
|
||||
return c.getShard(index).get(index)
|
||||
}
|
||||
|
||||
type cacheShard struct {
|
||||
items map[uint64]interface{}
|
||||
sync.RWMutex
|
||||
maxSize int
|
||||
}
|
||||
|
||||
// Returns true if object already existed, false otherwise.
|
||||
func (s *cacheShard) add(index uint64, obj interface{}) bool {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
_, isOverwrite := s.items[index]
|
||||
s.items[index] = obj
|
||||
if len(s.items) > s.maxSize {
|
||||
var randomKey uint64
|
||||
for randomKey = range s.items {
|
||||
break
|
||||
}
|
||||
delete(s.items, randomKey)
|
||||
}
|
||||
return isOverwrite
|
||||
}
|
||||
|
||||
func (s *cacheShard) get(index uint64) (obj interface{}, found bool) {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
obj, found = s.items[index]
|
||||
return
|
||||
}
|
65
pkg/util/cache_test.go
Normal file
65
pkg/util/cache_test.go
Normal file
@ -0,0 +1,65 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
const (
|
||||
maxTestCacheSize int = shardsCount * 2
|
||||
)
|
||||
|
||||
func ExpectEntry(t *testing.T, cache Cache, index uint64, expectedValue interface{}) {
|
||||
elem, found := cache.Get(index)
|
||||
if !found {
|
||||
t.Error("Expected to find entry with key 1")
|
||||
} else if elem != expectedValue {
|
||||
t.Errorf("Expected to find %v, got %v", expectedValue, elem)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBasic(t *testing.T) {
|
||||
cache := NewCache(maxTestCacheSize)
|
||||
cache.Add(1, "xxx")
|
||||
ExpectEntry(t, cache, 1, "xxx")
|
||||
}
|
||||
|
||||
func TestOverflow(t *testing.T) {
|
||||
cache := NewCache(maxTestCacheSize)
|
||||
for i := 0; i < maxTestCacheSize+1; i++ {
|
||||
cache.Add(uint64(i), "xxx")
|
||||
}
|
||||
foundIndexes := make([]uint64, 0)
|
||||
for i := 0; i < maxTestCacheSize+1; i++ {
|
||||
_, found := cache.Get(uint64(i))
|
||||
if found {
|
||||
foundIndexes = append(foundIndexes, uint64(i))
|
||||
}
|
||||
}
|
||||
if len(foundIndexes) != maxTestCacheSize {
|
||||
t.Errorf("Expect to find %d elements, got %d %v", maxTestCacheSize, len(foundIndexes), foundIndexes)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOverwrite(t *testing.T) {
|
||||
cache := NewCache(maxTestCacheSize)
|
||||
cache.Add(1, "xxx")
|
||||
ExpectEntry(t, cache, 1, "xxx")
|
||||
cache.Add(1, "yyy")
|
||||
ExpectEntry(t, cache, 1, "yyy")
|
||||
}
|
Loading…
Reference in New Issue
Block a user