Merge pull request #28680 from wojtek-t/advanced_node_info_map_copy

Automatic merge from submit-queue

Avoid creating NodeInfoMap from scratch on every scheduling.

Ref #28590
This commit is contained in:
k8s-merge-robot 2016-07-09 03:03:05 -07:00 committed by GitHub
commit 91226f77a1
7 changed files with 56 additions and 47 deletions

View File

@ -62,6 +62,8 @@ type genericScheduler struct {
pods algorithm.PodLister
lastNodeIndexLock sync.Mutex
lastNodeIndex uint64
cachedNodeInfoMap map[string]*schedulercache.NodeInfo
}
// Schedule tries to schedule the given pod to one of node in the node list.
@ -85,13 +87,13 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe
}
// Used for all fit and priority funcs.
nodeNameToInfo, err := g.cache.GetNodeNameToInfoMap()
err = g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
if err != nil {
return "", err
}
trace.Step("Computing predicates")
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, nodeNameToInfo, g.predicates, nodes, g.extenders)
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, g.predicates, nodes, g.extenders)
if err != nil {
return "", err
}
@ -104,7 +106,7 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe
}
trace.Step("Prioritizing")
priorityList, err := PrioritizeNodes(pod, nodeNameToInfo, g.prioritizers, algorithm.FakeNodeLister(filteredNodes), g.extenders)
priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, g.prioritizers, algorithm.FakeNodeLister(filteredNodes), g.extenders)
if err != nil {
return "", err
}
@ -329,9 +331,10 @@ func EqualPriority(_ *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInf
func NewGenericScheduler(cache schedulercache.Cache, predicates map[string]algorithm.FitPredicate, prioritizers []algorithm.PriorityConfig, extenders []algorithm.SchedulerExtender) algorithm.ScheduleAlgorithm {
return &genericScheduler{
cache: cache,
predicates: predicates,
prioritizers: prioritizers,
extenders: extenders,
cache: cache,
predicates: predicates,
prioritizers: prioritizers,
extenders: extenders,
cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo),
}
}

View File

