Merge pull request #86399 from ahg-g/ahg1-prioritymeta

InterPodAffinity Priority as Score plugin
This commit is contained in:
Kubernetes Prow Robot 2019-12-19 13:39:56 -08:00 committed by GitHub
commit 641d0290e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 416 additions and 1106 deletions

View File

@ -21,6 +21,7 @@ go_library(
"//pkg/scheduler/apis/config/validation:go_default_library",
"//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/framework/plugins:go_default_library",
"//pkg/scheduler/framework/plugins/interpodaffinity:go_default_library",
"//pkg/scheduler/framework/plugins/nodelabel:go_default_library",
"//pkg/scheduler/framework/plugins/noderesources:go_default_library",
"//pkg/scheduler/framework/plugins/requestedtocapacityratio:go_default_library",

View File

@ -12,7 +12,6 @@ go_library(
"balanced_resource_allocation.go",
"even_pods_spread.go",
"image_locality.go",
"interpod_affinity.go",
"least_requested.go",
"metadata.go",
"most_requested.go",
@ -38,14 +37,12 @@ go_library(
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//pkg/util/node:go_default_library",
"//pkg/util/parsers:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/listers/apps/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
@ -60,7 +57,6 @@ go_test(
"balanced_resource_allocation_test.go",
"even_pods_spread_test.go",
"image_locality_test.go",
"interpod_affinity_test.go",
"least_requested_test.go",
"metadata_test.go",
"most_requested_test.go",

View File

@ -1,317 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package priorities
import (
"context"
"fmt"
"sync"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/klog"
)
type topologyPairToScore map[string]map[string]int64
type podAffinityPriorityMap struct {
topologyScore topologyPairToScore
affinityTerms []*weightedAffinityTerm
antiAffinityTerms []*weightedAffinityTerm
hardPodAffinityWeight int32
sync.Mutex
}
// A "processed" representation of v1.WeightedAffinityTerm.
type weightedAffinityTerm struct {
namespaces sets.String
selector labels.Selector
weight int32
topologyKey string
}
func newWeightedAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm, weight int32) (*weightedAffinityTerm, error) {
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(pod, term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
return nil, err
}
return &weightedAffinityTerm{namespaces: namespaces, selector: selector, topologyKey: term.TopologyKey, weight: weight}, nil
}
func getProcessedTerms(pod *v1.Pod, terms []v1.WeightedPodAffinityTerm) ([]*weightedAffinityTerm, error) {
if terms == nil {
return nil, nil
}
var processedTerms []*weightedAffinityTerm
for i := range terms {
p, err := newWeightedAffinityTerm(pod, &terms[i].PodAffinityTerm, terms[i].Weight)
if err != nil {
return nil, err
}
processedTerms = append(processedTerms, p)
}
return processedTerms, nil
}
func (p *podAffinityPriorityMap) processTerm(term *weightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) error {
if len(fixedNode.Labels) == 0 {
return nil
}
match := priorityutil.PodMatchesTermsNamespaceAndSelector(podToCheck, term.namespaces, term.selector)
tpValue, tpValueExist := fixedNode.Labels[term.topologyKey]
if match && tpValueExist {
p.Lock()
if p.topologyScore[term.topologyKey] == nil {
p.topologyScore[term.topologyKey] = make(map[string]int64)
}
p.topologyScore[term.topologyKey][tpValue] += int64(term.weight * int32(multiplier))
p.Unlock()
}
return nil
}
func (p *podAffinityPriorityMap) processTerms(terms []*weightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) error {
for _, term := range terms {
if err := p.processTerm(term, podToCheck, fixedNode, multiplier); err != nil {
return err
}
}
return nil
}
// CalculateInterPodAffinityPriorityMap calculate the number of matching pods on the passed-in "node",
// and return the number as Score.
func CalculateInterPodAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
node := nodeInfo.Node()
if node == nil {
return framework.NodeScore{}, fmt.Errorf("node not found")
}
var topologyScore topologyPairToScore
if priorityMeta, ok := meta.(*priorityMetadata); ok {
topologyScore = priorityMeta.topologyScore
}
var score int64
for tpKey, tpValues := range topologyScore {
if v, exist := node.Labels[tpKey]; exist {
score += tpValues[v]
}
}
return framework.NodeScore{Name: node.Name, Score: score}, nil
}
// CalculateInterPodAffinityPriorityReduce normalizes the score for each filteredNode,
// The basic rule is: the bigger the score(matching number of pods) is, the smaller the
// final normalized score will be.
func CalculateInterPodAffinityPriorityReduce(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister,
result framework.NodeScoreList) error {
var topologyScore topologyPairToScore
if priorityMeta, ok := meta.(*priorityMetadata); ok {
topologyScore = priorityMeta.topologyScore
}
if len(topologyScore) == 0 {
return nil
}
var maxCount, minCount int64
for i := range result {
score := result[i].Score
if score > maxCount {
maxCount = score
}
if score < minCount {
minCount = score
}
}
maxMinDiff := maxCount - minCount
for i := range result {
fScore := float64(0)
if maxMinDiff > 0 {
fScore = float64(framework.MaxNodeScore) * (float64(result[i].Score-minCount) / float64(maxMinDiff))
}
result[i].Score = int64(fScore)
}
return nil
}
func (p *podAffinityPriorityMap) processExistingPod(existingPod *v1.Pod, existingPodNodeInfo *schedulernodeinfo.NodeInfo, incomingPod *v1.Pod) error {
existingPodAffinity := existingPod.Spec.Affinity
existingHasAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAffinity != nil
existingHasAntiAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAntiAffinity != nil
existingPodNode := existingPodNodeInfo.Node()
// For every soft pod affinity term of <pod>, if <existingPod> matches the term,
// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPods>`s node by the term`s weight.
if err := p.processTerms(p.affinityTerms, existingPod, existingPodNode, 1); err != nil {
return err
}
// For every soft pod anti-affinity term of <pod>, if <existingPod> matches the term,
// decrement <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>`s node by the term`s weight.
if err := p.processTerms(p.antiAffinityTerms, existingPod, existingPodNode, -1); err != nil {
return err
}
if existingHasAffinityConstraints {
// For every hard pod affinity term of <existingPod>, if <pod> matches the term,
// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the constant <ipa.hardPodAffinityWeight>
if p.hardPodAffinityWeight > 0 {
terms := existingPodAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
for i := range terms {
term := &terms[i]
processedTerm, err := newWeightedAffinityTerm(existingPod, term, p.hardPodAffinityWeight)
if err != nil {
return err
}
if err := p.processTerm(processedTerm, incomingPod, existingPodNode, 1); err != nil {
return err
}
}
}
// For every soft pod affinity term of <existingPod>, if <pod> matches the term,
// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
terms, err := getProcessedTerms(existingPod, existingPodAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
if err != nil {
klog.Error(err)
return nil
}
if err := p.processTerms(terms, incomingPod, existingPodNode, 1); err != nil {
return err
}
}
if existingHasAntiAffinityConstraints {
// For every soft pod anti-affinity term of <existingPod>, if <pod> matches the term,
// decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
terms, err := getProcessedTerms(existingPod, existingPodAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
if err != nil {
return err
}
if err := p.processTerms(terms, incomingPod, existingPodNode, -1); err != nil {
return err
}
}
return nil
}
func buildTopologyPairToScore(
pod *v1.Pod,
sharedLister schedulerlisters.SharedLister,
filteredNodes []*v1.Node,
hardPodAffinityWeight int32,
) topologyPairToScore {
if sharedLister == nil {
klog.Error("BuildTopologyPairToScore with empty shared lister")
return nil
}
affinity := pod.Spec.Affinity
hasAffinityConstraints := affinity != nil && affinity.PodAffinity != nil
hasAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil
// Unless the pod being scheduled has affinity terms, we only
// need to process nodes hosting pods with affinity.
allNodes, err := sharedLister.NodeInfos().HavePodsWithAffinityList()
if err != nil {
klog.Errorf("get pods with affinity list error, err: %v", err)
return nil
}
if hasAffinityConstraints || hasAntiAffinityConstraints {
allNodes, err = sharedLister.NodeInfos().List()
if err != nil {
klog.Errorf("get all nodes from shared lister error, err: %v", err)
return nil
}
}
var affinityTerms []*weightedAffinityTerm
var antiAffinityTerms []*weightedAffinityTerm
if hasAffinityConstraints {
if affinityTerms, err = getProcessedTerms(pod, affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution); err != nil {
klog.Error(err)
return nil
}
}
if hasAntiAffinityConstraints {
if antiAffinityTerms, err = getProcessedTerms(pod, affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution); err != nil {
klog.Error(err)
return nil
}
}
pm := podAffinityPriorityMap{
topologyScore: make(topologyPairToScore),
affinityTerms: affinityTerms,
antiAffinityTerms: antiAffinityTerms,
hardPodAffinityWeight: hardPodAffinityWeight,
}
errCh := schedutil.NewErrorChannel()
ctx, cancel := context.WithCancel(context.Background())
processNode := func(i int) {
nodeInfo := allNodes[i]
if nodeInfo.Node() != nil {
// Unless the pod being scheduled has affinity terms, we only
// need to process pods with affinity in the node.
podsToProcess := nodeInfo.PodsWithAffinity()
if hasAffinityConstraints || hasAntiAffinityConstraints {
// We need to process all the pods.
podsToProcess = nodeInfo.Pods()
}
for _, existingPod := range podsToProcess {
if err := pm.processExistingPod(existingPod, nodeInfo, pod); err != nil {
errCh.SendErrorWithCancel(err, cancel)
return
}
}
}
}
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode)
if err := errCh.ReceiveError(); err != nil {
klog.Error(err)
return nil
}
return pm.topologyScore
}

View File

@ -1,696 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package priorities
import (
"reflect"
"testing"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
st "k8s.io/kubernetes/pkg/scheduler/testing"
)
func TestInterPodAffinityPriority(t *testing.T) {
labelRgChina := map[string]string{
"region": "China",
}
labelRgIndia := map[string]string{
"region": "India",
}
labelAzAz1 := map[string]string{
"az": "az1",
}
labelAzAz2 := map[string]string{
"az": "az2",
}
labelRgChinaAzAz1 := map[string]string{
"region": "China",
"az": "az1",
}
podLabelSecurityS1 := map[string]string{
"security": "S1",
}
podLabelSecurityS2 := map[string]string{
"security": "S2",
}
// considered only preferredDuringSchedulingIgnoredDuringExecution in pod affinity
stayWithS1InRegion := &v1.Affinity{
PodAffinity: &v1.PodAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{
{
Weight: 5,
PodAffinityTerm: v1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "security",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"S1"},
},
},
},
TopologyKey: "region",
},
},
},
},
}
stayWithS2InRegion := &v1.Affinity{
PodAffinity: &v1.PodAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{
{
Weight: 6,
PodAffinityTerm: v1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "security",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"S2"},
},
},
},
TopologyKey: "region",
},
},
},
},
}
affinity3 := &v1.Affinity{
PodAffinity: &v1.PodAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{
{
Weight: 8,
PodAffinityTerm: v1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "security",
Operator: metav1.LabelSelectorOpNotIn,
Values: []string{"S1"},
}, {
Key: "security",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"S2"},
},
},
},
TopologyKey: "region",
},
}, {
Weight: 2,
PodAffinityTerm: v1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "security",
Operator: metav1.LabelSelectorOpExists,
}, {
Key: "wrongkey",
Operator: metav1.LabelSelectorOpDoesNotExist,
},
},
},
TopologyKey: "region",
},
},
},
},
}
hardAffinity := &v1.Affinity{
PodAffinity: &v1.PodAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "security",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"S1", "value2"},
},
},
},
TopologyKey: "region",
}, {
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "security",
Operator: metav1.LabelSelectorOpExists,
}, {
Key: "wrongkey",
Operator: metav1.LabelSelectorOpDoesNotExist,
},
},
},
TopologyKey: "region",
},
},
},
}
awayFromS1InAz := &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{
{
Weight: 5,
PodAffinityTerm: v1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "security",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"S1"},
},
},
},
TopologyKey: "az",
},
},
},
},
}
// to stay away from security S2 in any az.
awayFromS2InAz := &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{
{
Weight: 5,
PodAffinityTerm: v1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "security",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"S2"},
},
},
},
TopologyKey: "az",
},
},
},
},
}
// to stay with security S1 in same region, stay away from security S2 in any az.
stayWithS1InRegionAwayFromS2InAz := &v1.Affinity{
PodAffinity: &v1.PodAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{
{
Weight: 8,
PodAffinityTerm: v1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "security",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"S1"},
},
},
},
TopologyKey: "region",
},
},
},
},
PodAntiAffinity: &v1.PodAntiAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{
{
Weight: 5,
PodAffinityTerm: v1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "security",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"S2"},
},
},
},
TopologyKey: "az",
},
},
},
},
}
tests := []struct {
pod *v1.Pod
pods []*v1.Pod
nodes []*v1.Node
expectedList framework.NodeScoreList
name string
}{
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: 0}},
name: "all machines are same priority as Affinity is nil",
},
// the node(machine1) that have the label {"region": "China"} (match the topology key) and that have existing pods that match the labelSelector get high score
// the node(machine3) that don't have the label {"region": "whatever the value is"} (mismatch the topology key) but that have existing pods that match the labelSelector get low score
// the node(machine2) that have the label {"region": "China"} (match the topology key) but that have existing pods that mismatch the labelSelector get low score
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegion}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
{Spec: v1.PodSpec{NodeName: "machine3"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: 0}},
name: "Affinity: pod that matches topology key & pods in nodes will get high score comparing to others" +
"which doesn't match either pods in nodes or in topology key",
},
// the node1(machine1) that have the label {"region": "China"} (match the topology key) and that have existing pods that match the labelSelector get high score
// the node2(machine2) that have the label {"region": "China"}, match the topology key and have the same label value with node1, get the same high score with node1
// the node3(machine3) that have the label {"region": "India"}, match the topology key but have a different label value, don't have existing pods that match the labelSelector,
// get a low score.
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegion}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgChinaAzAz1}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelRgIndia}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: framework.MaxNodeScore}, {Name: "machine3", Score: 0}},
name: "All the nodes that have the same topology key & label value with one of them has an existing pod that match the affinity rules, have the same score",
},
// there are 2 regions, say regionChina(machine1,machine3,machine4) and regionIndia(machine2,machine5), both regions have nodes that match the preference.
// But there are more nodes(actually more existing pods) in regionChina that match the preference than regionIndia.
// Then, nodes in regionChina get higher score than nodes in regionIndia, and all the nodes in regionChina should get a same score(high score),
// while all the nodes in regionIndia should get another same score(low score).
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS2InRegion}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
{Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
{Spec: v1.PodSpec{NodeName: "machine3"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
{Spec: v1.PodSpec{NodeName: "machine4"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
{Spec: v1.PodSpec{NodeName: "machine5"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelRgChina}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine4", Labels: labelRgChina}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine5", Labels: labelRgIndia}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 50}, {Name: "machine3", Score: framework.MaxNodeScore}, {Name: "machine4", Score: framework.MaxNodeScore}, {Name: "machine5", Score: 50}},
name: "Affinity: nodes in one region has more matching pods comparing to other reqion, so the region which has more macthes will get high score",
},
// Test with the different operators and values for pod affinity scheduling preference, including some match failures.
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: affinity3}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
{Spec: v1.PodSpec{NodeName: "machine3"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 20}, {Name: "machine2", Score: framework.MaxNodeScore}, {Name: "machine3", Score: 0}},
name: "Affinity: different Label operators and values for pod affinity scheduling preference, including some match failures ",
},
// Test the symmetry cases for affinity, the difference between affinity and symmetry is not the pod wants to run together with some existing pods,
// but the existing pods have the inter pod affinity preference while the pod to schedule satisfy the preference.
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1", Affinity: stayWithS1InRegion}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine2", Affinity: stayWithS2InRegion}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: framework.MaxNodeScore}, {Name: "machine3", Score: 0}},
name: "Affinity symmetry: considered only the preferredDuringSchedulingIgnoredDuringExecution in pod affinity symmetry",
},
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1", Affinity: hardAffinity}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine2", Affinity: hardAffinity}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: framework.MaxNodeScore}, {Name: "machine3", Score: 0}},
name: "Affinity symmetry: considered RequiredDuringSchedulingIgnoredDuringExecution in pod affinity symmetry",
},
// The pod to schedule prefer to stay away from some existing pods at node level using the pod anti affinity.
// the nodes that have the label {"node": "bar"} (match the topology key) and that have existing pods that match the labelSelector get low score
// the nodes that don't have the label {"node": "whatever the value is"} (mismatch the topology key) but that have existing pods that match the labelSelector get high score
// the nodes that have the label {"node": "bar"} (match the topology key) but that have existing pods that mismatch the labelSelector get high score
// there are 2 nodes, say node1 and node2, both nodes have pods that match the labelSelector and have topology-key in node.Labels.
// But there are more pods on node1 that match the preference than node2. Then, node1 get a lower score than node2.
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: awayFromS1InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelAzAz1}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgChina}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: framework.MaxNodeScore}},
name: "Anti Affinity: pod that doesnot match existing pods in node will get high score ",
},
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: awayFromS1InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelAzAz1}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgChina}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: framework.MaxNodeScore}},
name: "Anti Affinity: pod that does not matches topology key & matches the pods in nodes will get higher score comparing to others ",
},
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: awayFromS1InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelAzAz1}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: framework.MaxNodeScore}},
name: "Anti Affinity: one node has more matching pods comparing to other node, so the node which has more unmacthes will get high score",
},
// Test the symmetry cases for anti affinity
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1", Affinity: awayFromS2InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine2", Affinity: awayFromS1InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelAzAz1}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelAzAz2}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: framework.MaxNodeScore}},
name: "Anti Affinity symmetry: the existing pods in node which has anti affinity match will get high score",
},
// Test both affinity and anti-affinity
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegionAwayFromS2InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelAzAz1}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}},
name: "Affinity and Anti Affinity: considered only preferredDuringSchedulingIgnoredDuringExecution in both pod affinity & anti affinity",
},
// Combined cases considering both affinity and anti-affinity, the pod to schedule and existing pods have the same labels (they are in the same RC/service),
// the pod prefer to run together with its brother pods in the same region, but wants to stay away from them at node level,
// so that all the pods of a RC/service can stay in a same region but trying to separate with each other
// machine-1,machine-3,machine-4 are in ChinaRegion others machin-2,machine-5 are in IndiaRegion
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegionAwayFromS2InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine3"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine3"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine4"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine5"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChinaAzAz1}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelRgChina}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine4", Labels: labelRgChina}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine5", Labels: labelRgIndia}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 40}, {Name: "machine3", Score: framework.MaxNodeScore}, {Name: "machine4", Score: framework.MaxNodeScore}, {Name: "machine5", Score: 40}},
name: "Affinity and Anti Affinity: considering both affinity and anti-affinity, the pod to schedule and existing pods have the same labels",
},
// Consider Affinity, Anti Affinity and symmetry together.
// for Affinity, the weights are: 8, 0, 0, 0
// for Anti Affinity, the weights are: 0, -5, 0, 0
// for Affinity symmetry, the weights are: 0, 0, 8, 0
// for Anti Affinity symmetry, the weights are: 0, 0, 0, -5
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegionAwayFromS2InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
{Spec: v1.PodSpec{NodeName: "machine3", Affinity: stayWithS1InRegionAwayFromS2InAz}},
{Spec: v1.PodSpec{NodeName: "machine4", Affinity: awayFromS1InAz}},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelAzAz1}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelRgIndia}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine4", Labels: labelAzAz2}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: framework.MaxNodeScore}, {Name: "machine4", Score: 0}},
name: "Affinity and Anti Affinity and symmetry: considered only preferredDuringSchedulingIgnoredDuringExecution in both pod affinity & anti affinity & symmetry",
},
// Cover https://github.com/kubernetes/kubernetes/issues/82796 which panics upon:
// 1. Some nodes in a topology don't have pods with affinity, but other nodes in the same topology have.
// 2. The incoming pod doesn't have affinity.
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine2", Affinity: stayWithS1InRegionAwayFromS2InAz}},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgChina}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: framework.MaxNodeScore}},
name: "Avoid panic when partial nodes in a topology don't have pods with affinity",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
allNodes := append([]*v1.Node{}, test.nodes...)
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, test.nodes))
meta := &priorityMetadata{
topologyScore: buildTopologyPairToScore(test.pod, snapshot, allNodes, v1.DefaultHardPodAffinitySymmetricWeight),
}
var gotList framework.NodeScoreList
for _, n := range test.nodes {
nodeName := n.Name
nodeScore, err := CalculateInterPodAffinityPriorityMap(test.pod, meta, snapshot.NodeInfoMap[nodeName])
if err != nil {
t.Error(err)
}
gotList = append(gotList, nodeScore)
}
CalculateInterPodAffinityPriorityReduce(test.pod, meta, snapshot, gotList)
if !reflect.DeepEqual(gotList, test.expectedList) {
t.Errorf("CalculateInterPodAffinityPriority() = %#v, want %#v", gotList, test.expectedList)
}
})
}
}
func TestHardPodAffinitySymmetricWeight(t *testing.T) {
podLabelServiceS1 := map[string]string{
"service": "S1",
}
labelRgChina := map[string]string{
"region": "China",
}
labelRgIndia := map[string]string{
"region": "India",
}
labelAzAz1 := map[string]string{
"az": "az1",
}
hardPodAffinity := &v1.Affinity{
PodAffinity: &v1.PodAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "service",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"S1"},
},
},
},
TopologyKey: "region",
},
},
},
}
tests := []struct {
pod *v1.Pod
pods []*v1.Pod
nodes []*v1.Node
hardPodAffinityWeight int32
expectedList framework.NodeScoreList
name string
}{
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelServiceS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1", Affinity: hardPodAffinity}},
{Spec: v1.PodSpec{NodeName: "machine2", Affinity: hardPodAffinity}},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}},
},
hardPodAffinityWeight: v1.DefaultHardPodAffinitySymmetricWeight,
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: framework.MaxNodeScore}, {Name: "machine3", Score: 0}},
name: "Hard Pod Affinity symmetry: hard pod affinity symmetry weights 1 by default, then nodes that match the hard pod affinity symmetry rules, get a high score",
},
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelServiceS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1", Affinity: hardPodAffinity}},
{Spec: v1.PodSpec{NodeName: "machine2", Affinity: hardPodAffinity}},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}},
},
hardPodAffinityWeight: 0,
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: 0}},
name: "Hard Pod Affinity symmetry: hard pod affinity symmetry is closed(weights 0), then nodes that match the hard pod affinity symmetry rules, get same score with those not match",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
allNodes := append([]*v1.Node{}, test.nodes...)
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, test.nodes))
meta := &priorityMetadata{
topologyScore: buildTopologyPairToScore(test.pod, snapshot, allNodes, test.hardPodAffinityWeight),
}
var gotList framework.NodeScoreList
for _, n := range test.nodes {
nodeName := n.Name
nodeScore, err := CalculateInterPodAffinityPriorityMap(test.pod, meta, snapshot.NodeInfoMap[nodeName])
if err != nil {
t.Error(err)
}
gotList = append(gotList, nodeScore)
}
CalculateInterPodAffinityPriorityReduce(test.pod, meta, snapshot, gotList)
if !reflect.DeepEqual(gotList, test.expectedList) {
t.Errorf("CalculateInterPodAffinityPriority() = %#v, want %#v", gotList, test.expectedList)
}
})
}
}
func BenchmarkInterPodAffinityPriority(b *testing.B) {
tests := []struct {
name string
pod *v1.Pod
existingPodsNum int
allNodesNum int
prepFunc func(existingPodsNum, allNodesNum int) (existingPods []*v1.Pod, allNodes []*v1.Node)
}{
{
name: "1000nodes/incoming pod without PodAffinity and existing pods without PodAffinity",
pod: st.MakePod().Name("p").Label("foo", "").Obj(),
existingPodsNum: 10000,
allNodesNum: 1000,
prepFunc: st.MakeNodesAndPods,
},
{
name: "1000nodes/incoming pod with PodAffinity and existing pods without PodAffinity",
pod: st.MakePod().Name("p").Label("foo", "").PodAffinityExists("foo", "zone", st.PodAffinityWithPreferredReq).Obj(),
existingPodsNum: 10000,
allNodesNum: 1000,
prepFunc: st.MakeNodesAndPods,
},
{
name: "1000nodes/incoming pod without PodAffinity and existing pods with PodAffinity",
pod: st.MakePod().Name("p").Label("foo", "").Obj(),
existingPodsNum: 10000,
allNodesNum: 1000,
prepFunc: st.MakeNodesAndPodsForPodAffinity,
},
{
name: "1000nodes/incoming pod with PodAffinity and existing pods with PodAffinity",
pod: st.MakePod().Name("p").Label("foo", "").PodAffinityExists("foo", "zone", st.PodAffinityWithPreferredReq).Obj(),
existingPodsNum: 10000,
allNodesNum: 1000,
prepFunc: st.MakeNodesAndPodsForPodAffinity,
},
}
for _, test := range tests {
b.Run(test.name, func(b *testing.B) {
existingPods, allNodes := test.prepFunc(test.existingPodsNum, test.allNodesNum)
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(existingPods, allNodes))
meta := &priorityMetadata{
topologyScore: buildTopologyPairToScore(test.pod, snapshot, allNodes, v1.DefaultHardPodAffinitySymmetricWeight),
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
var gotList framework.NodeScoreList
for _, n := range allNodes {
nodeName := n.Name
nodeScore, _ := CalculateInterPodAffinityPriorityMap(test.pod, meta, snapshot.NodeInfoMap[nodeName])
gotList = append(gotList, nodeScore)
}
CalculateInterPodAffinityPriorityReduce(test.pod, meta, snapshot, gotList)
}
})
}
}

View File

@ -64,7 +64,6 @@ type priorityMetadata struct {
podFirstServiceSelector labels.Selector
totalNumNodes int
podTopologySpreadMap *podTopologySpreadMap
topologyScore topologyPairToScore
}
// PriorityMetadata is a MetadataProducer. Node info can be nil.
@ -99,7 +98,6 @@ func (pmf *MetadataFactory) PriorityMetadata(
podFirstServiceSelector: getFirstServiceSelector(pod, pmf.serviceLister),
totalNumNodes: totalNumNodes,
podTopologySpreadMap: tpSpreadMap,
topologyScore: buildTopologyPairToScore(pod, sharedLister, filteredNodes, pmf.hardPodAffinityWeight),
}
}

View File

@ -124,6 +124,9 @@ func TestCompatibility(t *testing.T) {
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 1},
{Name: "ImageLocality", Weight: 1},
@ -161,6 +164,9 @@ func TestCompatibility(t *testing.T) {
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 1},
{Name: "ImageLocality", Weight: 1},
@ -198,6 +204,9 @@ func TestCompatibility(t *testing.T) {
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 1},
{Name: "ImageLocality", Weight: 1},

View File

@ -70,7 +70,7 @@ func init() {
)
// pods should be placed in the same topological domain (e.g. same node, same rack, same zone, same power domain, etc.)
// as some other pods, or, conversely, should not be placed in the same topological domain as some other pods.
scheduler.RegisterPriorityMapReduceFunction(priorities.InterPodAffinityPriority, priorities.CalculateInterPodAffinityPriorityMap, priorities.CalculateInterPodAffinityPriorityReduce, 1)
scheduler.RegisterPriorityMapReduceFunction(priorities.InterPodAffinityPriority, nil, nil, 1)
// Prioritize nodes by least requested utilization.
scheduler.RegisterPriorityMapReduceFunction(priorities.LeastRequestedPriority, priorities.LeastRequestedPriorityMap, nil, 1)

