From a63cccfafc4f57adb3b4655d906c6ac5d101edc8 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Fri, 22 Jul 2016 12:37:21 +0200 Subject: [PATCH 1/3] Cache pods with pod (anti)affinity constraints --- .../pkg/scheduler/schedulercache/node_info.go | 29 +++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/plugin/pkg/scheduler/schedulercache/node_info.go b/plugin/pkg/scheduler/schedulercache/node_info.go index 4ce1cdd0a10..4db5f3cb16f 100644 --- a/plugin/pkg/scheduler/schedulercache/node_info.go +++ b/plugin/pkg/scheduler/schedulercache/node_info.go @@ -33,11 +33,13 @@ type NodeInfo struct { // Overall node information. node *api.Node + pods []*api.Pod + podsWithAffinity []*api.Pod + // Total requested resource of all pods on this node. // It includes assumed pods which scheduler sends binding to apiserver but // didn't get it as scheduled yet. requestedResource *Resource - pods []*api.Pod nonzeroRequest *Resource // We store allocatedResources (which is Node.Status.Allocatable.*) explicitly // as int64, to avoid conversions and accessing map. @@ -126,13 +128,19 @@ func (n *NodeInfo) Clone() *NodeInfo { pods := append([]*api.Pod(nil), n.pods...) clone := &NodeInfo{ node: n.node, + pods: pods, requestedResource: &(*n.requestedResource), nonzeroRequest: &(*n.nonzeroRequest), allocatableResource: &(*n.allocatableResource), allowedPodNumber: n.allowedPodNumber, - pods: pods, generation: n.generation, } + if len(n.pods) > 0 { + clone.pods = append([]*api.Pod(nil), n.pods...) + } + if len(n.podsWithAffinity) > 0 { + clone.podsWithAffinity = append([]*api.Pod(nil), n.podsWithAffinity...) + } return clone } @@ -154,6 +162,10 @@ func (n *NodeInfo) addPod(pod *api.Pod) { n.nonzeroRequest.MilliCPU += non0_cpu n.nonzeroRequest.Memory += non0_mem n.pods = append(n.pods, pod) + // TODO: This should return pointer to avoid allocations. + if affinity, err := api.GetAffinityFromPodAnnotations(pod.Annotations); err == nil && (affinity.PodAffinity != nil || affinity.PodAntiAffinity != nil) { + n.podsWithAffinity = append(n.podsWithAffinity, pod) + } n.generation++ } @@ -164,6 +176,19 @@ func (n *NodeInfo) removePod(pod *api.Pod) error { return err } + for i := range n.podsWithAffinity { + k2, err := getPodKey(n.podsWithAffinity[i]) + if err != nil { + glog.Errorf("Cannot get pod key, err: %v", err) + continue + } + if k1 == k2 { + // delete the element + n.podsWithAffinity[i] = n.podsWithAffinity[len(n.podsWithAffinity)-1] + n.podsWithAffinity = n.podsWithAffinity[:len(n.podsWithAffinity)-1] + break + } + } for i := range n.pods { k2, err := getPodKey(n.pods[i]) if err != nil { From 898a6444e3a0325b38dba8d77b2bc37d758c289c Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Fri, 22 Jul 2016 12:48:35 +0200 Subject: [PATCH 2/3] Return pointer for Affinity in api helper --- pkg/api/helpers.go | 9 +++++---- pkg/api/validation/validation.go | 3 +++ plugin/pkg/admission/antiaffinity/admission.go | 2 +- .../scheduler/algorithm/predicates/predicates.go | 16 ++++++++++++---- .../algorithm/predicates/predicates_test.go | 2 +- .../algorithm/priorities/interpod_affinity.go | 8 ++++---- .../algorithm/priorities/node_affinity.go | 2 +- plugin/pkg/scheduler/schedulercache/node_info.go | 11 +++++++++-- 8 files changed, 36 insertions(+), 17 deletions(-) diff --git a/pkg/api/helpers.go b/pkg/api/helpers.go index b1881914231..08245e50069 100644 --- a/pkg/api/helpers.go +++ b/pkg/api/helpers.go @@ -436,15 +436,16 @@ const ( // GetAffinityFromPod gets the json serialized affinity data from Pod.Annotations // and converts it to the Affinity type in api. -func GetAffinityFromPodAnnotations(annotations map[string]string) (Affinity, error) { - var affinity Affinity +func GetAffinityFromPodAnnotations(annotations map[string]string) (*Affinity, error) { if len(annotations) > 0 && annotations[AffinityAnnotationKey] != "" { + var affinity Affinity err := json.Unmarshal([]byte(annotations[AffinityAnnotationKey]), &affinity) if err != nil { - return affinity, err + return nil, err } + return &affinity, nil } - return affinity, nil + return nil, nil } // GetTolerationsFromPodAnnotations gets the json serialized tolerations data from Pod.Annotations diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index 0fe02c5dc8d..e08bb1e5d60 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -1863,6 +1863,9 @@ func ValidateAffinityInPodAnnotations(annotations map[string]string, fldPath *fi allErrs = append(allErrs, field.Invalid(fldPath, api.AffinityAnnotationKey, err.Error())) return allErrs } + if affinity == nil { + return allErrs + } affinityFldPath := fldPath.Child(api.AffinityAnnotationKey) if affinity.NodeAffinity != nil { diff --git a/plugin/pkg/admission/antiaffinity/admission.go b/plugin/pkg/admission/antiaffinity/admission.go index f211022b3a3..50cb5a5f8cb 100644 --- a/plugin/pkg/admission/antiaffinity/admission.go +++ b/plugin/pkg/admission/antiaffinity/admission.go @@ -63,7 +63,7 @@ func (p *plugin) Admit(attributes admission.Attributes) (err error) { glog.V(5).Infof("Invalid Affinity detected, but we will leave handling of this to validation phase") return nil } - if affinity.PodAntiAffinity != nil { + if affinity != nil && affinity.PodAntiAffinity != nil { var podAntiAffinityTerms []api.PodAffinityTerm if len(affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 { podAntiAffinityTerms = affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index 71b96c228f3..161e668f19e 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -512,7 +512,7 @@ func podMatchesNodeLabels(pod *api.Pod, node *api.Node) bool { // 5. zero-length non-nil []NodeSelectorRequirement matches no nodes also, just for simplicity // 6. non-nil empty NodeSelectorRequirement is not allowed nodeAffinityMatches := true - if affinity.NodeAffinity != nil { + if affinity != nil && affinity.NodeAffinity != nil { nodeAffinity := affinity.NodeAffinity // if no required NodeAffinity requirements, will do no-op, means select all nodes. // TODO: Replace next line with subsequent commented-out line when implement RequiredDuringSchedulingRequiredDuringExecution. @@ -809,14 +809,19 @@ func (checker *PodAffinityChecker) InterPodAffinityMatches(pod *api.Pod, meta in // Check if the current node match the inter-pod affinity scheduling constraints. // Hard inter-pod affinity is not symmetric, check only when affinity.PodAffinity exists. - if affinity.PodAffinity != nil { + if affinity != nil && affinity.PodAffinity != nil { if !checker.NodeMatchesHardPodAffinity(pod, allPods, node, affinity.PodAffinity) { return false, ErrPodAffinityNotMatch } } - // Hard inter-pod anti-affinity is symmetric, we should always check it. - if !checker.NodeMatchesHardPodAntiAffinity(pod, allPods, node, affinity.PodAntiAffinity) { + // Hard inter-pod anti-affinity is symmetric, we should always check it + // (also when affinity or affinity.PodAntiAffinity is nil). + var antiAffinity *api.PodAntiAffinity + if affinity != nil { + antiAffinity = affinity.PodAntiAffinity + } + if !checker.NodeMatchesHardPodAntiAffinity(pod, allPods, node, antiAffinity) { return false, ErrPodAffinityNotMatch } @@ -933,6 +938,9 @@ func (checker *PodAffinityChecker) NodeMatchesHardPodAntiAffinity(pod *api.Pod, glog.V(10).Infof("Failed to get Affinity from Pod %+v, err: %+v", podName(pod), err) return false } + if epAffinity == nil { + continue + } epNode, err := checker.info.GetNodeInfo(ep.Spec.NodeName) if err != nil { glog.V(10).Infof("Failed to get node from Pod %+v, err: %+v", podName(ep), err) diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go index 48295b01da1..caaef840b2e 100755 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go @@ -2500,7 +2500,7 @@ func TestInterPodAffinityWithMultipleNodes(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - if affinity.NodeAffinity != nil { + if affinity != nil && affinity.NodeAffinity != nil { nodeInfo := schedulercache.NewNodeInfo() nodeInfo.SetNode(&node) fits2, err := PodSelectorMatches(test.pod, PredicateMetadata(test.pod), nodeInfo) diff --git a/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity.go b/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity.go index 269762a59d8..dd4d2ea0811 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity.go +++ b/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity.go @@ -127,14 +127,14 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *api.Pod, nod return nil, err } - if affinity.PodAffinity != nil { + if affinity != nil && affinity.PodAffinity != nil { // For every soft pod affinity term of , if matches the term, // increment for every node in the cluster with the same // value as that of `s node by the term`s weight. terms := affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution pm.processTerms(terms, pod, existingPod, existingPodNode, 1) } - if affinity.PodAntiAffinity != nil { + if affinity != nil && affinity.PodAntiAffinity != nil { // For every soft pod anti-affinity term of , if matches the term, // decrement for every node in the cluster with the same // value as that of `s node by the term`s weight. @@ -142,7 +142,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *api.Pod, nod pm.processTerms(terms, pod, existingPod, existingPodNode, -1) } - if existingPodAffinity.PodAffinity != nil { + if existingPodAffinity != nil && existingPodAffinity.PodAffinity != nil { // For every hard pod affinity term of , if matches the term, // increment for every node in the cluster with the same // value as that of 's node by the constant @@ -162,7 +162,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *api.Pod, nod terms := existingPodAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution pm.processTerms(terms, existingPod, pod, existingPodNode, 1) } - if existingPodAffinity.PodAntiAffinity != nil { + if existingPodAffinity != nil && existingPodAffinity.PodAntiAffinity != nil { // For every soft pod anti-affinity term of , if matches the term, // decrement for every node in the cluster with the same // value as that of 's node by the term's weight. diff --git a/plugin/pkg/scheduler/algorithm/priorities/node_affinity.go b/plugin/pkg/scheduler/algorithm/priorities/node_affinity.go index 53b8b87ffc1..97a2c9b6544 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/node_affinity.go +++ b/plugin/pkg/scheduler/algorithm/priorities/node_affinity.go @@ -41,7 +41,7 @@ func CalculateNodeAffinityPriority(pod *api.Pod, nodeNameToInfo map[string]*sche // A nil element of PreferredDuringSchedulingIgnoredDuringExecution matches no objects. // An element of PreferredDuringSchedulingIgnoredDuringExecution that refers to an // empty PreferredSchedulingTerm matches all objects. - if affinity.NodeAffinity != nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil { + if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil { // Match PreferredDuringSchedulingIgnoredDuringExecution term by term. for _, preferredSchedulingTerm := range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution { if preferredSchedulingTerm.Weight == 0 { diff --git a/plugin/pkg/scheduler/schedulercache/node_info.go b/plugin/pkg/scheduler/schedulercache/node_info.go index 4db5f3cb16f..5d041b0cd0d 100644 --- a/plugin/pkg/scheduler/schedulercache/node_info.go +++ b/plugin/pkg/scheduler/schedulercache/node_info.go @@ -153,6 +153,14 @@ func (n *NodeInfo) String() string { return fmt.Sprintf("&NodeInfo{Pods:%v, RequestedResource:%#v, NonZeroRequest: %#v}", podKeys, n.requestedResource, n.nonzeroRequest) } +func hasPodAffinityConstraints(pod *api.Pod) bool { + affinity, err := api.GetAffinityFromPodAnnotations(pod.Annotations) + if err != nil || affinity == nil { + return false + } + return affinity.PodAffinity != nil || affinity.PodAntiAffinity != nil +} + // addPod adds pod information to this NodeInfo. func (n *NodeInfo) addPod(pod *api.Pod) { cpu, mem, nvidia_gpu, non0_cpu, non0_mem := calculateResource(pod) @@ -162,8 +170,7 @@ func (n *NodeInfo) addPod(pod *api.Pod) { n.nonzeroRequest.MilliCPU += non0_cpu n.nonzeroRequest.Memory += non0_mem n.pods = append(n.pods, pod) - // TODO: This should return pointer to avoid allocations. - if affinity, err := api.GetAffinityFromPodAnnotations(pod.Annotations); err == nil && (affinity.PodAffinity != nil || affinity.PodAntiAffinity != nil) { + if hasPodAffinityConstraints(pod) { n.podsWithAffinity = append(n.podsWithAffinity, pod) } n.generation++ From d3b9d583a2f7b03f409592169a2066219b25795f Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Wed, 27 Jul 2016 17:42:27 +0200 Subject: [PATCH 3/3] Optimize PodAffinity priority function. --- .../algorithm/priorities/interpod_affinity.go | 78 ++++++++++++++----- .../priorities/interpod_affinity_test.go | 6 +- .../priorities/node_affinity_test.go | 2 +- .../algorithm/priorities/priorities_test.go | 26 +------ .../priorities/selector_spreading_test.go | 6 +- .../priorities/taint_toleration_test.go | 2 +- .../pkg/scheduler/schedulercache/node_info.go | 8 ++ plugin/pkg/scheduler/schedulercache/util.go | 16 ++-- plugin/pkg/scheduler/testing/pods_to_cache.go | 2 +- 9 files changed, 90 insertions(+), 56 deletions(-) diff --git a/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity.go b/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity.go index dd4d2ea0811..5889402b2a4 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity.go +++ b/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity.go @@ -17,9 +17,11 @@ limitations under the License. package priorities import ( + "sync" + "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util" @@ -52,6 +54,8 @@ func NewInterPodAffinityPriority( } type podAffinityPriorityMap struct { + sync.Mutex + // nodes contain all nodes that should be considered nodes []*api.Node // counts store the mapping from node name to so-far computed score of @@ -71,20 +75,30 @@ func newPodAffinityPriorityMap(nodes []*api.Node, failureDomains priorityutil.To } } +func (p *podAffinityPriorityMap) setError(err error) { + p.Lock() + defer p.Unlock() + if p.firstError == nil { + p.firstError = err + } +} + func (p *podAffinityPriorityMap) processTerm(term *api.PodAffinityTerm, podDefiningAffinityTerm, podToCheck *api.Pod, fixedNode *api.Node, weight float64) { match, err := priorityutil.PodMatchesTermsNamespaceAndSelector(podToCheck, podDefiningAffinityTerm, term) if err != nil { - if p.firstError == nil { - p.firstError = err - } + p.setError(err) return } if match { - for _, node := range p.nodes { - if p.failureDomains.NodesHaveSameTopologyKey(node, fixedNode, term.TopologyKey) { - p.counts[node.Name] += weight + func() { + p.Lock() + defer p.Unlock() + for _, node := range p.nodes { + if p.failureDomains.NodesHaveSameTopologyKey(node, fixedNode, term.TopologyKey) { + p.counts[node.Name] += weight + } } - } + }() } } @@ -101,14 +115,17 @@ func (p *podAffinityPriorityMap) processTerms(terms []api.WeightedPodAffinityTer // Symmetry need to be considered for preferredDuringSchedulingIgnoredDuringExecution from podAffinity & podAntiAffinity, // symmetry need to be considered for hard requirements from podAffinity func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) { - allPods, err := ipa.podLister.List(labels.Everything()) - if err != nil { - return nil, err - } affinity, err := api.GetAffinityFromPodAnnotations(pod.Annotations) if err != nil { return nil, err } + hasAffinityConstraints := affinity != nil && affinity.PodAffinity != nil + hasAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil + + allNodeNames := make([]string, 0, len(nodeNameToInfo)) + for name := range nodeNameToInfo { + allNodeNames = append(allNodeNames, name) + } // convert the topology key based weights to the node name based weights var maxCount float64 @@ -117,24 +134,26 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *api.Pod, nod // the node. pm := newPodAffinityPriorityMap(nodes, ipa.failureDomains) - for _, existingPod := range allPods { + processPod := func(existingPod *api.Pod) error { existingPodNode, err := ipa.info.GetNodeInfo(existingPod.Spec.NodeName) if err != nil { - return nil, err + return err } existingPodAffinity, err := api.GetAffinityFromPodAnnotations(existingPod.Annotations) if err != nil { - return nil, err + return err } + existingHasAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAffinity != nil + existingHasAntiAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAntiAffinity != nil - if affinity != nil && affinity.PodAffinity != nil { + if hasAffinityConstraints { // For every soft pod affinity term of , if matches the term, // increment for every node in the cluster with the same // value as that of `s node by the term`s weight. terms := affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution pm.processTerms(terms, pod, existingPod, existingPodNode, 1) } - if affinity != nil && affinity.PodAntiAffinity != nil { + if hasAntiAffinityConstraints { // For every soft pod anti-affinity term of , if matches the term, // decrement for every node in the cluster with the same // value as that of `s node by the term`s weight. @@ -142,7 +161,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *api.Pod, nod pm.processTerms(terms, pod, existingPod, existingPodNode, -1) } - if existingPodAffinity != nil && existingPodAffinity.PodAffinity != nil { + if existingHasAffinityConstraints { // For every hard pod affinity term of , if matches the term, // increment for every node in the cluster with the same // value as that of 's node by the constant @@ -162,14 +181,35 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *api.Pod, nod terms := existingPodAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution pm.processTerms(terms, existingPod, pod, existingPodNode, 1) } - if existingPodAffinity != nil && existingPodAffinity.PodAntiAffinity != nil { + if existingHasAntiAffinityConstraints { // For every soft pod anti-affinity term of , if matches the term, // decrement for every node in the cluster with the same // value as that of 's node by the term's weight. terms := existingPodAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution pm.processTerms(terms, existingPod, pod, existingPodNode, -1) } + return nil } + processNode := func(i int) { + nodeInfo := nodeNameToInfo[allNodeNames[i]] + if hasAffinityConstraints || hasAntiAffinityConstraints { + // We need to process all the nodes. + for _, existingPod := range nodeInfo.Pods() { + if err := processPod(existingPod); err != nil { + pm.setError(err) + } + } + } else { + // The pod doesn't have any constraints - we need to check only existing + // ones that have some. + for _, existingPod := range nodeInfo.PodsWithAffinity() { + if err := processPod(existingPod); err != nil { + pm.setError(err) + } + } + } + } + workqueue.Parallelize(16, len(allNodeNames), processNode) if pm.firstError != nil { return nil, pm.firstError } diff --git a/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity_test.go b/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity_test.go index aaed1661cd7..64225311027 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity_test.go @@ -496,7 +496,7 @@ func TestInterPodAffinityPriority(t *testing.T) { }, } for _, test := range tests { - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) interPodAffinity := InterPodAffinity{ info: FakeNodeListInfo(test.nodes), nodeLister: algorithm.FakeNodeLister(test.nodes), @@ -585,7 +585,7 @@ func TestHardPodAffinitySymmetricWeight(t *testing.T) { }, } for _, test := range tests { - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) ipa := InterPodAffinity{ info: FakeNodeListInfo(test.nodes), nodeLister: algorithm.FakeNodeLister(test.nodes), @@ -669,7 +669,7 @@ func TestSoftPodAntiAffinityWithFailureDomains(t *testing.T) { }, } for _, test := range tests { - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) ipa := InterPodAffinity{ info: FakeNodeListInfo(test.nodes), nodeLister: algorithm.FakeNodeLister(test.nodes), diff --git a/plugin/pkg/scheduler/algorithm/priorities/node_affinity_test.go b/plugin/pkg/scheduler/algorithm/priorities/node_affinity_test.go index b6bf33c626f..d2efbc0137f 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/node_affinity_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/node_affinity_test.go @@ -155,7 +155,7 @@ func TestNodeAffinityPriority(t *testing.T) { } for _, test := range tests { - list, err := CalculateNodeAffinityPriority(test.pod, schedulercache.CreateNodeNameToInfoMap(nil), test.nodes) + list, err := CalculateNodeAffinityPriority(test.pod, schedulercache.CreateNodeNameToInfoMap(nil, test.nodes), test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go b/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go index 898797f9e6e..eb78285256c 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go @@ -138,13 +138,7 @@ func TestZeroRequest(t *testing.T) { const expectedPriority int = 25 for _, test := range tests { - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) - for _, node := range test.nodes { - if _, ok := nodeNameToInfo[node.Name]; !ok { - nodeNameToInfo[node.Name] = schedulercache.NewNodeInfo() - } - nodeNameToInfo[node.Name].SetNode(node) - } + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) list, err := scheduler.PrioritizeNodes( test.pod, nodeNameToInfo, @@ -395,13 +389,7 @@ func TestLeastRequested(t *testing.T) { } for _, test := range tests { - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) - for _, node := range test.nodes { - if _, ok := nodeNameToInfo[node.Name]; !ok { - nodeNameToInfo[node.Name] = schedulercache.NewNodeInfo() - } - nodeNameToInfo[node.Name].SetNode(node) - } + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) list, err := LeastRequestedPriority(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) @@ -734,13 +722,7 @@ func TestBalancedResourceAllocation(t *testing.T) { } for _, test := range tests { - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) - for _, node := range test.nodes { - if _, ok := nodeNameToInfo[node.Name]; !ok { - nodeNameToInfo[node.Name] = schedulercache.NewNodeInfo() - } - nodeNameToInfo[node.Name].SetNode(node) - } + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) list, err := BalancedResourceAllocation(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) @@ -884,7 +866,7 @@ func TestImageLocalityPriority(t *testing.T) { } for _, test := range tests { - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) list, err := ImageLocalityPriority(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go index 5f8c3f2ab61..302a7de11f0 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go @@ -276,7 +276,7 @@ func TestSelectorSpreadPriority(t *testing.T) { } for _, test := range tests { - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nil) selectorSpread := SelectorSpread{podLister: algorithm.FakePodLister(test.pods), serviceLister: algorithm.FakeServiceLister(test.services), controllerLister: algorithm.FakeControllerLister(test.rcs), replicaSetLister: algorithm.FakeReplicaSetLister(test.rss)} list, err := selectorSpread.CalculateSpreadPriority(test.pod, nodeNameToInfo, makeNodeList(test.nodes)) if err != nil { @@ -477,7 +477,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) { } for _, test := range tests { - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nil) selectorSpread := SelectorSpread{podLister: algorithm.FakePodLister(test.pods), serviceLister: algorithm.FakeServiceLister(test.services), controllerLister: algorithm.FakeControllerLister(test.rcs), replicaSetLister: algorithm.FakeReplicaSetLister(test.rss)} list, err := selectorSpread.CalculateSpreadPriority(test.pod, nodeNameToInfo, makeLabeledNodeList(labeledNodes)) if err != nil { @@ -649,7 +649,7 @@ func TestZoneSpreadPriority(t *testing.T) { } for _, test := range tests { - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nil) zoneSpread := ServiceAntiAffinity{podLister: algorithm.FakePodLister(test.pods), serviceLister: algorithm.FakeServiceLister(test.services), label: "zone"} list, err := zoneSpread.CalculateAntiAffinityPriority(test.pod, nodeNameToInfo, makeLabeledNodeList(test.nodes)) if err != nil { diff --git a/plugin/pkg/scheduler/algorithm/priorities/taint_toleration_test.go b/plugin/pkg/scheduler/algorithm/priorities/taint_toleration_test.go index e2f77ea72d0..f0b19a71204 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/taint_toleration_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/taint_toleration_test.go @@ -210,7 +210,7 @@ func TestTaintAndToleration(t *testing.T) { }, } for _, test := range tests { - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap([]*api.Pod{{}}) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, nil) list, err := ComputeTaintTolerationPriority(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("%s, unexpected error: %v", test.test, err) diff --git a/plugin/pkg/scheduler/schedulercache/node_info.go b/plugin/pkg/scheduler/schedulercache/node_info.go index 5d041b0cd0d..4f0bf9de070 100644 --- a/plugin/pkg/scheduler/schedulercache/node_info.go +++ b/plugin/pkg/scheduler/schedulercache/node_info.go @@ -93,6 +93,14 @@ func (n *NodeInfo) Pods() []*api.Pod { return n.pods } +// PodsWithAffinity return all pods with (anti)affinity constraints on this node. +func (n *NodeInfo) PodsWithAffinity() []*api.Pod { + if n == nil { + return nil + } + return n.podsWithAffinity +} + func (n *NodeInfo) AllowedPodNumber() int { if n == nil { return 0 diff --git a/plugin/pkg/scheduler/schedulercache/util.go b/plugin/pkg/scheduler/schedulercache/util.go index d39d0caf30b..12e6848bc57 100644 --- a/plugin/pkg/scheduler/schedulercache/util.go +++ b/plugin/pkg/scheduler/schedulercache/util.go @@ -20,16 +20,20 @@ import "k8s.io/kubernetes/pkg/api" // CreateNodeNameToInfoMap obtains a list of pods and pivots that list into a map where the keys are node names // and the values are the aggregated information for that node. -func CreateNodeNameToInfoMap(pods []*api.Pod) map[string]*NodeInfo { +func CreateNodeNameToInfoMap(pods []*api.Pod, nodes []*api.Node) map[string]*NodeInfo { nodeNameToInfo := make(map[string]*NodeInfo) for _, pod := range pods { nodeName := pod.Spec.NodeName - nodeInfo, ok := nodeNameToInfo[nodeName] - if !ok { - nodeInfo = NewNodeInfo() - nodeNameToInfo[nodeName] = nodeInfo + if _, ok := nodeNameToInfo[nodeName]; !ok { + nodeNameToInfo[nodeName] = NewNodeInfo() } - nodeInfo.addPod(pod) + nodeNameToInfo[nodeName].addPod(pod) + } + for _, node := range nodes { + if _, ok := nodeNameToInfo[node.Name]; !ok { + nodeNameToInfo[node.Name] = NewNodeInfo() + } + nodeNameToInfo[node.Name].SetNode(node) } return nodeNameToInfo } diff --git a/plugin/pkg/scheduler/testing/pods_to_cache.go b/plugin/pkg/scheduler/testing/pods_to_cache.go index 40281bc4cf6..2eec3c5c5c4 100644 --- a/plugin/pkg/scheduler/testing/pods_to_cache.go +++ b/plugin/pkg/scheduler/testing/pods_to_cache.go @@ -42,7 +42,7 @@ func (p PodsToCache) UpdateNode(oldNode, newNode *api.Node) error { return nil } func (p PodsToCache) RemoveNode(node *api.Node) error { return nil } func (p PodsToCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.NodeInfo) error { - infoMap = schedulercache.CreateNodeNameToInfoMap(p) + infoMap = schedulercache.CreateNodeNameToInfoMap(p, nil) return nil }