EvenPodsSpread: optimize Priority logic

This commit is contained in:
Wei Huang 2019-07-25 15:18:20 -07:00
parent 26a45b2bd3
commit 762a7113a7
No known key found for this signature in database
GPG Key ID: BE5E9752F8B6E005
2 changed files with 141 additions and 81 deletions

View File

@ -18,10 +18,10 @@ package priorities
import ( import (
"context" "context"
"math"
"sync/atomic" "sync/atomic"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
@ -37,45 +37,39 @@ type topologyPair struct {
} }
type topologySpreadConstraintsMap struct { type topologySpreadConstraintsMap struct {
// podCounts is keyed with node name, and valued with the number of matching pods. // nodeNameToPodCounts is keyed with node name, and valued with the number of matching pods.
podCounts map[string]*int64 nodeNameToPodCounts map[string]int64
// total number of matching pods on each qualified <topologyKey:value> pair // topologyPairToPodCounts is keyed with topologyPair, and valued with the number of matching pods.
total int64 topologyPairToPodCounts map[topologyPair]*int64
// topologyPairToNodeNames store the mapping from potential <topologyKey:value>
// pair to node names
topologyPairToNodeNames map[topologyPair][]string
} }
func newTopologySpreadConstraintsMap(len int) *topologySpreadConstraintsMap { func newTopologySpreadConstraintsMap() *topologySpreadConstraintsMap {
return &topologySpreadConstraintsMap{ return &topologySpreadConstraintsMap{
podCounts: make(map[string]*int64, len), nodeNameToPodCounts: make(map[string]int64),
topologyPairToNodeNames: make(map[topologyPair][]string), topologyPairToPodCounts: make(map[topologyPair]*int64),
} }
} }
// Note: the <nodes> passed in are the "filtered" nodes which have passed Predicates.
// This function iterates <nodes> to filter out the nodes which don't have required topologyKey(s),
// and initialize two maps:
// 1) t.topologyPairToPodCounts: keyed with both eligible topology pair and node names.
// 2) t.nodeNameToPodCounts: keyed with node name, and valued with a *int64 pointer for eligible node only.
func (t *topologySpreadConstraintsMap) initialize(pod *v1.Pod, nodes []*v1.Node) { func (t *topologySpreadConstraintsMap) initialize(pod *v1.Pod, nodes []*v1.Node) {
constraints := getSoftTopologySpreadConstraints(pod) constraints := getSoftTopologySpreadConstraints(pod)
for _, node := range nodes { for _, node := range nodes {
match := true if !predicates.NodeLabelsMatchSpreadConstraints(node.Labels, constraints) {
var pairs []topologyPair continue
}
for _, constraint := range constraints { for _, constraint := range constraints {
tpKey := constraint.TopologyKey pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]}
if _, ok := node.Labels[tpKey]; !ok { if t.topologyPairToPodCounts[pair] == nil {
// Current node isn't qualified for the soft constraints, t.topologyPairToPodCounts[pair] = new(int64)
// so break here and the node will hold default value (nil).
match = false
break
} }
pairs = append(pairs, topologyPair{key: tpKey, value: node.Labels[tpKey]})
} }
if match { t.nodeNameToPodCounts[node.Name] = 0
for _, pair := range pairs { // For those nodes which don't have all required topologyKeys present, it's intentional to keep
t.topologyPairToNodeNames[pair] = append(t.topologyPairToNodeNames[pair], node.Name) // those entries absent in nodeNameToPodCounts, so that we're able to score them to 0 afterwards.
}
t.podCounts[node.Name] = new(int64)
}
// For those nodes which don't have all required topologyKeys present, it's intentional to
// leave podCounts[nodeName] as nil, so that we're able to score these nodes to 0 afterwards.
} }
} }
@ -83,11 +77,11 @@ func (t *topologySpreadConstraintsMap) initialize(pod *v1.Pod, nodes []*v1.Node)
// that are with WhenUnsatisfiable=ScheduleAnyway (a.k.a soft constraint). // that are with WhenUnsatisfiable=ScheduleAnyway (a.k.a soft constraint).
// The function works as below: // The function works as below:
// 1) In all nodes, calculate the number of pods which match <pod>'s soft topology spread constraints. // 1) In all nodes, calculate the number of pods which match <pod>'s soft topology spread constraints.
// 2) Sum up the number to each node in <nodes> which has corresponding topologyPair present. // 2) Group the number calculated in 1) by topologyPair, and sum up to corresponding candidate nodes.
// 3) Finally normalize the number to 0~10. The node with the highest score is the most preferred. // 3) Finally normalize the number to 0~10. The node with the highest score is the most preferred.
// Note: Symmetry is not applicable. We only weigh how incomingPod matches existingPod. // Note: Symmetry is not applicable. We only weigh how incomingPod matches existingPod.
// Whether existingPod matches incomingPod doesn't contribute to the final score. // Whether existingPod matches incomingPod doesn't contribute to the final score.
// This is different with the Affinity API. // This is different from the Affinity API.
func CalculateEvenPodsSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) { func CalculateEvenPodsSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) {
result := make(schedulerapi.HostPriorityList, len(nodes)) result := make(schedulerapi.HostPriorityList, len(nodes))
// return if incoming pod doesn't have soft topology spread constraints. // return if incoming pod doesn't have soft topology spread constraints.
@ -96,7 +90,7 @@ func CalculateEvenPodsSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*sch
return result, nil return result, nil
} }
t := newTopologySpreadConstraintsMap(len(nodes)) t := newTopologySpreadConstraintsMap()
t.initialize(pod, nodes) t.initialize(pod, nodes)
allNodeNames := make([]string, 0, len(nodeNameToInfo)) allNodeNames := make([]string, 0, len(nodeNameToInfo))
@ -106,7 +100,7 @@ func CalculateEvenPodsSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*sch
errCh := schedutil.NewErrorChannel() errCh := schedutil.NewErrorChannel()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
processNode := func(i int) { processAllNode := func(i int) {
nodeInfo := nodeNameToInfo[allNodeNames[i]] nodeInfo := nodeNameToInfo[allNodeNames[i]]
node := nodeInfo.Node() node := nodeInfo.Node()
if node == nil { if node == nil {
@ -118,58 +112,63 @@ func CalculateEvenPodsSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*sch
!predicates.NodeLabelsMatchSpreadConstraints(node.Labels, constraints) { !predicates.NodeLabelsMatchSpreadConstraints(node.Labels, constraints) {
return return
} }
// It's enough to use topologyKey as the "key" of the map.
matchCount := make(map[string]int64) for _, constraint := range constraints {
for _, existingPod := range nodeInfo.Pods() { pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]}
podLabelSet := labels.Set(existingPod.Labels) // If current topology pair is not associated with any candidate node,
// Matching on constraints is calculated independently. // continue to avoid unnecessary calculation.
for _, constraint := range constraints { if t.topologyPairToPodCounts[pair] == nil {
match, err := predicates.PodMatchesSpreadConstraint(podLabelSet, constraint) continue
}
// <matchSum> indicates how many pods (on current node) match the <constraint>.
matchSum := int64(0)
for _, existingPod := range nodeInfo.Pods() {
match, err := predicates.PodMatchesSpreadConstraint(existingPod.Labels, constraint)
if err != nil { if err != nil {
errCh.SendErrorWithCancel(err, cancel) errCh.SendErrorWithCancel(err, cancel)
return return
} }
if match { if match {
matchCount[constraint.TopologyKey]++ matchSum++
} }
} }
} atomic.AddInt64(t.topologyPairToPodCounts[pair], matchSum)
// Keys in t.podCounts have been ensured to contain "filtered" nodes only.
for _, constraint := range constraints {
tpKey := constraint.TopologyKey
pair := topologyPair{key: tpKey, value: node.Labels[tpKey]}
// For each <pair>, all matched nodes get the credit of summed matchCount.
// And we add matchCount to <t.total> to reverse the final score later.
for _, nodeName := range t.topologyPairToNodeNames[pair] {
atomic.AddInt64(t.podCounts[nodeName], matchCount[tpKey])
atomic.AddInt64(&t.total, matchCount[tpKey])
}
} }
} }
workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processNode) workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processAllNode)
if err := errCh.ReceiveError(); err != nil { if err := errCh.ReceiveError(); err != nil {
return nil, err return nil, err
} }
var maxCount, minCount int64 var minCount int64 = math.MaxInt64
// <total> sums up the number of matching pods on each qualified topology pair
var total int64
for _, node := range nodes { for _, node := range nodes {
if t.podCounts[node.Name] == nil { if _, ok := t.nodeNameToPodCounts[node.Name]; !ok {
continue continue
} }
// reverse
count := t.total - *t.podCounts[node.Name] // For each present <pair>, current node gets a credit of <matchSum>.
if count > maxCount { // And we add <matchSum> to <t.total> to reverse the final score later.
maxCount = count for _, constraint := range constraints {
} else if count < minCount { if tpVal, ok := node.Labels[constraint.TopologyKey]; ok {
minCount = count pair := topologyPair{key: constraint.TopologyKey, value: tpVal}
matchSum := *t.topologyPairToPodCounts[pair]
t.nodeNameToPodCounts[node.Name] += matchSum
total += matchSum
}
}
if t.nodeNameToPodCounts[node.Name] < minCount {
minCount = t.nodeNameToPodCounts[node.Name]
} }
t.podCounts[node.Name] = &count
} }
// calculate final priority score for each node // calculate final priority score for each node
// TODO(Huang-Wei): in alpha version, we keep the formula as simple as possible. // TODO(Huang-Wei): in alpha version, we keep the formula as simple as possible.
// current version ranks the nodes properly, but it doesn't take MaxSkew into // current version ranks the nodes properly, but it doesn't take MaxSkew into
// consideration, we may come up with a better formula in the future. // consideration, we may come up with a better formula in the future.
maxMinDiff := maxCount - minCount maxMinDiff := total - minCount
for i := range nodes { for i := range nodes {
node := nodes[i] node := nodes[i]
result[i].Host = node.Name result[i].Host = node.Name
@ -182,7 +181,7 @@ func CalculateEvenPodsSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*sch
}(&result[i].Score, node.Name) }(&result[i].Score, node.Name)
} }
if t.podCounts[node.Name] == nil { if _, ok := t.nodeNameToPodCounts[node.Name]; !ok {
result[i].Score = 0 result[i].Score = 0
continue continue
} }
@ -190,7 +189,7 @@ func CalculateEvenPodsSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*sch
result[i].Score = schedulerapi.MaxPriority result[i].Score = schedulerapi.MaxPriority
continue continue
} }
fScore := float64(schedulerapi.MaxPriority) * (float64(*t.podCounts[node.Name]-minCount) / float64(maxMinDiff)) fScore := float64(schedulerapi.MaxPriority) * (float64(total-t.nodeNameToPodCounts[node.Name]) / float64(maxMinDiff))
result[i].Score = int(fScore) result[i].Score = int(fScore)
} }

