From 49934c05c087c52c538c3df7b439007154da0f1c Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Wed, 4 May 2016 13:32:05 +0200 Subject: [PATCH] Avoid creating NodeInfoMap from scratch on every scheduling. --- plugin/pkg/scheduler/generic_scheduler.go | 17 ++++--- plugin/pkg/scheduler/schedulercache/cache.go | 14 ++++-- .../scheduler/schedulercache/cache_test.go | 46 +++++++------------ .../pkg/scheduler/schedulercache/interface.go | 7 +-- .../pkg/scheduler/schedulercache/node_info.go | 10 ++++ plugin/pkg/scheduler/testing/fake_cache.go | 4 +- plugin/pkg/scheduler/testing/pods_to_cache.go | 5 +- 7 files changed, 56 insertions(+), 47 deletions(-) diff --git a/plugin/pkg/scheduler/generic_scheduler.go b/plugin/pkg/scheduler/generic_scheduler.go index a2799fad74d..dbeabe9a399 100644 --- a/plugin/pkg/scheduler/generic_scheduler.go +++ b/plugin/pkg/scheduler/generic_scheduler.go @@ -62,6 +62,8 @@ type genericScheduler struct { pods algorithm.PodLister lastNodeIndexLock sync.Mutex lastNodeIndex uint64 + + cachedNodeInfoMap map[string]*schedulercache.NodeInfo } // Schedule tries to schedule the given pod to one of node in the node list. @@ -85,13 +87,13 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe } // Used for all fit and priority funcs. - nodeNameToInfo, err := g.cache.GetNodeNameToInfoMap() + err = g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap) if err != nil { return "", err } trace.Step("Computing predicates") - filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, nodeNameToInfo, g.predicates, nodes, g.extenders) + filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, g.predicates, nodes, g.extenders) if err != nil { return "", err } @@ -104,7 +106,7 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe } trace.Step("Prioritizing") - priorityList, err := PrioritizeNodes(pod, nodeNameToInfo, g.prioritizers, algorithm.FakeNodeLister(filteredNodes), g.extenders) + priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, g.prioritizers, algorithm.FakeNodeLister(filteredNodes), g.extenders) if err != nil { return "", err } @@ -329,9 +331,10 @@ func EqualPriority(_ *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInf func NewGenericScheduler(cache schedulercache.Cache, predicates map[string]algorithm.FitPredicate, prioritizers []algorithm.PriorityConfig, extenders []algorithm.SchedulerExtender) algorithm.ScheduleAlgorithm { return &genericScheduler{ - cache: cache, - predicates: predicates, - prioritizers: prioritizers, - extenders: extenders, + cache: cache, + predicates: predicates, + prioritizers: prioritizers, + extenders: extenders, + cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo), } } diff --git a/plugin/pkg/scheduler/schedulercache/cache.go b/plugin/pkg/scheduler/schedulercache/cache.go index c8ffe18ff02..58c94ac69ad 100644 --- a/plugin/pkg/scheduler/schedulercache/cache.go +++ b/plugin/pkg/scheduler/schedulercache/cache.go @@ -74,14 +74,20 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul } } -func (cache *schedulerCache) GetNodeNameToInfoMap() (map[string]*NodeInfo, error) { - nodeNameToInfo := make(map[string]*NodeInfo) +func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*NodeInfo) error { cache.mu.Lock() defer cache.mu.Unlock() for name, info := range cache.nodes { - nodeNameToInfo[name] = info.Clone() + if current, ok := nodeNameToInfo[name]; !ok || current.generation != info.generation { + nodeNameToInfo[name] = info.Clone() + } } - return nodeNameToInfo, nil + for name := range nodeNameToInfo { + if _, ok := cache.nodes[name]; !ok { + delete(nodeNameToInfo, name) + } + } + return nil } func (cache *schedulerCache) List(selector labels.Selector) ([]*api.Pod, error) { diff --git a/plugin/pkg/scheduler/schedulercache/cache_test.go b/plugin/pkg/scheduler/schedulercache/cache_test.go index 2484fcf589a..4077326f89a 100644 --- a/plugin/pkg/scheduler/schedulercache/cache_test.go +++ b/plugin/pkg/scheduler/schedulercache/cache_test.go @@ -28,6 +28,16 @@ import ( priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util" ) +func deepEqualWithoutGeneration(t *testing.T, testcase int, actual, expected *NodeInfo) { + // Ignore generation field. + if actual != nil { + actual.generation = 0 + } + if !reflect.DeepEqual(actual, expected) { + t.Errorf("#%d: node info get=%s, want=%s", testcase, actual, expected) + } +} + // TestAssumePodScheduled tests that after a pod is assumed, its information is aggregated // on node level. func TestAssumePodScheduled(t *testing.T) { @@ -92,9 +102,7 @@ func TestAssumePodScheduled(t *testing.T) { } } n := cache.nodes[nodeName] - if !reflect.DeepEqual(n, tt.wNodeInfo) { - t.Errorf("#%d: node info get=%s, want=%s", i, n, tt.wNodeInfo) - } + deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo) } } @@ -154,9 +162,7 @@ func TestExpirePod(t *testing.T) { // pods that have assumedTime + ttl < cleanupTime will get expired and removed cache.cleanupAssumedPods(tt.cleanupTime) n := cache.nodes[nodeName] - if !reflect.DeepEqual(n, tt.wNodeInfo) { - t.Errorf("#%d: node info get=%s, want=%s", i, n, tt.wNodeInfo) - } + deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo) } } @@ -207,9 +213,7 @@ func TestAddPodWillConfirm(t *testing.T) { cache.cleanupAssumedPods(now.Add(2 * ttl)) // check after expiration. confirmed pods shouldn't be expired. n := cache.nodes[nodeName] - if !reflect.DeepEqual(n, tt.wNodeInfo) { - t.Errorf("#%d: node info get=%s, want=%s", i, n, tt.wNodeInfo) - } + deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo) } } @@ -254,9 +258,7 @@ func TestAddPodAfterExpiration(t *testing.T) { } // check after expiration. confirmed pods shouldn't be expired. n = cache.nodes[nodeName] - if !reflect.DeepEqual(n, tt.wNodeInfo) { - t.Errorf("#%d: node info get=%s, want=%s", i, n, tt.wNodeInfo) - } + deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo) } } @@ -317,9 +319,7 @@ func TestUpdatePod(t *testing.T) { } // check after expiration. confirmed pods shouldn't be expired. n := cache.nodes[nodeName] - if !reflect.DeepEqual(n, tt.wNodeInfo[i-1]) { - t.Errorf("#%d: node info get=%s, want=%s", i-1, n, tt.wNodeInfo) - } + deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo[i-1]) } } } @@ -390,9 +390,7 @@ func TestExpireAddUpdatePod(t *testing.T) { } // check after expiration. confirmed pods shouldn't be expired. n := cache.nodes[nodeName] - if !reflect.DeepEqual(n, tt.wNodeInfo[i-1]) { - t.Errorf("#%d: node info get=%s, want=%s", i-1, n, tt.wNodeInfo) - } + deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo[i-1]) } } } @@ -426,9 +424,7 @@ func TestRemovePod(t *testing.T) { t.Fatalf("AddPod failed: %v", err) } n := cache.nodes[nodeName] - if !reflect.DeepEqual(n, tt.wNodeInfo) { - t.Errorf("#%d: node info get=%s, want=%s", i, n, tt.wNodeInfo) - } + deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo) if err := cache.RemovePod(tt.pod); err != nil { t.Fatalf("RemovePod failed: %v", err) @@ -441,14 +437,6 @@ func TestRemovePod(t *testing.T) { } } -func BenchmarkGetNodeNameToInfoMap1kNodes30kPods(b *testing.B) { - cache := setupCacheOf1kNodes30kPods(b) - b.ResetTimer() - for n := 0; n < b.N; n++ { - cache.GetNodeNameToInfoMap() - } -} - func BenchmarkList1kNodes30kPods(b *testing.B) { cache := setupCacheOf1kNodes30kPods(b) b.ResetTimer() diff --git a/plugin/pkg/scheduler/schedulercache/interface.go b/plugin/pkg/scheduler/schedulercache/interface.go index 604e7cedbd8..962002d7c19 100644 --- a/plugin/pkg/scheduler/schedulercache/interface.go +++ b/plugin/pkg/scheduler/schedulercache/interface.go @@ -80,9 +80,10 @@ type Cache interface { // RemoveNode removes overall information about node. RemoveNode(node *api.Node) error - // GetNodeNameToInfoMap returns a map of node names to node info. The node info contains - // aggregated information of pods scheduled (including assumed to be) on this node. - GetNodeNameToInfoMap() (map[string]*NodeInfo, error) + // UpdateNodeNameToInfoMap updates the passed infoMap to the current contents of Cache. + // The node info contains aggregated information of pods scheduled (including assumed to be) + // on this node. + UpdateNodeNameToInfoMap(infoMap map[string]*NodeInfo) error // List lists all cached pods (including assumed ones). List(labels.Selector) ([]*api.Pod, error) diff --git a/plugin/pkg/scheduler/schedulercache/node_info.go b/plugin/pkg/scheduler/schedulercache/node_info.go index d66a6ad4b52..94950d8679d 100644 --- a/plugin/pkg/scheduler/schedulercache/node_info.go +++ b/plugin/pkg/scheduler/schedulercache/node_info.go @@ -39,6 +39,10 @@ type NodeInfo struct { requestedResource *Resource pods []*api.Pod nonzeroRequest *Resource + + // Whenever NodeInfo changes, generation is bumped. + // This is used to avoid cloning it if the object didn't change. + generation int64 } // Resource is a collection of compute resource. @@ -55,6 +59,7 @@ func NewNodeInfo(pods ...*api.Pod) *NodeInfo { ni := &NodeInfo{ requestedResource: &Resource{}, nonzeroRequest: &Resource{}, + generation: 0, } for _, pod := range pods { ni.addPod(pod) @@ -101,6 +106,7 @@ func (n *NodeInfo) Clone() *NodeInfo { requestedResource: &(*n.requestedResource), nonzeroRequest: &(*n.nonzeroRequest), pods: pods, + generation: n.generation, } return clone } @@ -123,6 +129,7 @@ func (n *NodeInfo) addPod(pod *api.Pod) { n.nonzeroRequest.MilliCPU += non0_cpu n.nonzeroRequest.Memory += non0_mem n.pods = append(n.pods, pod) + n.generation++ } // removePod subtracts pod information to this NodeInfo. @@ -149,6 +156,7 @@ func (n *NodeInfo) removePod(pod *api.Pod) error { n.requestedResource.NvidiaGPU -= nvidia_gpu n.nonzeroRequest.MilliCPU -= non0_cpu n.nonzeroRequest.Memory -= non0_mem + n.generation++ return nil } } @@ -173,6 +181,7 @@ func calculateResource(pod *api.Pod) (cpu int64, mem int64, nvidia_gpu int64, no // Sets the overall node information. func (n *NodeInfo) SetNode(node *api.Node) error { n.node = node + n.generation++ return nil } @@ -183,6 +192,7 @@ func (n *NodeInfo) RemoveNode(node *api.Node) error { // and thus can potentially be observed later, even though they happened before // node removal. This is handled correctly in cache.go file. n.node = nil + n.generation++ return nil } diff --git a/plugin/pkg/scheduler/testing/fake_cache.go b/plugin/pkg/scheduler/testing/fake_cache.go index 7f98e7e1e7c..2748e6c6e28 100644 --- a/plugin/pkg/scheduler/testing/fake_cache.go +++ b/plugin/pkg/scheduler/testing/fake_cache.go @@ -44,8 +44,8 @@ func (f *FakeCache) UpdateNode(oldNode, newNode *api.Node) error { return nil } func (f *FakeCache) RemoveNode(node *api.Node) error { return nil } -func (f *FakeCache) GetNodeNameToInfoMap() (map[string]*schedulercache.NodeInfo, error) { - return nil, nil +func (f *FakeCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.NodeInfo) error { + return nil } func (f *FakeCache) List(s labels.Selector) ([]*api.Pod, error) { return nil, nil } diff --git a/plugin/pkg/scheduler/testing/pods_to_cache.go b/plugin/pkg/scheduler/testing/pods_to_cache.go index b43a5da4422..1fec9e56a71 100644 --- a/plugin/pkg/scheduler/testing/pods_to_cache.go +++ b/plugin/pkg/scheduler/testing/pods_to_cache.go @@ -41,8 +41,9 @@ func (p PodsToCache) UpdateNode(oldNode, newNode *api.Node) error { return nil } func (p PodsToCache) RemoveNode(node *api.Node) error { return nil } -func (p PodsToCache) GetNodeNameToInfoMap() (map[string]*schedulercache.NodeInfo, error) { - return schedulercache.CreateNodeNameToInfoMap(p), nil +func (p PodsToCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.NodeInfo) error { + infoMap = schedulercache.CreateNodeNameToInfoMap(p) + return nil } func (p PodsToCache) List(s labels.Selector) (selected []*api.Pod, err error) {