Merge pull request #84669 from draveness/feature/interpod-affinity-with-map-reduce-pattern

feat: update interpod affinity priority with map reduce pattern
This commit is contained in:
Kubernetes Prow Robot 2019-11-07 04:35:40 -08:00 committed by GitHub
commit 204fc6bd28
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 168 additions and 104 deletions

View File

@ -18,6 +18,7 @@ package priorities
import (
"context"
"fmt"
"sync"
v1 "k8s.io/api/core/v1"
@ -26,24 +27,12 @@ import (
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/klog"
)
// InterPodAffinity contains information to calculate inter pod affinity.
type InterPodAffinity struct {
hardPodAffinityWeight int32
}
// NewInterPodAffinityPriority creates an InterPodAffinity.
func NewInterPodAffinityPriority(hardPodAffinityWeight int32) PriorityFunction {
interPodAffinity := &InterPodAffinity{
hardPodAffinityWeight: hardPodAffinityWeight,
}
return interPodAffinity.CalculateInterPodAffinityPriority
}
type topologyPairToScore map[string]map[string]int64
type podAffinityPriorityMap struct {
@ -94,33 +83,97 @@ func (p *podAffinityPriorityMap) processTerms(terms []v1.WeightedPodAffinityTerm
return nil
}
// CalculateInterPodAffinityPriority compute a sum by iterating through the elements of weightedPodAffinityTerm and adding
// "weight" to the sum if the corresponding PodAffinityTerm is satisfied for
// that node; the node(s) with the highest sum are the most preferred.
// Symmetry need to be considered for preferredDuringSchedulingIgnoredDuringExecution from podAffinity & podAntiAffinity,
// symmetry need to be considered for hard requirements from podAffinity
func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (framework.NodeScoreList, error) {
// CalculateInterPodAffinityPriorityMap calculate the number of matching pods on the passed-in "node",
// and return the number as Score.
func CalculateInterPodAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
node := nodeInfo.Node()
if node == nil {
return framework.NodeScore{}, fmt.Errorf("node not found")
}
var topologyScore topologyPairToScore
if priorityMeta, ok := meta.(*priorityMetadata); ok {
topologyScore = priorityMeta.topologyScore
}
var score int64
for tpKey, tpValues := range topologyScore {
if v, exist := node.Labels[tpKey]; exist {
score += tpValues[v]
}
}
return framework.NodeScore{Name: node.Name, Score: score}, nil
}
// CalculateInterPodAffinityPriorityReduce normalizes the score for each filteredNode,
// The basic rule is: the bigger the score(matching number of pods) is, the smaller the
// final normalized score will be.
func CalculateInterPodAffinityPriorityReduce(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister,
result framework.NodeScoreList) error {
var topologyScore topologyPairToScore
if priorityMeta, ok := meta.(*priorityMetadata); ok {
topologyScore = priorityMeta.topologyScore
}
if len(topologyScore) == 0 {
return nil
}
var maxCount, minCount int64
for i := range result {
score := result[i].Score
if score > maxCount {
maxCount = score
}
if score < minCount {
minCount = score
}
}
maxMinDiff := maxCount - minCount
for i := range result {
fScore := float64(0)
if maxMinDiff > 0 {
fScore = float64(framework.MaxNodeScore) * (float64(result[i].Score-minCount) / float64(maxMinDiff))
}
result[i].Score = int64(fScore)
}
return nil
}
func buildTopologyPairToScore(
pod *v1.Pod,
sharedLister schedulerlisters.SharedLister,
filteredNodes []*v1.Node,
hardPodAffinityWeight int32,
) topologyPairToScore {
if sharedLister == nil {
klog.Error("BuildTopologyPairToScore with empty shared lister")
return nil
}
affinity := pod.Spec.Affinity
hasAffinityConstraints := affinity != nil && affinity.PodAffinity != nil
hasAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil
// pm stores (1) all nodes that should be considered and (2) the so-far computed score for each node.
pm := newPodAffinityPriorityMap(nodes)
pm := newPodAffinityPriorityMap(filteredNodes)
allNodes, err := sharedLister.NodeInfos().HavePodsWithAffinityList()
if err != nil {
return nil, err
klog.Errorf("get pods with affinity list error, err: %v", err)
return nil
}
if hasAffinityConstraints || hasAntiAffinityConstraints {
allNodes, err = sharedLister.NodeInfos().List()
if err != nil {
return nil, err
klog.Errorf("get all nodes from shared lister error, err: %v", err)
return nil
}
}
// convert the topology key based weights to the node name based weights
var maxCount, minCount int64
processPod := func(existingPod *v1.Pod) error {
existingPodNodeInfo, err := sharedLister.NodeInfos().Get(existingPod.Spec.NodeName)
if err != nil {
@ -155,14 +208,14 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, shar
// For every hard pod affinity term of <existingPod>, if <pod> matches the term,
// increment <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the constant <ipa.hardPodAffinityWeight>
if ipa.hardPodAffinityWeight > 0 {
if hardPodAffinityWeight > 0 {
terms := existingPodAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
for _, term := range terms {
if err := pm.processTerm(&term, existingPod, pod, existingPodNode, int64(ipa.hardPodAffinityWeight)); err != nil {
if err := pm.processTerm(&term, existingPod, pod, existingPodNode, int64(hardPodAffinityWeight)); err != nil {
return err
}
}
@ -214,38 +267,9 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, shar
}
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode)
if err := errCh.ReceiveError(); err != nil {
return nil, err
klog.Error(err)
return nil
}
counts := make([]int64, len(nodes))
for i := range nodes {
if nodes[i].Labels != nil {
for tpKey, tpValues := range pm.topologyScore {
if v, exist := nodes[i].Labels[tpKey]; exist {
counts[i] += tpValues[v]
}
}
}
if counts[i] > maxCount {
maxCount = counts[i]
}
if counts[i] < minCount {
minCount = counts[i]
}
}
// calculate final priority score for each node
result := make(framework.NodeScoreList, 0, len(nodes))
maxMinDiff := maxCount - minCount
for i, node := range nodes {
fScore := float64(0)
if maxMinDiff > 0 {
fScore = float64(framework.MaxNodeScore) * (float64(counts[i]-minCount) / float64(maxCount-minCount))
}
result = append(result, framework.NodeScore{Name: node.Name, Score: int64(fScore)})
if klog.V(10) {
klog.Infof("%v -> %v: InterPodAffinityPriority, Score: (%d)", pod.Name, node.Name, int(fScore))
}
}
return result, nil
return pm.topologyScore
}

View File

@ -514,16 +514,25 @@ func TestInterPodAffinityPriority(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
allNodes := append([]*v1.Node{}, test.nodes...)
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes)
interPodAffinity := InterPodAffinity{
hardPodAffinityWeight: v1.DefaultHardPodAffinitySymmetricWeight,
meta := &priorityMetadata{
topologyScore: buildTopologyPairToScore(test.pod, snapshot, allNodes, v1.DefaultHardPodAffinitySymmetricWeight),
}
list, err := interPodAffinity.CalculateInterPodAffinityPriority(test.pod, snapshot, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
var gotList framework.NodeScoreList
for _, n := range test.nodes {
nodeName := n.Name
nodeScore, err := CalculateInterPodAffinityPriorityMap(test.pod, meta, snapshot.NodeInfoMap[nodeName])
if err != nil {
t.Error(err)
}
gotList = append(gotList, nodeScore)
}
if !reflect.DeepEqual(test.expectedList, list) {
t.Errorf("expected \n\t%#v, \ngot \n\t%#v\n", test.expectedList, list)
CalculateInterPodAffinityPriorityReduce(test.pod, meta, snapshot, gotList)
if !reflect.DeepEqual(gotList, test.expectedList) {
t.Errorf("CalculateInterPodAffinityPriority() = %#v, want %#v", gotList, test.expectedList)
}
})
}
@ -601,16 +610,25 @@ func TestHardPodAffinitySymmetricWeight(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
allNodes := append([]*v1.Node{}, test.nodes...)
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes)
ipa := InterPodAffinity{
hardPodAffinityWeight: test.hardPodAffinityWeight,
meta := &priorityMetadata{
topologyScore: buildTopologyPairToScore(test.pod, snapshot, allNodes, test.hardPodAffinityWeight),
}
list, err := ipa.CalculateInterPodAffinityPriority(test.pod, snapshot, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
var gotList framework.NodeScoreList
for _, n := range test.nodes {
nodeName := n.Name
nodeScore, err := CalculateInterPodAffinityPriorityMap(test.pod, meta, snapshot.NodeInfoMap[nodeName])
if err != nil {
t.Error(err)
}
gotList = append(gotList, nodeScore)
}
if !reflect.DeepEqual(test.expectedList, list) {
t.Errorf("expected \n\t%#v, \ngot \n\t%#v\n", test.expectedList, list)
CalculateInterPodAffinityPriorityReduce(test.pod, meta, snapshot, gotList)
if !reflect.DeepEqual(gotList, test.expectedList) {
t.Errorf("CalculateInterPodAffinityPriority() = %#v, want %#v", gotList, test.expectedList)
}
})
}
@ -654,16 +672,24 @@ func BenchmarkInterPodAffinityPriority(b *testing.B) {
},
}
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
existingPods, allNodes := tt.prepFunc(tt.existingPodsNum, tt.allNodesNum)
for _, test := range tests {
b.Run(test.name, func(b *testing.B) {
existingPods, allNodes := test.prepFunc(test.existingPodsNum, test.allNodesNum)
snapshot := nodeinfosnapshot.NewSnapshot(existingPods, allNodes)
interPodAffinity := InterPodAffinity{
hardPodAffinityWeight: v1.DefaultHardPodAffinitySymmetricWeight,
meta := &priorityMetadata{
topologyScore: buildTopologyPairToScore(test.pod, snapshot, allNodes, v1.DefaultHardPodAffinitySymmetricWeight),
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
interPodAffinity.CalculateInterPodAffinityPriority(tt.pod, snapshot, allNodes)
var gotList framework.NodeScoreList
for _, n := range allNodes {
nodeName := n.Name
nodeScore, _ := CalculateInterPodAffinityPriorityMap(test.pod, meta, snapshot.NodeInfoMap[nodeName])
gotList = append(gotList, nodeScore)
}
CalculateInterPodAffinityPriorityReduce(test.pod, meta, snapshot, gotList)
}
})
}

View File

@ -28,10 +28,11 @@ import (
// PriorityMetadataFactory is a factory to produce PriorityMetadata.
type PriorityMetadataFactory struct {
serviceLister corelisters.ServiceLister
controllerLister corelisters.ReplicationControllerLister
replicaSetLister appslisters.ReplicaSetLister
statefulSetLister appslisters.StatefulSetLister
serviceLister corelisters.ServiceLister
controllerLister corelisters.ReplicationControllerLister
replicaSetLister appslisters.ReplicaSetLister
statefulSetLister appslisters.StatefulSetLister
hardPodAffinityWeight int32
}
// NewPriorityMetadataFactory creates a PriorityMetadataFactory.
@ -40,12 +41,14 @@ func NewPriorityMetadataFactory(
controllerLister corelisters.ReplicationControllerLister,
replicaSetLister appslisters.ReplicaSetLister,
statefulSetLister appslisters.StatefulSetLister,
hardPodAffinityWeight int32,
) PriorityMetadataProducer {
factory := &PriorityMetadataFactory{
serviceLister: serviceLister,
controllerLister: controllerLister,
replicaSetLister: replicaSetLister,
statefulSetLister: statefulSetLister,
serviceLister: serviceLister,
controllerLister: controllerLister,
replicaSetLister: replicaSetLister,
statefulSetLister: statefulSetLister,
hardPodAffinityWeight: hardPodAffinityWeight,
}
return factory.PriorityMetadata
}
@ -60,10 +63,15 @@ type priorityMetadata struct {
podFirstServiceSelector labels.Selector
totalNumNodes int
podTopologySpreadMap *podTopologySpreadMap
topologyScore topologyPairToScore
}
// PriorityMetadata is a PriorityMetadataProducer. Node info can be nil.
func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, filteredNodes []*v1.Node, sharedLister schedulerlisters.SharedLister) interface{} {
func (pmf *PriorityMetadataFactory) PriorityMetadata(
pod *v1.Pod,
filteredNodes []*v1.Node,
sharedLister schedulerlisters.SharedLister,
) interface{} {
// If we cannot compute metadata, just return nil
if pod == nil {
return nil
@ -85,6 +93,7 @@ func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, filteredNodes
podFirstServiceSelector: getFirstServiceSelector(pod, pmf.serviceLister),
totalNumNodes: totalNumNodes,
podTopologySpreadMap: buildPodTopologySpreadMap(pod, filteredNodes, allNodes),
topologyScore: buildTopologyPairToScore(pod, sharedLister, filteredNodes, pmf.hardPodAffinityWeight),
}
}

View File

@ -170,6 +170,7 @@ func TestPriorityMetadata(t *testing.T) {
informerFactory.Core().V1().ReplicationControllers().Lister(),
informerFactory.Apps().V1().ReplicaSets().Lister(),
informerFactory.Apps().V1().StatefulSets().Lister(),
1,
)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {

View File

@ -350,7 +350,9 @@ func TestSelectorSpreadPriority(t *testing.T) {
fakelisters.ServiceLister(test.services),
fakelisters.ControllerLister(test.rcs),
fakelisters.ReplicaSetLister(test.rss),
fakelisters.StatefulSetLister(test.sss))
fakelisters.StatefulSetLister(test.sss),
1,
)
metaData := metaDataProducer(test.pod, nodes, snapshot)
ttp := priorityFunction(selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce, metaData)
@ -587,7 +589,9 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
fakelisters.ServiceLister(test.services),
fakelisters.ControllerLister(test.rcs),
fakelisters.ReplicaSetLister(test.rss),
fakelisters.StatefulSetLister(test.sss))
fakelisters.StatefulSetLister(test.sss),
1,
)
metaData := metaDataProducer(test.pod, nodes, snapshot)
ttp := priorityFunction(selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce, metaData)
list, err := ttp(test.pod, snapshot, makeLabeledNodeList(labeledNodes))
@ -775,7 +779,9 @@ func TestZoneSpreadPriority(t *testing.T) {
fakelisters.ServiceLister(test.services),
fakelisters.ControllerLister(rcs),
fakelisters.ReplicaSetLister(rss),
fakelisters.StatefulSetLister(sss))
fakelisters.StatefulSetLister(sss),
1,
)
metaData := metaDataProducer(test.pod, nodes, snapshot)
ttp := priorityFunction(zoneSpread.CalculateAntiAffinityPriorityMap, zoneSpread.CalculateAntiAffinityPriorityReduce, metaData)
list, err := ttp(test.pod, snapshot, makeLabeledNodeList(test.nodes))

View File

@ -17,7 +17,7 @@ limitations under the License.
package priorities
import (
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"

View File

@ -27,7 +27,7 @@ func init() {
// Register functions that extract metadata used by priorities computations.
scheduler.RegisterPriorityMetadataProducerFactory(
func(args scheduler.PluginFactoryArgs) priorities.PriorityMetadataProducer {
return priorities.NewPriorityMetadataFactory(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister)
return priorities.NewPriorityMetadataFactory(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister, args.HardPodAffinitySymmetricWeight)
})
// ServiceSpreadingPriority is a priority config factory that spreads pods by minimizing
@ -66,15 +66,7 @@ func init() {
)
// pods should be placed in the same topological domain (e.g. same node, same rack, same zone, same power domain, etc.)
// as some other pods, or, conversely, should not be placed in the same topological domain as some other pods.
scheduler.RegisterPriorityConfigFactory(
priorities.InterPodAffinityPriority,
scheduler.PriorityConfigFactory{
Function: func(args scheduler.PluginFactoryArgs) priorities.PriorityFunction {
return priorities.NewInterPodAffinityPriority(args.HardPodAffinitySymmetricWeight)
},
Weight: 1,
},
)
scheduler.RegisterPriorityMapReduceFunction(priorities.InterPodAffinityPriority, priorities.CalculateInterPodAffinityPriorityMap, priorities.CalculateInterPodAffinityPriorityReduce, 1)
// Prioritize nodes by least requested utilization.
scheduler.RegisterPriorityMapReduceFunction(priorities.LeastRequestedPriority, priorities.LeastRequestedPriorityMap, nil, 1)

View File

@ -1005,6 +1005,7 @@ func TestZeroRequest(t *testing.T) {
informerFactory.Core().V1().ReplicationControllers().Lister(),
informerFactory.Apps().V1().ReplicaSets().Lister(),
informerFactory.Apps().V1().StatefulSets().Lister(),
1,
)
metaData := metaDataProducer(test.pod, test.nodes, snapshot)

View File

@ -356,7 +356,9 @@ func TestDefaultPodTopologySpreadScore(t *testing.T) {
fakelisters.ServiceLister(test.services),
fakelisters.ControllerLister(test.rcs),
fakelisters.ReplicaSetLister(test.rss),
fakelisters.StatefulSetLister(test.sss))
fakelisters.StatefulSetLister(test.sss),
1,
)
metaData := metaDataProducer(test.pod, nodes, snapshot)
state := framework.NewCycleState()
@ -612,7 +614,9 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
fakelisters.ServiceLister(test.services),
fakelisters.ControllerLister(test.rcs),
fakelisters.ReplicaSetLister(test.rss),
fakelisters.StatefulSetLister(test.sss))
fakelisters.StatefulSetLister(test.sss),
1,
)
metaData := metaDataProducer(test.pod, nodes, snapshot)
plugin := &DefaultPodTopologySpread{

View File

@ -198,6 +198,7 @@ func TestImageLocalityPriority(t *testing.T) {
informerFactory.Core().V1().ReplicationControllers().Lister(),
informerFactory.Apps().V1().ReplicaSets().Lister(),
informerFactory.Apps().V1().StatefulSets().Lister(),
1,
)
snapshot := nodeinfosnapshot.NewSnapshot(nil, test.nodes)