add lookup cache for ReplicationController and ReplicaSet

This commit is contained in:
mqliang 2016-02-23 23:17:27 +08:00
parent 7662a5ee54
commit d9a35a25d7
4 changed files with 313 additions and 4 deletions

View File

@ -0,0 +1,92 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"hash/adler32"
"sync"
"github.com/golang/groupcache/lru"
"k8s.io/kubernetes/pkg/api/meta"
hashutil "k8s.io/kubernetes/pkg/util/hash"
)
const DefaultCacheEntries = 4096
type objectWithMeta interface {
meta.Object
}
// keyFunc returns the key of an object, which is used to look up in the cache for it's matching object.
// Since we match objects by namespace and Labels/Selector, so if two objects have the same namespace and labels,
// they will have the same key.
func keyFunc(obj objectWithMeta) uint64 {
hash := adler32.New()
hashutil.DeepHashObject(hash, &equivalenceLabelObj{
namespace: obj.GetNamespace(),
labels: obj.GetLabels(),
})
return uint64(hash.Sum32())
}
type equivalenceLabelObj struct {
namespace string
labels map[string]string
}
// MatchingCache save label and selector matching relationship
type MatchingCache struct {
mutex sync.RWMutex
cache *lru.Cache
}
// NewMatchingCache return a NewMatchingCache, which save label and selector matching relationship.
func NewMatchingCache(maxCacheEntries int) *MatchingCache {
return &MatchingCache{
cache: lru.New(maxCacheEntries),
}
}
// Add will add matching information to the cache.
func (c *MatchingCache) Add(labelObj objectWithMeta, selectorObj objectWithMeta) {
key := keyFunc(labelObj)
c.mutex.Lock()
defer c.mutex.Unlock()
c.cache.Add(key, selectorObj)
}
// GetMatchingObject lookup the matching object for a given object.
// Note: the cache information may be invalid since the controller may be deleted or updated,
// we need check in the external request to ensure the cache data is not dirty.
func (c *MatchingCache) GetMatchingObject(labelObj objectWithMeta) (controller interface{}, exists bool) {
key := keyFunc(labelObj)
c.mutex.Lock()
defer c.mutex.Unlock()
return c.cache.Get(key)
}
// Update update the cached matching information.
func (c *MatchingCache) Update(labelObj objectWithMeta, selectorObj objectWithMeta) {
c.Add(labelObj, selectorObj)
}
// InvalidateAll invalidate the whole cache.
func (c *MatchingCache) InvalidateAll() {
c.mutex.Lock()
defer c.mutex.Unlock()
c.cache = lru.New(c.cache.MaxEntries)
}

View File

