diff --git a/pkg/scheduler/algorithm/priorities/interpod_affinity.go b/pkg/scheduler/algorithm/priorities/interpod_affinity.go index 75188163fb3..0e45781ce57 100644 --- a/pkg/scheduler/algorithm/priorities/interpod_affinity.go +++ b/pkg/scheduler/algorithm/priorities/interpod_affinity.go @@ -18,6 +18,7 @@ package priorities import ( "context" + "fmt" "sync" v1 "k8s.io/api/core/v1" @@ -26,24 +27,12 @@ import ( priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers" + schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" schedutil "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/klog" ) -// InterPodAffinity contains information to calculate inter pod affinity. -type InterPodAffinity struct { - hardPodAffinityWeight int32 -} - -// NewInterPodAffinityPriority creates an InterPodAffinity. -func NewInterPodAffinityPriority(hardPodAffinityWeight int32) PriorityFunction { - interPodAffinity := &InterPodAffinity{ - hardPodAffinityWeight: hardPodAffinityWeight, - } - return interPodAffinity.CalculateInterPodAffinityPriority -} - type topologyPairToScore map[string]map[string]int64 type podAffinityPriorityMap struct { @@ -94,33 +83,97 @@ func (p *podAffinityPriorityMap) processTerms(terms []v1.WeightedPodAffinityTerm return nil } -// CalculateInterPodAffinityPriority compute a sum by iterating through the elements of weightedPodAffinityTerm and adding -// "weight" to the sum if the corresponding PodAffinityTerm is satisfied for -// that node; the node(s) with the highest sum are the most preferred. -// Symmetry need to be considered for preferredDuringSchedulingIgnoredDuringExecution from podAffinity & podAntiAffinity, -// symmetry need to be considered for hard requirements from podAffinity -func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (framework.NodeScoreList, error) { +// CalculateInterPodAffinityPriorityMap calculate the number of matching pods on the passed-in "node", +// and return the number as Score. +func CalculateInterPodAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) { + node := nodeInfo.Node() + if node == nil { + return framework.NodeScore{}, fmt.Errorf("node not found") + } + + var topologyScore topologyPairToScore + if priorityMeta, ok := meta.(*priorityMetadata); ok { + topologyScore = priorityMeta.topologyScore + } + + var score int64 + for tpKey, tpValues := range topologyScore { + if v, exist := node.Labels[tpKey]; exist { + score += tpValues[v] + } + } + + return framework.NodeScore{Name: node.Name, Score: score}, nil +} + +// CalculateInterPodAffinityPriorityReduce normalizes the score for each filteredNode, +// The basic rule is: the bigger the score(matching number of pods) is, the smaller the +// final normalized score will be. +func CalculateInterPodAffinityPriorityReduce(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister, + result framework.NodeScoreList) error { + var topologyScore topologyPairToScore + if priorityMeta, ok := meta.(*priorityMetadata); ok { + topologyScore = priorityMeta.topologyScore + } + if len(topologyScore) == 0 { + return nil + } + + var maxCount, minCount int64 + for i := range result { + score := result[i].Score + if score > maxCount { + maxCount = score + } + if score < minCount { + minCount = score + } + } + + maxMinDiff := maxCount - minCount + for i := range result { + fScore := float64(0) + if maxMinDiff > 0 { + fScore = float64(framework.MaxNodeScore) * (float64(result[i].Score-minCount) / float64(maxMinDiff)) + } + + result[i].Score = int64(fScore) + } + + return nil +} + +func buildTopologyPairToScore( + pod *v1.Pod, + sharedLister schedulerlisters.SharedLister, + filteredNodes []*v1.Node, + hardPodAffinityWeight int32, +) topologyPairToScore { + if sharedLister == nil { + klog.Error("BuildTopologyPairToScore with empty shared lister") + return nil + } + affinity := pod.Spec.Affinity hasAffinityConstraints := affinity != nil && affinity.PodAffinity != nil hasAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil // pm stores (1) all nodes that should be considered and (2) the so-far computed score for each node. - pm := newPodAffinityPriorityMap(nodes) + pm := newPodAffinityPriorityMap(filteredNodes) allNodes, err := sharedLister.NodeInfos().HavePodsWithAffinityList() if err != nil { - return nil, err + klog.Errorf("get pods with affinity list error, err: %v", err) + return nil } if hasAffinityConstraints || hasAntiAffinityConstraints { allNodes, err = sharedLister.NodeInfos().List() if err != nil { - return nil, err + klog.Errorf("get all nodes from shared lister error, err: %v", err) + return nil } } - // convert the topology key based weights to the node name based weights - var maxCount, minCount int64 - processPod := func(existingPod *v1.Pod) error { existingPodNodeInfo, err := sharedLister.NodeInfos().Get(existingPod.Spec.NodeName) if err != nil { @@ -155,14 +208,14 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, shar // For every hard pod affinity term of , if matches the term, // increment for every node in the cluster with the same // value as that of 's node by the constant - if ipa.hardPodAffinityWeight > 0 { + if hardPodAffinityWeight > 0 { terms := existingPodAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution // TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution. //if len(existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 { // terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...) //} for _, term := range terms { - if err := pm.processTerm(&term, existingPod, pod, existingPodNode, int64(ipa.hardPodAffinityWeight)); err != nil { + if err := pm.processTerm(&term, existingPod, pod, existingPodNode, int64(hardPodAffinityWeight)); err != nil { return err } } @@ -214,38 +267,9 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, shar } workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode) if err := errCh.ReceiveError(); err != nil { - return nil, err + klog.Error(err) + return nil } - counts := make([]int64, len(nodes)) - for i := range nodes { - if nodes[i].Labels != nil { - for tpKey, tpValues := range pm.topologyScore { - if v, exist := nodes[i].Labels[tpKey]; exist { - counts[i] += tpValues[v] - } - } - } - if counts[i] > maxCount { - maxCount = counts[i] - } - if counts[i] < minCount { - minCount = counts[i] - } - } - - // calculate final priority score for each node - result := make(framework.NodeScoreList, 0, len(nodes)) - maxMinDiff := maxCount - minCount - for i, node := range nodes { - fScore := float64(0) - if maxMinDiff > 0 { - fScore = float64(framework.MaxNodeScore) * (float64(counts[i]-minCount) / float64(maxCount-minCount)) - } - result = append(result, framework.NodeScore{Name: node.Name, Score: int64(fScore)}) - if klog.V(10) { - klog.Infof("%v -> %v: InterPodAffinityPriority, Score: (%d)", pod.Name, node.Name, int(fScore)) - } - } - return result, nil + return pm.topologyScore } diff --git a/pkg/scheduler/algorithm/priorities/interpod_affinity_test.go b/pkg/scheduler/algorithm/priorities/interpod_affinity_test.go index b4010200a3d..fa2f8368c3c 100644 --- a/pkg/scheduler/algorithm/priorities/interpod_affinity_test.go +++ b/pkg/scheduler/algorithm/priorities/interpod_affinity_test.go @@ -514,16 +514,25 @@ func TestInterPodAffinityPriority(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + allNodes := append([]*v1.Node{}, test.nodes...) snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes) - interPodAffinity := InterPodAffinity{ - hardPodAffinityWeight: v1.DefaultHardPodAffinitySymmetricWeight, + + meta := &priorityMetadata{ + topologyScore: buildTopologyPairToScore(test.pod, snapshot, allNodes, v1.DefaultHardPodAffinitySymmetricWeight), } - list, err := interPodAffinity.CalculateInterPodAffinityPriority(test.pod, snapshot, test.nodes) - if err != nil { - t.Errorf("unexpected error: %v", err) + var gotList framework.NodeScoreList + for _, n := range test.nodes { + nodeName := n.Name + nodeScore, err := CalculateInterPodAffinityPriorityMap(test.pod, meta, snapshot.NodeInfoMap[nodeName]) + if err != nil { + t.Error(err) + } + gotList = append(gotList, nodeScore) } - if !reflect.DeepEqual(test.expectedList, list) { - t.Errorf("expected \n\t%#v, \ngot \n\t%#v\n", test.expectedList, list) + + CalculateInterPodAffinityPriorityReduce(test.pod, meta, snapshot, gotList) + if !reflect.DeepEqual(gotList, test.expectedList) { + t.Errorf("CalculateInterPodAffinityPriority() = %#v, want %#v", gotList, test.expectedList) } }) } @@ -601,16 +610,25 @@ func TestHardPodAffinitySymmetricWeight(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + allNodes := append([]*v1.Node{}, test.nodes...) snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes) - ipa := InterPodAffinity{ - hardPodAffinityWeight: test.hardPodAffinityWeight, + + meta := &priorityMetadata{ + topologyScore: buildTopologyPairToScore(test.pod, snapshot, allNodes, test.hardPodAffinityWeight), } - list, err := ipa.CalculateInterPodAffinityPriority(test.pod, snapshot, test.nodes) - if err != nil { - t.Errorf("unexpected error: %v", err) + var gotList framework.NodeScoreList + for _, n := range test.nodes { + nodeName := n.Name + nodeScore, err := CalculateInterPodAffinityPriorityMap(test.pod, meta, snapshot.NodeInfoMap[nodeName]) + if err != nil { + t.Error(err) + } + gotList = append(gotList, nodeScore) } - if !reflect.DeepEqual(test.expectedList, list) { - t.Errorf("expected \n\t%#v, \ngot \n\t%#v\n", test.expectedList, list) + + CalculateInterPodAffinityPriorityReduce(test.pod, meta, snapshot, gotList) + if !reflect.DeepEqual(gotList, test.expectedList) { + t.Errorf("CalculateInterPodAffinityPriority() = %#v, want %#v", gotList, test.expectedList) } }) } @@ -654,16 +672,24 @@ func BenchmarkInterPodAffinityPriority(b *testing.B) { }, } - for _, tt := range tests { - b.Run(tt.name, func(b *testing.B) { - existingPods, allNodes := tt.prepFunc(tt.existingPodsNum, tt.allNodesNum) + for _, test := range tests { + b.Run(test.name, func(b *testing.B) { + existingPods, allNodes := test.prepFunc(test.existingPodsNum, test.allNodesNum) snapshot := nodeinfosnapshot.NewSnapshot(existingPods, allNodes) - interPodAffinity := InterPodAffinity{ - hardPodAffinityWeight: v1.DefaultHardPodAffinitySymmetricWeight, + + meta := &priorityMetadata{ + topologyScore: buildTopologyPairToScore(test.pod, snapshot, allNodes, v1.DefaultHardPodAffinitySymmetricWeight), } b.ResetTimer() + for i := 0; i < b.N; i++ { - interPodAffinity.CalculateInterPodAffinityPriority(tt.pod, snapshot, allNodes) + var gotList framework.NodeScoreList + for _, n := range allNodes { + nodeName := n.Name + nodeScore, _ := CalculateInterPodAffinityPriorityMap(test.pod, meta, snapshot.NodeInfoMap[nodeName]) + gotList = append(gotList, nodeScore) + } + CalculateInterPodAffinityPriorityReduce(test.pod, meta, snapshot, gotList) } }) } diff --git a/pkg/scheduler/algorithm/priorities/metadata.go b/pkg/scheduler/algorithm/priorities/metadata.go index d33bf46db7a..31a577c1ce7 100644 --- a/pkg/scheduler/algorithm/priorities/metadata.go +++ b/pkg/scheduler/algorithm/priorities/metadata.go @@ -28,10 +28,11 @@ import ( // PriorityMetadataFactory is a factory to produce PriorityMetadata. type PriorityMetadataFactory struct { - serviceLister corelisters.ServiceLister - controllerLister corelisters.ReplicationControllerLister - replicaSetLister appslisters.ReplicaSetLister - statefulSetLister appslisters.StatefulSetLister + serviceLister corelisters.ServiceLister + controllerLister corelisters.ReplicationControllerLister + replicaSetLister appslisters.ReplicaSetLister + statefulSetLister appslisters.StatefulSetLister + hardPodAffinityWeight int32 } // NewPriorityMetadataFactory creates a PriorityMetadataFactory. @@ -40,12 +41,14 @@ func NewPriorityMetadataFactory( controllerLister corelisters.ReplicationControllerLister, replicaSetLister appslisters.ReplicaSetLister, statefulSetLister appslisters.StatefulSetLister, + hardPodAffinityWeight int32, ) PriorityMetadataProducer { factory := &PriorityMetadataFactory{ - serviceLister: serviceLister, - controllerLister: controllerLister, - replicaSetLister: replicaSetLister, - statefulSetLister: statefulSetLister, + serviceLister: serviceLister, + controllerLister: controllerLister, + replicaSetLister: replicaSetLister, + statefulSetLister: statefulSetLister, + hardPodAffinityWeight: hardPodAffinityWeight, } return factory.PriorityMetadata } @@ -60,10 +63,15 @@ type priorityMetadata struct { podFirstServiceSelector labels.Selector totalNumNodes int podTopologySpreadMap *podTopologySpreadMap + topologyScore topologyPairToScore } // PriorityMetadata is a PriorityMetadataProducer. Node info can be nil. -func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, filteredNodes []*v1.Node, sharedLister schedulerlisters.SharedLister) interface{} { +func (pmf *PriorityMetadataFactory) PriorityMetadata( + pod *v1.Pod, + filteredNodes []*v1.Node, + sharedLister schedulerlisters.SharedLister, +) interface{} { // If we cannot compute metadata, just return nil if pod == nil { return nil @@ -85,6 +93,7 @@ func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, filteredNodes podFirstServiceSelector: getFirstServiceSelector(pod, pmf.serviceLister), totalNumNodes: totalNumNodes, podTopologySpreadMap: buildPodTopologySpreadMap(pod, filteredNodes, allNodes), + topologyScore: buildTopologyPairToScore(pod, sharedLister, filteredNodes, pmf.hardPodAffinityWeight), } } diff --git a/pkg/scheduler/algorithm/priorities/metadata_test.go b/pkg/scheduler/algorithm/priorities/metadata_test.go index b1032785420..bea6cf300f8 100644 --- a/pkg/scheduler/algorithm/priorities/metadata_test.go +++ b/pkg/scheduler/algorithm/priorities/metadata_test.go @@ -170,6 +170,7 @@ func TestPriorityMetadata(t *testing.T) { informerFactory.Core().V1().ReplicationControllers().Lister(), informerFactory.Apps().V1().ReplicaSets().Lister(), informerFactory.Apps().V1().StatefulSets().Lister(), + 1, ) for _, test := range tests { t.Run(test.name, func(t *testing.T) { diff --git a/pkg/scheduler/algorithm/priorities/selector_spreading_test.go b/pkg/scheduler/algorithm/priorities/selector_spreading_test.go index 3121837e43d..5760bc1e7c2 100644 --- a/pkg/scheduler/algorithm/priorities/selector_spreading_test.go +++ b/pkg/scheduler/algorithm/priorities/selector_spreading_test.go @@ -350,7 +350,9 @@ func TestSelectorSpreadPriority(t *testing.T) { fakelisters.ServiceLister(test.services), fakelisters.ControllerLister(test.rcs), fakelisters.ReplicaSetLister(test.rss), - fakelisters.StatefulSetLister(test.sss)) + fakelisters.StatefulSetLister(test.sss), + 1, + ) metaData := metaDataProducer(test.pod, nodes, snapshot) ttp := priorityFunction(selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce, metaData) @@ -587,7 +589,9 @@ func TestZoneSelectorSpreadPriority(t *testing.T) { fakelisters.ServiceLister(test.services), fakelisters.ControllerLister(test.rcs), fakelisters.ReplicaSetLister(test.rss), - fakelisters.StatefulSetLister(test.sss)) + fakelisters.StatefulSetLister(test.sss), + 1, + ) metaData := metaDataProducer(test.pod, nodes, snapshot) ttp := priorityFunction(selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce, metaData) list, err := ttp(test.pod, snapshot, makeLabeledNodeList(labeledNodes)) @@ -775,7 +779,9 @@ func TestZoneSpreadPriority(t *testing.T) { fakelisters.ServiceLister(test.services), fakelisters.ControllerLister(rcs), fakelisters.ReplicaSetLister(rss), - fakelisters.StatefulSetLister(sss)) + fakelisters.StatefulSetLister(sss), + 1, + ) metaData := metaDataProducer(test.pod, nodes, snapshot) ttp := priorityFunction(zoneSpread.CalculateAntiAffinityPriorityMap, zoneSpread.CalculateAntiAffinityPriorityReduce, metaData) list, err := ttp(test.pod, snapshot, makeLabeledNodeList(test.nodes)) diff --git a/pkg/scheduler/algorithm/priorities/types.go b/pkg/scheduler/algorithm/priorities/types.go index 02fdff9a4b4..e0ef90669eb 100644 --- a/pkg/scheduler/algorithm/priorities/types.go +++ b/pkg/scheduler/algorithm/priorities/types.go @@ -17,7 +17,7 @@ limitations under the License. package priorities import ( - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" diff --git a/pkg/scheduler/algorithmprovider/defaults/register_priorities.go b/pkg/scheduler/algorithmprovider/defaults/register_priorities.go index ee27d6a43de..8c82e769ee2 100644 --- a/pkg/scheduler/algorithmprovider/defaults/register_priorities.go +++ b/pkg/scheduler/algorithmprovider/defaults/register_priorities.go @@ -27,7 +27,7 @@ func init() { // Register functions that extract metadata used by priorities computations. scheduler.RegisterPriorityMetadataProducerFactory( func(args scheduler.PluginFactoryArgs) priorities.PriorityMetadataProducer { - return priorities.NewPriorityMetadataFactory(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister) + return priorities.NewPriorityMetadataFactory(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister, args.HardPodAffinitySymmetricWeight) }) // ServiceSpreadingPriority is a priority config factory that spreads pods by minimizing @@ -66,15 +66,7 @@ func init() { ) // pods should be placed in the same topological domain (e.g. same node, same rack, same zone, same power domain, etc.) // as some other pods, or, conversely, should not be placed in the same topological domain as some other pods. - scheduler.RegisterPriorityConfigFactory( - priorities.InterPodAffinityPriority, - scheduler.PriorityConfigFactory{ - Function: func(args scheduler.PluginFactoryArgs) priorities.PriorityFunction { - return priorities.NewInterPodAffinityPriority(args.HardPodAffinitySymmetricWeight) - }, - Weight: 1, - }, - ) + scheduler.RegisterPriorityMapReduceFunction(priorities.InterPodAffinityPriority, priorities.CalculateInterPodAffinityPriorityMap, priorities.CalculateInterPodAffinityPriorityReduce, 1) // Prioritize nodes by least requested utilization. scheduler.RegisterPriorityMapReduceFunction(priorities.LeastRequestedPriority, priorities.LeastRequestedPriorityMap, nil, 1) diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index ed5b54b9ba7..a241ab67fe8 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -1005,6 +1005,7 @@ func TestZeroRequest(t *testing.T) { informerFactory.Core().V1().ReplicationControllers().Lister(), informerFactory.Apps().V1().ReplicaSets().Lister(), informerFactory.Apps().V1().StatefulSets().Lister(), + 1, ) metaData := metaDataProducer(test.pod, test.nodes, snapshot) diff --git a/pkg/scheduler/framework/plugins/defaultpodtopologyspread/default_pod_topology_spread_test.go b/pkg/scheduler/framework/plugins/defaultpodtopologyspread/default_pod_topology_spread_test.go index cc786f4864d..c514a68c62d 100644 --- a/pkg/scheduler/framework/plugins/defaultpodtopologyspread/default_pod_topology_spread_test.go +++ b/pkg/scheduler/framework/plugins/defaultpodtopologyspread/default_pod_topology_spread_test.go @@ -356,7 +356,9 @@ func TestDefaultPodTopologySpreadScore(t *testing.T) { fakelisters.ServiceLister(test.services), fakelisters.ControllerLister(test.rcs), fakelisters.ReplicaSetLister(test.rss), - fakelisters.StatefulSetLister(test.sss)) + fakelisters.StatefulSetLister(test.sss), + 1, + ) metaData := metaDataProducer(test.pod, nodes, snapshot) state := framework.NewCycleState() @@ -612,7 +614,9 @@ func TestZoneSelectorSpreadPriority(t *testing.T) { fakelisters.ServiceLister(test.services), fakelisters.ControllerLister(test.rcs), fakelisters.ReplicaSetLister(test.rss), - fakelisters.StatefulSetLister(test.sss)) + fakelisters.StatefulSetLister(test.sss), + 1, + ) metaData := metaDataProducer(test.pod, nodes, snapshot) plugin := &DefaultPodTopologySpread{ diff --git a/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go b/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go index 34a93dda87e..a20c3fbe4e9 100644 --- a/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go +++ b/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go @@ -198,6 +198,7 @@ func TestImageLocalityPriority(t *testing.T) { informerFactory.Core().V1().ReplicationControllers().Lister(), informerFactory.Apps().V1().ReplicaSets().Lister(), informerFactory.Apps().V1().StatefulSets().Lister(), + 1, ) snapshot := nodeinfosnapshot.NewSnapshot(nil, test.nodes)