From d3b9d583a2f7b03f409592169a2066219b25795f Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Wed, 27 Jul 2016 17:42:27 +0200 Subject: [PATCH] Optimize PodAffinity priority function. --- .../algorithm/priorities/interpod_affinity.go | 78 ++++++++++++++----- .../priorities/interpod_affinity_test.go | 6 +- .../priorities/node_affinity_test.go | 2 +- .../algorithm/priorities/priorities_test.go | 26 +------ .../priorities/selector_spreading_test.go | 6 +- .../priorities/taint_toleration_test.go | 2 +- .../pkg/scheduler/schedulercache/node_info.go | 8 ++ plugin/pkg/scheduler/schedulercache/util.go | 16 ++-- plugin/pkg/scheduler/testing/pods_to_cache.go | 2 +- 9 files changed, 90 insertions(+), 56 deletions(-) diff --git a/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity.go b/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity.go index dd4d2ea0811..5889402b2a4 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity.go +++ b/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity.go @@ -17,9 +17,11 @@ limitations under the License. package priorities import ( + "sync" + "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util" @@ -52,6 +54,8 @@ func NewInterPodAffinityPriority( } type podAffinityPriorityMap struct { + sync.Mutex + // nodes contain all nodes that should be considered nodes []*api.Node // counts store the mapping from node name to so-far computed score of @@ -71,20 +75,30 @@ func newPodAffinityPriorityMap(nodes []*api.Node, failureDomains priorityutil.To } } +func (p *podAffinityPriorityMap) setError(err error) { + p.Lock() + defer p.Unlock() + if p.firstError == nil { + p.firstError = err + } +} + func (p *podAffinityPriorityMap) processTerm(term *api.PodAffinityTerm, podDefiningAffinityTerm, podToCheck *api.Pod, fixedNode *api.Node, weight float64) { match, err := priorityutil.PodMatchesTermsNamespaceAndSelector(podToCheck, podDefiningAffinityTerm, term) if err != nil { - if p.firstError == nil { - p.firstError = err - } + p.setError(err) return } if match { - for _, node := range p.nodes { - if p.failureDomains.NodesHaveSameTopologyKey(node, fixedNode, term.TopologyKey) { - p.counts[node.Name] += weight + func() { + p.Lock() + defer p.Unlock() + for _, node := range p.nodes { + if p.failureDomains.NodesHaveSameTopologyKey(node, fixedNode, term.TopologyKey) { + p.counts[node.Name] += weight + } } - } + }() } } @@ -101,14 +115,17 @@ func (p *podAffinityPriorityMap) processTerms(terms []api.WeightedPodAffinityTer // Symmetry need to be considered for preferredDuringSchedulingIgnoredDuringExecution from podAffinity & podAntiAffinity, // symmetry need to be considered for hard requirements from podAffinity func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) { - allPods, err := ipa.podLister.List(labels.Everything()) - if err != nil { - return nil, err - } affinity, err := api.GetAffinityFromPodAnnotations(pod.Annotations) if err != nil { return nil, err } + hasAffinityConstraints := affinity != nil && affinity.PodAffinity != nil + hasAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil + + allNodeNames := make([]string, 0, len(nodeNameToInfo)) + for name := range nodeNameToInfo { + allNodeNames = append(allNodeNames, name) + } // convert the topology key based weights to the node name based weights var maxCount float64 @@ -117,24 +134,26 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *api.Pod, nod // the node. pm := newPodAffinityPriorityMap(nodes, ipa.failureDomains) - for _, existingPod := range allPods { + processPod := func(existingPod *api.Pod) error { existingPodNode, err := ipa.info.GetNodeInfo(existingPod.Spec.NodeName) if err != nil { - return nil, err + return err } existingPodAffinity, err := api.GetAffinityFromPodAnnotations(existingPod.Annotations) if err != nil { - return nil, err + return err } + existingHasAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAffinity != nil + existingHasAntiAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAntiAffinity != nil - if affinity != nil && affinity.PodAffinity != nil { + if hasAffinityConstraints { // For every soft pod affinity term of , if matches the term, // increment for every node in the cluster with the same // value as that of `s node by the term`s weight. terms := affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution pm.processTerms(terms, pod, existingPod, existingPodNode, 1) } - if affinity != nil && affinity.PodAntiAffinity != nil { + if hasAntiAffinityConstraints { // For every soft pod anti-affinity term of , if matches the term, // decrement for every node in the cluster with the same // value as that of `s node by the term`s weight. @@ -142,7 +161,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *api.Pod, nod pm.processTerms(terms, pod, existingPod, existingPodNode, -1) } - if existingPodAffinity != nil && existingPodAffinity.PodAffinity != nil { + if existingHasAffinityConstraints { // For every hard pod affinity term of , if matches the term, // increment for every node in the cluster with the same // value as that of 's node by the constant @@ -162,14 +181,35 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *api.Pod, nod terms := existingPodAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution pm.processTerms(terms, existingPod, pod, existingPodNode, 1) } - if existingPodAffinity != nil && existingPodAffinity.PodAntiAffinity != nil { + if existingHasAntiAffinityConstraints { // For every soft pod anti-affinity term of , if matches the term, // decrement for every node in the cluster with the same // value as that of 's node by the term's weight. terms := existingPodAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution pm.processTerms(terms, existingPod, pod, existingPodNode, -1) } + return nil } + processNode := func(i int) { + nodeInfo := nodeNameToInfo[allNodeNames[i]] + if hasAffinityConstraints || hasAntiAffinityConstraints { + // We need to process all the nodes. + for _, existingPod := range nodeInfo.Pods() { + if err := processPod(existingPod); err != nil { + pm.setError(err) + } + } + } else { + // The pod doesn't have any constraints - we need to check only existing + // ones that have some. + for _, existingPod := range nodeInfo.PodsWithAffinity() { + if err := processPod(existingPod); err != nil { + pm.setError(err) + } + } + } + } + workqueue.Parallelize(16, len(allNodeNames), processNode) if pm.firstError != nil { return nil, pm.firstError } diff --git a/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity_test.go b/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity_test.go index aaed1661cd7..64225311027 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity_test.go @@ -496,7 +496,7 @@ func TestInterPodAffinityPriority(t *testing.T) { }, } for _, test := range tests { - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) interPodAffinity := InterPodAffinity{ info: FakeNodeListInfo(test.nodes), nodeLister: algorithm.FakeNodeLister(test.nodes), @@ -585,7 +585,7 @@ func TestHardPodAffinitySymmetricWeight(t *testing.T) { }, } for _, test := range tests { - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) ipa := InterPodAffinity{ info: FakeNodeListInfo(test.nodes), nodeLister: algorithm.FakeNodeLister(test.nodes), @@ -669,7 +669,7 @@ func TestSoftPodAntiAffinityWithFailureDomains(t *testing.T) { }, } for _, test := range tests { - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) ipa := InterPodAffinity{ info: FakeNodeListInfo(test.nodes), nodeLister: algorithm.FakeNodeLister(test.nodes), diff --git a/plugin/pkg/scheduler/algorithm/priorities/node_affinity_test.go b/plugin/pkg/scheduler/algorithm/priorities/node_affinity_test.go index b6bf33c626f..d2efbc0137f 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/node_affinity_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/node_affinity_test.go @@ -155,7 +155,7 @@ func TestNodeAffinityPriority(t *testing.T) { } for _, test := range tests { - list, err := CalculateNodeAffinityPriority(test.pod, schedulercache.CreateNodeNameToInfoMap(nil), test.nodes) + list, err := CalculateNodeAffinityPriority(test.pod, schedulercache.CreateNodeNameToInfoMap(nil, test.nodes), test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go b/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go index 898797f9e6e..eb78285256c 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go @@ -138,13 +138,7 @@ func TestZeroRequest(t *testing.T) { const expectedPriority int = 25 for _, test := range tests { - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) - for _, node := range test.nodes { - if _, ok := nodeNameToInfo[node.Name]; !ok { - nodeNameToInfo[node.Name] = schedulercache.NewNodeInfo() - } - nodeNameToInfo[node.Name].SetNode(node) - } + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) list, err := scheduler.PrioritizeNodes( test.pod, nodeNameToInfo, @@ -395,13 +389,7 @@ func TestLeastRequested(t *testing.T) { } for _, test := range tests { - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) - for _, node := range test.nodes { - if _, ok := nodeNameToInfo[node.Name]; !ok { - nodeNameToInfo[node.Name] = schedulercache.NewNodeInfo() - } - nodeNameToInfo[node.Name].SetNode(node) - } + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) list, err := LeastRequestedPriority(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) @@ -734,13 +722,7 @@ func TestBalancedResourceAllocation(t *testing.T) { } for _, test := range tests { - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) - for _, node := range test.nodes { - if _, ok := nodeNameToInfo[node.Name]; !ok { - nodeNameToInfo[node.Name] = schedulercache.NewNodeInfo() - } - nodeNameToInfo[node.Name].SetNode(node) - } + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) list, err := BalancedResourceAllocation(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) @@ -884,7 +866,7 @@ func TestImageLocalityPriority(t *testing.T) { } for _, test := range tests { - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) list, err := ImageLocalityPriority(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go index 5f8c3f2ab61..302a7de11f0 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go @@ -276,7 +276,7 @@ func TestSelectorSpreadPriority(t *testing.T) { } for _, test := range tests { - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nil) selectorSpread := SelectorSpread{podLister: algorithm.FakePodLister(test.pods), serviceLister: algorithm.FakeServiceLister(test.services), controllerLister: algorithm.FakeControllerLister(test.rcs), replicaSetLister: algorithm.FakeReplicaSetLister(test.rss)} list, err := selectorSpread.CalculateSpreadPriority(test.pod, nodeNameToInfo, makeNodeList(test.nodes)) if err != nil { @@ -477,7 +477,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) { } for _, test := range tests { - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nil) selectorSpread := SelectorSpread{podLister: algorithm.FakePodLister(test.pods), serviceLister: algorithm.FakeServiceLister(test.services), controllerLister: algorithm.FakeControllerLister(test.rcs), replicaSetLister: algorithm.FakeReplicaSetLister(test.rss)} list, err := selectorSpread.CalculateSpreadPriority(test.pod, nodeNameToInfo, makeLabeledNodeList(labeledNodes)) if err != nil { @@ -649,7 +649,7 @@ func TestZoneSpreadPriority(t *testing.T) { } for _, test := range tests { - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nil) zoneSpread := ServiceAntiAffinity{podLister: algorithm.FakePodLister(test.pods), serviceLister: algorithm.FakeServiceLister(test.services), label: "zone"} list, err := zoneSpread.CalculateAntiAffinityPriority(test.pod, nodeNameToInfo, makeLabeledNodeList(test.nodes)) if err != nil { diff --git a/plugin/pkg/scheduler/algorithm/priorities/taint_toleration_test.go b/plugin/pkg/scheduler/algorithm/priorities/taint_toleration_test.go index e2f77ea72d0..f0b19a71204 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/taint_toleration_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/taint_toleration_test.go @@ -210,7 +210,7 @@ func TestTaintAndToleration(t *testing.T) { }, } for _, test := range tests { - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap([]*api.Pod{{}}) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, nil) list, err := ComputeTaintTolerationPriority(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("%s, unexpected error: %v", test.test, err) diff --git a/plugin/pkg/scheduler/schedulercache/node_info.go b/plugin/pkg/scheduler/schedulercache/node_info.go index 5d041b0cd0d..4f0bf9de070 100644 --- a/plugin/pkg/scheduler/schedulercache/node_info.go +++ b/plugin/pkg/scheduler/schedulercache/node_info.go @@ -93,6 +93,14 @@ func (n *NodeInfo) Pods() []*api.Pod { return n.pods } +// PodsWithAffinity return all pods with (anti)affinity constraints on this node. +func (n *NodeInfo) PodsWithAffinity() []*api.Pod { + if n == nil { + return nil + } + return n.podsWithAffinity +} + func (n *NodeInfo) AllowedPodNumber() int { if n == nil { return 0 diff --git a/plugin/pkg/scheduler/schedulercache/util.go b/plugin/pkg/scheduler/schedulercache/util.go index d39d0caf30b..12e6848bc57 100644 --- a/plugin/pkg/scheduler/schedulercache/util.go +++ b/plugin/pkg/scheduler/schedulercache/util.go @@ -20,16 +20,20 @@ import "k8s.io/kubernetes/pkg/api" // CreateNodeNameToInfoMap obtains a list of pods and pivots that list into a map where the keys are node names // and the values are the aggregated information for that node. -func CreateNodeNameToInfoMap(pods []*api.Pod) map[string]*NodeInfo { +func CreateNodeNameToInfoMap(pods []*api.Pod, nodes []*api.Node) map[string]*NodeInfo { nodeNameToInfo := make(map[string]*NodeInfo) for _, pod := range pods { nodeName := pod.Spec.NodeName - nodeInfo, ok := nodeNameToInfo[nodeName] - if !ok { - nodeInfo = NewNodeInfo() - nodeNameToInfo[nodeName] = nodeInfo + if _, ok := nodeNameToInfo[nodeName]; !ok { + nodeNameToInfo[nodeName] = NewNodeInfo() } - nodeInfo.addPod(pod) + nodeNameToInfo[nodeName].addPod(pod) + } + for _, node := range nodes { + if _, ok := nodeNameToInfo[node.Name]; !ok { + nodeNameToInfo[node.Name] = NewNodeInfo() + } + nodeNameToInfo[node.Name].SetNode(node) } return nodeNameToInfo } diff --git a/plugin/pkg/scheduler/testing/pods_to_cache.go b/plugin/pkg/scheduler/testing/pods_to_cache.go index 40281bc4cf6..2eec3c5c5c4 100644 --- a/plugin/pkg/scheduler/testing/pods_to_cache.go +++ b/plugin/pkg/scheduler/testing/pods_to_cache.go @@ -42,7 +42,7 @@ func (p PodsToCache) UpdateNode(oldNode, newNode *api.Node) error { return nil } func (p PodsToCache) RemoveNode(node *api.Node) error { return nil } func (p PodsToCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.NodeInfo) error { - infoMap = schedulercache.CreateNodeNameToInfoMap(p) + infoMap = schedulercache.CreateNodeNameToInfoMap(p, nil) return nil }