diff --git a/pkg/scheduler/algorithm/priorities/even_pods_spread.go b/pkg/scheduler/algorithm/priorities/even_pods_spread.go index 6045934f601..54022043655 100644 --- a/pkg/scheduler/algorithm/priorities/even_pods_spread.go +++ b/pkg/scheduler/algorithm/priorities/even_pods_spread.go @@ -15,3 +15,196 @@ limitations under the License. */ package priorities + +import ( + "context" + "sync" + "sync/atomic" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/util/workqueue" + "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" + schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" + schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" + + "k8s.io/klog" +) + +type topologyPair struct { + key string + value string +} + +type topologySpreadConstrantsMap struct { + // The first error that we faced. + firstError error + sync.Mutex + + // counts store the mapping from node name to so-far computed score of + // the node. + counts map[string]*int64 + // total number of matching pods on each qualified pair + total int64 + // topologyPairToNodeNames store the mapping from potential + // pair to node names + topologyPairToNodeNames map[topologyPair][]string +} + +func newTopologySpreadConstrantsMap(len int) *topologySpreadConstrantsMap { + return &topologySpreadConstrantsMap{ + counts: make(map[string]*int64, len), + topologyPairToNodeNames: make(map[topologyPair][]string), + } +} + +func (t *topologySpreadConstrantsMap) setError(err error) { + t.Lock() + if t.firstError == nil { + t.firstError = err + } + t.Unlock() +} + +func (t *topologySpreadConstrantsMap) initialize(pod *v1.Pod, nodes []*v1.Node) { + constraints := getSoftTopologySpreadConstraints(pod) + for _, node := range nodes { + labelSet := labels.Set(node.Labels) + allMatch := true + var pairs []topologyPair + for _, constraint := range constraints { + tpKey := constraint.TopologyKey + if !labelSet.Has(tpKey) { + allMatch = false + break + } + pairs = append(pairs, topologyPair{key: tpKey, value: node.Labels[tpKey]}) + } + if allMatch { + for _, pair := range pairs { + t.topologyPairToNodeNames[pair] = append(t.topologyPairToNodeNames[pair], node.Name) + } + t.counts[node.Name] = new(int64) + } + // for those nodes which don't have all required topologyKeys present, it's intentional to + // leave counts[nodeName] as nil, so that we're able to score these nodes to 0 afterwards + } +} + +// CalculateEvenPodsSpreadPriority computes a score by checking through the topologySpreadConstraints +// that are with WhenUnsatifiable=ScheduleAnyway (a.k.a soft constraint). +// For each node (not only "filtered" nodes by Predicates), it adds the number of matching pods +// (all topologySpreadConstraints must be satified) as a "weight" to any "filtered" node +// which has the pair present. +// Then the sumed "weight" are normalized to 0~10, and the node(s) with the highest score are +// the most preferred. +// Symmetry is not considered. +func CalculateEvenPodsSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) { + nodesLen := len(nodes) + result := make(schedulerapi.HostPriorityList, nodesLen) + // if incoming pod doesn't have soft topology spread constraints, return + constraints := getSoftTopologySpreadConstraints(pod) + if len(constraints) == 0 { + return result, nil + } + + t := newTopologySpreadConstrantsMap(len(nodes)) + t.initialize(pod, nodes) + + allNodeNames := make([]string, 0, len(nodeNameToInfo)) + for name := range nodeNameToInfo { + allNodeNames = append(allNodeNames, name) + } + + ctx, cancel := context.WithCancel(context.Background()) + processNode := func(i int) { + nodeInfo := nodeNameToInfo[allNodeNames[i]] + if node := nodeInfo.Node(); node != nil { + // (1) `node` should satisfy incoming pod's NodeSelector/NodeAffinity + // (2) All topologyKeys need to be present in `node` + if !predicates.PodMatchesNodeSelectorAndAffinityTerms(pod, node) || + !predicates.NodeLabelsMatchSpreadConstraints(node.Labels, constraints) { + return + } + matchCount := 0 + for _, existingPod := range nodeInfo.Pods() { + match, err := predicates.PodMatchesAllSpreadConstraints(existingPod, pod.Namespace, constraints) + if err != nil { + t.setError(err) + cancel() + return + } + if match { + matchCount++ + } + } + // add matchCount up to EACH node which is at least in one topology domain + // with current node + for _, constraint := range constraints { + tpKey := constraint.TopologyKey + pair := topologyPair{key: tpKey, value: node.Labels[tpKey]} + for _, nodeName := range t.topologyPairToNodeNames[pair] { + atomic.AddInt64(t.counts[nodeName], int64(matchCount)) + atomic.AddInt64(&t.total, int64(matchCount)) + } + } + } + } + workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processNode) + if t.firstError != nil { + return nil, t.firstError + } + + var maxCount, minCount int64 + for _, node := range nodes { + if t.counts[node.Name] == nil { + continue + } + // reverse + count := t.total - *t.counts[node.Name] + if count > maxCount { + maxCount = count + } else if count < minCount { + minCount = count + } + t.counts[node.Name] = &count + } + // calculate final priority score for each node + // TODO(Huang-Wei): in alpha version, we keep the formula as simple as possible. + // current version ranks the nodes properly, but it doesn't take MaxSkew into + // consideration, we may come up with a better formula in the future. + maxMinDiff := maxCount - minCount + for i := range nodes { + node := nodes[i] + result[i].Host = node.Name + if t.counts[node.Name] == nil { + result[i].Score = 0 + continue + } + if maxMinDiff == 0 { + result[i].Score = schedulerapi.MaxPriority + continue + } + fScore := float64(schedulerapi.MaxPriority) * (float64(*t.counts[node.Name]-minCount) / float64(maxMinDiff)) + // need to reverse b/c the more matching pods it has, the less qualified it is + // result[i].Score = schedulerapi.MaxPriority - int(fScore) + result[i].Score = int(fScore) + if klog.V(10) { + klog.Infof("%v -> %v: EvenPodsSpreadPriority, Score: (%d)", pod.Name, node.Name, int(fScore)) + } + } + + return result, nil +} + +// TODO(Huang-Wei): combine this with getHardTopologySpreadConstraints() in predicates package +func getSoftTopologySpreadConstraints(pod *v1.Pod) (constraints []v1.TopologySpreadConstraint) { + if pod != nil { + for _, constraint := range pod.Spec.TopologySpreadConstraints { + if constraint.WhenUnsatisfiable == v1.ScheduleAnyway { + constraints = append(constraints, constraint) + } + } + } + return +} diff --git a/pkg/scheduler/algorithm/priorities/even_pods_spread_test.go b/pkg/scheduler/algorithm/priorities/even_pods_spread_test.go new file mode 100644 index 00000000000..99225669b16 --- /dev/null +++ b/pkg/scheduler/algorithm/priorities/even_pods_spread_test.go @@ -0,0 +1,302 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package priorities + +import ( + "reflect" + "testing" + + "k8s.io/api/core/v1" + schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" + schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" + u "k8s.io/kubernetes/pkg/scheduler/util" +) + +func Test_topologySpreadConstrantsMap_initialize(t *testing.T) { + tests := []struct { + name string + pod *v1.Pod + nodes []*v1.Node + want map[topologyPair][]string + }{ + { + name: "normal case", + pod: u.MakePod().Name("p").Label("foo", ""). + SpreadConstraint(1, "zone", softSpread, u.MakeLabelSelector().Exists("foo").Obj()). + SpreadConstraint(1, "node", softSpread, u.MakeLabelSelector().Exists("foo").Obj()). + Obj(), + nodes: []*v1.Node{ + u.MakeNode().Name("node-a").Label("zone", "zone1").Label("node", "node-a").Obj(), + u.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(), + u.MakeNode().Name("node-x").Label("zone", "zone2").Label("node", "node-x").Obj(), + }, + want: map[topologyPair][]string{ + {key: "zone", value: "zone1"}: {"node-a", "node-b"}, + {key: "zone", value: "zone2"}: {"node-x"}, + {key: "node", value: "node-a"}: {"node-a"}, + {key: "node", value: "node-b"}: {"node-b"}, + {key: "node", value: "node-x"}: {"node-x"}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tMap := newTopologySpreadConstrantsMap(len(tt.nodes)) + tMap.initialize(tt.pod, tt.nodes) + if !reflect.DeepEqual(tMap.topologyPairToNodeNames, tt.want) { + t.Errorf("initilize().topologyPairToNodeNames = %#v, want %#v", tMap.topologyPairToNodeNames, tt.want) + } + }) + } +} + +func TestCalculateEvenPodsSpreadPriority(t *testing.T) { + tests := []struct { + name string + pod *v1.Pod + existingPods []*v1.Pod + nodes []*v1.Node + failedNodes []*v1.Node // nodes + failedNodes = all nodes + want schedulerapi.HostPriorityList + }{ + // Explanation on the Legend: + // a) X/Y means there are X matching pods on node1 and Y on node2, both nodes are candidates + // (i.e. they have passed all predicates) + // b) X/~Y~ means there are X matching pods on node1 and Y on node2, but node Y is NOT a candidate + // c) X/?Y? means there are X matching pods on node1 and Y on node2, both nodes are candidates + // but node2 either i) doesn't have all required topologyKeys present, or ii) doesn't match + // incoming pod's nodeSelector/nodeAffinity + { + // if there is only one candidate node, it should be scored to 10 + name: "one constraint on node, no existing pods", + pod: u.MakePod().Name("p").Label("foo", ""). + SpreadConstraint(1, "node", softSpread, u.MakeLabelSelector().Exists("foo").Obj()). + Obj(), + nodes: []*v1.Node{ + u.MakeNode().Name("node-a").Label("node", "node-a").Obj(), + u.MakeNode().Name("node-b").Label("node", "node-b").Obj(), + }, + want: []schedulerapi.HostPriority{ + {Host: "node-a", Score: 10}, + {Host: "node-b", Score: 10}, + }, + }, + { + // if there is only one candidate node, it should be scored to 10 + name: "one constraint on node, only one node is candidate", + pod: u.MakePod().Name("p").Label("foo", ""). + SpreadConstraint(1, "node", softSpread, u.MakeLabelSelector().Exists("foo").Obj()). + Obj(), + existingPods: []*v1.Pod{ + u.MakePod().Name("p-a1").Node("node-a").Label("foo", "").Obj(), + u.MakePod().Name("p-a2").Node("node-a").Label("foo", "").Obj(), + u.MakePod().Name("p-b1").Node("node-b").Label("foo", "").Obj(), + }, + nodes: []*v1.Node{ + u.MakeNode().Name("node-a").Label("node", "node-a").Obj(), + }, + failedNodes: []*v1.Node{ + u.MakeNode().Name("node-b").Label("node", "node-b").Obj(), + }, + want: []schedulerapi.HostPriority{ + {Host: "node-a", Score: 10}, + }, + }, + { + // matching pods spread as 2/1/0/3, total = 6 + // after reversing, it's 4/5/6/3 + // so scores = 40/6, 50/6, 60/6, 30/6 + name: "one constraint on node, all 4 nodes are candidates", + pod: u.MakePod().Name("p").Label("foo", ""). + SpreadConstraint(1, "node", softSpread, u.MakeLabelSelector().Exists("foo").Obj()). + Obj(), + existingPods: []*v1.Pod{ + u.MakePod().Name("p-a1").Node("node-a").Label("foo", "").Obj(), + u.MakePod().Name("p-a2").Node("node-a").Label("foo", "").Obj(), + u.MakePod().Name("p-b1").Node("node-b").Label("foo", "").Obj(), + u.MakePod().Name("p-d1").Node("node-d").Label("foo", "").Obj(), + u.MakePod().Name("p-d2").Node("node-d").Label("foo", "").Obj(), + u.MakePod().Name("p-d3").Node("node-d").Label("foo", "").Obj(), + }, + nodes: []*v1.Node{ + u.MakeNode().Name("node-a").Label("node", "node-a").Obj(), + u.MakeNode().Name("node-b").Label("node", "node-b").Obj(), + u.MakeNode().Name("node-c").Label("node", "node-c").Obj(), + u.MakeNode().Name("node-d").Label("node", "node-d").Obj(), + }, + failedNodes: []*v1.Node{}, + want: []schedulerapi.HostPriority{ + {Host: "node-a", Score: 6}, + {Host: "node-b", Score: 8}, + {Host: "node-c", Score: 10}, + {Host: "node-d", Score: 5}, + }, + }, + { + // matching pods spread as 4/2/1/~3~, total = 4+2+1 = 7 (as node4 is not a candidate) + // after reversing, it's 3/5/6 + // so scores = 30/6, 50/6, 60/6 + name: "one constraint on node, 3 out of 4 nodes are candidates", + pod: u.MakePod().Name("p").Label("foo", ""). + SpreadConstraint(1, "node", softSpread, u.MakeLabelSelector().Exists("foo").Obj()). + Obj(), + existingPods: []*v1.Pod{ + u.MakePod().Name("p-a1").Node("node-a").Label("foo", "").Obj(), + u.MakePod().Name("p-a2").Node("node-a").Label("foo", "").Obj(), + u.MakePod().Name("p-a3").Node("node-a").Label("foo", "").Obj(), + u.MakePod().Name("p-a4").Node("node-a").Label("foo", "").Obj(), + u.MakePod().Name("p-b1").Node("node-b").Label("foo", "").Obj(), + u.MakePod().Name("p-b2").Node("node-b").Label("foo", "").Obj(), + u.MakePod().Name("p-x1").Node("node-x").Label("foo", "").Obj(), + u.MakePod().Name("p-y1").Node("node-y").Label("foo", "").Obj(), + u.MakePod().Name("p-y2").Node("node-y").Label("foo", "").Obj(), + u.MakePod().Name("p-y3").Node("node-y").Label("foo", "").Obj(), + }, + nodes: []*v1.Node{ + u.MakeNode().Name("node-a").Label("node", "node-a").Obj(), + u.MakeNode().Name("node-b").Label("node", "node-b").Obj(), + u.MakeNode().Name("node-x").Label("node", "node-x").Obj(), + }, + failedNodes: []*v1.Node{ + u.MakeNode().Name("node-y").Label("node", "node-y").Obj(), + }, + want: []schedulerapi.HostPriority{ + {Host: "node-a", Score: 5}, + {Host: "node-b", Score: 8}, + {Host: "node-x", Score: 10}, + }, + }, + { + // matching pods spread as 4/?2?/1/~3~, total = 4+?+1 = 5 (as node2 is problematic) + // after reversing, it's 1/?/4 + // so scores = 10/4, 0, 40/4 + name: "one constraint on node, 3 out of 4 nodes are candidates", + pod: u.MakePod().Name("p").Label("foo", ""). + SpreadConstraint(1, "node", softSpread, u.MakeLabelSelector().Exists("foo").Obj()). + Obj(), + existingPods: []*v1.Pod{ + u.MakePod().Name("p-a1").Node("node-a").Label("foo", "").Obj(), + u.MakePod().Name("p-a2").Node("node-a").Label("foo", "").Obj(), + u.MakePod().Name("p-a3").Node("node-a").Label("foo", "").Obj(), + u.MakePod().Name("p-a4").Node("node-a").Label("foo", "").Obj(), + u.MakePod().Name("p-b1").Node("node-b").Label("foo", "").Obj(), + u.MakePod().Name("p-b2").Node("node-b").Label("foo", "").Obj(), + u.MakePod().Name("p-x1").Node("node-x").Label("foo", "").Obj(), + u.MakePod().Name("p-y1").Node("node-y").Label("foo", "").Obj(), + u.MakePod().Name("p-y2").Node("node-y").Label("foo", "").Obj(), + u.MakePod().Name("p-y3").Node("node-y").Label("foo", "").Obj(), + }, + nodes: []*v1.Node{ + u.MakeNode().Name("node-a").Label("node", "node-a").Obj(), + u.MakeNode().Name("node-b").Label("n", "node-b").Obj(), // label `n` doesn't match topologyKey + u.MakeNode().Name("node-x").Label("node", "node-x").Obj(), + }, + failedNodes: []*v1.Node{ + u.MakeNode().Name("node-y").Label("node", "node-y").Obj(), + }, + want: []schedulerapi.HostPriority{ + {Host: "node-a", Score: 2}, + {Host: "node-b", Score: 0}, + {Host: "node-x", Score: 10}, + }, + }, + { + // matching pods spread as 4/2/1/~3~, total = 6+6+4 = 16 (as topologyKey is zone instead of node) + // after reversing, it's 10/10/12 + // so scores = 100/12, 100/12, 120/12 + name: "one constraint on zone, 3 out of 4 nodes are candidates", + pod: u.MakePod().Name("p").Label("foo", ""). + SpreadConstraint(1, "zone", softSpread, u.MakeLabelSelector().Exists("foo").Obj()). + Obj(), + existingPods: []*v1.Pod{ + u.MakePod().Name("p-a1").Node("node-a").Label("foo", "").Obj(), + u.MakePod().Name("p-a2").Node("node-a").Label("foo", "").Obj(), + u.MakePod().Name("p-a3").Node("node-a").Label("foo", "").Obj(), + u.MakePod().Name("p-a4").Node("node-a").Label("foo", "").Obj(), + u.MakePod().Name("p-b1").Node("node-b").Label("foo", "").Obj(), + u.MakePod().Name("p-b2").Node("node-b").Label("foo", "").Obj(), + u.MakePod().Name("p-x1").Node("node-x").Label("foo", "").Obj(), + u.MakePod().Name("p-y1").Node("node-y").Label("foo", "").Obj(), + u.MakePod().Name("p-y2").Node("node-y").Label("foo", "").Obj(), + u.MakePod().Name("p-y3").Node("node-y").Label("foo", "").Obj(), + }, + nodes: []*v1.Node{ + u.MakeNode().Name("node-a").Label("zone", "zone1").Label("node", "node-a").Obj(), + u.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(), + u.MakeNode().Name("node-x").Label("zone", "zone2").Label("node", "node-x").Obj(), + }, + failedNodes: []*v1.Node{ + u.MakeNode().Name("node-y").Label("zone", "zone2").Label("node", "node-y").Obj(), + }, + want: []schedulerapi.HostPriority{ + {Host: "node-a", Score: 8}, + {Host: "node-b", Score: 8}, + {Host: "node-x", Score: 10}, + }, + }, + { + // matching pods spread as 2/~1~/2/~4~, total = 2+3 + 2+6 = 13 (zone and node should be both sumed up) + // after reversing, it's 8/5 + // so scores = 80/8, 50/8 + name: "two constraint on zone and node, 2 out of 4 nodes are candidates", + pod: u.MakePod().Name("p").Label("foo", ""). + SpreadConstraint(1, "zone", softSpread, u.MakeLabelSelector().Exists("foo").Obj()). + SpreadConstraint(1, "node", softSpread, u.MakeLabelSelector().Exists("foo").Obj()). + Obj(), + existingPods: []*v1.Pod{ + u.MakePod().Name("p-a1").Node("node-a").Label("foo", "").Obj(), + u.MakePod().Name("p-a2").Node("node-a").Label("foo", "").Obj(), + u.MakePod().Name("p-b1").Node("node-b").Label("foo", "").Obj(), + u.MakePod().Name("p-x1").Node("node-x").Label("foo", "").Obj(), + u.MakePod().Name("p-x2").Node("node-x").Label("foo", "").Obj(), + u.MakePod().Name("p-y1").Node("node-y").Label("foo", "").Obj(), + u.MakePod().Name("p-y2").Node("node-y").Label("foo", "").Obj(), + u.MakePod().Name("p-y3").Node("node-y").Label("foo", "").Obj(), + u.MakePod().Name("p-y4").Node("node-y").Label("foo", "").Obj(), + }, + nodes: []*v1.Node{ + u.MakeNode().Name("node-a").Label("zone", "zone1").Label("node", "node-a").Obj(), + u.MakeNode().Name("node-x").Label("zone", "zone2").Label("node", "node-x").Obj(), + }, + failedNodes: []*v1.Node{ + u.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(), + u.MakeNode().Name("node-y").Label("zone", "zone2").Label("node", "node-y").Obj(), + }, + want: []schedulerapi.HostPriority{ + {Host: "node-a", Score: 10}, + {Host: "node-x", Score: 6}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + allNodes := append([]*v1.Node{}, tt.nodes...) + allNodes = append(allNodes, tt.failedNodes...) + nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(tt.existingPods, allNodes) + + got, _ := CalculateEvenPodsSpreadPriority(tt.pod, nodeNameToInfo, tt.nodes) + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("CalculateEvenPodsSpreadPriority() = %#v, want %#v", got, tt.want) + } + }) + } +} + +var ( + hardSpread = v1.DoNotSchedule + softSpread = v1.ScheduleAnyway +)