View File

@ -29,10 +29,11 @@ import (
func Test_topologySpreadConstraintsMap_initialize(t *testing.T) { func Test_topologySpreadConstraintsMap_initialize(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
pod *v1.Pod pod *v1.Pod
nodes []*v1.Node nodes []*v1.Node
want map[topologyPair][]string wantNodeNameMap map[string]int64
wantTopologyPairMap map[topologyPair]*int64
}{ }{
{ {
name: "normal case", name: "normal case",
@ -45,12 +46,17 @@ func Test_topologySpreadConstraintsMap_initialize(t *testing.T) {
st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(), st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(),
st.MakeNode().Name("node-x").Label("zone", "zone2").Label("node", "node-x").Obj(), st.MakeNode().Name("node-x").Label("zone", "zone2").Label("node", "node-x").Obj(),
}, },
want: map[topologyPair][]string{ wantNodeNameMap: map[string]int64{
{key: "zone", value: "zone1"}: {"node-a", "node-b"}, "node-a": 0,
{key: "zone", value: "zone2"}: {"node-x"}, "node-b": 0,
{key: "node", value: "node-a"}: {"node-a"}, "node-x": 0,
{key: "node", value: "node-b"}: {"node-b"}, },
{key: "node", value: "node-x"}: {"node-x"}, wantTopologyPairMap: map[topologyPair]*int64{
{key: "zone", value: "zone1"}: new(int64),
{key: "zone", value: "zone2"}: new(int64),
{key: "node", value: "node-a"}: new(int64),
{key: "node", value: "node-b"}: new(int64),
{key: "node", value: "node-x"}: new(int64),
}, },
}, },
{ {
@ -64,19 +70,26 @@ func Test_topologySpreadConstraintsMap_initialize(t *testing.T) {
st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(), st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(),
st.MakeNode().Name("node-x").Label("node", "node-x").Obj(), st.MakeNode().Name("node-x").Label("node", "node-x").Obj(),
}, },
want: map[topologyPair][]string{ wantNodeNameMap: map[string]int64{
{key: "zone", value: "zone1"}: {"node-a", "node-b"}, "node-a": 0,
{key: "node", value: "node-a"}: {"node-a"}, "node-b": 0,
{key: "node", value: "node-b"}: {"node-b"}, },
wantTopologyPairMap: map[topologyPair]*int64{
{key: "zone", value: "zone1"}: new(int64),
{key: "node", value: "node-a"}: new(int64),
{key: "node", value: "node-b"}: new(int64),
}, },
}, },
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
tMap := newTopologySpreadConstraintsMap(len(tt.nodes)) tMap := newTopologySpreadConstraintsMap()
tMap.initialize(tt.pod, tt.nodes) tMap.initialize(tt.pod, tt.nodes)
if !reflect.DeepEqual(tMap.topologyPairToNodeNames, tt.want) { if !reflect.DeepEqual(tMap.nodeNameToPodCounts, tt.wantNodeNameMap) {
t.Errorf("initilize().topologyPairToNodeNames = %#v, want %#v", tMap.topologyPairToNodeNames, tt.want) t.Errorf("initilize().nodeNameToPodCounts = %#v, want %#v", tMap.nodeNameToPodCounts, tt.wantNodeNameMap)
}
if !reflect.DeepEqual(tMap.topologyPairToPodCounts, tt.wantTopologyPairMap) {
t.Errorf("initilize().topologyPairToPodCounts = %#v, want %#v", tMap.topologyPairToPodCounts, tt.wantTopologyPairMap)
} }
}) })
} }
@ -134,6 +147,24 @@ func TestCalculateEvenPodsSpreadPriority(t *testing.T) {
{Host: "node-a", Score: 10}, {Host: "node-a", Score: 10},
}, },
}, },
{
name: "one constraint on node, all nodes have the same number of matching pods",
pod: st.MakePod().Name("p").Label("foo", "").
SpreadConstraint(1, "node", softSpread, st.MakeLabelSelector().Exists("foo").Obj()).
Obj(),
existingPods: []*v1.Pod{
st.MakePod().Name("p-a1").Node("node-a").Label("foo", "").Obj(),
st.MakePod().Name("p-b1").Node("node-b").Label("foo", "").Obj(),
},
nodes: []*v1.Node{
st.MakeNode().Name("node-a").Label("node", "node-a").Obj(),
st.MakeNode().Name("node-b").Label("node", "node-b").Obj(),
},
want: []schedulerapi.HostPriority{
{Host: "node-a", Score: 10},
{Host: "node-b", Score: 10},
},
},
{ {
// matching pods spread as 2/1/0/3, total = 6 // matching pods spread as 2/1/0/3, total = 6
// after reversing, it's 4/5/6/3 // after reversing, it's 4/5/6/3
@ -338,6 +369,36 @@ func TestCalculateEvenPodsSpreadPriority(t *testing.T) {
{Host: "node-y", Score: 8}, {Host: "node-y", Score: 8},
}, },
}, },
{
// For the first constraint (zone): the matching pods spread as 0/0/2/2
// For the second constraint (node): the matching pods spread as 0/1/0/1
// sum them up gets: 0/1/2/3, and total number is 6.
// after reversing, it's 6/5/4/3.
// so scores = 60/6, 50/6, 40/6, 30/6
name: "two constraints on zone and node, with different labelSelectors, some nodes have 0 pods",
pod: st.MakePod().Name("p").Label("foo", "").Label("bar", "").
SpreadConstraint(1, "zone", softSpread, st.MakeLabelSelector().Exists("foo").Obj()).
SpreadConstraint(1, "node", softSpread, st.MakeLabelSelector().Exists("bar").Obj()).
Obj(),
existingPods: []*v1.Pod{
st.MakePod().Name("p-b1").Node("node-b").Label("bar", "").Obj(),
st.MakePod().Name("p-x1").Node("node-x").Label("foo", "").Obj(),
st.MakePod().Name("p-y1").Node("node-y").Label("foo", "").Label("bar", "").Obj(),
},
nodes: []*v1.Node{
st.MakeNode().Name("node-a").Label("zone", "zone1").Label("node", "node-a").Obj(),
st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(),
st.MakeNode().Name("node-x").Label("zone", "zone2").Label("node", "node-x").Obj(),
st.MakeNode().Name("node-y").Label("zone", "zone2").Label("node", "node-y").Obj(),
},
failedNodes: []*v1.Node{},
want: []schedulerapi.HostPriority{
{Host: "node-a", Score: 10},
{Host: "node-b", Score: 8},
{Host: "node-x", Score: 6},
{Host: "node-y", Score: 5},
},
},
{ {
// For the first constraint (zone): the matching pods spread as 2/2/1/~1~ // For the first constraint (zone): the matching pods spread as 2/2/1/~1~
// For the second constraint (node): the matching pods spread as 0/1/0/~1~ // For the second constraint (node): the matching pods spread as 0/1/0/~1~