diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index 90112ba941f..3ad9b9ae42b 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -21,6 +21,7 @@ go_library( "//pkg/scheduler/apis/config/validation:go_default_library", "//pkg/scheduler/core:go_default_library", "//pkg/scheduler/framework/plugins:go_default_library", + "//pkg/scheduler/framework/plugins/interpodaffinity:go_default_library", "//pkg/scheduler/framework/plugins/nodelabel:go_default_library", "//pkg/scheduler/framework/plugins/noderesources:go_default_library", "//pkg/scheduler/framework/plugins/requestedtocapacityratio:go_default_library", diff --git a/pkg/scheduler/algorithm/priorities/BUILD b/pkg/scheduler/algorithm/priorities/BUILD index e47ce9355b0..f00e13f8499 100644 --- a/pkg/scheduler/algorithm/priorities/BUILD +++ b/pkg/scheduler/algorithm/priorities/BUILD @@ -12,7 +12,6 @@ go_library( "balanced_resource_allocation.go", "even_pods_spread.go", "image_locality.go", - "interpod_affinity.go", "least_requested.go", "metadata.go", "most_requested.go", @@ -38,14 +37,12 @@ go_library( "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/listers:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library", - "//pkg/scheduler/util:go_default_library", "//pkg/util/node:go_default_library", "//pkg/util/parsers:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/listers/apps/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", @@ -60,7 +57,6 @@ go_test( "balanced_resource_allocation_test.go", "even_pods_spread_test.go", "image_locality_test.go", - "interpod_affinity_test.go", "least_requested_test.go", "metadata_test.go", "most_requested_test.go", diff --git a/pkg/scheduler/algorithm/priorities/interpod_affinity.go b/pkg/scheduler/algorithm/priorities/interpod_affinity.go deleted file mode 100644 index 107bbf8181a..00000000000 --- a/pkg/scheduler/algorithm/priorities/interpod_affinity.go +++ /dev/null @@ -1,317 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package priorities - -import ( - "context" - "fmt" - "sync" - - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/util/workqueue" - priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" - framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" - schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers" - schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" - schedutil "k8s.io/kubernetes/pkg/scheduler/util" - - "k8s.io/klog" -) - -type topologyPairToScore map[string]map[string]int64 - -type podAffinityPriorityMap struct { - topologyScore topologyPairToScore - affinityTerms []*weightedAffinityTerm - antiAffinityTerms []*weightedAffinityTerm - hardPodAffinityWeight int32 - sync.Mutex -} - -// A "processed" representation of v1.WeightedAffinityTerm. -type weightedAffinityTerm struct { - namespaces sets.String - selector labels.Selector - weight int32 - topologyKey string -} - -func newWeightedAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm, weight int32) (*weightedAffinityTerm, error) { - namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(pod, term) - selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) - if err != nil { - return nil, err - } - return &weightedAffinityTerm{namespaces: namespaces, selector: selector, topologyKey: term.TopologyKey, weight: weight}, nil -} - -func getProcessedTerms(pod *v1.Pod, terms []v1.WeightedPodAffinityTerm) ([]*weightedAffinityTerm, error) { - if terms == nil { - return nil, nil - } - - var processedTerms []*weightedAffinityTerm - for i := range terms { - p, err := newWeightedAffinityTerm(pod, &terms[i].PodAffinityTerm, terms[i].Weight) - if err != nil { - return nil, err - } - processedTerms = append(processedTerms, p) - } - return processedTerms, nil -} - -func (p *podAffinityPriorityMap) processTerm(term *weightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) error { - if len(fixedNode.Labels) == 0 { - return nil - } - - match := priorityutil.PodMatchesTermsNamespaceAndSelector(podToCheck, term.namespaces, term.selector) - tpValue, tpValueExist := fixedNode.Labels[term.topologyKey] - if match && tpValueExist { - p.Lock() - if p.topologyScore[term.topologyKey] == nil { - p.topologyScore[term.topologyKey] = make(map[string]int64) - } - p.topologyScore[term.topologyKey][tpValue] += int64(term.weight * int32(multiplier)) - p.Unlock() - } - return nil -} - -func (p *podAffinityPriorityMap) processTerms(terms []*weightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) error { - for _, term := range terms { - if err := p.processTerm(term, podToCheck, fixedNode, multiplier); err != nil { - return err - } - } - return nil -} - -// CalculateInterPodAffinityPriorityMap calculate the number of matching pods on the passed-in "node", -// and return the number as Score. -func CalculateInterPodAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) { - node := nodeInfo.Node() - if node == nil { - return framework.NodeScore{}, fmt.Errorf("node not found") - } - - var topologyScore topologyPairToScore - if priorityMeta, ok := meta.(*priorityMetadata); ok { - topologyScore = priorityMeta.topologyScore - } - - var score int64 - for tpKey, tpValues := range topologyScore { - if v, exist := node.Labels[tpKey]; exist { - score += tpValues[v] - } - } - - return framework.NodeScore{Name: node.Name, Score: score}, nil -} - -// CalculateInterPodAffinityPriorityReduce normalizes the score for each filteredNode, -// The basic rule is: the bigger the score(matching number of pods) is, the smaller the -// final normalized score will be. -func CalculateInterPodAffinityPriorityReduce(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister, - result framework.NodeScoreList) error { - var topologyScore topologyPairToScore - if priorityMeta, ok := meta.(*priorityMetadata); ok { - topologyScore = priorityMeta.topologyScore - } - if len(topologyScore) == 0 { - return nil - } - - var maxCount, minCount int64 - for i := range result { - score := result[i].Score - if score > maxCount { - maxCount = score - } - if score < minCount { - minCount = score - } - } - - maxMinDiff := maxCount - minCount - for i := range result { - fScore := float64(0) - if maxMinDiff > 0 { - fScore = float64(framework.MaxNodeScore) * (float64(result[i].Score-minCount) / float64(maxMinDiff)) - } - - result[i].Score = int64(fScore) - } - - return nil -} - -func (p *podAffinityPriorityMap) processExistingPod(existingPod *v1.Pod, existingPodNodeInfo *schedulernodeinfo.NodeInfo, incomingPod *v1.Pod) error { - existingPodAffinity := existingPod.Spec.Affinity - existingHasAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAffinity != nil - existingHasAntiAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAntiAffinity != nil - existingPodNode := existingPodNodeInfo.Node() - - // For every soft pod affinity term of , if matches the term, - // increment for every node in the cluster with the same - // value as that of `s node by the term`s weight. - if err := p.processTerms(p.affinityTerms, existingPod, existingPodNode, 1); err != nil { - return err - } - - // For every soft pod anti-affinity term of , if matches the term, - // decrement for every node in the cluster with the same - // value as that of `s node by the term`s weight. - if err := p.processTerms(p.antiAffinityTerms, existingPod, existingPodNode, -1); err != nil { - return err - } - - if existingHasAffinityConstraints { - // For every hard pod affinity term of , if matches the term, - // increment for every node in the cluster with the same - // value as that of 's node by the constant - if p.hardPodAffinityWeight > 0 { - terms := existingPodAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution - // TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution. - //if len(existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 { - // terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...) - //} - for i := range terms { - term := &terms[i] - processedTerm, err := newWeightedAffinityTerm(existingPod, term, p.hardPodAffinityWeight) - if err != nil { - return err - } - if err := p.processTerm(processedTerm, incomingPod, existingPodNode, 1); err != nil { - return err - } - } - } - // For every soft pod affinity term of , if matches the term, - // increment for every node in the cluster with the same - // value as that of 's node by the term's weight. - terms, err := getProcessedTerms(existingPod, existingPodAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution) - if err != nil { - klog.Error(err) - return nil - } - - if err := p.processTerms(terms, incomingPod, existingPodNode, 1); err != nil { - return err - } - } - if existingHasAntiAffinityConstraints { - // For every soft pod anti-affinity term of , if matches the term, - // decrement for every node in the cluster with the same - // value as that of 's node by the term's weight. - terms, err := getProcessedTerms(existingPod, existingPodAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution) - if err != nil { - return err - } - if err := p.processTerms(terms, incomingPod, existingPodNode, -1); err != nil { - return err - } - } - return nil -} - -func buildTopologyPairToScore( - pod *v1.Pod, - sharedLister schedulerlisters.SharedLister, - filteredNodes []*v1.Node, - hardPodAffinityWeight int32, -) topologyPairToScore { - if sharedLister == nil { - klog.Error("BuildTopologyPairToScore with empty shared lister") - return nil - } - - affinity := pod.Spec.Affinity - hasAffinityConstraints := affinity != nil && affinity.PodAffinity != nil - hasAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil - - // Unless the pod being scheduled has affinity terms, we only - // need to process nodes hosting pods with affinity. - allNodes, err := sharedLister.NodeInfos().HavePodsWithAffinityList() - if err != nil { - klog.Errorf("get pods with affinity list error, err: %v", err) - return nil - } - if hasAffinityConstraints || hasAntiAffinityConstraints { - allNodes, err = sharedLister.NodeInfos().List() - if err != nil { - klog.Errorf("get all nodes from shared lister error, err: %v", err) - return nil - } - } - - var affinityTerms []*weightedAffinityTerm - var antiAffinityTerms []*weightedAffinityTerm - if hasAffinityConstraints { - if affinityTerms, err = getProcessedTerms(pod, affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution); err != nil { - klog.Error(err) - return nil - } - } - if hasAntiAffinityConstraints { - if antiAffinityTerms, err = getProcessedTerms(pod, affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution); err != nil { - klog.Error(err) - return nil - } - } - - pm := podAffinityPriorityMap{ - topologyScore: make(topologyPairToScore), - affinityTerms: affinityTerms, - antiAffinityTerms: antiAffinityTerms, - hardPodAffinityWeight: hardPodAffinityWeight, - } - - errCh := schedutil.NewErrorChannel() - ctx, cancel := context.WithCancel(context.Background()) - processNode := func(i int) { - nodeInfo := allNodes[i] - if nodeInfo.Node() != nil { - // Unless the pod being scheduled has affinity terms, we only - // need to process pods with affinity in the node. - podsToProcess := nodeInfo.PodsWithAffinity() - if hasAffinityConstraints || hasAntiAffinityConstraints { - // We need to process all the pods. - podsToProcess = nodeInfo.Pods() - } - - for _, existingPod := range podsToProcess { - if err := pm.processExistingPod(existingPod, nodeInfo, pod); err != nil { - errCh.SendErrorWithCancel(err, cancel) - return - } - } - } - } - workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode) - if err := errCh.ReceiveError(); err != nil { - klog.Error(err) - return nil - } - - return pm.topologyScore -} diff --git a/pkg/scheduler/algorithm/priorities/interpod_affinity_test.go b/pkg/scheduler/algorithm/priorities/interpod_affinity_test.go deleted file mode 100644 index 5783a27ca7a..00000000000 --- a/pkg/scheduler/algorithm/priorities/interpod_affinity_test.go +++ /dev/null @@ -1,696 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package priorities - -import ( - "reflect" - "testing" - - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" - nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" - st "k8s.io/kubernetes/pkg/scheduler/testing" -) - -func TestInterPodAffinityPriority(t *testing.T) { - labelRgChina := map[string]string{ - "region": "China", - } - labelRgIndia := map[string]string{ - "region": "India", - } - labelAzAz1 := map[string]string{ - "az": "az1", - } - labelAzAz2 := map[string]string{ - "az": "az2", - } - labelRgChinaAzAz1 := map[string]string{ - "region": "China", - "az": "az1", - } - podLabelSecurityS1 := map[string]string{ - "security": "S1", - } - podLabelSecurityS2 := map[string]string{ - "security": "S2", - } - // considered only preferredDuringSchedulingIgnoredDuringExecution in pod affinity - stayWithS1InRegion := &v1.Affinity{ - PodAffinity: &v1.PodAffinity{ - PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{ - { - Weight: 5, - PodAffinityTerm: v1.PodAffinityTerm{ - LabelSelector: &metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: "security", - Operator: metav1.LabelSelectorOpIn, - Values: []string{"S1"}, - }, - }, - }, - TopologyKey: "region", - }, - }, - }, - }, - } - stayWithS2InRegion := &v1.Affinity{ - PodAffinity: &v1.PodAffinity{ - PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{ - { - Weight: 6, - PodAffinityTerm: v1.PodAffinityTerm{ - LabelSelector: &metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: "security", - Operator: metav1.LabelSelectorOpIn, - Values: []string{"S2"}, - }, - }, - }, - TopologyKey: "region", - }, - }, - }, - }, - } - affinity3 := &v1.Affinity{ - PodAffinity: &v1.PodAffinity{ - PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{ - { - Weight: 8, - PodAffinityTerm: v1.PodAffinityTerm{ - LabelSelector: &metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: "security", - Operator: metav1.LabelSelectorOpNotIn, - Values: []string{"S1"}, - }, { - Key: "security", - Operator: metav1.LabelSelectorOpIn, - Values: []string{"S2"}, - }, - }, - }, - TopologyKey: "region", - }, - }, { - Weight: 2, - PodAffinityTerm: v1.PodAffinityTerm{ - LabelSelector: &metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: "security", - Operator: metav1.LabelSelectorOpExists, - }, { - Key: "wrongkey", - Operator: metav1.LabelSelectorOpDoesNotExist, - }, - }, - }, - TopologyKey: "region", - }, - }, - }, - }, - } - hardAffinity := &v1.Affinity{ - PodAffinity: &v1.PodAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: "security", - Operator: metav1.LabelSelectorOpIn, - Values: []string{"S1", "value2"}, - }, - }, - }, - TopologyKey: "region", - }, { - LabelSelector: &metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: "security", - Operator: metav1.LabelSelectorOpExists, - }, { - Key: "wrongkey", - Operator: metav1.LabelSelectorOpDoesNotExist, - }, - }, - }, - TopologyKey: "region", - }, - }, - }, - } - awayFromS1InAz := &v1.Affinity{ - PodAntiAffinity: &v1.PodAntiAffinity{ - PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{ - { - Weight: 5, - PodAffinityTerm: v1.PodAffinityTerm{ - LabelSelector: &metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: "security", - Operator: metav1.LabelSelectorOpIn, - Values: []string{"S1"}, - }, - }, - }, - TopologyKey: "az", - }, - }, - }, - }, - } - // to stay away from security S2 in any az. - awayFromS2InAz := &v1.Affinity{ - PodAntiAffinity: &v1.PodAntiAffinity{ - PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{ - { - Weight: 5, - PodAffinityTerm: v1.PodAffinityTerm{ - LabelSelector: &metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: "security", - Operator: metav1.LabelSelectorOpIn, - Values: []string{"S2"}, - }, - }, - }, - TopologyKey: "az", - }, - }, - }, - }, - } - // to stay with security S1 in same region, stay away from security S2 in any az. - stayWithS1InRegionAwayFromS2InAz := &v1.Affinity{ - PodAffinity: &v1.PodAffinity{ - PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{ - { - Weight: 8, - PodAffinityTerm: v1.PodAffinityTerm{ - LabelSelector: &metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: "security", - Operator: metav1.LabelSelectorOpIn, - Values: []string{"S1"}, - }, - }, - }, - TopologyKey: "region", - }, - }, - }, - }, - PodAntiAffinity: &v1.PodAntiAffinity{ - PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{ - { - Weight: 5, - PodAffinityTerm: v1.PodAffinityTerm{ - LabelSelector: &metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: "security", - Operator: metav1.LabelSelectorOpIn, - Values: []string{"S2"}, - }, - }, - }, - TopologyKey: "az", - }, - }, - }, - }, - } - - tests := []struct { - pod *v1.Pod - pods []*v1.Pod - nodes []*v1.Node - expectedList framework.NodeScoreList - name string - }{ - { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - nodes: []*v1.Node{ - {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}}, - }, - expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: 0}}, - name: "all machines are same priority as Affinity is nil", - }, - // the node(machine1) that have the label {"region": "China"} (match the topology key) and that have existing pods that match the labelSelector get high score - // the node(machine3) that don't have the label {"region": "whatever the value is"} (mismatch the topology key) but that have existing pods that match the labelSelector get low score - // the node(machine2) that have the label {"region": "China"} (match the topology key) but that have existing pods that mismatch the labelSelector get low score - { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegion}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - pods: []*v1.Pod{ - {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - {Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, - {Spec: v1.PodSpec{NodeName: "machine3"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - }, - nodes: []*v1.Node{ - {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}}, - }, - expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: 0}}, - name: "Affinity: pod that matches topology key & pods in nodes will get high score comparing to others" + - "which doesn't match either pods in nodes or in topology key", - }, - // the node1(machine1) that have the label {"region": "China"} (match the topology key) and that have existing pods that match the labelSelector get high score - // the node2(machine2) that have the label {"region": "China"}, match the topology key and have the same label value with node1, get the same high score with node1 - // the node3(machine3) that have the label {"region": "India"}, match the topology key but have a different label value, don't have existing pods that match the labelSelector, - // get a low score. - { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegion}}, - pods: []*v1.Pod{ - {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - }, - nodes: []*v1.Node{ - {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgChinaAzAz1}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelRgIndia}}, - }, - expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: framework.MaxNodeScore}, {Name: "machine3", Score: 0}}, - name: "All the nodes that have the same topology key & label value with one of them has an existing pod that match the affinity rules, have the same score", - }, - // there are 2 regions, say regionChina(machine1,machine3,machine4) and regionIndia(machine2,machine5), both regions have nodes that match the preference. - // But there are more nodes(actually more existing pods) in regionChina that match the preference than regionIndia. - // Then, nodes in regionChina get higher score than nodes in regionIndia, and all the nodes in regionChina should get a same score(high score), - // while all the nodes in regionIndia should get another same score(low score). - { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS2InRegion}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - pods: []*v1.Pod{ - {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, - {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, - {Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, - {Spec: v1.PodSpec{NodeName: "machine3"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, - {Spec: v1.PodSpec{NodeName: "machine4"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, - {Spec: v1.PodSpec{NodeName: "machine5"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, - }, - nodes: []*v1.Node{ - {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelRgChina}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine4", Labels: labelRgChina}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine5", Labels: labelRgIndia}}, - }, - expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 50}, {Name: "machine3", Score: framework.MaxNodeScore}, {Name: "machine4", Score: framework.MaxNodeScore}, {Name: "machine5", Score: 50}}, - name: "Affinity: nodes in one region has more matching pods comparing to other reqion, so the region which has more macthes will get high score", - }, - // Test with the different operators and values for pod affinity scheduling preference, including some match failures. - { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: affinity3}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - pods: []*v1.Pod{ - {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - {Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, - {Spec: v1.PodSpec{NodeName: "machine3"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - }, - nodes: []*v1.Node{ - {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}}, - }, - expectedList: []framework.NodeScore{{Name: "machine1", Score: 20}, {Name: "machine2", Score: framework.MaxNodeScore}, {Name: "machine3", Score: 0}}, - name: "Affinity: different Label operators and values for pod affinity scheduling preference, including some match failures ", - }, - // Test the symmetry cases for affinity, the difference between affinity and symmetry is not the pod wants to run together with some existing pods, - // but the existing pods have the inter pod affinity preference while the pod to schedule satisfy the preference. - { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, - pods: []*v1.Pod{ - {Spec: v1.PodSpec{NodeName: "machine1", Affinity: stayWithS1InRegion}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - {Spec: v1.PodSpec{NodeName: "machine2", Affinity: stayWithS2InRegion}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, - }, - nodes: []*v1.Node{ - {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}}, - }, - expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: framework.MaxNodeScore}, {Name: "machine3", Score: 0}}, - name: "Affinity symmetry: considered only the preferredDuringSchedulingIgnoredDuringExecution in pod affinity symmetry", - }, - { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - pods: []*v1.Pod{ - {Spec: v1.PodSpec{NodeName: "machine1", Affinity: hardAffinity}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - {Spec: v1.PodSpec{NodeName: "machine2", Affinity: hardAffinity}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, - }, - nodes: []*v1.Node{ - {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}}, - }, - expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: framework.MaxNodeScore}, {Name: "machine3", Score: 0}}, - name: "Affinity symmetry: considered RequiredDuringSchedulingIgnoredDuringExecution in pod affinity symmetry", - }, - - // The pod to schedule prefer to stay away from some existing pods at node level using the pod anti affinity. - // the nodes that have the label {"node": "bar"} (match the topology key) and that have existing pods that match the labelSelector get low score - // the nodes that don't have the label {"node": "whatever the value is"} (mismatch the topology key) but that have existing pods that match the labelSelector get high score - // the nodes that have the label {"node": "bar"} (match the topology key) but that have existing pods that mismatch the labelSelector get high score - // there are 2 nodes, say node1 and node2, both nodes have pods that match the labelSelector and have topology-key in node.Labels. - // But there are more pods on node1 that match the preference than node2. Then, node1 get a lower score than node2. - { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: awayFromS1InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - pods: []*v1.Pod{ - {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - {Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, - }, - nodes: []*v1.Node{ - {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelAzAz1}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgChina}}, - }, - expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: framework.MaxNodeScore}}, - name: "Anti Affinity: pod that doesnot match existing pods in node will get high score ", - }, - { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: awayFromS1InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - pods: []*v1.Pod{ - {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - {Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - }, - nodes: []*v1.Node{ - {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelAzAz1}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgChina}}, - }, - expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: framework.MaxNodeScore}}, - name: "Anti Affinity: pod that does not matches topology key & matches the pods in nodes will get higher score comparing to others ", - }, - { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: awayFromS1InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - pods: []*v1.Pod{ - {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - {Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, - }, - nodes: []*v1.Node{ - {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelAzAz1}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}}, - }, - expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: framework.MaxNodeScore}}, - name: "Anti Affinity: one node has more matching pods comparing to other node, so the node which has more unmacthes will get high score", - }, - // Test the symmetry cases for anti affinity - { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, - pods: []*v1.Pod{ - {Spec: v1.PodSpec{NodeName: "machine1", Affinity: awayFromS2InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - {Spec: v1.PodSpec{NodeName: "machine2", Affinity: awayFromS1InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, - }, - nodes: []*v1.Node{ - {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelAzAz1}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelAzAz2}}, - }, - expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: framework.MaxNodeScore}}, - name: "Anti Affinity symmetry: the existing pods in node which has anti affinity match will get high score", - }, - // Test both affinity and anti-affinity - { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegionAwayFromS2InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - pods: []*v1.Pod{ - {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - {Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - }, - nodes: []*v1.Node{ - {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelAzAz1}}, - }, - expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}}, - name: "Affinity and Anti Affinity: considered only preferredDuringSchedulingIgnoredDuringExecution in both pod affinity & anti affinity", - }, - // Combined cases considering both affinity and anti-affinity, the pod to schedule and existing pods have the same labels (they are in the same RC/service), - // the pod prefer to run together with its brother pods in the same region, but wants to stay away from them at node level, - // so that all the pods of a RC/service can stay in a same region but trying to separate with each other - // machine-1,machine-3,machine-4 are in ChinaRegion others machin-2,machine-5 are in IndiaRegion - { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegionAwayFromS2InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - pods: []*v1.Pod{ - {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - {Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - {Spec: v1.PodSpec{NodeName: "machine3"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - {Spec: v1.PodSpec{NodeName: "machine3"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - {Spec: v1.PodSpec{NodeName: "machine4"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - {Spec: v1.PodSpec{NodeName: "machine5"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - }, - nodes: []*v1.Node{ - {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChinaAzAz1}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelRgChina}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine4", Labels: labelRgChina}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine5", Labels: labelRgIndia}}, - }, - expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 40}, {Name: "machine3", Score: framework.MaxNodeScore}, {Name: "machine4", Score: framework.MaxNodeScore}, {Name: "machine5", Score: 40}}, - name: "Affinity and Anti Affinity: considering both affinity and anti-affinity, the pod to schedule and existing pods have the same labels", - }, - // Consider Affinity, Anti Affinity and symmetry together. - // for Affinity, the weights are: 8, 0, 0, 0 - // for Anti Affinity, the weights are: 0, -5, 0, 0 - // for Affinity symmetry, the weights are: 0, 0, 8, 0 - // for Anti Affinity symmetry, the weights are: 0, 0, 0, -5 - { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegionAwayFromS2InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - pods: []*v1.Pod{ - {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - {Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, - {Spec: v1.PodSpec{NodeName: "machine3", Affinity: stayWithS1InRegionAwayFromS2InAz}}, - {Spec: v1.PodSpec{NodeName: "machine4", Affinity: awayFromS1InAz}}, - }, - nodes: []*v1.Node{ - {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelAzAz1}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelRgIndia}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine4", Labels: labelAzAz2}}, - }, - expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: framework.MaxNodeScore}, {Name: "machine4", Score: 0}}, - name: "Affinity and Anti Affinity and symmetry: considered only preferredDuringSchedulingIgnoredDuringExecution in both pod affinity & anti affinity & symmetry", - }, - // Cover https://github.com/kubernetes/kubernetes/issues/82796 which panics upon: - // 1. Some nodes in a topology don't have pods with affinity, but other nodes in the same topology have. - // 2. The incoming pod doesn't have affinity. - { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - pods: []*v1.Pod{ - {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, - {Spec: v1.PodSpec{NodeName: "machine2", Affinity: stayWithS1InRegionAwayFromS2InAz}}, - }, - nodes: []*v1.Node{ - {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgChina}}, - }, - expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: framework.MaxNodeScore}}, - name: "Avoid panic when partial nodes in a topology don't have pods with affinity", - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - allNodes := append([]*v1.Node{}, test.nodes...) - snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, test.nodes)) - - meta := &priorityMetadata{ - topologyScore: buildTopologyPairToScore(test.pod, snapshot, allNodes, v1.DefaultHardPodAffinitySymmetricWeight), - } - var gotList framework.NodeScoreList - for _, n := range test.nodes { - nodeName := n.Name - nodeScore, err := CalculateInterPodAffinityPriorityMap(test.pod, meta, snapshot.NodeInfoMap[nodeName]) - if err != nil { - t.Error(err) - } - gotList = append(gotList, nodeScore) - } - - CalculateInterPodAffinityPriorityReduce(test.pod, meta, snapshot, gotList) - if !reflect.DeepEqual(gotList, test.expectedList) { - t.Errorf("CalculateInterPodAffinityPriority() = %#v, want %#v", gotList, test.expectedList) - } - }) - } -} - -func TestHardPodAffinitySymmetricWeight(t *testing.T) { - podLabelServiceS1 := map[string]string{ - "service": "S1", - } - labelRgChina := map[string]string{ - "region": "China", - } - labelRgIndia := map[string]string{ - "region": "India", - } - labelAzAz1 := map[string]string{ - "az": "az1", - } - hardPodAffinity := &v1.Affinity{ - PodAffinity: &v1.PodAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: "service", - Operator: metav1.LabelSelectorOpIn, - Values: []string{"S1"}, - }, - }, - }, - TopologyKey: "region", - }, - }, - }, - } - tests := []struct { - pod *v1.Pod - pods []*v1.Pod - nodes []*v1.Node - hardPodAffinityWeight int32 - expectedList framework.NodeScoreList - name string - }{ - { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelServiceS1}}, - pods: []*v1.Pod{ - {Spec: v1.PodSpec{NodeName: "machine1", Affinity: hardPodAffinity}}, - {Spec: v1.PodSpec{NodeName: "machine2", Affinity: hardPodAffinity}}, - }, - nodes: []*v1.Node{ - {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}}, - }, - hardPodAffinityWeight: v1.DefaultHardPodAffinitySymmetricWeight, - expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: framework.MaxNodeScore}, {Name: "machine3", Score: 0}}, - name: "Hard Pod Affinity symmetry: hard pod affinity symmetry weights 1 by default, then nodes that match the hard pod affinity symmetry rules, get a high score", - }, - { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelServiceS1}}, - pods: []*v1.Pod{ - {Spec: v1.PodSpec{NodeName: "machine1", Affinity: hardPodAffinity}}, - {Spec: v1.PodSpec{NodeName: "machine2", Affinity: hardPodAffinity}}, - }, - nodes: []*v1.Node{ - {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}}, - {ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}}, - }, - hardPodAffinityWeight: 0, - expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: 0}}, - name: "Hard Pod Affinity symmetry: hard pod affinity symmetry is closed(weights 0), then nodes that match the hard pod affinity symmetry rules, get same score with those not match", - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - allNodes := append([]*v1.Node{}, test.nodes...) - snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, test.nodes)) - - meta := &priorityMetadata{ - topologyScore: buildTopologyPairToScore(test.pod, snapshot, allNodes, test.hardPodAffinityWeight), - } - var gotList framework.NodeScoreList - for _, n := range test.nodes { - nodeName := n.Name - nodeScore, err := CalculateInterPodAffinityPriorityMap(test.pod, meta, snapshot.NodeInfoMap[nodeName]) - if err != nil { - t.Error(err) - } - gotList = append(gotList, nodeScore) - } - - CalculateInterPodAffinityPriorityReduce(test.pod, meta, snapshot, gotList) - if !reflect.DeepEqual(gotList, test.expectedList) { - t.Errorf("CalculateInterPodAffinityPriority() = %#v, want %#v", gotList, test.expectedList) - } - }) - } -} - -func BenchmarkInterPodAffinityPriority(b *testing.B) { - tests := []struct { - name string - pod *v1.Pod - existingPodsNum int - allNodesNum int - prepFunc func(existingPodsNum, allNodesNum int) (existingPods []*v1.Pod, allNodes []*v1.Node) - }{ - { - name: "1000nodes/incoming pod without PodAffinity and existing pods without PodAffinity", - pod: st.MakePod().Name("p").Label("foo", "").Obj(), - existingPodsNum: 10000, - allNodesNum: 1000, - prepFunc: st.MakeNodesAndPods, - }, - { - name: "1000nodes/incoming pod with PodAffinity and existing pods without PodAffinity", - pod: st.MakePod().Name("p").Label("foo", "").PodAffinityExists("foo", "zone", st.PodAffinityWithPreferredReq).Obj(), - existingPodsNum: 10000, - allNodesNum: 1000, - prepFunc: st.MakeNodesAndPods, - }, - { - name: "1000nodes/incoming pod without PodAffinity and existing pods with PodAffinity", - pod: st.MakePod().Name("p").Label("foo", "").Obj(), - existingPodsNum: 10000, - allNodesNum: 1000, - prepFunc: st.MakeNodesAndPodsForPodAffinity, - }, - { - name: "1000nodes/incoming pod with PodAffinity and existing pods with PodAffinity", - pod: st.MakePod().Name("p").Label("foo", "").PodAffinityExists("foo", "zone", st.PodAffinityWithPreferredReq).Obj(), - existingPodsNum: 10000, - allNodesNum: 1000, - prepFunc: st.MakeNodesAndPodsForPodAffinity, - }, - } - - for _, test := range tests { - b.Run(test.name, func(b *testing.B) { - existingPods, allNodes := test.prepFunc(test.existingPodsNum, test.allNodesNum) - snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(existingPods, allNodes)) - - meta := &priorityMetadata{ - topologyScore: buildTopologyPairToScore(test.pod, snapshot, allNodes, v1.DefaultHardPodAffinitySymmetricWeight), - } - b.ResetTimer() - - for i := 0; i < b.N; i++ { - var gotList framework.NodeScoreList - for _, n := range allNodes { - nodeName := n.Name - nodeScore, _ := CalculateInterPodAffinityPriorityMap(test.pod, meta, snapshot.NodeInfoMap[nodeName]) - gotList = append(gotList, nodeScore) - } - CalculateInterPodAffinityPriorityReduce(test.pod, meta, snapshot, gotList) - } - }) - } -} diff --git a/pkg/scheduler/algorithm/priorities/metadata.go b/pkg/scheduler/algorithm/priorities/metadata.go index 7726c45972c..839db4d6bb4 100644 --- a/pkg/scheduler/algorithm/priorities/metadata.go +++ b/pkg/scheduler/algorithm/priorities/metadata.go @@ -64,7 +64,6 @@ type priorityMetadata struct { podFirstServiceSelector labels.Selector totalNumNodes int podTopologySpreadMap *podTopologySpreadMap - topologyScore topologyPairToScore } // PriorityMetadata is a MetadataProducer. Node info can be nil. @@ -99,7 +98,6 @@ func (pmf *MetadataFactory) PriorityMetadata( podFirstServiceSelector: getFirstServiceSelector(pod, pmf.serviceLister), totalNumNodes: totalNumNodes, podTopologySpreadMap: tpSpreadMap, - topologyScore: buildTopologyPairToScore(pod, sharedLister, filteredNodes, pmf.hardPodAffinityWeight), } } diff --git a/pkg/scheduler/algorithmprovider/defaults/defaults_test.go b/pkg/scheduler/algorithmprovider/defaults/defaults_test.go index 8c45d3f53f3..3f705053386 100644 --- a/pkg/scheduler/algorithmprovider/defaults/defaults_test.go +++ b/pkg/scheduler/algorithmprovider/defaults/defaults_test.go @@ -124,6 +124,9 @@ func TestCompatibility(t *testing.T) { {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, }, + "PostFilterPlugin": { + {Name: "InterPodAffinity"}, + }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 1}, {Name: "ImageLocality", Weight: 1}, @@ -161,6 +164,9 @@ func TestCompatibility(t *testing.T) { {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, }, + "PostFilterPlugin": { + {Name: "InterPodAffinity"}, + }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 1}, {Name: "ImageLocality", Weight: 1}, @@ -198,6 +204,9 @@ func TestCompatibility(t *testing.T) { {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, }, + "PostFilterPlugin": { + {Name: "InterPodAffinity"}, + }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 1}, {Name: "ImageLocality", Weight: 1}, diff --git a/pkg/scheduler/algorithmprovider/defaults/register_priorities.go b/pkg/scheduler/algorithmprovider/defaults/register_priorities.go index 8887206d041..4effab8c826 100644 --- a/pkg/scheduler/algorithmprovider/defaults/register_priorities.go +++ b/pkg/scheduler/algorithmprovider/defaults/register_priorities.go @@ -70,7 +70,7 @@ func init() { ) // pods should be placed in the same topological domain (e.g. same node, same rack, same zone, same power domain, etc.) // as some other pods, or, conversely, should not be placed in the same topological domain as some other pods. - scheduler.RegisterPriorityMapReduceFunction(priorities.InterPodAffinityPriority, priorities.CalculateInterPodAffinityPriorityMap, priorities.CalculateInterPodAffinityPriorityReduce, 1) + scheduler.RegisterPriorityMapReduceFunction(priorities.InterPodAffinityPriority, nil, nil, 1) // Prioritize nodes by least requested utilization. scheduler.RegisterPriorityMapReduceFunction(priorities.LeastRequestedPriority, priorities.LeastRequestedPriorityMap, nil, 1) diff --git a/pkg/scheduler/apis/config/testing/compatibility_test.go b/pkg/scheduler/apis/config/testing/compatibility_test.go index 3184441af2b..47b4239a066 100644 --- a/pkg/scheduler/apis/config/testing/compatibility_test.go +++ b/pkg/scheduler/apis/config/testing/compatibility_test.go @@ -313,6 +313,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, }, + "PostFilterPlugin": { + {Name: "InterPodAffinity"}, + }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, {Name: "ImageLocality", Weight: 2}, @@ -383,6 +386,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, }, + "PostFilterPlugin": { + {Name: "InterPodAffinity"}, + }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, {Name: "ImageLocality", Weight: 2}, @@ -464,6 +470,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, }, + "PostFilterPlugin": { + {Name: "InterPodAffinity"}, + }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, {Name: "ImageLocality", Weight: 2}, @@ -556,6 +565,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, }, + "PostFilterPlugin": { + {Name: "InterPodAffinity"}, + }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, {Name: "ImageLocality", Weight: 2}, @@ -650,6 +662,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, }, + "PostFilterPlugin": { + {Name: "InterPodAffinity"}, + }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, {Name: "ImageLocality", Weight: 2}, @@ -747,6 +762,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, }, + "PostFilterPlugin": { + {Name: "InterPodAffinity"}, + }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, {Name: "ImageLocality", Weight: 2}, @@ -856,6 +874,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, }, + "PostFilterPlugin": { + {Name: "InterPodAffinity"}, + }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, {Name: "ImageLocality", Weight: 2}, @@ -968,6 +989,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, }, + "PostFilterPlugin": { + {Name: "InterPodAffinity"}, + }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, {Name: "ImageLocality", Weight: 2}, @@ -1080,6 +1104,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, }, + "PostFilterPlugin": { + {Name: "InterPodAffinity"}, + }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, {Name: "ImageLocality", Weight: 2}, @@ -1196,6 +1223,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, }, + "PostFilterPlugin": { + {Name: "InterPodAffinity"}, + }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, {Name: "ImageLocality", Weight: 2}, diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index e5953495d3c..9f1d6215fb1 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -44,6 +44,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/apis/config/validation" "k8s.io/kubernetes/pkg/scheduler/core" "k8s.io/kubernetes/pkg/scheduler/framework/plugins" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" @@ -113,11 +114,6 @@ type Configurator struct { configProducerArgs *plugins.ConfigProducerArgs } -// GetHardPodAffinitySymmetricWeight is exposed for testing. -func (c *Configurator) GetHardPodAffinitySymmetricWeight() int32 { - return c.hardPodAffinitySymmetricWeight -} - // Create creates a scheduler with the default algorithm provider. func (c *Configurator) Create() (*Scheduler, error) { return c.CreateFromProvider(schedulerapi.SchedulerDefaultProviderName) @@ -221,8 +217,12 @@ func (c *Configurator) CreateFromConfig(policy schedulerapi.Policy) (*Scheduler, func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Scheduler, error) { klog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys) - if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 { - return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 1-100", c.GetHardPodAffinitySymmetricWeight()) + if c.hardPodAffinitySymmetricWeight < 1 || c.hardPodAffinitySymmetricWeight > 100 { + return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 1-100", c.hardPodAffinitySymmetricWeight) + } + + c.configProducerArgs.InterPodAffinityArgs = &interpodaffinity.Args{ + HardPodAffinityWeight: c.hardPodAffinitySymmetricWeight, } predicateFuncs, pluginsForPredicates, pluginConfigForPredicates, err := c.getPredicateConfigs(predicateKeys) diff --git a/pkg/scheduler/factory_test.go b/pkg/scheduler/factory_test.go index 6098c49889d..44caa77bdd6 100644 --- a/pkg/scheduler/factory_test.go +++ b/pkg/scheduler/factory_test.go @@ -116,7 +116,7 @@ func TestCreateFromConfig(t *testing.T) { if err != nil { t.Fatalf("CreateFromConfig failed: %v", err) } - hpa := factory.GetHardPodAffinitySymmetricWeight() + hpa := factory.hardPodAffinitySymmetricWeight if hpa != v1.DefaultHardPodAffinitySymmetricWeight { t.Errorf("Wrong hardPodAffinitySymmetricWeight, ecpected: %d, got: %d", v1.DefaultHardPodAffinitySymmetricWeight, hpa) } @@ -205,7 +205,7 @@ func TestCreateFromConfigWithHardPodAffinitySymmetricWeight(t *testing.T) { t.Errorf("Invalid configuration: %v", err) } factory.CreateFromConfig(policy) - hpa := factory.GetHardPodAffinitySymmetricWeight() + hpa := factory.hardPodAffinitySymmetricWeight if hpa != 10 { t.Errorf("Wrong hardPodAffinitySymmetricWeight, ecpected: %d, got: %d", 10, hpa) } diff --git a/pkg/scheduler/framework/plugins/default_registry.go b/pkg/scheduler/framework/plugins/default_registry.go index 91a1e3e8eb4..d71fd9d0317 100644 --- a/pkg/scheduler/framework/plugins/default_registry.go +++ b/pkg/scheduler/framework/plugins/default_registry.go @@ -101,6 +101,8 @@ type ConfigProducerArgs struct { ServiceAffinityArgs *serviceaffinity.Args // NodeResourcesFitArgs is the args for the NodeResources fit filter. NodeResourcesFitArgs *noderesources.FitArgs + // InterPodAffinityArgs is the args for InterPodAffinity plugin + InterPodAffinityArgs *interpodaffinity.Args } // ConfigProducer produces a framework's configuration. @@ -254,7 +256,9 @@ func NewDefaultConfigProducerRegistry() *ConfigProducerRegistry { }) registry.RegisterPriority(priorities.InterPodAffinityPriority, func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { + plugins.PostFilter = appendToPluginSet(plugins.PostFilter, interpodaffinity.Name, nil) plugins.Score = appendToPluginSet(plugins.Score, interpodaffinity.Name, &args.Weight) + pluginConfig = append(pluginConfig, makePluginConfig(interpodaffinity.Name, args.InterPodAffinityArgs)) return }) registry.RegisterPriority(priorities.NodePreferAvoidPodsPriority, diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/BUILD b/pkg/scheduler/framework/plugins/interpodaffinity/BUILD index 558818e4029..e693c1be6ad 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/BUILD +++ b/pkg/scheduler/framework/plugins/interpodaffinity/BUILD @@ -7,13 +7,18 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/scheduler/algorithm/predicates:go_default_library", - "//pkg/scheduler/algorithm/priorities:go_default_library", + "//pkg/scheduler/algorithm/priorities/util:go_default_library", "//pkg/scheduler/framework/plugins/migration:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/listers:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library", + "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], ) @@ -24,14 +29,11 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/scheduler/algorithm/predicates:go_default_library", - "//pkg/scheduler/algorithm/priorities:go_default_library", - "//pkg/scheduler/framework/plugins/migration:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/nodeinfo/snapshot:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//staging/src/k8s.io/client-go/informers:go_default_library", - "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", ], ) diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/interpod_affinity.go b/pkg/scheduler/framework/plugins/interpodaffinity/interpod_affinity.go index ae986dd57a4..6d31865e67b 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/interpod_affinity.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/interpod_affinity.go @@ -19,36 +19,52 @@ package interpodaffinity import ( "context" "fmt" + "sync" "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/workqueue" "k8s.io/klog" - "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" - "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" + priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers" "k8s.io/kubernetes/pkg/scheduler/nodeinfo" + schedutil "k8s.io/kubernetes/pkg/scheduler/util" ) // InterPodAffinity is a plugin that checks inter pod affinity type InterPodAffinity struct { - snapshotSharedLister schedulerlisters.SharedLister - podAffinityChecker *predicates.PodAffinityChecker + sharedLister schedulerlisters.SharedLister + podAffinityChecker *predicates.PodAffinityChecker + hardPodAffinityWeight int32 + sync.Mutex +} + +// Args holds the args that are used to configure the plugin. +type Args struct { + HardPodAffinityWeight int32 `json:"hardPodAffinityWeight,omitempty"` } var _ framework.PreFilterPlugin = &InterPodAffinity{} var _ framework.FilterPlugin = &InterPodAffinity{} +var _ framework.PostFilterPlugin = &InterPodAffinity{} var _ framework.ScorePlugin = &InterPodAffinity{} const ( // Name is the name of the plugin used in the plugin registry and configurations. Name = "InterPodAffinity" - // preFilterStateKey is the key in CycleState to InterPodAffinity pre-computed data. + // preFilterStateKey is the key in CycleState to InterPodAffinity pre-computed data for Filtering. // Using the name of the plugin will likely help us avoid collisions with other plugins. preFilterStateKey = "PreFilter" + Name + + // postFilterStateKey is the key in CycleState to InterPodAffinity pre-computed data for Scoring. + postFilterStateKey = "PostFilter" + Name ) // preFilterState computed at PreFilter and used at Filter. @@ -75,10 +91,10 @@ func (pl *InterPodAffinity) PreFilter(ctx context.Context, cycleState *framework var allNodes []*nodeinfo.NodeInfo var havePodsWithAffinityNodes []*nodeinfo.NodeInfo var err error - if allNodes, err = pl.snapshotSharedLister.NodeInfos().List(); err != nil { + if allNodes, err = pl.sharedLister.NodeInfos().List(); err != nil { return framework.NewStatus(framework.Error, fmt.Sprintf("failed to list NodeInfos: %v", err)) } - if havePodsWithAffinityNodes, err = pl.snapshotSharedLister.NodeInfos().HavePodsWithAffinityList(); err != nil { + if havePodsWithAffinityNodes, err = pl.sharedLister.NodeInfos().HavePodsWithAffinityList(); err != nil { return framework.NewStatus(framework.Error, fmt.Sprintf("failed to list NodeInfos with pods with affinity: %v", err)) } if meta, err = predicates.GetPodAffinityMetadata(pod, allNodes, havePodsWithAffinityNodes); err != nil { @@ -143,25 +159,299 @@ func (pl *InterPodAffinity) Filter(ctx context.Context, cycleState *framework.Cy return migration.PredicateResultToFrameworkStatus(reasons, err) } +// A "processed" representation of v1.WeightedAffinityTerm. +type weightedAffinityTerm struct { + namespaces sets.String + selector labels.Selector + weight int32 + topologyKey string +} + +// postFilterState computed at PostFilter and used at Score. +type postFilterState struct { + topologyScore map[string]map[string]int64 + affinityTerms []*weightedAffinityTerm + antiAffinityTerms []*weightedAffinityTerm +} + +// Clone implements the mandatory Clone interface. We don't really copy the data since +// there is no need for that. +func (s *postFilterState) Clone() framework.StateData { + return s +} + +func newWeightedAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm, weight int32) (*weightedAffinityTerm, error) { + namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(pod, term) + selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) + if err != nil { + return nil, err + } + return &weightedAffinityTerm{namespaces: namespaces, selector: selector, topologyKey: term.TopologyKey, weight: weight}, nil +} + +func getProcessedTerms(pod *v1.Pod, terms []v1.WeightedPodAffinityTerm) ([]*weightedAffinityTerm, error) { + if terms == nil { + return nil, nil + } + + var processedTerms []*weightedAffinityTerm + for i := range terms { + p, err := newWeightedAffinityTerm(pod, &terms[i].PodAffinityTerm, terms[i].Weight) + if err != nil { + return nil, err + } + processedTerms = append(processedTerms, p) + } + return processedTerms, nil +} + +func (pl *InterPodAffinity) processTerm( + state *postFilterState, + term *weightedAffinityTerm, + podToCheck *v1.Pod, + fixedNode *v1.Node, + multiplier int, +) { + if len(fixedNode.Labels) == 0 { + return + } + + match := priorityutil.PodMatchesTermsNamespaceAndSelector(podToCheck, term.namespaces, term.selector) + tpValue, tpValueExist := fixedNode.Labels[term.topologyKey] + if match && tpValueExist { + pl.Lock() + if state.topologyScore[term.topologyKey] == nil { + state.topologyScore[term.topologyKey] = make(map[string]int64) + } + state.topologyScore[term.topologyKey][tpValue] += int64(term.weight * int32(multiplier)) + pl.Unlock() + } + return +} + +func (pl *InterPodAffinity) processTerms(state *postFilterState, terms []*weightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) error { + for _, term := range terms { + pl.processTerm(state, term, podToCheck, fixedNode, multiplier) + } + return nil +} + +func (pl *InterPodAffinity) processExistingPod(state *postFilterState, existingPod *v1.Pod, existingPodNodeInfo *nodeinfo.NodeInfo, incomingPod *v1.Pod) error { + existingPodAffinity := existingPod.Spec.Affinity + existingHasAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAffinity != nil + existingHasAntiAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAntiAffinity != nil + existingPodNode := existingPodNodeInfo.Node() + + // For every soft pod affinity term of , if matches the term, + // increment for every node in the cluster with the same + // value as that of `s node by the term`s weight. + pl.processTerms(state, state.affinityTerms, existingPod, existingPodNode, 1) + + // For every soft pod anti-affinity term of , if matches the term, + // decrement for every node in the cluster with the same + // value as that of `s node by the term`s weight. + pl.processTerms(state, state.antiAffinityTerms, existingPod, existingPodNode, -1) + + if existingHasAffinityConstraints { + // For every hard pod affinity term of , if matches the term, + // increment for every node in the cluster with the same + // value as that of 's node by the constant + if pl.hardPodAffinityWeight > 0 { + terms := existingPodAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution + // TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution. + //if len(existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 { + // terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...) + //} + for i := range terms { + term := &terms[i] + processedTerm, err := newWeightedAffinityTerm(existingPod, term, pl.hardPodAffinityWeight) + if err != nil { + return err + } + pl.processTerm(state, processedTerm, incomingPod, existingPodNode, 1) + } + } + // For every soft pod affinity term of , if matches the term, + // increment for every node in the cluster with the same + // value as that of 's node by the term's weight. + terms, err := getProcessedTerms(existingPod, existingPodAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution) + if err != nil { + klog.Error(err) + return nil + } + + pl.processTerms(state, terms, incomingPod, existingPodNode, 1) + } + if existingHasAntiAffinityConstraints { + // For every soft pod anti-affinity term of , if matches the term, + // decrement for every node in the cluster with the same + // value as that of 's node by the term's weight. + terms, err := getProcessedTerms(existingPod, existingPodAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution) + if err != nil { + return err + } + pl.processTerms(state, terms, incomingPod, existingPodNode, -1) + } + return nil +} + +// PostFilter builds and writes cycle state used by Score and NormalizeScore. +func (pl *InterPodAffinity) PostFilter( + pCtx context.Context, + cycleState *framework.CycleState, + pod *v1.Pod, + nodes []*v1.Node, + _ framework.NodeToStatusMap, +) *framework.Status { + if len(nodes) == 0 { + // No nodes to score. + return nil + } + + if pl.sharedLister == nil { + return framework.NewStatus(framework.Error, fmt.Sprintf("BuildTopologyPairToScore with empty shared lister")) + } + + affinity := pod.Spec.Affinity + hasAffinityConstraints := affinity != nil && affinity.PodAffinity != nil + hasAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil + + // Unless the pod being scheduled has affinity terms, we only + // need to process nodes hosting pods with affinity. + allNodes, err := pl.sharedLister.NodeInfos().HavePodsWithAffinityList() + if err != nil { + framework.NewStatus(framework.Error, fmt.Sprintf("get pods with affinity list error, err: %v", err)) + } + if hasAffinityConstraints || hasAntiAffinityConstraints { + allNodes, err = pl.sharedLister.NodeInfos().List() + if err != nil { + framework.NewStatus(framework.Error, fmt.Sprintf("get all nodes from shared lister error, err: %v", err)) + } + } + + var affinityTerms []*weightedAffinityTerm + var antiAffinityTerms []*weightedAffinityTerm + if hasAffinityConstraints { + if affinityTerms, err = getProcessedTerms(pod, affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution); err != nil { + klog.Error(err) + return nil + } + } + if hasAntiAffinityConstraints { + if antiAffinityTerms, err = getProcessedTerms(pod, affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution); err != nil { + klog.Error(err) + return nil + } + } + + state := &postFilterState{ + topologyScore: make(map[string]map[string]int64), + affinityTerms: affinityTerms, + antiAffinityTerms: antiAffinityTerms, + } + + errCh := schedutil.NewErrorChannel() + ctx, cancel := context.WithCancel(pCtx) + processNode := func(i int) { + nodeInfo := allNodes[i] + if nodeInfo.Node() == nil { + return + } + // Unless the pod being scheduled has affinity terms, we only + // need to process pods with affinity in the node. + podsToProcess := nodeInfo.PodsWithAffinity() + if hasAffinityConstraints || hasAntiAffinityConstraints { + // We need to process all the pods. + podsToProcess = nodeInfo.Pods() + } + + for _, existingPod := range podsToProcess { + if err := pl.processExistingPod(state, existingPod, nodeInfo, pod); err != nil { + errCh.SendErrorWithCancel(err, cancel) + return + } + } + } + workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode) + if err := errCh.ReceiveError(); err != nil { + return framework.NewStatus(framework.Error, err.Error()) + } + + cycleState.Write(postFilterStateKey, state) + return nil +} + +func getPostFilterState(cycleState *framework.CycleState) (*postFilterState, error) { + c, err := cycleState.Read(postFilterStateKey) + if err != nil { + return nil, fmt.Errorf("Error reading %q from cycleState: %v", preFilterStateKey, err) + } + + s, ok := c.(*postFilterState) + if !ok { + return nil, fmt.Errorf("%+v convert to interpodaffinity.postFilterState error", c) + } + return s, nil +} + // Score invoked at the Score extension point. // The "score" returned in this function is the matching number of pods on the `nodeName`, // it is normalized later. -func (pl *InterPodAffinity) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { - nodeInfo, err := pl.snapshotSharedLister.NodeInfos().Get(nodeName) +func (pl *InterPodAffinity) Score(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { + nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName) + if err != nil || nodeInfo.Node() == nil { + return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v, node is nil: %v", nodeName, err, nodeInfo.Node() == nil)) + } + node := nodeInfo.Node() + + s, err := getPostFilterState(cycleState) if err != nil { - return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) + return 0, framework.NewStatus(framework.Error, err.Error()) + } + var score int64 + for tpKey, tpValues := range s.topologyScore { + if v, exist := node.Labels[tpKey]; exist { + score += tpValues[v] + } } - meta := migration.PriorityMetadata(state) - s, err := priorities.CalculateInterPodAffinityPriorityMap(pod, meta, nodeInfo) - return s.Score, migration.ErrorToFrameworkStatus(err) + return score, nil } -// NormalizeScore invoked after scoring all nodes. -func (pl *InterPodAffinity) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status { - meta := migration.PriorityMetadata(state) - err := priorities.CalculateInterPodAffinityPriorityReduce(pod, meta, pl.snapshotSharedLister, scores) - return migration.ErrorToFrameworkStatus(err) +// NormalizeScore normalizes the score for each filteredNode. +// The basic rule is: the bigger the score(matching number of pods) is, the smaller the +// final normalized score will be. +func (pl *InterPodAffinity) NormalizeScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status { + s, err := getPostFilterState(cycleState) + if err != nil { + return framework.NewStatus(framework.Error, err.Error()) + } + if len(s.topologyScore) == 0 { + return nil + } + + var maxCount, minCount int64 + for i := range scores { + score := scores[i].Score + if score > maxCount { + maxCount = score + } + if score < minCount { + minCount = score + } + } + + maxMinDiff := maxCount - minCount + for i := range scores { + fScore := float64(0) + if maxMinDiff > 0 { + fScore = float64(framework.MaxNodeScore) * (float64(scores[i].Score-minCount) / float64(maxMinDiff)) + } + + scores[i].Score = int64(fScore) + } + + return nil } // ScoreExtensions of the Score plugin. @@ -170,12 +460,19 @@ func (pl *InterPodAffinity) ScoreExtensions() framework.ScoreExtensions { } // New initializes a new plugin and returns it. -func New(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) { +func New(plArgs *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) { if h.SnapshotSharedLister() == nil { return nil, fmt.Errorf("SnapshotSharedlister is nil") } + + args := &Args{} + if err := framework.DecodeInto(plArgs, args); err != nil { + return nil, err + } + return &InterPodAffinity{ - snapshotSharedLister: h.SnapshotSharedLister(), - podAffinityChecker: predicates.NewPodAffinityChecker(h.SnapshotSharedLister()), + sharedLister: h.SnapshotSharedLister(), + podAffinityChecker: predicates.NewPodAffinityChecker(h.SnapshotSharedLister()), + hardPodAffinityWeight: args.HardPodAffinityWeight, }, nil } diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/interpod_affinity_test.go b/pkg/scheduler/framework/plugins/interpodaffinity/interpod_affinity_test.go index cd2017671a4..129b06cbe8f 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/interpod_affinity_test.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/interpod_affinity_test.go @@ -18,16 +18,14 @@ package interpodaffinity import ( "context" + "fmt" "reflect" "testing" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/informers" - clientsetfake "k8s.io/client-go/kubernetes/fake" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" - "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" - "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" ) @@ -57,7 +55,7 @@ func createPodWithAffinityTerms(namespace, nodeName string, labels map[string]st } -func TestSingleNode(t *testing.T) { +func TestRequiredAffinitySingleNode(t *testing.T) { podLabel := map[string]string{"service": "securityscan"} labels1 := map[string]string{ "region": "r1", @@ -783,8 +781,8 @@ func TestSingleNode(t *testing.T) { t.Run(test.name, func(t *testing.T) { snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, []*v1.Node{test.node})) p := &InterPodAffinity{ - snapshotSharedLister: snapshot, - podAffinityChecker: predicates.NewPodAffinityChecker(snapshot), + sharedLister: snapshot, + podAffinityChecker: predicates.NewPodAffinityChecker(snapshot), } state := framework.NewCycleState() preFilterStatus := p.PreFilter(context.Background(), state, test.pod) @@ -799,7 +797,7 @@ func TestSingleNode(t *testing.T) { } } -func TestMultipleNodes(t *testing.T) { +func TestRequiredAffinityMultipleNodes(t *testing.T) { podLabelA := map[string]string{ "foo": "bar", } @@ -1621,8 +1619,8 @@ func TestMultipleNodes(t *testing.T) { snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, test.nodes)) for indexNode, node := range test.nodes { p := &InterPodAffinity{ - snapshotSharedLister: snapshot, - podAffinityChecker: predicates.NewPodAffinityChecker(snapshot), + sharedLister: snapshot, + podAffinityChecker: predicates.NewPodAffinityChecker(snapshot), } state := framework.NewCycleState() preFilterStatus := p.PreFilter(context.Background(), state, test.pod) @@ -1638,7 +1636,7 @@ func TestMultipleNodes(t *testing.T) { } } -func TestInterPodAffinityPriority(t *testing.T) { +func TestPreferredAffinity(t *testing.T) { labelRgChina := map[string]string{ "region": "China", } @@ -2127,35 +2125,27 @@ func TestInterPodAffinityPriority(t *testing.T) { t.Run(test.name, func(t *testing.T) { state := framework.NewCycleState() snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, test.nodes)) - fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot)) + p := &InterPodAffinity{ + sharedLister: snapshot, + podAffinityChecker: predicates.NewPodAffinityChecker(snapshot), + hardPodAffinityWeight: 1, + } - client := clientsetfake.NewSimpleClientset() - informerFactory := informers.NewSharedInformerFactory(client, 0) - - metaDataProducer := priorities.NewMetadataFactory( - informerFactory.Core().V1().Services().Lister(), - informerFactory.Core().V1().ReplicationControllers().Lister(), - informerFactory.Apps().V1().ReplicaSets().Lister(), - informerFactory.Apps().V1().StatefulSets().Lister(), - 1, - ) - - metaData := metaDataProducer(test.pod, test.nodes, snapshot) - - state.Write(migration.PrioritiesStateKey, &migration.PrioritiesStateData{Reference: metaData}) - - p, _ := New(nil, fh) + status := p.PostFilter(context.Background(), state, test.pod, test.nodes, nil) + if !status.IsSuccess() { + t.Errorf("unexpected error: %v", status) + } var gotList framework.NodeScoreList for _, n := range test.nodes { nodeName := n.ObjectMeta.Name - score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.pod, nodeName) + score, status := p.Score(context.Background(), state, test.pod, nodeName) if !status.IsSuccess() { t.Errorf("unexpected error: %v", status) } gotList = append(gotList, framework.NodeScore{Name: nodeName, Score: score}) } - status := p.(framework.ScorePlugin).ScoreExtensions().NormalizeScore(context.Background(), state, test.pod, gotList) + status = p.ScoreExtensions().NormalizeScore(context.Background(), state, test.pod, gotList) if !status.IsSuccess() { t.Errorf("unexpected error: %v", status) } @@ -2168,7 +2158,7 @@ func TestInterPodAffinityPriority(t *testing.T) { } } -func TestHardPodAffinitySymmetricWeight(t *testing.T) { +func TestPreferredAffinityWithHardPodAffinitySymmetricWeight(t *testing.T) { podLabelServiceS1 := map[string]string{ "service": "S1", } @@ -2244,22 +2234,12 @@ func TestHardPodAffinitySymmetricWeight(t *testing.T) { snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, test.nodes)) fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot)) - client := clientsetfake.NewSimpleClientset() - informerFactory := informers.NewSharedInformerFactory(client, 0) - - metaDataProducer := priorities.NewMetadataFactory( - informerFactory.Core().V1().Services().Lister(), - informerFactory.Core().V1().ReplicationControllers().Lister(), - informerFactory.Apps().V1().ReplicaSets().Lister(), - informerFactory.Apps().V1().StatefulSets().Lister(), - test.hardPodAffinityWeight, - ) - - metaData := metaDataProducer(test.pod, test.nodes, snapshot) - - state.Write(migration.PrioritiesStateKey, &migration.PrioritiesStateData{Reference: metaData}) - - p, _ := New(nil, fh) + args := &runtime.Unknown{Raw: []byte(fmt.Sprintf(`{"hardPodAffinityWeight":%d}`, test.hardPodAffinityWeight))} + p, _ := New(args, fh) + status := p.(framework.PostFilterPlugin).PostFilter(context.Background(), state, test.pod, test.nodes, nil) + if !status.IsSuccess() { + t.Errorf("unexpected error: %v", status) + } var gotList framework.NodeScoreList for _, n := range test.nodes { nodeName := n.ObjectMeta.Name @@ -2270,7 +2250,7 @@ func TestHardPodAffinitySymmetricWeight(t *testing.T) { gotList = append(gotList, framework.NodeScore{Name: nodeName, Score: score}) } - status := p.(framework.ScorePlugin).ScoreExtensions().NormalizeScore(context.Background(), state, test.pod, gotList) + status = p.(framework.ScorePlugin).ScoreExtensions().NormalizeScore(context.Background(), state, test.pod, gotList) if !status.IsSuccess() { t.Errorf("unexpected error: %v", status) } @@ -2282,7 +2262,7 @@ func TestHardPodAffinitySymmetricWeight(t *testing.T) { } } -func TestStateAddRemovePod(t *testing.T) { +func TestPreFilterStateAddRemovePod(t *testing.T) { var label1 = map[string]string{ "region": "r1", "zone": "z11", @@ -2511,8 +2491,8 @@ func TestStateAddRemovePod(t *testing.T) { snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(pods, test.nodes)) p := &InterPodAffinity{ - snapshotSharedLister: snapshot, - podAffinityChecker: predicates.NewPodAffinityChecker(snapshot), + sharedLister: snapshot, + podAffinityChecker: predicates.NewPodAffinityChecker(snapshot), } cycleState := framework.NewCycleState() preFilterStatus := p.PreFilter(context.Background(), cycleState, test.pendingPod) diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 21a67d1668d..986f00c430d 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -146,6 +146,9 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) { {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, }, + "PostFilterPlugin": { + {Name: "InterPodAffinity"}, + }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 1}, {Name: "ImageLocality", Weight: 1}, @@ -222,6 +225,9 @@ kind: Policy {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, }, + "PostFilterPlugin": { + {Name: "InterPodAffinity"}, + }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 1}, {Name: "ImageLocality", Weight: 1},