@ -74,14 +74,20 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul
}
}
func (cache *schedulerCache) GetNodeNameToInfoMap() (map[string]*NodeInfo, error) {
nodeNameToInfo := make(map[string]*NodeInfo)
func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*NodeInfo) error {
cache.mu.Lock()
defer cache.mu.Unlock()
for name, info := range cache.nodes {
nodeNameToInfo[name] = info.Clone()
if current, ok := nodeNameToInfo[name]; !ok || current.generation != info.generation {
nodeNameToInfo[name] = info.Clone()
}
}
return nodeNameToInfo, nil
for name := range nodeNameToInfo {
if _, ok := cache.nodes[name]; !ok {
delete(nodeNameToInfo, name)
}
}
return nil
}
func (cache *schedulerCache) List(selector labels.Selector) ([]*api.Pod, error) {

View File

@ -28,6 +28,16 @@ import (
priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util"
)
func deepEqualWithoutGeneration(t *testing.T, testcase int, actual, expected *NodeInfo) {
// Ignore generation field.
if actual != nil {
actual.generation = 0
}
if !reflect.DeepEqual(actual, expected) {
t.Errorf("#%d: node info get=%s, want=%s", testcase, actual, expected)
}
}
// TestAssumePodScheduled tests that after a pod is assumed, its information is aggregated
// on node level.
func TestAssumePodScheduled(t *testing.T) {
@ -92,9 +102,7 @@ func TestAssumePodScheduled(t *testing.T) {
}
}
n := cache.nodes[nodeName]
if !reflect.DeepEqual(n, tt.wNodeInfo) {
t.Errorf("#%d: node info get=%s, want=%s", i, n, tt.wNodeInfo)
}
deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo)
}
}
@ -154,9 +162,7 @@ func TestExpirePod(t *testing.T) {
// pods that have assumedTime + ttl < cleanupTime will get expired and removed
cache.cleanupAssumedPods(tt.cleanupTime)
n := cache.nodes[nodeName]
if !reflect.DeepEqual(n, tt.wNodeInfo) {
t.Errorf("#%d: node info get=%s, want=%s", i, n, tt.wNodeInfo)
}
deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo)
}
}
@ -207,9 +213,7 @@ func TestAddPodWillConfirm(t *testing.T) {
cache.cleanupAssumedPods(now.Add(2 * ttl))
// check after expiration. confirmed pods shouldn't be expired.
n := cache.nodes[nodeName]
if !reflect.DeepEqual(n, tt.wNodeInfo) {
t.Errorf("#%d: node info get=%s, want=%s", i, n, tt.wNodeInfo)
}
deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo)
}
}
@ -254,9 +258,7 @@ func TestAddPodAfterExpiration(t *testing.T) {
}
// check after expiration. confirmed pods shouldn't be expired.
n = cache.nodes[nodeName]
if !reflect.DeepEqual(n, tt.wNodeInfo) {
t.Errorf("#%d: node info get=%s, want=%s", i, n, tt.wNodeInfo)
}
deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo)
}
}
@ -317,9 +319,7 @@ func TestUpdatePod(t *testing.T) {
}
// check after expiration. confirmed pods shouldn't be expired.
n := cache.nodes[nodeName]
if !reflect.DeepEqual(n, tt.wNodeInfo[i-1]) {
t.Errorf("#%d: node info get=%s, want=%s", i-1, n, tt.wNodeInfo)
}
deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo[i-1])
}
}
}
@ -390,9 +390,7 @@ func TestExpireAddUpdatePod(t *testing.T) {
}
// check after expiration. confirmed pods shouldn't be expired.
n := cache.nodes[nodeName]
if !reflect.DeepEqual(n, tt.wNodeInfo[i-1]) {
t.Errorf("#%d: node info get=%s, want=%s", i-1, n, tt.wNodeInfo)
}
deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo[i-1])
}
}
}
@ -426,9 +424,7 @@ func TestRemovePod(t *testing.T) {
t.Fatalf("AddPod failed: %v", err)
}
n := cache.nodes[nodeName]
if !reflect.DeepEqual(n, tt.wNodeInfo) {
t.Errorf("#%d: node info get=%s, want=%s", i, n, tt.wNodeInfo)
}
deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo)
if err := cache.RemovePod(tt.pod); err != nil {
t.Fatalf("RemovePod failed: %v", err)
@ -441,14 +437,6 @@ func TestRemovePod(t *testing.T) {
}
}
func BenchmarkGetNodeNameToInfoMap1kNodes30kPods(b *testing.B) {
cache := setupCacheOf1kNodes30kPods(b)
b.ResetTimer()
for n := 0; n < b.N; n++ {
cache.GetNodeNameToInfoMap()
}
}
func BenchmarkList1kNodes30kPods(b *testing.B) {
cache := setupCacheOf1kNodes30kPods(b)
b.ResetTimer()

View File

@ -80,9 +80,10 @@ type Cache interface {
// RemoveNode removes overall information about node.
RemoveNode(node *api.Node) error
// GetNodeNameToInfoMap returns a map of node names to node info. The node info contains
// aggregated information of pods scheduled (including assumed to be) on this node.
GetNodeNameToInfoMap() (map[string]*NodeInfo, error)
// UpdateNodeNameToInfoMap updates the passed infoMap to the current contents of Cache.
// The node info contains aggregated information of pods scheduled (including assumed to be)
// on this node.
UpdateNodeNameToInfoMap(infoMap map[string]*NodeInfo) error
// List lists all cached pods (including assumed ones).
List(labels.Selector) ([]*api.Pod, error)

View File

@ -39,6 +39,10 @@ type NodeInfo struct {
requestedResource *Resource
pods []*api.Pod
nonzeroRequest *Resource
// Whenever NodeInfo changes, generation is bumped.
// This is used to avoid cloning it if the object didn't change.
generation int64
}
// Resource is a collection of compute resource.
@ -55,6 +59,7 @@ func NewNodeInfo(pods ...*api.Pod) *NodeInfo {
ni := &NodeInfo{
requestedResource: &Resource{},
nonzeroRequest: &Resource{},
generation: 0,
}
for _, pod := range pods {
ni.addPod(pod)
@ -101,6 +106,7 @@ func (n *NodeInfo) Clone() *NodeInfo {
requestedResource: &(*n.requestedResource),
nonzeroRequest: &(*n.nonzeroRequest),
pods: pods,
generation: n.generation,
}
return clone
}
@ -123,6 +129,7 @@ func (n *NodeInfo) addPod(pod *api.Pod) {
n.nonzeroRequest.MilliCPU += non0_cpu
n.nonzeroRequest.Memory += non0_mem
n.pods = append(n.pods, pod)
n.generation++
}
// removePod subtracts pod information to this NodeInfo.
@ -149,6 +156,7 @@ func (n *NodeInfo) removePod(pod *api.Pod) error {
n.requestedResource.NvidiaGPU -= nvidia_gpu
n.nonzeroRequest.MilliCPU -= non0_cpu
n.nonzeroRequest.Memory -= non0_mem
n.generation++
return nil
}
}
@ -173,6 +181,7 @@ func calculateResource(pod *api.Pod) (cpu int64, mem int64, nvidia_gpu int64, no
// Sets the overall node information.
func (n *NodeInfo) SetNode(node *api.Node) error {
n.node = node
n.generation++
return nil
}
@ -183,6 +192,7 @@ func (n *NodeInfo) RemoveNode(node *api.Node) error {
// and thus can potentially be observed later, even though they happened before
// node removal. This is handled correctly in cache.go file.
n.node = nil
n.generation++
return nil
}

View File

@ -44,8 +44,8 @@ func (f *FakeCache) UpdateNode(oldNode, newNode *api.Node) error { return nil }
func (f *FakeCache) RemoveNode(node *api.Node) error { return nil }
func (f *FakeCache) GetNodeNameToInfoMap() (map[string]*schedulercache.NodeInfo, error) {
return nil, nil
func (f *FakeCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.NodeInfo) error {
return nil
}
func (f *FakeCache) List(s labels.Selector) ([]*api.Pod, error) { return nil, nil }

View File

@ -41,8 +41,9 @@ func (p PodsToCache) UpdateNode(oldNode, newNode *api.Node) error { return nil }
func (p PodsToCache) RemoveNode(node *api.Node) error { return nil }
func (p PodsToCache) GetNodeNameToInfoMap() (map[string]*schedulercache.NodeInfo, error) {
return schedulercache.CreateNodeNameToInfoMap(p), nil
func (p PodsToCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.NodeInfo) error {
infoMap = schedulercache.CreateNodeNameToInfoMap(p)
return nil
}
func (p PodsToCache) List(s labels.Selector) (selected []*api.Pod, err error) {