Merge pull request #84449 from ahg-g/ahg-prioritymeta

Priorities use SharedLister interface instead of NodeInfo Map
This commit is contained in:
Kubernetes Prow Robot 2019-10-29 17:21:03 -07:00 committed by GitHub
commit a8727f0f04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 169 additions and 155 deletions

View File

@ -86,6 +86,7 @@ go_test(
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/cache/fake:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",

View File

@ -27,7 +27,7 @@ import (
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)
// getExistingVolumeCountForNode gets the current number of volumes on node.
@ -401,17 +401,17 @@ func TestBalancedResourceAllocation(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, test.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes)
if len(test.pod.Spec.Volumes) > 0 {
maxVolumes := 5
for _, info := range nodeNameToInfo {
for _, info := range snapshot.NodeInfoMap {
info.TransientInfo.TransNodeInfo.AllocatableVolumesCount = getExistingVolumeCountForNode(info.Pods(), maxVolumes)
info.TransientInfo.TransNodeInfo.RequestedVolumes = len(test.pod.Spec.Volumes)
}
}
function := priorityFunction(BalancedResourceAllocationMap, nil, nil)
list, err := function(test.pod, nodeNameToInfo, test.nodes)
list, err := function(test.pod, snapshot, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)

View File

@ -25,7 +25,7 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/klog"
@ -82,7 +82,7 @@ func (t *topologySpreadConstraintsMap) initialize(pod *v1.Pod, nodes []*v1.Node)
// Note: Symmetry is not applicable. We only weigh how incomingPod matches existingPod.
// Whether existingPod matches incomingPod doesn't contribute to the final score.
// This is different from the Affinity API.
func CalculateEvenPodsSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (framework.NodeScoreList, error) {
func CalculateEvenPodsSpreadPriority(pod *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (framework.NodeScoreList, error) {
result := make(framework.NodeScoreList, len(nodes))
// return if incoming pod doesn't have soft topology spread constraints.
constraints := getSoftTopologySpreadConstraints(pod)
@ -90,18 +90,18 @@ func CalculateEvenPodsSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*sch
return result, nil
}
allNodes, err := sharedLister.NodeInfos().List()
if err != nil {
return nil, err
}
t := newTopologySpreadConstraintsMap()
t.initialize(pod, nodes)
allNodeNames := make([]string, 0, len(nodeNameToInfo))
for name := range nodeNameToInfo {
allNodeNames = append(allNodeNames, name)
}
errCh := schedutil.NewErrorChannel()
ctx, cancel := context.WithCancel(context.Background())
processAllNode := func(i int) {
nodeInfo := nodeNameToInfo[allNodeNames[i]]
nodeInfo := allNodes[i]
node := nodeInfo.Node()
if node == nil {
return
@ -136,7 +136,7 @@ func CalculateEvenPodsSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*sch
atomic.AddInt32(t.topologyPairToPodCounts[pair], matchSum)
}
}
workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processAllNode)
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processAllNode)
if err := errCh.ReceiveError(); err != nil {
return nil, err
}

View File

@ -22,7 +22,7 @@ import (
v1 "k8s.io/api/core/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
st "k8s.io/kubernetes/pkg/scheduler/testing"
)
@ -434,9 +434,8 @@ func TestCalculateEvenPodsSpreadPriority(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
allNodes := append([]*v1.Node{}, tt.nodes...)
allNodes = append(allNodes, tt.failedNodes...)
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(tt.existingPods, allNodes)
got, _ := CalculateEvenPodsSpreadPriority(tt.pod, nodeNameToInfo, tt.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(tt.existingPods, allNodes)
got, _ := CalculateEvenPodsSpreadPriority(tt.pod, snapshot, tt.nodes)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("CalculateEvenPodsSpreadPriority() = %#v, want %#v", got, tt.want)
}
@ -484,10 +483,10 @@ func BenchmarkTestCalculateEvenPodsSpreadPriority(b *testing.B) {
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
existingPods, allNodes, filteredNodes := st.MakeNodesAndPodsForEvenPodsSpread(tt.pod, tt.existingPodsNum, tt.allNodesNum, tt.filteredNodesNum)
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(existingPods, allNodes)
snapshot := nodeinfosnapshot.NewSnapshot(existingPods, allNodes)
b.ResetTimer()
for i := 0; i < b.N; i++ {
CalculateEvenPodsSpreadPriority(tt.pod, nodeNameToInfo, filteredNodes)
CalculateEvenPodsSpreadPriority(tt.pod, snapshot, filteredNodes)
}
})
}

View File

@ -25,7 +25,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
"k8s.io/kubernetes/pkg/util/parsers"
)
@ -184,8 +184,8 @@ func TestImageLocalityPriority(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, test.nodes)
list, err := priorityFunction(ImageLocalityPriorityMap, nil, &priorityMetadata{totalNumNodes: len(test.nodes)})(test.pod, nodeNameToInfo, test.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes)
list, err := priorityFunction(ImageLocalityPriorityMap, nil, &priorityMetadata{totalNumNodes: len(test.nodes)})(test.pod, snapshot, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -26,7 +26,6 @@ import (
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/klog"
@ -34,14 +33,12 @@ import (
// InterPodAffinity contains information to calculate inter pod affinity.
type InterPodAffinity struct {
nodeInfoLister schedulerlisters.NodeInfoLister
hardPodAffinityWeight int32
}
// NewInterPodAffinityPriority creates an InterPodAffinity.
func NewInterPodAffinityPriority(nodeInfoLister schedulerlisters.NodeInfoLister, hardPodAffinityWeight int32) PriorityFunction {
func NewInterPodAffinityPriority(hardPodAffinityWeight int32) PriorityFunction {
interPodAffinity := &InterPodAffinity{
nodeInfoLister: nodeInfoLister,
hardPodAffinityWeight: hardPodAffinityWeight,
}
return interPodAffinity.CalculateInterPodAffinityPriority
@ -102,14 +99,14 @@ func (p *podAffinityPriorityMap) processTerms(terms []v1.WeightedPodAffinityTerm
// that node; the node(s) with the highest sum are the most preferred.
// Symmetry need to be considered for preferredDuringSchedulingIgnoredDuringExecution from podAffinity & podAntiAffinity,
// symmetry need to be considered for hard requirements from podAffinity
func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (framework.NodeScoreList, error) {
func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (framework.NodeScoreList, error) {
affinity := pod.Spec.Affinity
hasAffinityConstraints := affinity != nil && affinity.PodAffinity != nil
hasAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil
// pm stores (1) all nodes that should be considered and (2) the so-far computed score for each node.
pm := newPodAffinityPriorityMap(nodes)
allNodes, err := ipa.nodeInfoLister.List()
allNodes, err := sharedLister.NodeInfos().List()
if err != nil {
return nil, err
}
@ -118,7 +115,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
var maxCount, minCount int64
processPod := func(existingPod *v1.Pod) error {
existingPodNodeInfo, err := ipa.nodeInfoLister.Get(existingPod.Spec.NodeName)
existingPodNodeInfo, err := sharedLister.NodeInfos().Get(existingPod.Spec.NodeName)
if err != nil {
klog.Errorf("Node not found, %v", existingPod.Spec.NodeName)
return nil

View File

@ -516,10 +516,9 @@ func TestInterPodAffinityPriority(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes)
interPodAffinity := InterPodAffinity{
nodeInfoLister: snapshot.NodeInfos(),
hardPodAffinityWeight: v1.DefaultHardPodAffinitySymmetricWeight,
}
list, err := interPodAffinity.CalculateInterPodAffinityPriority(test.pod, snapshot.NodeInfoMap, test.nodes)
list, err := interPodAffinity.CalculateInterPodAffinityPriority(test.pod, snapshot, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -604,10 +603,9 @@ func TestHardPodAffinitySymmetricWeight(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes)
ipa := InterPodAffinity{
nodeInfoLister: snapshot.NodeInfos(),
hardPodAffinityWeight: test.hardPodAffinityWeight,
}
list, err := ipa.CalculateInterPodAffinityPriority(test.pod, snapshot.NodeInfoMap, test.nodes)
list, err := ipa.CalculateInterPodAffinityPriority(test.pod, snapshot, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -661,12 +659,11 @@ func BenchmarkInterPodAffinityPriority(b *testing.B) {
existingPods, allNodes := tt.prepFunc(tt.existingPodsNum, tt.allNodesNum)
snapshot := nodeinfosnapshot.NewSnapshot(existingPods, allNodes)
interPodAffinity := InterPodAffinity{
nodeInfoLister: snapshot.NodeInfos(),
hardPodAffinityWeight: v1.DefaultHardPodAffinitySymmetricWeight,
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
interPodAffinity.CalculateInterPodAffinityPriority(tt.pod, snapshot.NodeInfoMap, allNodes)
interPodAffinity.CalculateInterPodAffinityPriority(tt.pod, snapshot, allNodes)
}
})
}

View File

@ -24,7 +24,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)
func TestLeastRequested(t *testing.T) {
@ -253,8 +253,8 @@ func TestLeastRequested(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, test.nodes)
list, err := priorityFunction(LeastRequestedPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes)
list, err := priorityFunction(LeastRequestedPriorityMap, nil, nil)(test.pod, snapshot, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -22,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
appslisters "k8s.io/client-go/listers/apps/v1"
corelisters "k8s.io/client-go/listers/core/v1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
@ -61,11 +62,17 @@ type priorityMetadata struct {
}
// PriorityMetadata is a PriorityMetadataProducer. Node info can be nil.
func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo) interface{} {
func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, sharedLister schedulerlisters.SharedLister) interface{} {
// If we cannot compute metadata, just return nil
if pod == nil {
return nil
}
totalNumNodes := 0
if sharedLister != nil {
if l, err := sharedLister.NodeInfos().List(); err == nil {
totalNumNodes = len(l)
}
}
return &priorityMetadata{
podLimits: getResourceLimits(pod),
podTolerations: getAllTolerationPreferNoSchedule(pod.Spec.Tolerations),
@ -73,7 +80,7 @@ func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, nodeNameToInfo
podSelectors: getSelectors(pod, pmf.serviceLister, pmf.controllerLister, pmf.replicaSetLister, pmf.statefulSetLister),
controllerRef: metav1.GetControllerOf(pod),
podFirstServiceSelector: getFirstServiceSelector(pod, pmf.serviceLister),
totalNumNodes: len(nodeNameToInfo),
totalNumNodes: totalNumNodes,
}
}

View File

@ -24,7 +24,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)
func TestMostRequested(t *testing.T) {
@ -210,8 +210,8 @@ func TestMostRequested(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, test.nodes)
list, err := priorityFunction(MostRequestedPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes)
list, err := priorityFunction(MostRequestedPriorityMap, nil, nil)(test.pod, snapshot, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -23,7 +23,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)
func TestNodeAffinityPriority(t *testing.T) {
@ -167,9 +167,9 @@ func TestNodeAffinityPriority(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(nil, test.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(nil, test.nodes)
nap := priorityFunction(CalculateNodeAffinityPriorityMap, CalculateNodeAffinityPriorityReduce, nil)
list, err := nap(test.pod, nodeNameToInfo, test.nodes)
list, err := nap(test.pod, snapshot, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -23,7 +23,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)
func TestNewNodeLabelPriority(t *testing.T) {
@ -107,12 +107,12 @@ func TestNewNodeLabelPriority(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(nil, test.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(nil, test.nodes)
labelPrioritizer := &NodeLabelPrioritizer{
label: test.label,
presence: test.presence,
}
list, err := priorityFunction(labelPrioritizer.CalculateNodeLabelPriorityMap, nil, nil)(nil, nodeNameToInfo, test.nodes)
list, err := priorityFunction(labelPrioritizer.CalculateNodeLabelPriorityMap, nil, nil)(nil, snapshot, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -23,7 +23,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)
func TestNodePreferAvoidPriority(t *testing.T) {
@ -141,8 +141,8 @@ func TestNodePreferAvoidPriority(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(nil, test.nodes)
list, err := priorityFunction(CalculateNodePreferAvoidPodsPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(nil, test.nodes)
list, err := priorityFunction(CalculateNodePreferAvoidPodsPriorityMap, nil, nil)(test.pod, snapshot, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -19,7 +19,7 @@ package priorities
import (
"k8s.io/api/core/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
)
// NormalizeReduce generates a PriorityReduceFunction that can normalize the result
@ -29,7 +29,7 @@ func NormalizeReduce(maxPriority int64, reverse bool) PriorityReduceFunction {
return func(
_ *v1.Pod,
_ interface{},
_ map[string]*schedulernodeinfo.NodeInfo,
_ schedulerlisters.SharedLister,
result framework.NodeScoreList) error {
var maxCount int64

View File

@ -25,7 +25,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)
func TestCreatingFunctionShapeErrorsIfEmptyPoints(t *testing.T) {
@ -240,8 +240,8 @@ func TestRequestedToCapacityRatio(t *testing.T) {
newPod := buildResourcesPod("", test.requested)
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(scheduledPods, nodes)
list, err := priorityFunction(RequestedToCapacityRatioResourceAllocationPriorityDefault().PriorityMap, nil, nil)(newPod, nodeNameToInfo, nodes)
snapshot := nodeinfosnapshot.NewSnapshot(scheduledPods, nodes)
list, err := priorityFunction(RequestedToCapacityRatioResourceAllocationPriorityDefault().PriorityMap, nil, nil)(newPod, snapshot, nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -386,11 +386,11 @@ func TestResourceBinPackingSingleExtended(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, test.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes)
functionShape, _ := NewFunctionShape([]FunctionShapePoint{{0, 0}, {100, 10}})
resourceToWeightMap := ResourceToWeightMap{v1.ResourceName("intel.com/foo"): 1}
prior := RequestedToCapacityRatioResourceAllocationPriority(functionShape, resourceToWeightMap)
list, err := priorityFunction(prior.PriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes)
list, err := priorityFunction(prior.PriorityMap, nil, nil)(test.pod, snapshot, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -611,11 +611,11 @@ func TestResourceBinPackingMultipleExtended(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, test.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes)
functionShape, _ := NewFunctionShape([]FunctionShapePoint{{0, 0}, {100, 10}})
resourceToWeightMap := ResourceToWeightMap{v1.ResourceName("intel.com/foo"): 3, v1.ResourceName("intel.com/bar"): 5}
prior := RequestedToCapacityRatioResourceAllocationPriority(functionShape, resourceToWeightMap)
list, err := priorityFunction(prior.PriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes)
list, err := priorityFunction(prior.PriorityMap, nil, nil)(test.pod, snapshot, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -23,7 +23,7 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)
func TestResourceLimitsPriority(t *testing.T) {
@ -138,7 +138,7 @@ func TestResourceLimitsPriority(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(nil, test.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(nil, test.nodes)
metadata := &priorityMetadata{
podLimits: getResourceLimits(test.pod),
}
@ -151,7 +151,7 @@ func TestResourceLimitsPriority(t *testing.T) {
function = priorityFunction(ResourceLimitsPriorityMap, nil, nil)
}
list, err := function(test.pod, nodeNameToInfo, test.nodes)
list, err := function(test.pod, snapshot, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)

View File

@ -98,7 +98,7 @@ func (s *SelectorSpread) CalculateSpreadPriorityMap(pod *v1.Pod, meta interface{
// based on the number of existing matching pods on the node
// where zone information is included on the nodes, it favors nodes
// in zones with fewer existing matching pods.
func (s *SelectorSpread) CalculateSpreadPriorityReduce(pod *v1.Pod, meta interface{}, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, result framework.NodeScoreList) error {
func (s *SelectorSpread) CalculateSpreadPriorityReduce(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister, result framework.NodeScoreList) error {
countsByZone := make(map[string]int64, 10)
maxCountByZone := int64(0)
maxCountByNodeName := int64(0)
@ -107,7 +107,11 @@ func (s *SelectorSpread) CalculateSpreadPriorityReduce(pod *v1.Pod, meta interfa
if result[i].Score > maxCountByNodeName {
maxCountByNodeName = result[i].Score
}
zoneID := utilnode.GetZoneKey(nodeNameToInfo[result[i].Name].Node())
nodeInfo, err := sharedLister.NodeInfos().Get(result[i].Name)
if err != nil {
return err
}
zoneID := utilnode.GetZoneKey(nodeInfo.Node())
if zoneID == "" {
continue
}
@ -134,7 +138,12 @@ func (s *SelectorSpread) CalculateSpreadPriorityReduce(pod *v1.Pod, meta interfa
}
// If there is zone information present, incorporate it
if haveZones {
zoneID := utilnode.GetZoneKey(nodeNameToInfo[result[i].Name].Node())
nodeInfo, err := sharedLister.NodeInfos().Get(result[i].Name)
if err != nil {
return err
}
zoneID := utilnode.GetZoneKey(nodeInfo.Node())
if zoneID != "" {
zoneScore := MaxNodeScoreFloat64
if maxCountByZone > 0 {
@ -240,7 +249,7 @@ func (s *ServiceAntiAffinity) CalculateAntiAffinityPriorityMap(pod *v1.Pod, meta
// CalculateAntiAffinityPriorityReduce computes each node score with the same value for a particular label.
// The label to be considered is provided to the struct (ServiceAntiAffinity).
func (s *ServiceAntiAffinity) CalculateAntiAffinityPriorityReduce(pod *v1.Pod, meta interface{}, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, result framework.NodeScoreList) error {
func (s *ServiceAntiAffinity) CalculateAntiAffinityPriorityReduce(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister, result framework.NodeScoreList) error {
var numServicePods int64
var label string
podCounts := map[string]int64{}
@ -249,10 +258,15 @@ func (s *ServiceAntiAffinity) CalculateAntiAffinityPriorityReduce(pod *v1.Pod, m
for _, hostPriority := range result {
numServicePods += hostPriority.Score
if !labels.Set(nodeNameToInfo[hostPriority.Name].Node().Labels).Has(s.label) {
nodeInfo, err := sharedLister.NodeInfos().Get(hostPriority.Name)
if err != nil {
return err
}
if !labels.Set(nodeInfo.Node().Labels).Has(s.label) {
continue
}
label = labels.Set(nodeNameToInfo[hostPriority.Name].Node().Labels).Get(s.label)
label = labels.Set(nodeInfo.Node().Labels).Get(s.label)
labelNodesStatus[hostPriority.Name] = label
podCounts[label] += hostPriority.Score
}

View File

@ -25,7 +25,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)
func controllerRef(kind, name, uid string) []metav1.OwnerReference {
@ -337,7 +337,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, makeNodeList(test.nodes))
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, makeNodeList(test.nodes))
selectorSpread := SelectorSpread{
serviceLister: fakelisters.ServiceLister(test.services),
controllerLister: fakelisters.ControllerLister(test.rcs),
@ -350,10 +350,10 @@ func TestSelectorSpreadPriority(t *testing.T) {
fakelisters.ControllerLister(test.rcs),
fakelisters.ReplicaSetLister(test.rss),
fakelisters.StatefulSetLister(test.sss))
metaData := metaDataProducer(test.pod, nodeNameToInfo)
metaData := metaDataProducer(test.pod, snapshot)
ttp := priorityFunction(selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce, metaData)
list, err := ttp(test.pod, nodeNameToInfo, makeNodeList(test.nodes))
list, err := ttp(test.pod, snapshot, makeNodeList(test.nodes))
if err != nil {
t.Errorf("unexpected error: %v \n", err)
}
@ -573,7 +573,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, makeLabeledNodeList(labeledNodes))
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, makeLabeledNodeList(labeledNodes))
selectorSpread := SelectorSpread{
serviceLister: fakelisters.ServiceLister(test.services),
controllerLister: fakelisters.ControllerLister(test.rcs),
@ -586,9 +586,9 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
fakelisters.ControllerLister(test.rcs),
fakelisters.ReplicaSetLister(test.rss),
fakelisters.StatefulSetLister(test.sss))
metaData := metaDataProducer(test.pod, nodeNameToInfo)
metaData := metaDataProducer(test.pod, snapshot)
ttp := priorityFunction(selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce, metaData)
list, err := ttp(test.pod, nodeNameToInfo, makeLabeledNodeList(labeledNodes))
list, err := ttp(test.pod, snapshot, makeLabeledNodeList(labeledNodes))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -765,17 +765,17 @@ func TestZoneSpreadPriority(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, makeLabeledNodeList(test.nodes))
zoneSpread := ServiceAntiAffinity{podLister: fakelisters.PodLister(test.pods), serviceLister: fakelisters.ServiceLister(test.services), label: "zone"}
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, makeLabeledNodeList(test.nodes))
zoneSpread := ServiceAntiAffinity{podLister: snapshot.Pods(), serviceLister: fakelisters.ServiceLister(test.services), label: "zone"}
metaDataProducer := NewPriorityMetadataFactory(
fakelisters.ServiceLister(test.services),
fakelisters.ControllerLister(rcs),
fakelisters.ReplicaSetLister(rss),
fakelisters.StatefulSetLister(sss))
metaData := metaDataProducer(test.pod, nodeNameToInfo)
metaData := metaDataProducer(test.pod, snapshot)
ttp := priorityFunction(zoneSpread.CalculateAntiAffinityPriorityMap, zoneSpread.CalculateAntiAffinityPriorityReduce, metaData)
list, err := ttp(test.pod, nodeNameToInfo, makeLabeledNodeList(test.nodes))
list, err := ttp(test.pod, snapshot, makeLabeledNodeList(test.nodes))
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -23,7 +23,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)
func nodeWithTaints(nodeName string, taints []v1.Taint) *v1.Node {
@ -227,9 +227,9 @@ func TestTaintAndToleration(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(nil, test.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(nil, test.nodes)
ttp := priorityFunction(ComputeTaintTolerationPriorityMap, ComputeTaintTolerationPriorityReduce, nil)
list, err := ttp(test.pod, nodeNameToInfo, test.nodes)
list, err := ttp(test.pod, snapshot, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -23,7 +23,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
)
func makeNode(node string, milliCPU, memory int64) *v1.Node {
@ -59,17 +59,21 @@ func makeNodeWithExtendedResource(node string, milliCPU, memory int64, extendedR
}
func priorityFunction(mapFn PriorityMapFunction, reduceFn PriorityReduceFunction, metaData interface{}) PriorityFunction {
return func(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (framework.NodeScoreList, error) {
return func(pod *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (framework.NodeScoreList, error) {
result := make(framework.NodeScoreList, 0, len(nodes))
for i := range nodes {
hostResult, err := mapFn(pod, metaData, nodeNameToInfo[nodes[i].Name])
nodeInfo, err := sharedLister.NodeInfos().Get(nodes[i].Name)
if err != nil {
return nil, err
}
hostResult, err := mapFn(pod, metaData, nodeInfo)
if err != nil {
return nil, err
}
result = append(result, hostResult)
}
if reduceFn != nil {
if err := reduceFn(pod, metaData, nodeNameToInfo, result); err != nil {
if err := reduceFn(pod, metaData, sharedLister, result); err != nil {
return nil, err
}
}

View File

@ -19,6 +19,7 @@ package priorities
import (
"k8s.io/api/core/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
@ -31,16 +32,16 @@ type PriorityMapFunction func(pod *v1.Pod, meta interface{}, nodeInfo *scheduler
// final scores for all nodes.
// TODO: Figure out the exact API of this method.
// TODO: Change interface{} to a specific type.
type PriorityReduceFunction func(pod *v1.Pod, meta interface{}, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, result framework.NodeScoreList) error
type PriorityReduceFunction func(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister, result framework.NodeScoreList) error
// PriorityMetadataProducer is a function that computes metadata for a given pod. This
// is now used for only for priority functions. For predicates please use PredicateMetadataProducer.
type PriorityMetadataProducer func(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo) interface{}
type PriorityMetadataProducer func(pod *v1.Pod, sharedLister schedulerlisters.SharedLister) interface{}
// PriorityFunction is a function that computes scores for all nodes.
// DEPRECATED
// Use Map-Reduce pattern for priority functions.
type PriorityFunction func(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (framework.NodeScoreList, error)
type PriorityFunction func(pod *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (framework.NodeScoreList, error)
// PriorityConfig is a config used for a priority function.
type PriorityConfig struct {
@ -54,6 +55,6 @@ type PriorityConfig struct {
}
// EmptyPriorityMetadataProducer returns a no-op PriorityMetadataProducer type.
func EmptyPriorityMetadataProducer(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo) interface{} {
func EmptyPriorityMetadataProducer(pod *v1.Pod, sharedLister schedulerlisters.SharedLister) interface{} {
return nil
}

View File

@ -22,20 +22,18 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
st "k8s.io/kubernetes/pkg/scheduler/testing"
)
// EmptyPriorityMetadataProducer should return a no-op PriorityMetadataProducer type.
func TestEmptyPriorityMetadataProducer(t *testing.T) {
fakePod := new(v1.Pod)
fakePod := st.MakePod().Name("p1").Node("node2").Obj()
fakeLabelSelector := labels.SelectorFromSet(labels.Set{"foo": "bar"})
nodeNameToInfo := map[string]*schedulernodeinfo.NodeInfo{
"2": schedulernodeinfo.NewNodeInfo(fakePod),
"1": schedulernodeinfo.NewNodeInfo(),
}
snapshot := nodeinfosnapshot.NewSnapshot([]*v1.Pod{fakePod}, []*v1.Node{st.MakeNode().Name("node1").Obj(), st.MakeNode().Name("node-a").Obj()})
// Test EmptyPriorityMetadataProducer
metadata := EmptyPriorityMetadataProducer(fakePod, nodeNameToInfo)
metadata := EmptyPriorityMetadataProducer(fakePod, snapshot)
if metadata != nil {
t.Errorf("failed to produce empty metadata: got %v, expected nil", metadata)
}

View File

@ -70,7 +70,7 @@ func init() {
priorities.InterPodAffinityPriority,
scheduler.PriorityConfigFactory{
Function: func(args scheduler.PluginFactoryArgs) priorities.PriorityFunction {
return priorities.NewInterPodAffinityPriority(args.NodeInfoLister, args.HardPodAffinitySymmetricWeight)
return priorities.NewInterPodAffinityPriority(args.HardPodAffinitySymmetricWeight)
},
Weight: 1,
},

View File

@ -59,6 +59,7 @@ go_test(
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/listers/fake:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",

View File

@ -40,6 +40,7 @@ import (
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/pkg/scheduler/util"
)
@ -106,7 +107,7 @@ func machine2PrioritizerExtender(pod *v1.Pod, nodes []*v1.Node) (*framework.Node
return &result, nil
}
func machine2Prioritizer(_ *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (framework.NodeScoreList, error) {
func machine2Prioritizer(_ *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (framework.NodeScoreList, error) {
result := []framework.NodeScore{}
for _, node := range nodes {
score := 10

View File

@ -238,8 +238,8 @@ func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleS
}, nil
}
metaPrioritiesInterface := g.priorityMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)
priorityList, err := PrioritizeNodes(ctx, pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders, g.framework, state)
metaPrioritiesInterface := g.priorityMetaProducer(pod, g.nodeInfoSnapshot)
priorityList, err := PrioritizeNodes(ctx, pod, g.nodeInfoSnapshot, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders, g.framework, state)
if err != nil {
return result, err
}
@ -704,7 +704,7 @@ func (g *genericScheduler) podFitsOnNode(
func PrioritizeNodes(
ctx context.Context,
pod *v1.Pod,
nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
snapshot *nodeinfosnapshot.Snapshot,
meta interface{},
priorityConfigs []priorities.PriorityConfig,
nodes []*v1.Node,
@ -716,7 +716,7 @@ func PrioritizeNodes(
if len(priorityConfigs) == 0 && len(extenders) == 0 && !fwk.HasScorePlugins() {
result := make(framework.NodeScoreList, 0, len(nodes))
for i := range nodes {
hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
hostPriority, err := EqualPriorityMap(pod, meta, snapshot.NodeInfoMap[nodes[i].Name])
if err != nil {
return nil, err
}
@ -750,7 +750,7 @@ func PrioritizeNodes(
wg.Done()
}()
var err error
results[index], err = priorityConfigs[index].Function(pod, nodeNameToInfo, nodes)
results[index], err = priorityConfigs[index].Function(pod, snapshot, nodes)
if err != nil {
appendError(err)
}
@ -761,7 +761,7 @@ func PrioritizeNodes(
}
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
nodeInfo := nodeNameToInfo[nodes[index].Name]
nodeInfo := snapshot.NodeInfoMap[nodes[index].Name]
for i := range priorityConfigs {
if priorityConfigs[i].Function != nil {
continue
@ -787,7 +787,7 @@ func PrioritizeNodes(
metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Dec()
wg.Done()
}()
if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
if err := priorityConfigs[index].Reduce(pod, meta, snapshot, results[index]); err != nil {
appendError(err)
}
if klog.V(10) {
@ -825,7 +825,7 @@ func PrioritizeNodes(
}
if len(extenders) != 0 && nodes != nil {
combinedScores := make(map[string]int64, len(nodeNameToInfo))
combinedScores := make(map[string]int64, len(snapshot.NodeInfoList))
for i := range extenders {
if !extenders[i].IsInterested(pod) {
continue

View File

@ -47,6 +47,7 @@ import (
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
@ -83,7 +84,7 @@ func hasNoPodsPredicate(pod *v1.Pod, meta algorithmpredicates.PredicateMetadata,
return false, []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, nil
}
func numericPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (framework.NodeScoreList, error) {
func numericPriority(pod *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (framework.NodeScoreList, error) {
result := []framework.NodeScore{}
for _, node := range nodes {
score, err := strconv.Atoi(node.Name)
@ -98,11 +99,11 @@ func numericPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.N
return result, nil
}
func reverseNumericPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (framework.NodeScoreList, error) {
func reverseNumericPriority(pod *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (framework.NodeScoreList, error) {
var maxScore float64
minScore := math.MaxFloat64
reverseResult := []framework.NodeScore{}
result, err := numericPriority(pod, nodeNameToInfo, nodes)
result, err := numericPriority(pod, sharedLister, nodes)
if err != nil {
return nil, err
}
@ -132,7 +133,7 @@ func falseMapPriority(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo
return framework.NodeScore{}, errPrioritize
}
func getNodeReducePriority(pod *v1.Pod, meta interface{}, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, result framework.NodeScoreList) error {
func getNodeReducePriority(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister, result framework.NodeScoreList) error {
for _, host := range result {
if host.Name == "" {
return fmt.Errorf("unexpected empty host name")
@ -998,7 +999,7 @@ func TestZeroRequest(t *testing.T) {
pc := priorities.PriorityConfig{Map: selectorSpreadPriorityMap, Reduce: selectorSpreadPriorityReduce, Weight: 1}
priorityConfigs = append(priorityConfigs, pc)
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, test.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes)
metaDataProducer := priorities.NewPriorityMetadataFactory(
informerFactory.Core().V1().Services().Lister(),
@ -1007,11 +1008,11 @@ func TestZeroRequest(t *testing.T) {
informerFactory.Apps().V1().StatefulSets().Lister(),
)
metaData := metaDataProducer(test.pod, nodeNameToInfo)
metaData := metaDataProducer(test.pod, snapshot)
list, err := PrioritizeNodes(
context.Background(),
test.pod, nodeNameToInfo, metaData, priorityConfigs,
test.pod, snapshot, metaData, priorityConfigs,
test.nodes, []algorithm.SchedulerExtender{}, emptyFramework, framework.NewCycleState())
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -1646,21 +1647,22 @@ func TestPickOneNodeForPreemption(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
g := &genericScheduler{
framework: emptyFramework,
predicates: test.predicates,
predicateMetaProducer: algorithmpredicates.GetPredicateMetadata,
}
assignDefaultStartTime(test.pods)
g.nodeInfoSnapshot = g.framework.NodeInfoSnapshot()
nodes := []*v1.Node{}
for _, n := range test.nodes {
nodes = append(nodes, makeNode(n, priorityutil.DefaultMilliCPURequest*5, priorityutil.DefaultMemoryRequest*5))
}
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, nodes)
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, nodes)
fwk, _ := framework.NewFramework(EmptyPluginRegistry, nil, []schedulerconfig.PluginConfig{}, framework.WithNodeInfoSnapshot(snapshot))
g := &genericScheduler{
framework: fwk,
predicates: test.predicates,
predicateMetaProducer: algorithmpredicates.GetPredicateMetadata,
nodeInfoSnapshot: snapshot,
}
assignDefaultStartTime(test.pods)
state := framework.NewCycleState()
g.nodeInfoSnapshot.NodeInfoMap = nodeNameToInfo
candidateNodes, _ := g.selectNodesForPreemption(context.Background(), state, test.pod, nodes, nil)
node := pickOneNodeForPreemption(candidateNodes)
found := false

View File

@ -51,6 +51,7 @@ import (
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
@ -281,7 +282,7 @@ func PredicateFunc(pod *v1.Pod, meta predicates.PredicateMetadata, nodeInfo *sch
return true, nil, nil
}
func PriorityFunc(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (framework.NodeScoreList, error) {
func PriorityFunc(pod *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (framework.NodeScoreList, error) {
return []framework.NodeScore{}, nil
}

View File

@ -22,7 +22,6 @@ go_test(
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/framework/plugins/migration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//pkg/util/parsers:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",

View File

@ -28,7 +28,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
"k8s.io/kubernetes/pkg/util/parsers"
)
@ -201,14 +200,13 @@ func TestImageLocalityPriority(t *testing.T) {
informerFactory.Apps().V1().StatefulSets().Lister(),
)
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(nil, test.nodes)
meta := metaDataProducer(test.pod, nodeNameToInfo)
snapshot := nodeinfosnapshot.NewSnapshot(nil, test.nodes)
meta := metaDataProducer(test.pod, snapshot)
state := framework.NewCycleState()
state.Write(migration.PrioritiesStateKey, &migration.PrioritiesStateData{Reference: meta})
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(nodeinfosnapshot.NewSnapshot(nil, test.nodes)))
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(snapshot))
p, _ := New(nil, fh)
var gotList framework.NodeScoreList

View File

@ -66,7 +66,7 @@ func (pl *NodeAffinity) Score(ctx context.Context, state *framework.CycleState,
// NormalizeScore invoked after scoring all nodes.
func (pl *NodeAffinity) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
// Note that CalculateNodeAffinityPriorityReduce doesn't use priority metadata, hence passing nil here.
err := priorities.CalculateNodeAffinityPriorityReduce(pod, nil, pl.handle.NodeInfoSnapshot().NodeInfoMap, scores)
err := priorities.CalculateNodeAffinityPriorityReduce(pod, nil, pl.handle.SnapshotSharedLister(), scores)
return migration.ErrorToFrameworkStatus(err)
}

View File

@ -38,6 +38,7 @@ go_test(
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
],

View File

@ -66,7 +66,7 @@ func (pl *TaintToleration) Score(ctx context.Context, state *framework.CycleStat
// NormalizeScore invoked after scoring all nodes.
func (pl *TaintToleration) NormalizeScore(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
// Note that ComputeTaintTolerationPriorityReduce doesn't use priority metadata, hence passing nil here.
err := priorities.ComputeTaintTolerationPriorityReduce(pod, nil, pl.handle.NodeInfoSnapshot().NodeInfoMap, scores)
err := priorities.ComputeTaintTolerationPriorityReduce(pod, nil, pl.handle.SnapshotSharedLister(), scores)
return migration.ErrorToFrameworkStatus(err)
}

View File

@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)
func nodeWithTaints(nodeName string, taints []v1.Taint) *v1.Node {
@ -229,10 +230,8 @@ func TestTaintTolerationScore(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
state := framework.NewCycleState()
fh, _ := framework.NewFramework(nil, nil, nil)
snapshot := fh.NodeInfoSnapshot()
snapshot.NodeInfoMap = schedulernodeinfo.CreateNodeNameToInfoMap(nil, test.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(nil, test.nodes)
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(snapshot))
p, _ := New(nil, fh)
var gotList framework.NodeScoreList

View File

@ -612,10 +612,7 @@ func (f *framework) SnapshotSharedLister() schedulerlisters.SharedLister {
return f.nodeInfoSnapshot
}
// NodeInfoSnapshot returns the latest NodeInfo snapshot. The snapshot
// is taken at the beginning of a scheduling cycle and remains unchanged until a
// pod finishes "Reserve". There is no guarantee that the information remains
// unchanged after "Reserve".
// NodeInfoSnapshot returns the NodeInfo Snapshot handler.
func (f *framework) NodeInfoSnapshot() *nodeinfosnapshot.Snapshot {
return f.nodeInfoSnapshot
}

View File

@ -450,6 +450,9 @@ type Framework interface {
// ListPlugins returns a map of extension point name to list of configured Plugins.
ListPlugins() map[string][]config.Plugin
// NodeInfoSnapshot return the NodeInfo.Snapshot handler.
NodeInfoSnapshot() *nodeinfosnapshot.Snapshot
}
// FrameworkHandle provides data and some tools that plugins can use. It is
@ -465,15 +468,6 @@ type FrameworkHandle interface {
// cache instead.
SnapshotSharedLister() schedulerlisters.SharedLister
// NodeInfoSnapshot return the latest NodeInfo snapshot. The snapshot
// is taken at the beginning of a scheduling cycle and remains unchanged until
// a pod finishes "Reserve" point. There is no guarantee that the information
// remains unchanged in the binding phase of scheduling, so plugins in the binding
// cycle(permit/pre-bind/bind/post-bind/un-reserve plugin) should not use it,
// otherwise a concurrent read/write error might occur, they should use scheduler
// cache instead.
NodeInfoSnapshot() *nodeinfosnapshot.Snapshot
// IterateOverWaitingPods acquires a read lock and iterates over the WaitingPods map.
IterateOverWaitingPods(callback func(WaitingPod))

View File

@ -53,6 +53,7 @@ import (
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
)
@ -142,7 +143,7 @@ func PredicateOne(pod *v1.Pod, meta predicates.PredicateMetadata, nodeInfo *sche
return true, nil, nil
}
func PriorityOne(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (framework.NodeScoreList, error) {
func PriorityOne(pod *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (framework.NodeScoreList, error) {
return []framework.NodeScore{}, nil
}

View File

@ -34,6 +34,7 @@ go_test(
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/apis/extender/v1:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/testing:go_default_library",
"//plugin/pkg/admission/defaulttolerationseconds:go_default_library",

View File

@ -43,6 +43,7 @@ import (
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/test/integration/framework"
)
@ -62,11 +63,11 @@ func PredicateTwo(pod *v1.Pod, meta predicates.PredicateMetadata, nodeInfo *sche
return true, nil, nil
}
func PriorityOne(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (schedulerframework.NodeScoreList, error) {
func PriorityOne(pod *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (schedulerframework.NodeScoreList, error) {
return []schedulerframework.NodeScore{}, nil
}
func PriorityTwo(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (schedulerframework.NodeScoreList, error) {
func PriorityTwo(pod *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (schedulerframework.NodeScoreList, error) {
return []schedulerframework.NodeScore{}, nil
}