diff --git a/pkg/scheduler/algorithm/predicates/metadata.go b/pkg/scheduler/algorithm/predicates/metadata.go index 2b5111c6f69..f6180eb6f96 100644 --- a/pkg/scheduler/algorithm/predicates/metadata.go +++ b/pkg/scheduler/algorithm/predicates/metadata.go @@ -245,7 +245,7 @@ func getTPMapMatchingSpreadConstraints(pod *v1.Pod, nodeInfoMap map[string]*sche if existingPod.Namespace != pod.Namespace { continue } - ok, err := podMatchesSpreadConstraint(existingPod.Labels, constraint) + ok, err := PodMatchesSpreadConstraint(existingPod.Labels, constraint) if err != nil { errCh.SendErrorWithCancel(err, cancel) return @@ -304,10 +304,11 @@ func getHardTopologySpreadConstraints(pod *v1.Pod) (constraints []v1.TopologySpr return } -// some corner cases: +// PodMatchesSpreadConstraint verifies if matches . +// Some corner cases: // 1. podLabelSet = nil => returns (false, nil) // 2. constraint.LabelSelector = nil => returns (false, nil) -func podMatchesSpreadConstraint(podLabelSet labels.Set, constraint v1.TopologySpreadConstraint) (bool, error) { +func PodMatchesSpreadConstraint(podLabelSet labels.Set, constraint v1.TopologySpreadConstraint) (bool, error) { selector, err := metav1.LabelSelectorAsSelector(constraint.LabelSelector) if err != nil { return false, err @@ -318,7 +319,7 @@ func podMatchesSpreadConstraint(podLabelSet labels.Set, constraint v1.TopologySp return true, nil } -// check if ALL topology keys in spread constraints are present in node labels +// NodeLabelsMatchSpreadConstraints checks if ALL topology keys in spread constraints are present in node labels. func NodeLabelsMatchSpreadConstraints(nodeLabels map[string]string, constraints []v1.TopologySpreadConstraint) bool { for _, constraint := range constraints { if _, ok := nodeLabels[constraint.TopologyKey]; !ok { @@ -396,7 +397,7 @@ func (m *topologyPairsPodSpreadMap) addPod(addedPod, preemptorPod *v1.Pod, node minMatchNeedingUpdate := make(map[string]struct{}) podLabelSet := labels.Set(addedPod.Labels) for _, constraint := range constraints { - if match, err := podMatchesSpreadConstraint(podLabelSet, constraint); err != nil { + if match, err := PodMatchesSpreadConstraint(podLabelSet, constraint); err != nil { return err } else if !match { continue diff --git a/pkg/scheduler/algorithm/predicates/metadata_test.go b/pkg/scheduler/algorithm/predicates/metadata_test.go index 911c05922bb..70d80f9fc69 100644 --- a/pkg/scheduler/algorithm/predicates/metadata_test.go +++ b/pkg/scheduler/algorithm/predicates/metadata_test.go @@ -904,12 +904,12 @@ func TestPodMatchesSpreadConstraint(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { podLabelSet := labels.Set(tt.podLabels) - got, err := podMatchesSpreadConstraint(podLabelSet, tt.constraint) + got, err := PodMatchesSpreadConstraint(podLabelSet, tt.constraint) if (err != nil) != tt.wantErr { - t.Errorf("podMatchesSpreadConstraint() error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("PodMatchesSpreadConstraint() error = %v, wantErr %v", err, tt.wantErr) } if got != tt.want { - t.Errorf("podMatchesSpreadConstraint() = %v, want %v", got, tt.want) + t.Errorf("PodMatchesSpreadConstraint() = %v, want %v", got, tt.want) } }) } diff --git a/pkg/scheduler/algorithm/predicates/predicates.go b/pkg/scheduler/algorithm/predicates/predicates.go index 6e265b14b59..2f6cbe3e76f 100644 --- a/pkg/scheduler/algorithm/predicates/predicates.go +++ b/pkg/scheduler/algorithm/predicates/predicates.go @@ -1748,7 +1748,7 @@ func EvenPodsSpreadPredicate(pod *v1.Pod, meta PredicateMetadata, nodeInfo *sche return false, []PredicateFailureReason{ErrTopologySpreadConstraintsNotMatch}, nil } - selfMatch, err := podMatchesSpreadConstraint(podLabelSet, constraint) + selfMatch, err := PodMatchesSpreadConstraint(podLabelSet, constraint) if err != nil { return false, nil, err } diff --git a/pkg/scheduler/algorithm/priorities/even_pods_spread.go b/pkg/scheduler/algorithm/priorities/even_pods_spread.go index 33559be1502..b02309ee3bb 100644 --- a/pkg/scheduler/algorithm/priorities/even_pods_spread.go +++ b/pkg/scheduler/algorithm/priorities/even_pods_spread.go @@ -18,7 +18,6 @@ package priorities import ( "context" - "sync" "sync/atomic" "k8s.io/api/core/v1" @@ -27,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" + schedutil "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/klog" ) @@ -36,14 +36,9 @@ type topologyPair struct { value string } -type topologySpreadConstrantsMap struct { - // The first error that we faced. - firstError error - sync.Mutex - - // counts store the mapping from node name to so-far computed score of - // the node. - counts map[string]*int64 +type topologySpreadConstraintsMap struct { + // podCounts is keyed with node name, and valued with the number of matching pods. + podCounts map[string]*int64 // total number of matching pods on each qualified pair total int64 // topologyPairToNodeNames store the mapping from potential @@ -51,64 +46,57 @@ type topologySpreadConstrantsMap struct { topologyPairToNodeNames map[topologyPair][]string } -func newTopologySpreadConstrantsMap(len int) *topologySpreadConstrantsMap { - return &topologySpreadConstrantsMap{ - counts: make(map[string]*int64, len), +func newTopologySpreadConstraintsMap(len int) *topologySpreadConstraintsMap { + return &topologySpreadConstraintsMap{ + podCounts: make(map[string]*int64, len), topologyPairToNodeNames: make(map[topologyPair][]string), } } -func (t *topologySpreadConstrantsMap) setError(err error) { - t.Lock() - if t.firstError == nil { - t.firstError = err - } - t.Unlock() -} - -func (t *topologySpreadConstrantsMap) initialize(pod *v1.Pod, nodes []*v1.Node) { +func (t *topologySpreadConstraintsMap) initialize(pod *v1.Pod, nodes []*v1.Node) { constraints := getSoftTopologySpreadConstraints(pod) for _, node := range nodes { - labelSet := labels.Set(node.Labels) - allMatch := true + match := true var pairs []topologyPair for _, constraint := range constraints { tpKey := constraint.TopologyKey - if !labelSet.Has(tpKey) { - allMatch = false + if _, ok := node.Labels[tpKey]; !ok { + // Current node isn't qualified for the soft constraints, + // so break here and the node will hold default value (nil). + match = false break } pairs = append(pairs, topologyPair{key: tpKey, value: node.Labels[tpKey]}) } - if allMatch { + if match { for _, pair := range pairs { t.topologyPairToNodeNames[pair] = append(t.topologyPairToNodeNames[pair], node.Name) } - t.counts[node.Name] = new(int64) + t.podCounts[node.Name] = new(int64) } - // for those nodes which don't have all required topologyKeys present, it's intentional to - // leave counts[nodeName] as nil, so that we're able to score these nodes to 0 afterwards + // For those nodes which don't have all required topologyKeys present, it's intentional to + // leave podCounts[nodeName] as nil, so that we're able to score these nodes to 0 afterwards. } } // CalculateEvenPodsSpreadPriority computes a score by checking through the topologySpreadConstraints -// that are with WhenUnsatifiable=ScheduleAnyway (a.k.a soft constraint). -// For each node (not only "filtered" nodes by Predicates), it adds the number of matching pods -// (all topologySpreadConstraints must be satified) as a "weight" to any "filtered" node -// which has the pair present. -// Then the sumed "weight" are normalized to 0~10, and the node(s) with the highest score are -// the most preferred. -// Symmetry is not considered. +// that are with WhenUnsatisfiable=ScheduleAnyway (a.k.a soft constraint). +// The function works as below: +// 1) In all nodes, calculate the number of pods which match 's soft topology spread constraints. +// 2) Sum up the number to each node in which has corresponding topologyPair present. +// 3) Finally normalize the number to 0~10. The node with the highest score is the most preferred. +// Note: Symmetry is not applicable. We only weigh how incomingPod matches existingPod. +// Whether existingPod matches incomingPod doesn't contribute to the final score. +// This is different with the Affinity API. func CalculateEvenPodsSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) { - nodesLen := len(nodes) - result := make(schedulerapi.HostPriorityList, nodesLen) - // if incoming pod doesn't have soft topology spread constraints, return + result := make(schedulerapi.HostPriorityList, len(nodes)) + // return if incoming pod doesn't have soft topology spread constraints. constraints := getSoftTopologySpreadConstraints(pod) if len(constraints) == 0 { return result, nil } - t := newTopologySpreadConstrantsMap(len(nodes)) + t := newTopologySpreadConstraintsMap(len(nodes)) t.initialize(pod, nodes) allNodeNames := make([]string, 0, len(nodeNameToInfo)) @@ -116,58 +104,66 @@ func CalculateEvenPodsSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*sch allNodeNames = append(allNodeNames, name) } + errCh := schedutil.NewErrorChannel() ctx, cancel := context.WithCancel(context.Background()) processNode := func(i int) { nodeInfo := nodeNameToInfo[allNodeNames[i]] - if node := nodeInfo.Node(); node != nil { - // (1) `node` should satisfy incoming pod's NodeSelector/NodeAffinity - // (2) All topologyKeys need to be present in `node` - if !predicates.PodMatchesNodeSelectorAndAffinityTerms(pod, node) || - !predicates.NodeLabelsMatchSpreadConstraints(node.Labels, constraints) { - return - } - matchCount := 0 - for _, existingPod := range nodeInfo.Pods() { - match, err := predicates.PodMatchesAllSpreadConstraints(existingPod, pod.Namespace, constraints) + node := nodeInfo.Node() + if node == nil { + return + } + // (1) `node` should satisfy incoming pod's NodeSelector/NodeAffinity + // (2) All topologyKeys need to be present in `node` + if !predicates.PodMatchesNodeSelectorAndAffinityTerms(pod, node) || + !predicates.NodeLabelsMatchSpreadConstraints(node.Labels, constraints) { + return + } + // It's enough to use topologyKey as the "key" of the map. + matchCount := make(map[string]int64) + for _, existingPod := range nodeInfo.Pods() { + podLabelSet := labels.Set(existingPod.Labels) + // Matching on constraints is calculated independently. + for _, constraint := range constraints { + match, err := predicates.PodMatchesSpreadConstraint(podLabelSet, constraint) if err != nil { - t.setError(err) - cancel() + errCh.SendErrorWithCancel(err, cancel) return } if match { - matchCount++ + matchCount[constraint.TopologyKey]++ } } - // add matchCount up to EACH node which is at least in one topology domain - // with current node - for _, constraint := range constraints { - tpKey := constraint.TopologyKey - pair := topologyPair{key: tpKey, value: node.Labels[tpKey]} - for _, nodeName := range t.topologyPairToNodeNames[pair] { - atomic.AddInt64(t.counts[nodeName], int64(matchCount)) - atomic.AddInt64(&t.total, int64(matchCount)) - } + } + // Keys in t.podCounts have been ensured to contain "filtered" nodes only. + for _, constraint := range constraints { + tpKey := constraint.TopologyKey + pair := topologyPair{key: tpKey, value: node.Labels[tpKey]} + // For each , all matched nodes get the credit of summed matchCount. + // And we add matchCount to to reverse the final score later. + for _, nodeName := range t.topologyPairToNodeNames[pair] { + atomic.AddInt64(t.podCounts[nodeName], matchCount[tpKey]) + atomic.AddInt64(&t.total, matchCount[tpKey]) } } } workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processNode) - if t.firstError != nil { - return nil, t.firstError + if err := errCh.ReceiveError(); err != nil { + return nil, err } var maxCount, minCount int64 for _, node := range nodes { - if t.counts[node.Name] == nil { + if t.podCounts[node.Name] == nil { continue } // reverse - count := t.total - *t.counts[node.Name] + count := t.total - *t.podCounts[node.Name] if count > maxCount { maxCount = count } else if count < minCount { minCount = count } - t.counts[node.Name] = &count + t.podCounts[node.Name] = &count } // calculate final priority score for each node // TODO(Huang-Wei): in alpha version, we keep the formula as simple as possible. @@ -186,7 +182,7 @@ func CalculateEvenPodsSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*sch }(&result[i].Score, node.Name) } - if t.counts[node.Name] == nil { + if t.podCounts[node.Name] == nil { result[i].Score = 0 continue } @@ -194,9 +190,7 @@ func CalculateEvenPodsSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*sch result[i].Score = schedulerapi.MaxPriority continue } - fScore := float64(schedulerapi.MaxPriority) * (float64(*t.counts[node.Name]-minCount) / float64(maxMinDiff)) - // need to reverse b/c the more matching pods it has, the less qualified it is - // result[i].Score = schedulerapi.MaxPriority - int(fScore) + fScore := float64(schedulerapi.MaxPriority) * (float64(*t.podCounts[node.Name]-minCount) / float64(maxMinDiff)) result[i].Score = int(fScore) } diff --git a/pkg/scheduler/algorithm/priorities/even_pods_spread_test.go b/pkg/scheduler/algorithm/priorities/even_pods_spread_test.go index 8db66cf904c..a9172c96b4c 100644 --- a/pkg/scheduler/algorithm/priorities/even_pods_spread_test.go +++ b/pkg/scheduler/algorithm/priorities/even_pods_spread_test.go @@ -26,7 +26,7 @@ import ( st "k8s.io/kubernetes/pkg/scheduler/testing" ) -func Test_topologySpreadConstrantsMap_initialize(t *testing.T) { +func Test_topologySpreadConstraintsMap_initialize(t *testing.T) { tests := []struct { name string pod *v1.Pod @@ -52,10 +52,27 @@ func Test_topologySpreadConstrantsMap_initialize(t *testing.T) { {key: "node", value: "node-x"}: {"node-x"}, }, }, + { + name: "node-x doesn't have label zone", + pod: st.MakePod().Name("p").Label("foo", ""). + SpreadConstraint(1, "zone", softSpread, st.MakeLabelSelector().Exists("foo").Obj()). + SpreadConstraint(1, "node", softSpread, st.MakeLabelSelector().Exists("bar").Obj()). + Obj(), + nodes: []*v1.Node{ + st.MakeNode().Name("node-a").Label("zone", "zone1").Label("node", "node-a").Obj(), + st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(), + st.MakeNode().Name("node-x").Label("node", "node-x").Obj(), + }, + want: map[topologyPair][]string{ + {key: "zone", value: "zone1"}: {"node-a", "node-b"}, + {key: "node", value: "node-a"}: {"node-a"}, + {key: "node", value: "node-b"}: {"node-b"}, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tMap := newTopologySpreadConstrantsMap(len(tt.nodes)) + tMap := newTopologySpreadConstraintsMap(len(tt.nodes)) tMap.initialize(tt.pod, tt.nodes) if !reflect.DeepEqual(tMap.topologyPairToNodeNames, tt.want) { t.Errorf("initilize().topologyPairToNodeNames = %#v, want %#v", tMap.topologyPairToNodeNames, tt.want) @@ -249,10 +266,10 @@ func TestCalculateEvenPodsSpreadPriority(t *testing.T) { }, }, { - // matching pods spread as 2/~1~/2/~4~, total = 2+3 + 2+6 = 13 (zone and node should be both sumed up) + // matching pods spread as 2/~1~/2/~4~, total = 2+3 + 2+6 = 13 (zone and node should be both summed up) // after reversing, it's 8/5 // so scores = 80/8, 50/8 - name: "two constraint on zone and node, 2 out of 4 nodes are candidates", + name: "two constraints on zone and node, 2 out of 4 nodes are candidates", pod: st.MakePod().Name("p").Label("foo", ""). SpreadConstraint(1, "zone", softSpread, st.MakeLabelSelector().Exists("foo").Obj()). SpreadConstraint(1, "node", softSpread, st.MakeLabelSelector().Exists("foo").Obj()). @@ -281,6 +298,76 @@ func TestCalculateEvenPodsSpreadPriority(t *testing.T) { {Host: "node-x", Score: 6}, }, }, + { + // If constraints hold different labelSelectors, it's a little complex. + // +----------------------+------------------------+ + // | zone1 | zone2 | + // +----------------------+------------------------+ + // | node-a | node-b | node-x | node-y | + // +--------+-------------+--------+---------------+ + // | P{foo} | P{foo, bar} | | P{foo} P{bar} | + // +--------+-------------+--------+---------------+ + // For the first constraint (zone): the matching pods spread as 2/2/1/1 + // For the second constraint (node): the matching pods spread as 0/1/0/1 + // sum them up gets: 2/3/1/2, and total number is 8. + // after reversing, it's 6/5/7/6 + // so scores = 60/7, 50/7, 70/7, 60/7 + name: "two constraints on zone and node, with different labelSelectors", + pod: st.MakePod().Name("p").Label("foo", "").Label("bar", ""). + SpreadConstraint(1, "zone", softSpread, st.MakeLabelSelector().Exists("foo").Obj()). + SpreadConstraint(1, "node", softSpread, st.MakeLabelSelector().Exists("bar").Obj()). + Obj(), + existingPods: []*v1.Pod{ + st.MakePod().Name("p-a1").Node("node-a").Label("foo", "").Obj(), + st.MakePod().Name("p-b1").Node("node-b").Label("foo", "").Label("bar", "").Obj(), + st.MakePod().Name("p-y1").Node("node-y").Label("foo", "").Obj(), + st.MakePod().Name("p-y2").Node("node-y").Label("bar", "").Obj(), + }, + nodes: []*v1.Node{ + st.MakeNode().Name("node-a").Label("zone", "zone1").Label("node", "node-a").Obj(), + st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(), + st.MakeNode().Name("node-x").Label("zone", "zone2").Label("node", "node-x").Obj(), + st.MakeNode().Name("node-y").Label("zone", "zone2").Label("node", "node-y").Obj(), + }, + failedNodes: []*v1.Node{}, + want: []schedulerapi.HostPriority{ + {Host: "node-a", Score: 8}, + {Host: "node-b", Score: 7}, + {Host: "node-x", Score: 10}, + {Host: "node-y", Score: 8}, + }, + }, + { + // For the first constraint (zone): the matching pods spread as 2/2/1/~1~ + // For the second constraint (node): the matching pods spread as 0/1/0/~1~ + // sum them up gets: 2/3/1, and total number is 6. + // after reversing, it's 4/3/5 + // so scores = 40/5, 30/5, 50/5 + name: "two constraints on zone and node, with different labelSelectors, 3 out of 4 nodes are candidates", + pod: st.MakePod().Name("p").Label("foo", "").Label("bar", ""). + SpreadConstraint(1, "zone", softSpread, st.MakeLabelSelector().Exists("foo").Obj()). + SpreadConstraint(1, "node", softSpread, st.MakeLabelSelector().Exists("bar").Obj()). + Obj(), + existingPods: []*v1.Pod{ + st.MakePod().Name("p-a1").Node("node-a").Label("foo", "").Obj(), + st.MakePod().Name("p-b1").Node("node-b").Label("foo", "").Label("bar", "").Obj(), + st.MakePod().Name("p-y1").Node("node-y").Label("foo", "").Obj(), + st.MakePod().Name("p-y2").Node("node-y").Label("bar", "").Obj(), + }, + nodes: []*v1.Node{ + st.MakeNode().Name("node-a").Label("zone", "zone1").Label("node", "node-a").Obj(), + st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(), + st.MakeNode().Name("node-x").Label("zone", "zone2").Label("node", "node-x").Obj(), + }, + failedNodes: []*v1.Node{ + st.MakeNode().Name("node-y").Label("zone", "zone2").Label("node", "node-y").Obj(), + }, + want: []schedulerapi.HostPriority{ + {Host: "node-a", Score: 8}, + {Host: "node-b", Score: 6}, + {Host: "node-x", Score: 10}, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {