Migrate EvenPodsSpread Priority as Score plugin in map/reduce style

This commit is contained in:
Wei Huang 2019-10-30 22:21:57 -07:00
parent 4d55d1d695
commit 90603728fb
No known key found for this signature in database
GPG Key ID: BE5E9752F8B6E005
9 changed files with 237 additions and 138 deletions

View File

@ -18,6 +18,7 @@ package priorities
import (
"context"
"fmt"
"math"
"sync/atomic"
@ -26,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/klog"
@ -36,67 +38,32 @@ type topologyPair struct {
value string
}
type topologySpreadConstraintsMap struct {
// nodeNameToPodCounts is keyed with node name, and valued with the number of matching pods.
nodeNameToPodCounts map[string]int32
type podTopologySpreadMap struct {
// nodeNameSet is a string set holding all node names which have all constraints[*].topologyKey present.
nodeNameSet map[string]struct{}
// topologyPairToPodCounts is keyed with topologyPair, and valued with the number of matching pods.
topologyPairToPodCounts map[topologyPair]*int32
topologyPairToPodCounts map[topologyPair]*int64
}
func newTopologySpreadConstraintsMap() *topologySpreadConstraintsMap {
return &topologySpreadConstraintsMap{
nodeNameToPodCounts: make(map[string]int32),
topologyPairToPodCounts: make(map[topologyPair]*int32),
func newTopologySpreadConstraintsMap() *podTopologySpreadMap {
return &podTopologySpreadMap{
nodeNameSet: make(map[string]struct{}),
topologyPairToPodCounts: make(map[topologyPair]*int64),
}
}
// Note: the <nodes> passed in are the "filtered" nodes which have passed Predicates.
// This function iterates <nodes> to filter out the nodes which don't have required topologyKey(s),
// and initialize two maps:
// 1) t.topologyPairToPodCounts: keyed with both eligible topology pair and node names.
// 2) t.nodeNameToPodCounts: keyed with node name, and valued with a *int32 pointer for eligible node only.
func (t *topologySpreadConstraintsMap) initialize(pod *v1.Pod, nodes []*v1.Node) {
constraints := getSoftTopologySpreadConstraints(pod)
for _, node := range nodes {
if !predicates.NodeLabelsMatchSpreadConstraints(node.Labels, constraints) {
continue
}
for _, constraint := range constraints {
pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]}
if t.topologyPairToPodCounts[pair] == nil {
t.topologyPairToPodCounts[pair] = new(int32)
}
}
t.nodeNameToPodCounts[node.Name] = 0
// For those nodes which don't have all required topologyKeys present, it's intentional to keep
// those entries absent in nodeNameToPodCounts, so that we're able to score them to 0 afterwards.
}
}
// CalculateEvenPodsSpreadPriority computes a score by checking through the topologySpreadConstraints
// that are with WhenUnsatisfiable=ScheduleAnyway (a.k.a soft constraint).
// The function works as below:
// 1) In all nodes, calculate the number of pods which match <pod>'s soft topology spread constraints.
// 2) Group the number calculated in 1) by topologyPair, and sum up to corresponding candidate nodes.
// 3) Finally normalize the number to 0~10. The node with the highest score is the most preferred.
// Note: Symmetry is not applicable. We only weigh how incomingPod matches existingPod.
// Whether existingPod matches incomingPod doesn't contribute to the final score.
// This is different from the Affinity API.
func CalculateEvenPodsSpreadPriority(pod *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (framework.NodeScoreList, error) {
result := make(framework.NodeScoreList, len(nodes))
// buildPodTopologySpreadMap prepares necessary data (podTopologySpreadMap) for incoming pod on the filteredNodes.
// Later Priority function will use 'podTopologySpreadMap' to perform the Scoring calculations.
func buildPodTopologySpreadMap(pod *v1.Pod, filteredNodes []*v1.Node, allNodes []*schedulernodeinfo.NodeInfo) *podTopologySpreadMap {
// return if incoming pod doesn't have soft topology spread constraints.
constraints := getSoftTopologySpreadConstraints(pod)
if len(constraints) == 0 {
return result, nil
if len(constraints) == 0 || len(filteredNodes) == 0 || len(allNodes) == 0 {
return nil
}
allNodes, err := sharedLister.NodeInfos().List()
if err != nil {
return nil, err
}
t := newTopologySpreadConstraintsMap()
t.initialize(pod, nodes)
// initialize podTopologySpreadMap which will be used in Score plugin.
m := newTopologySpreadConstraintsMap()
m.initialize(pod, filteredNodes)
errCh := schedutil.NewErrorChannel()
ctx, cancel := context.WithCancel(context.Background())
@ -117,12 +84,12 @@ func CalculateEvenPodsSpreadPriority(pod *v1.Pod, sharedLister schedulerlisters.
pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]}
// If current topology pair is not associated with any candidate node,
// continue to avoid unnecessary calculation.
if t.topologyPairToPodCounts[pair] == nil {
if m.topologyPairToPodCounts[pair] == nil {
continue
}
// <matchSum> indicates how many pods (on current node) match the <constraint>.
matchSum := int32(0)
matchSum := int64(0)
for _, existingPod := range nodeInfo.Pods() {
match, err := predicates.PodMatchesSpreadConstraint(existingPod.Labels, constraint)
if err != nil {
@ -133,67 +100,132 @@ func CalculateEvenPodsSpreadPriority(pod *v1.Pod, sharedLister schedulerlisters.
matchSum++
}
}
atomic.AddInt32(t.topologyPairToPodCounts[pair], matchSum)
atomic.AddInt64(m.topologyPairToPodCounts[pair], matchSum)
}
}
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processAllNode)
if err := errCh.ReceiveError(); err != nil {
return nil, err
klog.Error(err)
return nil
}
var minCount int32 = math.MaxInt32
// <total> sums up the number of matching pods on each qualified topology pair
var total int32
for _, node := range nodes {
if _, ok := t.nodeNameToPodCounts[node.Name]; !ok {
return m
}
// initialize iterates "filteredNodes" to filter out the nodes which don't have required topologyKey(s),
// and initialize two maps:
// 1) m.topologyPairToPodCounts: keyed with both eligible topology pair and node names.
// 2) m.nodeNameSet: keyed with node name, and valued with a *int64 pointer for eligible node only.
func (m *podTopologySpreadMap) initialize(pod *v1.Pod, filteredNodes []*v1.Node) {
constraints := getSoftTopologySpreadConstraints(pod)
for _, node := range filteredNodes {
if !predicates.NodeLabelsMatchSpreadConstraints(node.Labels, constraints) {
continue
}
// For each present <pair>, current node gets a credit of <matchSum>.
// And we add <matchSum> to <t.total> to reverse the final score later.
for _, constraint := range constraints {
if tpVal, ok := node.Labels[constraint.TopologyKey]; ok {
pair := topologyPair{key: constraint.TopologyKey, value: tpVal}
matchSum := *t.topologyPairToPodCounts[pair]
t.nodeNameToPodCounts[node.Name] += matchSum
total += matchSum
pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]}
if m.topologyPairToPodCounts[pair] == nil {
m.topologyPairToPodCounts[pair] = new(int64)
}
}
if t.nodeNameToPodCounts[node.Name] < minCount {
minCount = t.nodeNameToPodCounts[node.Name]
m.nodeNameSet[node.Name] = struct{}{}
// For those nodes which don't have all required topologyKeys present, it's intentional to leave
// their entries absent in nodeNameSet, so that we're able to score them to 0 afterwards.
}
}
// CalculateEvenPodsSpreadPriorityMap calculate the number of matching pods on the passed-in "node",
// and return the number as Score.
func CalculateEvenPodsSpreadPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
node := nodeInfo.Node()
if node == nil {
return framework.NodeScore{}, fmt.Errorf("node not found")
}
var m *podTopologySpreadMap
if priorityMeta, ok := meta.(*priorityMetadata); ok {
m = priorityMeta.podTopologySpreadMap
}
if m == nil {
return framework.NodeScore{}, nil
}
// no need to continue if the node is not qualified.
if _, ok := m.nodeNameSet[node.Name]; !ok {
return framework.NodeScore{Name: node.Name, Score: 0}, nil
}
constraints := getSoftTopologySpreadConstraints(pod)
// For each present <pair>, current node gets a credit of <matchSum>.
// And we sum up <matchSum> and return it as this node's score.
var score int64
for _, constraint := range constraints {
if tpVal, ok := node.Labels[constraint.TopologyKey]; ok {
pair := topologyPair{key: constraint.TopologyKey, value: tpVal}
matchSum := *m.topologyPairToPodCounts[pair]
score += matchSum
}
}
return framework.NodeScore{Name: node.Name, Score: score}, nil
}
// CalculateEvenPodsSpreadPriorityReduce normalizes the score for each filteredNode,
// The basic rule is: the bigger the score(matching number of pods) is, the smaller the
// final normalized score will be.
func CalculateEvenPodsSpreadPriorityReduce(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister,
result framework.NodeScoreList) error {
var m *podTopologySpreadMap
if priorityMeta, ok := meta.(*priorityMetadata); ok {
m = priorityMeta.podTopologySpreadMap
}
if m == nil {
return nil
}
// Calculate the summed <total> score and <minScore>.
var minScore int64 = math.MaxInt64
var total int64
for _, score := range result {
// it's mandatory to check if <score.Name> is present in m.nodeNameSet
if _, ok := m.nodeNameSet[score.Name]; !ok {
continue
}
total += score.Score
if score.Score < minScore {
minScore = score.Score
}
}
// calculate final priority score for each node
// TODO(Huang-Wei): in alpha version, we keep the formula as simple as possible.
// current version ranks the nodes properly, but it doesn't take MaxSkew into
// consideration, we may come up with a better formula in the future.
maxMinDiff := total - minCount
for i := range nodes {
node := nodes[i]
result[i].Name = node.Name
// debugging purpose: print the value for each node
// score must be pointer here, otherwise it's always 0
maxMinDiff := total - minScore
for i := range result {
nodeInfo, err := sharedLister.NodeInfos().Get(result[i].Name)
if err != nil {
return err
}
node := nodeInfo.Node()
// Debugging purpose: print the score for each node.
// Score must be a pointer here, otherwise it's always 0.
if klog.V(10) {
defer func(score *int64, nodeName string) {
klog.Infof("%v -> %v: EvenPodsSpreadPriority, Score: (%d)", pod.Name, nodeName, *score)
klog.Infof("%v -> %v: PodTopologySpread NormalizeScore, Score: (%d)", pod.Name, nodeName, *score)
}(&result[i].Score, node.Name)
}
if _, ok := t.nodeNameToPodCounts[node.Name]; !ok {
result[i].Score = 0
continue
}
if maxMinDiff == 0 {
result[i].Score = framework.MaxNodeScore
continue
}
fScore := float64(framework.MaxNodeScore) * (float64(total-t.nodeNameToPodCounts[node.Name]) / float64(maxMinDiff))
if _, ok := m.nodeNameSet[node.Name]; !ok {
result[i].Score = 0
continue
}
flippedScore := total - result[i].Score
fScore := float64(framework.MaxNodeScore) * (float64(flippedScore) / float64(maxMinDiff))
result[i].Score = int64(fScore)
}
return result, nil
return nil
}
// TODO(Huang-Wei): combine this with getHardTopologySpreadConstraints() in predicates package

View File

@ -26,13 +26,13 @@ import (
st "k8s.io/kubernetes/pkg/scheduler/testing"
)
func Test_topologySpreadConstraintsMap_initialize(t *testing.T) {
func Test_podTopologySpreadMap_initialize(t *testing.T) {
tests := []struct {
name string
pod *v1.Pod
nodes []*v1.Node
wantNodeNameMap map[string]int32
wantTopologyPairMap map[topologyPair]*int32
wantNodeNameSet map[string]struct{}
wantTopologyPairMap map[topologyPair]*int64
}{
{
name: "normal case",
@ -45,17 +45,17 @@ func Test_topologySpreadConstraintsMap_initialize(t *testing.T) {
st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(),
st.MakeNode().Name("node-x").Label("zone", "zone2").Label("node", "node-x").Obj(),
},
wantNodeNameMap: map[string]int32{
"node-a": 0,
"node-b": 0,
"node-x": 0,
wantNodeNameSet: map[string]struct{}{
"node-a": {},
"node-b": {},
"node-x": {},
},
wantTopologyPairMap: map[topologyPair]*int32{
{key: "zone", value: "zone1"}: new(int32),
{key: "zone", value: "zone2"}: new(int32),
{key: "node", value: "node-a"}: new(int32),
{key: "node", value: "node-b"}: new(int32),
{key: "node", value: "node-x"}: new(int32),
wantTopologyPairMap: map[topologyPair]*int64{
{key: "zone", value: "zone1"}: new(int64),
{key: "zone", value: "zone2"}: new(int64),
{key: "node", value: "node-a"}: new(int64),
{key: "node", value: "node-b"}: new(int64),
{key: "node", value: "node-x"}: new(int64),
},
},
{
@ -69,26 +69,26 @@ func Test_topologySpreadConstraintsMap_initialize(t *testing.T) {
st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(),
st.MakeNode().Name("node-x").Label("node", "node-x").Obj(),
},
wantNodeNameMap: map[string]int32{
"node-a": 0,
"node-b": 0,
wantNodeNameSet: map[string]struct{}{
"node-a": {},
"node-b": {},
},
wantTopologyPairMap: map[topologyPair]*int32{
{key: "zone", value: "zone1"}: new(int32),
{key: "node", value: "node-a"}: new(int32),
{key: "node", value: "node-b"}: new(int32),
wantTopologyPairMap: map[topologyPair]*int64{
{key: "zone", value: "zone1"}: new(int64),
{key: "node", value: "node-a"}: new(int64),
{key: "node", value: "node-b"}: new(int64),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tMap := newTopologySpreadConstraintsMap()
tMap.initialize(tt.pod, tt.nodes)
if !reflect.DeepEqual(tMap.nodeNameToPodCounts, tt.wantNodeNameMap) {
t.Errorf("initilize().nodeNameToPodCounts = %#v, want %#v", tMap.nodeNameToPodCounts, tt.wantNodeNameMap)
m := newTopologySpreadConstraintsMap()
m.initialize(tt.pod, tt.nodes)
if !reflect.DeepEqual(m.nodeNameSet, tt.wantNodeNameSet) {
t.Errorf("initilize().nodeNameSet = %#v, want %#v", m.nodeNameSet, tt.wantNodeNameSet)
}
if !reflect.DeepEqual(tMap.topologyPairToPodCounts, tt.wantTopologyPairMap) {
t.Errorf("initilize().topologyPairToPodCounts = %#v, want %#v", tMap.topologyPairToPodCounts, tt.wantTopologyPairMap)
if !reflect.DeepEqual(m.topologyPairToPodCounts, tt.wantTopologyPairMap) {
t.Errorf("initilize().topologyPairToPodCounts = %#v, want %#v", m.topologyPairToPodCounts, tt.wantTopologyPairMap)
}
})
}
@ -435,9 +435,23 @@ func TestCalculateEvenPodsSpreadPriority(t *testing.T) {
allNodes := append([]*v1.Node{}, tt.nodes...)
allNodes = append(allNodes, tt.failedNodes...)
snapshot := nodeinfosnapshot.NewSnapshot(tt.existingPods, allNodes)
got, _ := CalculateEvenPodsSpreadPriority(tt.pod, snapshot, tt.nodes)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("CalculateEvenPodsSpreadPriority() = %#v, want %#v", got, tt.want)
meta := &priorityMetadata{
podTopologySpreadMap: buildPodTopologySpreadMap(tt.pod, tt.nodes, snapshot.NodeInfoList),
}
var gotList framework.NodeScoreList
for _, n := range tt.nodes {
nodeName := n.Name
nodeScore, err := CalculateEvenPodsSpreadPriorityMap(tt.pod, meta, snapshot.NodeInfoMap[nodeName])
if err != nil {
t.Error(err)
}
gotList = append(gotList, nodeScore)
}
CalculateEvenPodsSpreadPriorityReduce(tt.pod, meta, snapshot, gotList)
if !reflect.DeepEqual(gotList, tt.want) {
t.Errorf("CalculateEvenPodsSpreadPriorityReduce() = %#v, want %#v", gotList, tt.want)
}
})
}
@ -484,9 +498,19 @@ func BenchmarkTestCalculateEvenPodsSpreadPriority(b *testing.B) {
b.Run(tt.name, func(b *testing.B) {
existingPods, allNodes, filteredNodes := st.MakeNodesAndPodsForEvenPodsSpread(tt.pod, tt.existingPodsNum, tt.allNodesNum, tt.filteredNodesNum)
snapshot := nodeinfosnapshot.NewSnapshot(existingPods, allNodes)
meta := &priorityMetadata{
podTopologySpreadMap: buildPodTopologySpreadMap(tt.pod, filteredNodes, snapshot.NodeInfoList),
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
CalculateEvenPodsSpreadPriority(tt.pod, snapshot, filteredNodes)
var gotList framework.NodeScoreList
for _, n := range filteredNodes {
nodeName := n.Name
nodeScore, _ := CalculateEvenPodsSpreadPriorityMap(tt.pod, meta, snapshot.NodeInfoMap[nodeName])
gotList = append(gotList, nodeScore)
}
CalculateEvenPodsSpreadPriorityReduce(tt.pod, meta, snapshot, gotList)
}
})
}

View File

@ -59,18 +59,21 @@ type priorityMetadata struct {
controllerRef *metav1.OwnerReference
podFirstServiceSelector labels.Selector
totalNumNodes int
podTopologySpreadMap *podTopologySpreadMap
}
// PriorityMetadata is a PriorityMetadataProducer. Node info can be nil.
func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, _ []*v1.Node, sharedLister schedulerlisters.SharedLister) interface{} {
func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, filteredNodes []*v1.Node, sharedLister schedulerlisters.SharedLister) interface{} {
// If we cannot compute metadata, just return nil
if pod == nil {
return nil
}
totalNumNodes := 0
var allNodes []*schedulernodeinfo.NodeInfo
if sharedLister != nil {
if l, err := sharedLister.NodeInfos().List(); err == nil {
totalNumNodes = len(l)
allNodes = l
}
}
return &priorityMetadata{
@ -81,6 +84,7 @@ func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, _ []*v1.Node,
controllerRef: metav1.GetControllerOf(pod),
podFirstServiceSelector: getFirstServiceSelector(pod, pmf.serviceLister),
totalNumNodes: totalNumNodes,
podTopologySpreadMap: buildPodTopologySpreadMap(pod, filteredNodes, allNodes),
}
}

View File

@ -68,7 +68,12 @@ func ApplyFeatureGates() (restore func()) {
scheduler.RegisterFitPredicate(predicates.EvenPodsSpreadPred, predicates.EvenPodsSpreadPredicate)
// register priority
scheduler.InsertPriorityKeyToAlgorithmProviderMap(priorities.EvenPodsSpreadPriority)
scheduler.RegisterPriorityFunction(priorities.EvenPodsSpreadPriority, priorities.CalculateEvenPodsSpreadPriority, 1)
scheduler.RegisterPriorityMapReduceFunction(
priorities.EvenPodsSpreadPriority,
priorities.CalculateEvenPodsSpreadPriorityMap,
priorities.CalculateEvenPodsSpreadPriorityReduce,
1,
)
}
// Prioritizes nodes that satisfy pod's resource limits

View File

@ -1196,15 +1196,15 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
featureGates: map[featuregate.Feature]bool{
features.EvenPodsSpread: true,
},
wantPrioritizers: sets.NewString(
"EvenPodsSpreadPriority",
),
wantPlugins: map[string][]kubeschedulerconfig.Plugin{
"FilterPlugin": {
{Name: "NodeUnschedulable"},
{Name: "TaintToleration"},
{Name: "PodTopologySpread"},
},
"ScorePlugin": {
{Name: "PodTopologySpread", Weight: 2},
},
},
},
{

View File

@ -229,25 +229,26 @@ func NewDefaultConfigProducerRegistry() *ConfigProducerRegistry {
plugins.Score = appendToPluginSet(plugins.Score, nodepreferavoidpods.Name, &args.Weight)
return
})
registry.RegisterPriority(priorities.MostRequestedPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, noderesources.MostAllocatedName, &args.Weight)
return
})
registry.RegisterPriority(priorities.BalancedResourceAllocation,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, noderesources.BalancedAllocationName, &args.Weight)
return
})
registry.RegisterPriority(priorities.LeastRequestedPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, noderesources.LeastAllocatedName, &args.Weight)
return
})
registry.RegisterPriority(priorities.EvenPodsSpreadPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, podtopologyspread.Name, &args.Weight)
return
})
registry.RegisterPriority(requestedtocapacityratio.Name,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, requestedtocapacityratio.Name, &args.Weight)

View File

@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/framework/plugins/migration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",

View File

@ -23,15 +23,19 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
// PodTopologySpread is a plugin that ensures pod's topologySpreadConstraints is satisfied.
type PodTopologySpread struct{}
type PodTopologySpread struct {
handle framework.FrameworkHandle
}
var _ framework.FilterPlugin = &PodTopologySpread{}
var _ framework.ScorePlugin = &PodTopologySpread{}
// Name is the name of the plugin used in the plugin registry and configurations.
const Name = "PodTopologySpread"
@ -51,7 +55,33 @@ func (pl *PodTopologySpread) Filter(ctx context.Context, cycleState *framework.C
return migration.PredicateResultToFrameworkStatus(reasons, err)
}
// New initializes a new plugin and returns it.
func New(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return &PodTopologySpread{}, nil
// Score invoked at the Score extension point.
// The "score" returned in this function is the matching number of pods on the `nodeName`,
// it is normalized later.
func (pl *PodTopologySpread) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
}
meta := migration.PriorityMetadata(state)
s, err := priorities.CalculateEvenPodsSpreadPriorityMap(pod, meta, nodeInfo)
return s.Score, migration.ErrorToFrameworkStatus(err)
}
// NormalizeScore invoked after scoring all nodes.
func (pl *PodTopologySpread) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
meta := migration.PriorityMetadata(state)
err := priorities.CalculateEvenPodsSpreadPriorityReduce(pod, meta, pl.handle.SnapshotSharedLister(), scores)
return migration.ErrorToFrameworkStatus(err)
}
// ScoreExtensions of the Score plugin.
func (pl *PodTopologySpread) ScoreExtensions() framework.ScoreExtensions {
return pl
}
// New initializes a new plugin and returns it.
func New(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) {
return &PodTopologySpread{handle: h}, nil
}

View File

@ -23,14 +23,16 @@ import (
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
"k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
st "k8s.io/kubernetes/pkg/scheduler/testing"
)
var hardSpread = v1.DoNotSchedule
var (
hardSpread = v1.DoNotSchedule
)
func TestPodTopologySpreadFilter_SingleConstraint(t *testing.T) {
func TestPodTopologySpread_Filter_SingleConstraint(t *testing.T) {
tests := []struct {
name string
pod *v1.Pod
@ -269,7 +271,7 @@ func TestPodTopologySpreadFilter_SingleConstraint(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
snapshot := nodeinfosnapshot.NewSnapshot(tt.existingPods, tt.nodes)
meta := predicates.GetPredicateMetadata(tt.pod, snapshot)
state := v1alpha1.NewCycleState()
state := framework.NewCycleState()
state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta})
plugin, _ := New(nil, nil)
for _, node := range tt.nodes {
@ -283,7 +285,7 @@ func TestPodTopologySpreadFilter_SingleConstraint(t *testing.T) {
}
}
func TestPodTopologySpreadFilter_MultipleConstraints(t *testing.T) {
func TestPodTopologySpread_Filter_MultipleConstraints(t *testing.T) {
tests := []struct {
name string
pod *v1.Pod
@ -466,7 +468,7 @@ func TestPodTopologySpreadFilter_MultipleConstraints(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
snapshot := nodeinfosnapshot.NewSnapshot(tt.existingPods, tt.nodes)
meta := predicates.GetPredicateMetadata(tt.pod, snapshot)
state := v1alpha1.NewCycleState()
state := framework.NewCycleState()
state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta})
plugin, _ := New(nil, nil)
for _, node := range tt.nodes {