diff --git a/pkg/controller/lookup_cache.go b/pkg/controller/lookup_cache.go new file mode 100644 index 00000000000..3b914eff261 --- /dev/null +++ b/pkg/controller/lookup_cache.go @@ -0,0 +1,92 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "hash/adler32" + "sync" + + "github.com/golang/groupcache/lru" + "k8s.io/kubernetes/pkg/api/meta" + hashutil "k8s.io/kubernetes/pkg/util/hash" +) + +const DefaultCacheEntries = 4096 + +type objectWithMeta interface { + meta.Object +} + +// keyFunc returns the key of an object, which is used to look up in the cache for it's matching object. +// Since we match objects by namespace and Labels/Selector, so if two objects have the same namespace and labels, +// they will have the same key. +func keyFunc(obj objectWithMeta) uint64 { + hash := adler32.New() + hashutil.DeepHashObject(hash, &equivalenceLabelObj{ + namespace: obj.GetNamespace(), + labels: obj.GetLabels(), + }) + return uint64(hash.Sum32()) +} + +type equivalenceLabelObj struct { + namespace string + labels map[string]string +} + +// MatchingCache save label and selector matching relationship +type MatchingCache struct { + mutex sync.RWMutex + cache *lru.Cache +} + +// NewMatchingCache return a NewMatchingCache, which save label and selector matching relationship. +func NewMatchingCache(maxCacheEntries int) *MatchingCache { + return &MatchingCache{ + cache: lru.New(maxCacheEntries), + } +} + +// Add will add matching information to the cache. +func (c *MatchingCache) Add(labelObj objectWithMeta, selectorObj objectWithMeta) { + key := keyFunc(labelObj) + c.mutex.Lock() + defer c.mutex.Unlock() + c.cache.Add(key, selectorObj) +} + +// GetMatchingObject lookup the matching object for a given object. +// Note: the cache information may be invalid since the controller may be deleted or updated, +// we need check in the external request to ensure the cache data is not dirty. +func (c *MatchingCache) GetMatchingObject(labelObj objectWithMeta) (controller interface{}, exists bool) { + key := keyFunc(labelObj) + c.mutex.Lock() + defer c.mutex.Unlock() + return c.cache.Get(key) +} + +// Update update the cached matching information. +func (c *MatchingCache) Update(labelObj objectWithMeta, selectorObj objectWithMeta) { + c.Add(labelObj, selectorObj) +} + +// InvalidateAll invalidate the whole cache. +func (c *MatchingCache) InvalidateAll() { + c.mutex.Lock() + defer c.mutex.Unlock() + c.cache = lru.New(c.cache.MaxEntries) +} diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index eb8ec2e6419..ba9bb1acdf2 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -19,6 +19,7 @@ limitations under the License. package replicaset import ( + "fmt" "reflect" "sort" "sync" @@ -34,6 +35,7 @@ import ( unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" @@ -86,6 +88,8 @@ type ReplicaSetController struct { // Added as a member to the struct to allow injection for testing. podStoreSynced func() bool + lookupCache *controller.MatchingCache + // Controllers that need to be synced queue *workqueue.Type } @@ -122,6 +126,24 @@ func NewReplicaSetController(kubeClient clientset.Interface, resyncPeriod contro framework.ResourceEventHandlerFuncs{ AddFunc: rsc.enqueueReplicaSet, UpdateFunc: func(old, cur interface{}) { + oldRS := old.(*extensions.ReplicaSet) + curRS := cur.(*extensions.ReplicaSet) + + // We should invalidate the whole lookup cache if a RS's selector has been updated. + // + // Imagine that you have two RSs: + // * old RS1 + // * new RS2 + // You also have a pod that is attached to RS2 (because it doesn't match RS1 selector). + // Now imagine that you are changing RS1 selector so that it is now matching that pod, + // in such case we must invalidate the whole cache so that pod could be adopted by RS1 + // + // This makes the lookup cache less helpful, but selector update does not happen often, + // so it's not a big problem + if !reflect.DeepEqual(oldRS.Spec.Selector, curRS.Spec.Selector) { + rsc.lookupCache.InvalidateAll() + } + // You might imagine that we only really need to enqueue the // replica set when Spec changes, but it is safer to sync any // time this function is triggered. That way a full informer @@ -134,8 +156,6 @@ func NewReplicaSetController(kubeClient clientset.Interface, resyncPeriod contro // this function), but in general extra resyncs shouldn't be // that bad as ReplicaSets that haven't met expectations yet won't // sync, and all the listing is done using local stores. - oldRS := old.(*extensions.ReplicaSet) - curRS := cur.(*extensions.ReplicaSet) if oldRS.Status.Replicas != curRS.Status.Replicas { glog.V(4).Infof("Observed updated replica count for ReplicaSet: %v, %d->%d", curRS.Name, oldRS.Status.Replicas, curRS.Status.Replicas) } @@ -171,6 +191,7 @@ func NewReplicaSetController(kubeClient clientset.Interface, resyncPeriod contro rsc.syncHandler = rsc.syncReplicaSet rsc.podStoreSynced = rsc.podController.HasSynced + rsc.lookupCache = controller.NewMatchingCache(controller.DefaultCacheEntries) return rsc } @@ -198,6 +219,20 @@ func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { // getPodReplicaSet returns the replica set managing the given pod. // TODO: Surface that we are ignoring multiple replica sets for a single pod. func (rsc *ReplicaSetController) getPodReplicaSet(pod *api.Pod) *extensions.ReplicaSet { + // look up in the cache, if cached and the cache is valid, just return cached value + if obj, cached := rsc.lookupCache.GetMatchingObject(pod); cached { + rs, ok := obj.(*extensions.ReplicaSet) + if !ok { + // This should not happen + glog.Errorf("lookup cache does not retuen a ReplicaSet object") + return nil + } + if cached && rsc.isCacheValid(pod, rs) { + return rs + } + } + + // if not cached or cached value is invalid, search all the rs to find the matching one, and update cache rss, err := rsc.rsStore.GetPodReplicaSets(pod) if err != nil { glog.V(4).Infof("No ReplicaSets found for pod %v, ReplicaSet controller will avoid syncing", pod.Name) @@ -215,9 +250,42 @@ func (rsc *ReplicaSetController) getPodReplicaSet(pod *api.Pod) *extensions.Repl glog.Errorf("user error! more than one ReplicaSet is selecting pods with labels: %+v", pod.Labels) sort.Sort(overlappingReplicaSets(rss)) } + + // update lookup cache + rsc.lookupCache.Update(pod, &rss[0]) + return &rss[0] } +// isCacheValid check if the cache is valid +func (rsc *ReplicaSetController) isCacheValid(pod *api.Pod, cachedRS *extensions.ReplicaSet) bool { + _, exists, err := rsc.rsStore.Get(cachedRS) + // rs has been deleted or updated, cache is invalid + if err != nil || !exists || !isReplicaSetMatch(pod, cachedRS) { + return false + } + return true +} + +// isReplicaSetMatch take a Pod and ReplicaSet, return whether the Pod and ReplicaSet are matching +// TODO(mqliang): This logic is a copy from GetPodReplicaSets(), remove the duplication +func isReplicaSetMatch(pod *api.Pod, rs *extensions.ReplicaSet) bool { + if rs.Namespace != pod.Namespace { + return false + } + selector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector) + if err != nil { + err = fmt.Errorf("invalid selector: %v", err) + return false + } + + // If a ReplicaSet with a nil or empty selector creeps in, it should match nothing, not everything. + if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { + return false + } + return true +} + // When a pod is created, enqueue the replica set that manages it and update it's expectations. func (rsc *ReplicaSetController) addPod(obj interface{}) { pod := obj.(*api.Pod) diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index 9aa8cab7a3f..1c5729622e5 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -87,6 +87,8 @@ type ReplicationManager struct { // Added as a member to the struct to allow injection for testing. podStoreSynced func() bool + lookupCache *controller.MatchingCache + // Controllers that need to be synced queue *workqueue.Type } @@ -123,6 +125,24 @@ func NewReplicationManager(kubeClient clientset.Interface, resyncPeriod controll framework.ResourceEventHandlerFuncs{ AddFunc: rm.enqueueController, UpdateFunc: func(old, cur interface{}) { + oldRC := old.(*api.ReplicationController) + curRC := cur.(*api.ReplicationController) + + // We should invalidate the whole lookup cache if a RC's selector has been updated. + // + // Imagine that you have two RCs: + // * old RC1 + // * new RC2 + // You also have a pod that is attached to RC2 (because it doesn't match RC1 selector). + // Now imagine that you are changing RC1 selector so that it is now matching that pod, + // in such case, we must invalidate the whole cache so that pod could be adopted by RC1 + // + // This makes the lookup cache less helpful, but selector update does not happen often, + // so it's not a big problem + if !reflect.DeepEqual(oldRC.Spec.Selector, curRC.Spec.Selector) { + rm.lookupCache.InvalidateAll() + } + // You might imagine that we only really need to enqueue the // controller when Spec changes, but it is safer to sync any // time this function is triggered. That way a full informer @@ -135,8 +155,6 @@ func NewReplicationManager(kubeClient clientset.Interface, resyncPeriod controll // this function), but in general extra resyncs shouldn't be // that bad as rcs that haven't met expectations yet won't // sync, and all the listing is done using local stores. - oldRC := old.(*api.ReplicationController) - curRC := cur.(*api.ReplicationController) if oldRC.Status.Replicas != curRC.Status.Replicas { glog.V(4).Infof("Observed updated replica count for rc: %v, %d->%d", curRC.Name, oldRC.Status.Replicas, curRC.Status.Replicas) } @@ -172,6 +190,7 @@ func NewReplicationManager(kubeClient clientset.Interface, resyncPeriod controll rm.syncHandler = rm.syncReplicationController rm.podStoreSynced = rm.podController.HasSynced + rm.lookupCache = controller.NewMatchingCache(controller.DefaultCacheEntries) return rm } @@ -200,6 +219,20 @@ func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) { // getPodController returns the controller managing the given pod. // TODO: Surface that we are ignoring multiple controllers for a single pod. func (rm *ReplicationManager) getPodController(pod *api.Pod) *api.ReplicationController { + // look up in the cache, if cached and the cache is valid, just return cached value + if obj, cached := rm.lookupCache.GetMatchingObject(pod); cached { + controller, ok := obj.(*api.ReplicationController) + if !ok { + // This should not happen + glog.Errorf("lookup cache does not retuen a ReplicationController object") + return nil + } + if cached && rm.isCacheValid(pod, controller) { + return controller + } + } + + // if not cached or cached value is invalid, search all the rc to find the matching one, and update cache controllers, err := rm.rcStore.GetPodControllers(pod) if err != nil { glog.V(4).Infof("No controllers found for pod %v, replication manager will avoid syncing", pod.Name) @@ -217,9 +250,39 @@ func (rm *ReplicationManager) getPodController(pod *api.Pod) *api.ReplicationCon glog.Errorf("user error! more than one replication controller is selecting pods with labels: %+v", pod.Labels) sort.Sort(OverlappingControllers(controllers)) } + + // update lookup cache + rm.lookupCache.Update(pod, &controllers[0]) + return &controllers[0] } +// isCacheValid check if the cache is valid +func (rm *ReplicationManager) isCacheValid(pod *api.Pod, cachedRC *api.ReplicationController) bool { + _, exists, err := rm.rcStore.Get(cachedRC) + // rc has been deleted or updated, cache is invalid + if err != nil || !exists || !isControllerMatch(pod, cachedRC) { + return false + } + return true +} + +// isControllerMatch take a Pod and ReplicationController, return whether the Pod and ReplicationController are matching +// TODO(mqliang): This logic is a copy from GetPodControllers(), remove the duplication +func isControllerMatch(pod *api.Pod, rc *api.ReplicationController) bool { + if rc.Namespace != pod.Namespace { + return false + } + labelSet := labels.Set(rc.Spec.Selector) + selector := labels.Set(rc.Spec.Selector).AsSelector() + + // If an rc with a nil or empty selector creeps in, it should match nothing, not everything. + if labelSet.AsSelector().Empty() || !selector.Matches(labels.Set(pod.Labels)) { + return false + } + return true +} + // When a pod is created, enqueue the controller that manages it and update it's expectations. func (rm *ReplicationManager) addPod(obj interface{}) { pod := obj.(*api.Pod) diff --git a/pkg/controller/replication/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go index f893499224b..a8227cc67ae 100644 --- a/pkg/controller/replication/replication_controller_test.go +++ b/pkg/controller/replication/replication_controller_test.go @@ -892,3 +892,89 @@ func TestOverlappingRCs(t *testing.T) { } } } + +func BenchmarkGetPodControllerMultiNS(b *testing.B) { + client := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas) + + const nsNum = 1000 + + pods := []api.Pod{} + for i := 0; i < nsNum; i++ { + ns := fmt.Sprintf("ns-%d", i) + for j := 0; j < 10; j++ { + rcName := fmt.Sprintf("rc-%d", j) + for k := 0; k < 10; k++ { + podName := fmt.Sprintf("pod-%d-%d", j, k) + pods = append(pods, api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: podName, + Namespace: ns, + Labels: map[string]string{"rcName": rcName}, + }, + }) + } + } + } + + for i := 0; i < nsNum; i++ { + ns := fmt.Sprintf("ns-%d", i) + for j := 0; j < 10; j++ { + rcName := fmt.Sprintf("rc-%d", j) + manager.rcStore.Add(&api.ReplicationController{ + ObjectMeta: api.ObjectMeta{Name: rcName, Namespace: ns}, + Spec: api.ReplicationControllerSpec{ + Selector: map[string]string{"rcName": rcName}, + }, + }) + } + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + for _, pod := range pods { + manager.getPodController(&pod) + } + } +} + +func BenchmarkGetPodControllerSingleNS(b *testing.B) { + client := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas) + + const rcNum = 1000 + const replicaNum = 3 + + pods := []api.Pod{} + for i := 0; i < rcNum; i++ { + rcName := fmt.Sprintf("rc-%d", i) + for j := 0; j < replicaNum; j++ { + podName := fmt.Sprintf("pod-%d-%d", i, j) + pods = append(pods, api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: podName, + Namespace: "foo", + Labels: map[string]string{"rcName": rcName}, + }, + }) + } + } + + for i := 0; i < rcNum; i++ { + rcName := fmt.Sprintf("rc-%d", i) + manager.rcStore.Add(&api.ReplicationController{ + ObjectMeta: api.ObjectMeta{Name: rcName, Namespace: "foo"}, + Spec: api.ReplicationControllerSpec{ + Selector: map[string]string{"rcName": rcName}, + }, + }) + } + b.ResetTimer() + + for i := 0; i < b.N; i++ { + for _, pod := range pods { + manager.getPodController(&pod) + } + } +}