@ -19,6 +19,7 @@ limitations under the License.
package replicaset
import (
"fmt"
"reflect"
"sort"
"sync"
@ -34,6 +35,7 @@ import (
unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
@ -86,6 +88,8 @@ type ReplicaSetController struct {
// Added as a member to the struct to allow injection for testing.
podStoreSynced func() bool
lookupCache *controller.MatchingCache
// Controllers that need to be synced
queue *workqueue.Type
}
@ -122,6 +126,24 @@ func NewReplicaSetController(kubeClient clientset.Interface, resyncPeriod contro
framework.ResourceEventHandlerFuncs{
AddFunc: rsc.enqueueReplicaSet,
UpdateFunc: func(old, cur interface{}) {
oldRS := old.(*extensions.ReplicaSet)
curRS := cur.(*extensions.ReplicaSet)
// We should invalidate the whole lookup cache if a RS's selector has been updated.
//
// Imagine that you have two RSs:
// * old RS1
// * new RS2
// You also have a pod that is attached to RS2 (because it doesn't match RS1 selector).
// Now imagine that you are changing RS1 selector so that it is now matching that pod,
// in such case we must invalidate the whole cache so that pod could be adopted by RS1
//
// This makes the lookup cache less helpful, but selector update does not happen often,
// so it's not a big problem
if !reflect.DeepEqual(oldRS.Spec.Selector, curRS.Spec.Selector) {
rsc.lookupCache.InvalidateAll()
}
// You might imagine that we only really need to enqueue the
// replica set when Spec changes, but it is safer to sync any
// time this function is triggered. That way a full informer
@ -134,8 +156,6 @@ func NewReplicaSetController(kubeClient clientset.Interface, resyncPeriod contro
// this function), but in general extra resyncs shouldn't be
// that bad as ReplicaSets that haven't met expectations yet won't
// sync, and all the listing is done using local stores.
oldRS := old.(*extensions.ReplicaSet)
curRS := cur.(*extensions.ReplicaSet)
if oldRS.Status.Replicas != curRS.Status.Replicas {
glog.V(4).Infof("Observed updated replica count for ReplicaSet: %v, %d->%d", curRS.Name, oldRS.Status.Replicas, curRS.Status.Replicas)
}
@ -171,6 +191,7 @@ func NewReplicaSetController(kubeClient clientset.Interface, resyncPeriod contro
rsc.syncHandler = rsc.syncReplicaSet
rsc.podStoreSynced = rsc.podController.HasSynced
rsc.lookupCache = controller.NewMatchingCache(controller.DefaultCacheEntries)
return rsc
}
@ -198,6 +219,20 @@ func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
// getPodReplicaSet returns the replica set managing the given pod.
// TODO: Surface that we are ignoring multiple replica sets for a single pod.
func (rsc *ReplicaSetController) getPodReplicaSet(pod *api.Pod) *extensions.ReplicaSet {
// look up in the cache, if cached and the cache is valid, just return cached value
if obj, cached := rsc.lookupCache.GetMatchingObject(pod); cached {
rs, ok := obj.(*extensions.ReplicaSet)
if !ok {
// This should not happen
glog.Errorf("lookup cache does not retuen a ReplicaSet object")
return nil
}
if cached && rsc.isCacheValid(pod, rs) {
return rs
}
}
// if not cached or cached value is invalid, search all the rs to find the matching one, and update cache
rss, err := rsc.rsStore.GetPodReplicaSets(pod)
if err != nil {
glog.V(4).Infof("No ReplicaSets found for pod %v, ReplicaSet controller will avoid syncing", pod.Name)
@ -215,9 +250,42 @@ func (rsc *ReplicaSetController) getPodReplicaSet(pod *api.Pod) *extensions.Repl
glog.Errorf("user error! more than one ReplicaSet is selecting pods with labels: %+v", pod.Labels)
sort.Sort(overlappingReplicaSets(rss))
}
// update lookup cache
rsc.lookupCache.Update(pod, &rss[0])
return &rss[0]
}
// isCacheValid check if the cache is valid
func (rsc *ReplicaSetController) isCacheValid(pod *api.Pod, cachedRS *extensions.ReplicaSet) bool {
_, exists, err := rsc.rsStore.Get(cachedRS)
// rs has been deleted or updated, cache is invalid
if err != nil || !exists || !isReplicaSetMatch(pod, cachedRS) {
return false
}
return true
}
// isReplicaSetMatch take a Pod and ReplicaSet, return whether the Pod and ReplicaSet are matching
// TODO(mqliang): This logic is a copy from GetPodReplicaSets(), remove the duplication
func isReplicaSetMatch(pod *api.Pod, rs *extensions.ReplicaSet) bool {
if rs.Namespace != pod.Namespace {
return false
}
selector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
err = fmt.Errorf("invalid selector: %v", err)
return false
}
// If a ReplicaSet with a nil or empty selector creeps in, it should match nothing, not everything.
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
return false
}
return true
}
// When a pod is created, enqueue the replica set that manages it and update it's expectations.
func (rsc *ReplicaSetController) addPod(obj interface{}) {
pod := obj.(*api.Pod)

View File

@ -87,6 +87,8 @@ type ReplicationManager struct {
// Added as a member to the struct to allow injection for testing.
podStoreSynced func() bool
lookupCache *controller.MatchingCache
// Controllers that need to be synced
queue *workqueue.Type
}
@ -123,6 +125,24 @@ func NewReplicationManager(kubeClient clientset.Interface, resyncPeriod controll
framework.ResourceEventHandlerFuncs{
AddFunc: rm.enqueueController,
UpdateFunc: func(old, cur interface{}) {
oldRC := old.(*api.ReplicationController)
curRC := cur.(*api.ReplicationController)
// We should invalidate the whole lookup cache if a RC's selector has been updated.
//
// Imagine that you have two RCs:
// * old RC1
// * new RC2
// You also have a pod that is attached to RC2 (because it doesn't match RC1 selector).
// Now imagine that you are changing RC1 selector so that it is now matching that pod,
// in such case, we must invalidate the whole cache so that pod could be adopted by RC1
//
// This makes the lookup cache less helpful, but selector update does not happen often,
// so it's not a big problem
if !reflect.DeepEqual(oldRC.Spec.Selector, curRC.Spec.Selector) {
rm.lookupCache.InvalidateAll()
}
// You might imagine that we only really need to enqueue the
// controller when Spec changes, but it is safer to sync any
// time this function is triggered. That way a full informer
@ -135,8 +155,6 @@ func NewReplicationManager(kubeClient clientset.Interface, resyncPeriod controll
// this function), but in general extra resyncs shouldn't be
// that bad as rcs that haven't met expectations yet won't
// sync, and all the listing is done using local stores.
oldRC := old.(*api.ReplicationController)
curRC := cur.(*api.ReplicationController)
if oldRC.Status.Replicas != curRC.Status.Replicas {
glog.V(4).Infof("Observed updated replica count for rc: %v, %d->%d", curRC.Name, oldRC.Status.Replicas, curRC.Status.Replicas)
}
@ -172,6 +190,7 @@ func NewReplicationManager(kubeClient clientset.Interface, resyncPeriod controll
rm.syncHandler = rm.syncReplicationController
rm.podStoreSynced = rm.podController.HasSynced
rm.lookupCache = controller.NewMatchingCache(controller.DefaultCacheEntries)
return rm
}
@ -200,6 +219,20 @@ func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) {
// getPodController returns the controller managing the given pod.
// TODO: Surface that we are ignoring multiple controllers for a single pod.
func (rm *ReplicationManager) getPodController(pod *api.Pod) *api.ReplicationController {
// look up in the cache, if cached and the cache is valid, just return cached value
if obj, cached := rm.lookupCache.GetMatchingObject(pod); cached {
controller, ok := obj.(*api.ReplicationController)
if !ok {
// This should not happen
glog.Errorf("lookup cache does not retuen a ReplicationController object")
return nil
}
if cached && rm.isCacheValid(pod, controller) {
return controller
}
}
// if not cached or cached value is invalid, search all the rc to find the matching one, and update cache
controllers, err := rm.rcStore.GetPodControllers(pod)
if err != nil {
glog.V(4).Infof("No controllers found for pod %v, replication manager will avoid syncing", pod.Name)
@ -217,9 +250,39 @@ func (rm *ReplicationManager) getPodController(pod *api.Pod) *api.ReplicationCon
glog.Errorf("user error! more than one replication controller is selecting pods with labels: %+v", pod.Labels)
sort.Sort(OverlappingControllers(controllers))
}
// update lookup cache
rm.lookupCache.Update(pod, &controllers[0])
return &controllers[0]
}
// isCacheValid check if the cache is valid
func (rm *ReplicationManager) isCacheValid(pod *api.Pod, cachedRC *api.ReplicationController) bool {
_, exists, err := rm.rcStore.Get(cachedRC)
// rc has been deleted or updated, cache is invalid
if err != nil || !exists || !isControllerMatch(pod, cachedRC) {
return false
}
return true
}
// isControllerMatch take a Pod and ReplicationController, return whether the Pod and ReplicationController are matching
// TODO(mqliang): This logic is a copy from GetPodControllers(), remove the duplication
func isControllerMatch(pod *api.Pod, rc *api.ReplicationController) bool {
if rc.Namespace != pod.Namespace {
return false
}
labelSet := labels.Set(rc.Spec.Selector)
selector := labels.Set(rc.Spec.Selector).AsSelector()
// If an rc with a nil or empty selector creeps in, it should match nothing, not everything.
if labelSet.AsSelector().Empty() || !selector.Matches(labels.Set(pod.Labels)) {
return false
}
return true
}
// When a pod is created, enqueue the controller that manages it and update it's expectations.
func (rm *ReplicationManager) addPod(obj interface{}) {
pod := obj.(*api.Pod)

View File

@ -892,3 +892,89 @@ func TestOverlappingRCs(t *testing.T) {
}
}
}
func BenchmarkGetPodControllerMultiNS(b *testing.B) {
client := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas)
const nsNum = 1000
pods := []api.Pod{}
for i := 0; i < nsNum; i++ {
ns := fmt.Sprintf("ns-%d", i)
for j := 0; j < 10; j++ {
rcName := fmt.Sprintf("rc-%d", j)
for k := 0; k < 10; k++ {
podName := fmt.Sprintf("pod-%d-%d", j, k)
pods = append(pods, api.Pod{
ObjectMeta: api.ObjectMeta{
Name: podName,
Namespace: ns,
Labels: map[string]string{"rcName": rcName},
},
})
}
}
}
for i := 0; i < nsNum; i++ {
ns := fmt.Sprintf("ns-%d", i)
for j := 0; j < 10; j++ {
rcName := fmt.Sprintf("rc-%d", j)
manager.rcStore.Add(&api.ReplicationController{
ObjectMeta: api.ObjectMeta{Name: rcName, Namespace: ns},
Spec: api.ReplicationControllerSpec{
Selector: map[string]string{"rcName": rcName},
},
})
}
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
for _, pod := range pods {
manager.getPodController(&pod)
}
}
}
func BenchmarkGetPodControllerSingleNS(b *testing.B) {
client := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas)
const rcNum = 1000
const replicaNum = 3
pods := []api.Pod{}
for i := 0; i < rcNum; i++ {
rcName := fmt.Sprintf("rc-%d", i)
for j := 0; j < replicaNum; j++ {
podName := fmt.Sprintf("pod-%d-%d", i, j)
pods = append(pods, api.Pod{
ObjectMeta: api.ObjectMeta{
Name: podName,
Namespace: "foo",
Labels: map[string]string{"rcName": rcName},
},
})
}
}
for i := 0; i < rcNum; i++ {
rcName := fmt.Sprintf("rc-%d", i)
manager.rcStore.Add(&api.ReplicationController{
ObjectMeta: api.ObjectMeta{Name: rcName, Namespace: "foo"},
Spec: api.ReplicationControllerSpec{
Selector: map[string]string{"rcName": rcName},
},
})
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
for _, pod := range pods {
manager.getPodController(&pod)
}
}
}