View File

@ -313,6 +313,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2},
{Name: "ImageLocality", Weight: 2},
@ -383,6 +386,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2},
{Name: "ImageLocality", Weight: 2},
@ -464,6 +470,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2},
{Name: "ImageLocality", Weight: 2},
@ -556,6 +565,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2},
{Name: "ImageLocality", Weight: 2},
@ -650,6 +662,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2},
{Name: "ImageLocality", Weight: 2},
@ -747,6 +762,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2},
{Name: "ImageLocality", Weight: 2},
@ -856,6 +874,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2},
{Name: "ImageLocality", Weight: 2},
@ -968,6 +989,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2},
{Name: "ImageLocality", Weight: 2},
@ -1080,6 +1104,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2},
{Name: "ImageLocality", Weight: 2},
@ -1196,6 +1223,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2},
{Name: "ImageLocality", Weight: 2},

View File

@ -44,6 +44,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
"k8s.io/kubernetes/pkg/scheduler/core"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
@ -113,11 +114,6 @@ type Configurator struct {
configProducerArgs *plugins.ConfigProducerArgs
}
// GetHardPodAffinitySymmetricWeight is exposed for testing.
func (c *Configurator) GetHardPodAffinitySymmetricWeight() int32 {
return c.hardPodAffinitySymmetricWeight
}
// Create creates a scheduler with the default algorithm provider.
func (c *Configurator) Create() (*Scheduler, error) {
return c.CreateFromProvider(schedulerapi.SchedulerDefaultProviderName)
@ -221,8 +217,12 @@ func (c *Configurator) CreateFromConfig(policy schedulerapi.Policy) (*Scheduler,
func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Scheduler, error) {
klog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)
if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 {
return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 1-100", c.GetHardPodAffinitySymmetricWeight())
if c.hardPodAffinitySymmetricWeight < 1 || c.hardPodAffinitySymmetricWeight > 100 {
return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 1-100", c.hardPodAffinitySymmetricWeight)
}
c.configProducerArgs.InterPodAffinityArgs = &interpodaffinity.Args{
HardPodAffinityWeight: c.hardPodAffinitySymmetricWeight,
}
predicateFuncs, pluginsForPredicates, pluginConfigForPredicates, err := c.getPredicateConfigs(predicateKeys)

View File

@ -116,7 +116,7 @@ func TestCreateFromConfig(t *testing.T) {
if err != nil {
t.Fatalf("CreateFromConfig failed: %v", err)
}
hpa := factory.GetHardPodAffinitySymmetricWeight()
hpa := factory.hardPodAffinitySymmetricWeight
if hpa != v1.DefaultHardPodAffinitySymmetricWeight {
t.Errorf("Wrong hardPodAffinitySymmetricWeight, ecpected: %d, got: %d", v1.DefaultHardPodAffinitySymmetricWeight, hpa)
}
@ -205,7 +205,7 @@ func TestCreateFromConfigWithHardPodAffinitySymmetricWeight(t *testing.T) {
t.Errorf("Invalid configuration: %v", err)
}
factory.CreateFromConfig(policy)
hpa := factory.GetHardPodAffinitySymmetricWeight()
hpa := factory.hardPodAffinitySymmetricWeight
if hpa != 10 {
t.Errorf("Wrong hardPodAffinitySymmetricWeight, ecpected: %d, got: %d", 10, hpa)
}

View File

@ -101,6 +101,8 @@ type ConfigProducerArgs struct {
ServiceAffinityArgs *serviceaffinity.Args
// NodeResourcesFitArgs is the args for the NodeResources fit filter.
NodeResourcesFitArgs *noderesources.FitArgs
// InterPodAffinityArgs is the args for InterPodAffinity plugin
InterPodAffinityArgs *interpodaffinity.Args
}
// ConfigProducer produces a framework's configuration.
@ -254,7 +256,9 @@ func NewDefaultConfigProducerRegistry() *ConfigProducerRegistry {
})
registry.RegisterPriority(priorities.InterPodAffinityPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.PostFilter = appendToPluginSet(plugins.PostFilter, interpodaffinity.Name, nil)
plugins.Score = appendToPluginSet(plugins.Score, interpodaffinity.Name, &args.Weight)
pluginConfig = append(pluginConfig, makePluginConfig(interpodaffinity.Name, args.InterPodAffinityArgs))
return
})
registry.RegisterPriority(priorities.NodePreferAvoidPodsPriority,

View File

@ -7,13 +7,18 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/framework/plugins/migration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
@ -24,14 +29,11 @@ go_test(
embed = [":go_default_library"],
deps = [
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/framework/plugins/migration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
],
)

View File

@ -19,36 +19,52 @@ package interpodaffinity
import (
"context"
"fmt"
"sync"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
)
// InterPodAffinity is a plugin that checks inter pod affinity
type InterPodAffinity struct {
snapshotSharedLister schedulerlisters.SharedLister
podAffinityChecker *predicates.PodAffinityChecker
sharedLister schedulerlisters.SharedLister
podAffinityChecker *predicates.PodAffinityChecker
hardPodAffinityWeight int32
sync.Mutex
}
// Args holds the args that are used to configure the plugin.
type Args struct {
HardPodAffinityWeight int32 `json:"hardPodAffinityWeight,omitempty"`
}
var _ framework.PreFilterPlugin = &InterPodAffinity{}
var _ framework.FilterPlugin = &InterPodAffinity{}
var _ framework.PostFilterPlugin = &InterPodAffinity{}
var _ framework.ScorePlugin = &InterPodAffinity{}
const (
// Name is the name of the plugin used in the plugin registry and configurations.
Name = "InterPodAffinity"
// preFilterStateKey is the key in CycleState to InterPodAffinity pre-computed data.
// preFilterStateKey is the key in CycleState to InterPodAffinity pre-computed data for Filtering.
// Using the name of the plugin will likely help us avoid collisions with other plugins.
preFilterStateKey = "PreFilter" + Name
// postFilterStateKey is the key in CycleState to InterPodAffinity pre-computed data for Scoring.
postFilterStateKey = "PostFilter" + Name
)
// preFilterState computed at PreFilter and used at Filter.
@ -75,10 +91,10 @@ func (pl *InterPodAffinity) PreFilter(ctx context.Context, cycleState *framework
var allNodes []*nodeinfo.NodeInfo
var havePodsWithAffinityNodes []*nodeinfo.NodeInfo
var err error
if allNodes, err = pl.snapshotSharedLister.NodeInfos().List(); err != nil {
if allNodes, err = pl.sharedLister.NodeInfos().List(); err != nil {
return framework.NewStatus(framework.Error, fmt.Sprintf("failed to list NodeInfos: %v", err))
}
if havePodsWithAffinityNodes, err = pl.snapshotSharedLister.NodeInfos().HavePodsWithAffinityList(); err != nil {
if havePodsWithAffinityNodes, err = pl.sharedLister.NodeInfos().HavePodsWithAffinityList(); err != nil {
return framework.NewStatus(framework.Error, fmt.Sprintf("failed to list NodeInfos with pods with affinity: %v", err))
}
if meta, err = predicates.GetPodAffinityMetadata(pod, allNodes, havePodsWithAffinityNodes); err != nil {
@ -143,25 +159,299 @@ func (pl *InterPodAffinity) Filter(ctx context.Context, cycleState *framework.Cy
return migration.PredicateResultToFrameworkStatus(reasons, err)
}
// A "processed" representation of v1.WeightedAffinityTerm.
type weightedAffinityTerm struct {
namespaces sets.String
selector labels.Selector
weight int32
topologyKey string
}
// postFilterState computed at PostFilter and used at Score.
type postFilterState struct {
topologyScore map[string]map[string]int64
affinityTerms []*weightedAffinityTerm
antiAffinityTerms []*weightedAffinityTerm
}
// Clone implements the mandatory Clone interface. We don't really copy the data since
// there is no need for that.
func (s *postFilterState) Clone() framework.StateData {
return s
}
func newWeightedAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm, weight int32) (*weightedAffinityTerm, error) {
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(pod, term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
return nil, err
}
return &weightedAffinityTerm{namespaces: namespaces, selector: selector, topologyKey: term.TopologyKey, weight: weight}, nil
}
func getProcessedTerms(pod *v1.Pod, terms []v1.WeightedPodAffinityTerm) ([]*weightedAffinityTerm, error) {
if terms == nil {
return nil, nil
}
var processedTerms []*weightedAffinityTerm
for i := range terms {
p, err := newWeightedAffinityTerm(pod, &terms[i].PodAffinityTerm, terms[i].Weight)
if err != nil {
return nil, err
}
processedTerms = append(processedTerms, p)
}
return processedTerms, nil
}
func (pl *InterPodAffinity) processTerm(
state *postFilterState,
term *weightedAffinityTerm,
podToCheck *v1.Pod,
fixedNode *v1.Node,
multiplier int,
) {
if len(fixedNode.Labels) == 0 {
return
}
match := priorityutil.PodMatchesTermsNamespaceAndSelector(podToCheck, term.namespaces, term.selector)
tpValue, tpValueExist := fixedNode.Labels[term.topologyKey]
if match && tpValueExist {
pl.Lock()
if state.topologyScore[term.topologyKey] == nil {
state.topologyScore[term.topologyKey] = make(map[string]int64)
}
state.topologyScore[term.topologyKey][tpValue] += int64(term.weight * int32(multiplier))
pl.Unlock()
}
return
}
func (pl *InterPodAffinity) processTerms(state *postFilterState, terms []*weightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) error {
for _, term := range terms {
pl.processTerm(state, term, podToCheck, fixedNode, multiplier)
}
return nil
}
func (pl *InterPodAffinity) processExistingPod(state *postFilterState, existingPod *v1.Pod, existingPodNodeInfo *nodeinfo.NodeInfo, incomingPod *v1.Pod) error {
existingPodAffinity := existingPod.Spec.Affinity
existingHasAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAffinity != nil
existingHasAntiAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAntiAffinity != nil
existingPodNode := existingPodNodeInfo.Node()
// For every soft pod affinity term of <pod>, if <existingPod> matches the term,
// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPods>`s node by the term`s weight.
pl.processTerms(state, state.affinityTerms, existingPod, existingPodNode, 1)
// For every soft pod anti-affinity term of <pod>, if <existingPod> matches the term,
// decrement <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>`s node by the term`s weight.
pl.processTerms(state, state.antiAffinityTerms, existingPod, existingPodNode, -1)
if existingHasAffinityConstraints {
// For every hard pod affinity term of <existingPod>, if <pod> matches the term,
// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the constant <ipa.hardPodAffinityWeight>
if pl.hardPodAffinityWeight > 0 {
terms := existingPodAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
for i := range terms {
term := &terms[i]
processedTerm, err := newWeightedAffinityTerm(existingPod, term, pl.hardPodAffinityWeight)
if err != nil {
return err
}
pl.processTerm(state, processedTerm, incomingPod, existingPodNode, 1)
}
}
// For every soft pod affinity term of <existingPod>, if <pod> matches the term,
// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
terms, err := getProcessedTerms(existingPod, existingPodAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
if err != nil {
klog.Error(err)
return nil
}
pl.processTerms(state, terms, incomingPod, existingPodNode, 1)
}
if existingHasAntiAffinityConstraints {
// For every soft pod anti-affinity term of <existingPod>, if <pod> matches the term,
// decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
terms, err := getProcessedTerms(existingPod, existingPodAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
if err != nil {
return err
}
pl.processTerms(state, terms, incomingPod, existingPodNode, -1)
}
return nil
}
// PostFilter builds and writes cycle state used by Score and NormalizeScore.
func (pl *InterPodAffinity) PostFilter(
pCtx context.Context,
cycleState *framework.CycleState,
pod *v1.Pod,
nodes []*v1.Node,
_ framework.NodeToStatusMap,
) *framework.Status {
if len(nodes) == 0 {
// No nodes to score.
return nil
}
if pl.sharedLister == nil {
return framework.NewStatus(framework.Error, fmt.Sprintf("BuildTopologyPairToScore with empty shared lister"))
}
affinity := pod.Spec.Affinity
hasAffinityConstraints := affinity != nil && affinity.PodAffinity != nil
hasAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil
// Unless the pod being scheduled has affinity terms, we only
// need to process nodes hosting pods with affinity.
allNodes, err := pl.sharedLister.NodeInfos().HavePodsWithAffinityList()
if err != nil {
framework.NewStatus(framework.Error, fmt.Sprintf("get pods with affinity list error, err: %v", err))
}
if hasAffinityConstraints || hasAntiAffinityConstraints {
allNodes, err = pl.sharedLister.NodeInfos().List()
if err != nil {
framework.NewStatus(framework.Error, fmt.Sprintf("get all nodes from shared lister error, err: %v", err))
}
}
var affinityTerms []*weightedAffinityTerm
var antiAffinityTerms []*weightedAffinityTerm
if hasAffinityConstraints {
if affinityTerms, err = getProcessedTerms(pod, affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution); err != nil {
klog.Error(err)
return nil
}
}
if hasAntiAffinityConstraints {
if antiAffinityTerms, err = getProcessedTerms(pod, affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution); err != nil {
klog.Error(err)
return nil
}
}
state := &postFilterState{
topologyScore: make(map[string]map[string]int64),
affinityTerms: affinityTerms,
antiAffinityTerms: antiAffinityTerms,
}
errCh := schedutil.NewErrorChannel()
ctx, cancel := context.WithCancel(pCtx)
processNode := func(i int) {
nodeInfo := allNodes[i]
if nodeInfo.Node() == nil {
return
}
// Unless the pod being scheduled has affinity terms, we only
// need to process pods with affinity in the node.
podsToProcess := nodeInfo.PodsWithAffinity()
if hasAffinityConstraints || hasAntiAffinityConstraints {
// We need to process all the pods.
podsToProcess = nodeInfo.Pods()
}
for _, existingPod := range podsToProcess {
if err := pl.processExistingPod(state, existingPod, nodeInfo, pod); err != nil {
errCh.SendErrorWithCancel(err, cancel)
return
}
}
}
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode)
if err := errCh.ReceiveError(); err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
cycleState.Write(postFilterStateKey, state)
return nil
}
func getPostFilterState(cycleState *framework.CycleState) (*postFilterState, error) {
c, err := cycleState.Read(postFilterStateKey)
if err != nil {
return nil, fmt.Errorf("Error reading %q from cycleState: %v", preFilterStateKey, err)
}
s, ok := c.(*postFilterState)
if !ok {
return nil, fmt.Errorf("%+v convert to interpodaffinity.postFilterState error", c)
}
return s, nil
}
// Score invoked at the Score extension point.
// The "score" returned in this function is the matching number of pods on the `nodeName`,
// it is normalized later.
func (pl *InterPodAffinity) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, err := pl.snapshotSharedLister.NodeInfos().Get(nodeName)
func (pl *InterPodAffinity) Score(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName)
if err != nil || nodeInfo.Node() == nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v, node is nil: %v", nodeName, err, nodeInfo.Node() == nil))
}
node := nodeInfo.Node()
s, err := getPostFilterState(cycleState)
if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
return 0, framework.NewStatus(framework.Error, err.Error())
}
var score int64
for tpKey, tpValues := range s.topologyScore {
if v, exist := node.Labels[tpKey]; exist {
score += tpValues[v]
}
}
meta := migration.PriorityMetadata(state)
s, err := priorities.CalculateInterPodAffinityPriorityMap(pod, meta, nodeInfo)
return s.Score, migration.ErrorToFrameworkStatus(err)
return score, nil
}
// NormalizeScore invoked after scoring all nodes.
func (pl *InterPodAffinity) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
meta := migration.PriorityMetadata(state)
err := priorities.CalculateInterPodAffinityPriorityReduce(pod, meta, pl.snapshotSharedLister, scores)
return migration.ErrorToFrameworkStatus(err)
// NormalizeScore normalizes the score for each filteredNode.
// The basic rule is: the bigger the score(matching number of pods) is, the smaller the
// final normalized score will be.
func (pl *InterPodAffinity) NormalizeScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
s, err := getPostFilterState(cycleState)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
if len(s.topologyScore) == 0 {
return nil
}
var maxCount, minCount int64
for i := range scores {
score := scores[i].Score
if score > maxCount {
maxCount = score
}
if score < minCount {
minCount = score
}
}
maxMinDiff := maxCount - minCount
for i := range scores {
fScore := float64(0)
if maxMinDiff > 0 {
fScore = float64(framework.MaxNodeScore) * (float64(scores[i].Score-minCount) / float64(maxMinDiff))
}
scores[i].Score = int64(fScore)
}
return nil
}
// ScoreExtensions of the Score plugin.
@ -170,12 +460,19 @@ func (pl *InterPodAffinity) ScoreExtensions() framework.ScoreExtensions {
}
// New initializes a new plugin and returns it.
func New(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) {
func New(plArgs *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) {
if h.SnapshotSharedLister() == nil {
return nil, fmt.Errorf("SnapshotSharedlister is nil")
}
args := &Args{}
if err := framework.DecodeInto(plArgs, args); err != nil {
return nil, err
}
return &InterPodAffinity{
snapshotSharedLister: h.SnapshotSharedLister(),
podAffinityChecker: predicates.NewPodAffinityChecker(h.SnapshotSharedLister()),
sharedLister: h.SnapshotSharedLister(),
podAffinityChecker: predicates.NewPodAffinityChecker(h.SnapshotSharedLister()),
hardPodAffinityWeight: args.HardPodAffinityWeight,
}, nil
}

View File

@ -18,16 +18,14 @@ package interpodaffinity
import (
"context"
"fmt"
"reflect"
"testing"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)
@ -57,7 +55,7 @@ func createPodWithAffinityTerms(namespace, nodeName string, labels map[string]st
}
func TestSingleNode(t *testing.T) {
func TestRequiredAffinitySingleNode(t *testing.T) {
podLabel := map[string]string{"service": "securityscan"}
labels1 := map[string]string{
"region": "r1",
@ -783,8 +781,8 @@ func TestSingleNode(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, []*v1.Node{test.node}))
p := &InterPodAffinity{
snapshotSharedLister: snapshot,
podAffinityChecker: predicates.NewPodAffinityChecker(snapshot),
sharedLister: snapshot,
podAffinityChecker: predicates.NewPodAffinityChecker(snapshot),
}
state := framework.NewCycleState()
preFilterStatus := p.PreFilter(context.Background(), state, test.pod)
@ -799,7 +797,7 @@ func TestSingleNode(t *testing.T) {
}
}
func TestMultipleNodes(t *testing.T) {
func TestRequiredAffinityMultipleNodes(t *testing.T) {
podLabelA := map[string]string{
"foo": "bar",
}
@ -1621,8 +1619,8 @@ func TestMultipleNodes(t *testing.T) {
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, test.nodes))
for indexNode, node := range test.nodes {
p := &InterPodAffinity{
snapshotSharedLister: snapshot,
podAffinityChecker: predicates.NewPodAffinityChecker(snapshot),
sharedLister: snapshot,
podAffinityChecker: predicates.NewPodAffinityChecker(snapshot),
}
state := framework.NewCycleState()
preFilterStatus := p.PreFilter(context.Background(), state, test.pod)
@ -1638,7 +1636,7 @@ func TestMultipleNodes(t *testing.T) {
}
}
func TestInterPodAffinityPriority(t *testing.T) {
func TestPreferredAffinity(t *testing.T) {
labelRgChina := map[string]string{
"region": "China",
}
@ -2127,35 +2125,27 @@ func TestInterPodAffinityPriority(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
state := framework.NewCycleState()
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, test.nodes))
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot))
p := &InterPodAffinity{
sharedLister: snapshot,
podAffinityChecker: predicates.NewPodAffinityChecker(snapshot),
hardPodAffinityWeight: 1,
}
client := clientsetfake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
metaDataProducer := priorities.NewMetadataFactory(
informerFactory.Core().V1().Services().Lister(),
informerFactory.Core().V1().ReplicationControllers().Lister(),
informerFactory.Apps().V1().ReplicaSets().Lister(),
informerFactory.Apps().V1().StatefulSets().Lister(),
1,
)
metaData := metaDataProducer(test.pod, test.nodes, snapshot)
state.Write(migration.PrioritiesStateKey, &migration.PrioritiesStateData{Reference: metaData})
p, _ := New(nil, fh)
status := p.PostFilter(context.Background(), state, test.pod, test.nodes, nil)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
var gotList framework.NodeScoreList
for _, n := range test.nodes {
nodeName := n.ObjectMeta.Name
score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.pod, nodeName)
score, status := p.Score(context.Background(), state, test.pod, nodeName)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
gotList = append(gotList, framework.NodeScore{Name: nodeName, Score: score})
}
status := p.(framework.ScorePlugin).ScoreExtensions().NormalizeScore(context.Background(), state, test.pod, gotList)
status = p.ScoreExtensions().NormalizeScore(context.Background(), state, test.pod, gotList)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
@ -2168,7 +2158,7 @@ func TestInterPodAffinityPriority(t *testing.T) {
}
}
func TestHardPodAffinitySymmetricWeight(t *testing.T) {
func TestPreferredAffinityWithHardPodAffinitySymmetricWeight(t *testing.T) {
podLabelServiceS1 := map[string]string{
"service": "S1",
}
@ -2244,22 +2234,12 @@ func TestHardPodAffinitySymmetricWeight(t *testing.T) {
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, test.nodes))
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot))
client := clientsetfake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
metaDataProducer := priorities.NewMetadataFactory(
informerFactory.Core().V1().Services().Lister(),
informerFactory.Core().V1().ReplicationControllers().Lister(),
informerFactory.Apps().V1().ReplicaSets().Lister(),
informerFactory.Apps().V1().StatefulSets().Lister(),
test.hardPodAffinityWeight,
)
metaData := metaDataProducer(test.pod, test.nodes, snapshot)
state.Write(migration.PrioritiesStateKey, &migration.PrioritiesStateData{Reference: metaData})
p, _ := New(nil, fh)
args := &runtime.Unknown{Raw: []byte(fmt.Sprintf(`{"hardPodAffinityWeight":%d}`, test.hardPodAffinityWeight))}
p, _ := New(args, fh)
status := p.(framework.PostFilterPlugin).PostFilter(context.Background(), state, test.pod, test.nodes, nil)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
var gotList framework.NodeScoreList
for _, n := range test.nodes {
nodeName := n.ObjectMeta.Name
@ -2270,7 +2250,7 @@ func TestHardPodAffinitySymmetricWeight(t *testing.T) {
gotList = append(gotList, framework.NodeScore{Name: nodeName, Score: score})
}
status := p.(framework.ScorePlugin).ScoreExtensions().NormalizeScore(context.Background(), state, test.pod, gotList)
status = p.(framework.ScorePlugin).ScoreExtensions().NormalizeScore(context.Background(), state, test.pod, gotList)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
@ -2282,7 +2262,7 @@ func TestHardPodAffinitySymmetricWeight(t *testing.T) {
}
}
func TestStateAddRemovePod(t *testing.T) {
func TestPreFilterStateAddRemovePod(t *testing.T) {
var label1 = map[string]string{
"region": "r1",
"zone": "z11",
@ -2511,8 +2491,8 @@ func TestStateAddRemovePod(t *testing.T) {
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(pods, test.nodes))
p := &InterPodAffinity{
snapshotSharedLister: snapshot,
podAffinityChecker: predicates.NewPodAffinityChecker(snapshot),
sharedLister: snapshot,
podAffinityChecker: predicates.NewPodAffinityChecker(snapshot),
}
cycleState := framework.NewCycleState()
preFilterStatus := p.PreFilter(context.Background(), cycleState, test.pendingPod)

View File

@ -146,6 +146,9 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 1},
{Name: "ImageLocality", Weight: 1},
@ -222,6 +225,9 @@ kind: Policy
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 1},
{Name: "ImageLocality", Weight: 1},