Merge pull request #84389 from ahg-g/ahg-interpod

Predicates use SharedLister interface instead of NodeInfo Map
This commit is contained in:
Kubernetes Prow Robot 2019-10-29 13:06:53 -07:00 committed by GitHub
commit 5f900f6332
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 147 additions and 176 deletions

View File

@ -63,6 +63,7 @@ go_test(
"//pkg/scheduler/api:go_default_library", "//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/listers/fake:go_default_library", "//pkg/scheduler/listers/fake:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//pkg/scheduler/testing:go_default_library", "//pkg/scheduler/testing:go_default_library",
"//pkg/volume/util:go_default_library", "//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",

View File

@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedutil "k8s.io/kubernetes/pkg/scheduler/util" schedutil "k8s.io/kubernetes/pkg/scheduler/util"
) )
@ -42,7 +43,7 @@ type PredicateMetadata interface {
} }
// PredicateMetadataProducer is a function that computes predicate metadata for a given pod. // PredicateMetadataProducer is a function that computes predicate metadata for a given pod.
type PredicateMetadataProducer func(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo) PredicateMetadata type PredicateMetadataProducer func(pod *v1.Pod, sharedLister schedulerlisters.SharedLister) PredicateMetadata
// AntiAffinityTerm's topology key value used in predicate metadata // AntiAffinityTerm's topology key value used in predicate metadata
type topologyPair struct { type topologyPair struct {
@ -314,7 +315,7 @@ func RegisterPredicateMetadataProducer(predicateName string, precomp predicateMe
} }
// EmptyPredicateMetadataProducer returns a no-op MetadataProducer type. // EmptyPredicateMetadataProducer returns a no-op MetadataProducer type.
func EmptyPredicateMetadataProducer(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo) PredicateMetadata { func EmptyPredicateMetadataProducer(pod *v1.Pod, sharedLister schedulerlisters.SharedLister) PredicateMetadata {
return nil return nil
} }
@ -330,20 +331,31 @@ func RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtende
} }
// GetPredicateMetadata returns the predicateMetadata which will be used by various predicates. // GetPredicateMetadata returns the predicateMetadata which will be used by various predicates.
func GetPredicateMetadata(pod *v1.Pod, nodeNameToInfoMap map[string]*schedulernodeinfo.NodeInfo) PredicateMetadata { func GetPredicateMetadata(pod *v1.Pod, sharedLister schedulerlisters.SharedLister) PredicateMetadata {
// If we cannot compute metadata, just return nil // If we cannot compute metadata, just return nil
if pod == nil { if pod == nil {
return nil return nil
} }
var allNodes []*schedulernodeinfo.NodeInfo
if sharedLister != nil {
n, err := sharedLister.NodeInfos().List()
if err != nil {
klog.Errorf("failed to list NodeInfos: %v", err)
return nil
}
allNodes = n
}
// evenPodsSpreadMetadata represents how existing pods match "pod" // evenPodsSpreadMetadata represents how existing pods match "pod"
// on its spread constraints // on its spread constraints
evenPodsSpreadMetadata, err := getEvenPodsSpreadMetadata(pod, nodeNameToInfoMap) evenPodsSpreadMetadata, err := getEvenPodsSpreadMetadata(pod, allNodes)
if err != nil { if err != nil {
klog.Errorf("Error calculating spreadConstraintsMap: %v", err) klog.Errorf("Error calculating spreadConstraintsMap: %v", err)
return nil return nil
} }
podAffinityMetadata, err := getPodAffinityMetadata(pod, nodeNameToInfoMap) podAffinityMetadata, err := getPodAffinityMetadata(pod, allNodes)
if err != nil { if err != nil {
klog.Errorf("Error calculating podAffinityMetadata: %v", err) klog.Errorf("Error calculating podAffinityMetadata: %v", err)
return nil return nil
@ -375,15 +387,15 @@ func getPodFitsResourcesMetedata(pod *v1.Pod) *podFitsResourcesMetadata {
} }
} }
func getPodAffinityMetadata(pod *v1.Pod, nodeNameToInfoMap map[string]*schedulernodeinfo.NodeInfo) (*podAffinityMetadata, error) { func getPodAffinityMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo) (*podAffinityMetadata, error) {
// existingPodAntiAffinityMap will be used later for efficient check on existing pods' anti-affinity // existingPodAntiAffinityMap will be used later for efficient check on existing pods' anti-affinity
existingPodAntiAffinityMap, err := getTPMapMatchingExistingAntiAffinity(pod, nodeNameToInfoMap) existingPodAntiAffinityMap, err := getTPMapMatchingExistingAntiAffinity(pod, allNodes)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// incomingPodAffinityMap will be used later for efficient check on incoming pod's affinity // incomingPodAffinityMap will be used later for efficient check on incoming pod's affinity
// incomingPodAntiAffinityMap will be used later for efficient check on incoming pod's anti-affinity // incomingPodAntiAffinityMap will be used later for efficient check on incoming pod's anti-affinity
incomingPodAffinityMap, incomingPodAntiAffinityMap, err := getTPMapMatchingIncomingAffinityAntiAffinity(pod, nodeNameToInfoMap) incomingPodAffinityMap, incomingPodAntiAffinityMap, err := getTPMapMatchingIncomingAffinityAntiAffinity(pod, allNodes)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -395,7 +407,7 @@ func getPodAffinityMetadata(pod *v1.Pod, nodeNameToInfoMap map[string]*scheduler
}, nil }, nil
} }
func getEvenPodsSpreadMetadata(pod *v1.Pod, nodeInfoMap map[string]*schedulernodeinfo.NodeInfo) (*evenPodsSpreadMetadata, error) { func getEvenPodsSpreadMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo) (*evenPodsSpreadMetadata, error) {
// We have feature gating in APIServer to strip the spec // We have feature gating in APIServer to strip the spec
// so don't need to re-check feature gate, just check length of constraints. // so don't need to re-check feature gate, just check length of constraints.
constraints := getHardTopologySpreadConstraints(pod) constraints := getHardTopologySpreadConstraints(pod)
@ -403,11 +415,6 @@ func getEvenPodsSpreadMetadata(pod *v1.Pod, nodeInfoMap map[string]*schedulernod
return nil, nil return nil, nil
} }
allNodeNames := make([]string, 0, len(nodeInfoMap))
for name := range nodeInfoMap {
allNodeNames = append(allNodeNames, name)
}
errCh := schedutil.NewErrorChannel() errCh := schedutil.NewErrorChannel()
var lock sync.Mutex var lock sync.Mutex
@ -426,10 +433,10 @@ func getEvenPodsSpreadMetadata(pod *v1.Pod, nodeInfoMap map[string]*schedulernod
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
processNode := func(i int) { processNode := func(i int) {
nodeInfo := nodeInfoMap[allNodeNames[i]] nodeInfo := allNodes[i]
node := nodeInfo.Node() node := nodeInfo.Node()
if node == nil { if node == nil {
klog.Errorf("node %q not found", allNodeNames[i]) klog.Error("node not found")
return return
} }
// In accordance to design, if NodeAffinity or NodeSelector is defined, // In accordance to design, if NodeAffinity or NodeSelector is defined,
@ -462,7 +469,7 @@ func getEvenPodsSpreadMetadata(pod *v1.Pod, nodeInfoMap map[string]*schedulernod
addTopologyPairMatchNum(pair, matchTotal) addTopologyPairMatchNum(pair, matchTotal)
} }
} }
workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processNode) workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode)
if err := errCh.ReceiveError(); err != nil { if err := errCh.ReceiveError(); err != nil {
return nil, err return nil, err
@ -726,12 +733,7 @@ func podMatchesAnyAffinityTermProperties(pod *v1.Pod, properties []*affinityTerm
// getTPMapMatchingExistingAntiAffinity calculates the following for each existing pod on each node: // getTPMapMatchingExistingAntiAffinity calculates the following for each existing pod on each node:
// (1) Whether it has PodAntiAffinity // (1) Whether it has PodAntiAffinity
// (2) Whether any AffinityTerm matches the incoming pod // (2) Whether any AffinityTerm matches the incoming pod
func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodeInfoMap map[string]*schedulernodeinfo.NodeInfo) (*topologyPairsMaps, error) { func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo) (*topologyPairsMaps, error) {
allNodeNames := make([]string, 0, len(nodeInfoMap))
for name := range nodeInfoMap {
allNodeNames = append(allNodeNames, name)
}
errCh := schedutil.NewErrorChannel() errCh := schedutil.NewErrorChannel()
var lock sync.Mutex var lock sync.Mutex
topologyMaps := newTopologyPairsMaps() topologyMaps := newTopologyPairsMaps()
@ -745,10 +747,10 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodeInfoMap map[string]*s
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
processNode := func(i int) { processNode := func(i int) {
nodeInfo := nodeInfoMap[allNodeNames[i]] nodeInfo := allNodes[i]
node := nodeInfo.Node() node := nodeInfo.Node()
if node == nil { if node == nil {
klog.Errorf("node %q not found", allNodeNames[i]) klog.Error("node not found")
return return
} }
for _, existingPod := range nodeInfo.PodsWithAffinity() { for _, existingPod := range nodeInfo.PodsWithAffinity() {
@ -760,7 +762,7 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodeInfoMap map[string]*s
appendTopologyPairsMaps(existingPodTopologyMaps) appendTopologyPairsMaps(existingPodTopologyMaps)
} }
} }
workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processNode) workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode)
if err := errCh.ReceiveError(); err != nil { if err := errCh.ReceiveError(); err != nil {
return nil, err return nil, err
@ -773,19 +775,13 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodeInfoMap map[string]*s
// It returns a topologyPairsMaps that are checked later by the affinity // It returns a topologyPairsMaps that are checked later by the affinity
// predicate. With this topologyPairsMaps available, the affinity predicate does not // predicate. With this topologyPairsMaps available, the affinity predicate does not
// need to check all the pods in the cluster. // need to check all the pods in the cluster.
func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, nodeInfoMap map[string]*schedulernodeinfo.NodeInfo) (topologyPairsAffinityPodsMaps *topologyPairsMaps, topologyPairsAntiAffinityPodsMaps *topologyPairsMaps, err error) { func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo) (topologyPairsAffinityPodsMaps *topologyPairsMaps, topologyPairsAntiAffinityPodsMaps *topologyPairsMaps, err error) {
affinity := pod.Spec.Affinity affinity := pod.Spec.Affinity
if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) { if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) {
return newTopologyPairsMaps(), newTopologyPairsMaps(), nil return newTopologyPairsMaps(), newTopologyPairsMaps(), nil
} }
allNodeNames := make([]string, 0, len(nodeInfoMap))
for name := range nodeInfoMap {
allNodeNames = append(allNodeNames, name)
}
errCh := schedutil.NewErrorChannel() errCh := schedutil.NewErrorChannel()
var lock sync.Mutex var lock sync.Mutex
topologyPairsAffinityPodsMaps = newTopologyPairsMaps() topologyPairsAffinityPodsMaps = newTopologyPairsMaps()
topologyPairsAntiAffinityPodsMaps = newTopologyPairsMaps() topologyPairsAntiAffinityPodsMaps = newTopologyPairsMaps()
@ -811,10 +807,10 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, nodeInfoMap map[s
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
processNode := func(i int) { processNode := func(i int) {
nodeInfo := nodeInfoMap[allNodeNames[i]] nodeInfo := allNodes[i]
node := nodeInfo.Node() node := nodeInfo.Node()
if node == nil { if node == nil {
klog.Errorf("node %q not found", allNodeNames[i]) klog.Error("node not found")
return return
} }
nodeTopologyPairsAffinityPodsMaps := newTopologyPairsMaps() nodeTopologyPairsAffinityPodsMaps := newTopologyPairsMaps()
@ -850,7 +846,7 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, nodeInfoMap map[s
appendResult(node.Name, nodeTopologyPairsAffinityPodsMaps, nodeTopologyPairsAntiAffinityPodsMaps) appendResult(node.Name, nodeTopologyPairsAffinityPodsMaps, nodeTopologyPairsAntiAffinityPodsMaps)
} }
} }
workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processNode) workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode)
if err := errCh.ReceiveError(); err != nil { if err := errCh.ReceiveError(); err != nil {
return nil, nil, err return nil, nil, err

View File

@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake" fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
st "k8s.io/kubernetes/pkg/scheduler/testing" st "k8s.io/kubernetes/pkg/scheduler/testing"
) )
@ -353,12 +354,12 @@ func TestPredicateMetadata_AddRemovePod(t *testing.T) {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
allPodLister := fakelisters.PodLister(append(test.existingPods, test.addedPod)) allPodLister := fakelisters.PodLister(append(test.existingPods, test.addedPod))
// getMeta creates predicate meta data given the list of pods. // getMeta creates predicate meta data given the list of pods.
getMeta := func(lister fakelisters.PodLister) (*predicateMetadata, map[string]*schedulernodeinfo.NodeInfo) { getMeta := func(pods []*v1.Pod) (*predicateMetadata, map[string]*schedulernodeinfo.NodeInfo) {
nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(lister, test.nodes) s := nodeinfosnapshot.NewSnapshot(pods, test.nodes)
_, precompute := NewServiceAffinityPredicate(fakelisters.NewNodeInfoLister(test.nodes), lister, fakelisters.ServiceLister(test.services), nil) _, precompute := NewServiceAffinityPredicate(s.NodeInfos(), s.Pods(), fakelisters.ServiceLister(test.services), nil)
RegisterPredicateMetadataProducer("ServiceAffinityMetaProducer", precompute) RegisterPredicateMetadataProducer("ServiceAffinityMetaProducer", precompute)
meta := GetPredicateMetadata(test.pendingPod, nodeInfoMap) meta := GetPredicateMetadata(test.pendingPod, s)
return meta.(*predicateMetadata), nodeInfoMap return meta.(*predicateMetadata), s.NodeInfoMap
} }
// allPodsMeta is meta data produced when all pods, including test.addedPod // allPodsMeta is meta data produced when all pods, including test.addedPod
@ -784,9 +785,9 @@ func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(tt.existingPods, tt.nodes) s := nodeinfosnapshot.NewSnapshot(tt.existingPods, tt.nodes)
l, _ := s.NodeInfos().List()
gotAffinityPodsMaps, gotAntiAffinityPodsMaps, err := getTPMapMatchingIncomingAffinityAntiAffinity(tt.pod, nodeInfoMap) gotAffinityPodsMaps, gotAntiAffinityPodsMaps, err := getTPMapMatchingIncomingAffinityAntiAffinity(tt.pod, l)
if (err != nil) != tt.wantErr { if (err != nil) != tt.wantErr {
t.Errorf("getTPMapMatchingIncomingAffinityAntiAffinity() error = %v, wantErr %v", err, tt.wantErr) t.Errorf("getTPMapMatchingIncomingAffinityAntiAffinity() error = %v, wantErr %v", err, tt.wantErr)
return return
@ -1180,8 +1181,9 @@ func TestGetTPMapMatchingSpreadConstraints(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(tt.existingPods, tt.nodes) s := nodeinfosnapshot.NewSnapshot(tt.existingPods, tt.nodes)
got, _ := getEvenPodsSpreadMetadata(tt.pod, nodeInfoMap) l, _ := s.NodeInfos().List()
got, _ := getEvenPodsSpreadMetadata(tt.pod, l)
got.sortCriticalPaths() got.sortCriticalPaths()
if !reflect.DeepEqual(got, tt.want) { if !reflect.DeepEqual(got, tt.want) {
t.Errorf("getEvenPodsSpreadMetadata() = %v, want %v", *got, *tt.want) t.Errorf("getEvenPodsSpreadMetadata() = %v, want %v", *got, *tt.want)
@ -1447,9 +1449,9 @@ func TestPodSpreadCache_addPod(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(tt.existingPods, tt.nodes) s := nodeinfosnapshot.NewSnapshot(tt.existingPods, tt.nodes)
evenPodsSpreadMetadata, _ := getEvenPodsSpreadMetadata(tt.preemptor, nodeInfoMap) l, _ := s.NodeInfos().List()
evenPodsSpreadMetadata, _ := getEvenPodsSpreadMetadata(tt.preemptor, l)
evenPodsSpreadMetadata.addPod(tt.addedPod, tt.preemptor, tt.nodes[tt.nodeIdx]) evenPodsSpreadMetadata.addPod(tt.addedPod, tt.preemptor, tt.nodes[tt.nodeIdx])
evenPodsSpreadMetadata.sortCriticalPaths() evenPodsSpreadMetadata.sortCriticalPaths()
if !reflect.DeepEqual(evenPodsSpreadMetadata, tt.want) { if !reflect.DeepEqual(evenPodsSpreadMetadata, tt.want) {
@ -1625,8 +1627,9 @@ func TestPodSpreadCache_removePod(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(tt.existingPods, tt.nodes) s := nodeinfosnapshot.NewSnapshot(tt.existingPods, tt.nodes)
evenPodsSpreadMetadata, _ := getEvenPodsSpreadMetadata(tt.preemptor, nodeInfoMap) l, _ := s.NodeInfos().List()
evenPodsSpreadMetadata, _ := getEvenPodsSpreadMetadata(tt.preemptor, l)
var deletedPod *v1.Pod var deletedPod *v1.Pod
if tt.deletedPodIdx < len(tt.existingPods) && tt.deletedPodIdx >= 0 { if tt.deletedPodIdx < len(tt.existingPods) && tt.deletedPodIdx >= 0 {
@ -1683,10 +1686,11 @@ func BenchmarkTestGetTPMapMatchingSpreadConstraints(b *testing.B) {
for _, tt := range tests { for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) { b.Run(tt.name, func(b *testing.B) {
existingPods, allNodes, _ := st.MakeNodesAndPodsForEvenPodsSpread(tt.pod, tt.existingPodsNum, tt.allNodesNum, tt.filteredNodesNum) existingPods, allNodes, _ := st.MakeNodesAndPodsForEvenPodsSpread(tt.pod, tt.existingPodsNum, tt.allNodesNum, tt.filteredNodesNum)
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(existingPods, allNodes) s := nodeinfosnapshot.NewSnapshot(existingPods, allNodes)
l, _ := s.NodeInfos().List()
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
getEvenPodsSpreadMetadata(tt.pod, nodeNameToInfo) getEvenPodsSpreadMetadata(tt.pod, l)
} }
}) })
} }

View File

@ -36,6 +36,7 @@ import (
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake" fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
st "k8s.io/kubernetes/pkg/scheduler/testing" st "k8s.io/kubernetes/pkg/scheduler/testing"
) )
@ -1854,19 +1855,18 @@ func TestServiceAffinity(t *testing.T) {
testIt := func(skipPrecompute bool) { testIt := func(skipPrecompute bool) {
t.Run(fmt.Sprintf("%v/skipPrecompute/%v", test.name, skipPrecompute), func(t *testing.T) { t.Run(fmt.Sprintf("%v/skipPrecompute/%v", test.name, skipPrecompute), func(t *testing.T) {
nodes := []*v1.Node{&node1, &node2, &node3, &node4, &node5} nodes := []*v1.Node{&node1, &node2, &node3, &node4, &node5}
nodeInfo := schedulernodeinfo.NewNodeInfo() s := nodeinfosnapshot.NewSnapshot(test.pods, nodes)
nodeInfo.SetNode(test.node)
nodeInfoMap := map[string]*schedulernodeinfo.NodeInfo{test.node.Name: nodeInfo}
// Reimplementing the logic that the scheduler implements: Any time it makes a predicate, it registers any precomputations. // Reimplementing the logic that the scheduler implements: Any time it makes a predicate, it registers any precomputations.
predicate, precompute := NewServiceAffinityPredicate(fakelisters.NewNodeInfoLister(nodes), fakelisters.PodLister(test.pods), fakelisters.ServiceLister(test.services), test.labels) predicate, precompute := NewServiceAffinityPredicate(s.NodeInfos(), s.Pods(), fakelisters.ServiceLister(test.services), test.labels)
// Register a precomputation or Rewrite the precomputation to a no-op, depending on the state we want to test. // Register a precomputation or Rewrite the precomputation to a no-op, depending on the state we want to test.
RegisterPredicateMetadataProducer("ServiceAffinityMetaProducer", func(pm *predicateMetadata) { RegisterPredicateMetadataProducer("ServiceAffinityMetaProducer", func(pm *predicateMetadata) {
if !skipPrecompute { if !skipPrecompute {
precompute(pm) precompute(pm)
} }
}) })
if pmeta, ok := (GetPredicateMetadata(test.pod, nodeInfoMap)).(*predicateMetadata); ok { if pmeta, ok := (GetPredicateMetadata(test.pod, s)).(*predicateMetadata); ok {
fits, reasons, err := predicate(test.pod, pmeta, nodeInfo) fits, reasons, err := predicate(test.pod, pmeta, s.NodeInfoMap[test.node.Name])
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -2922,22 +2922,13 @@ func TestInterPodAffinity(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
node := test.node s := nodeinfosnapshot.NewSnapshot(test.pods, []*v1.Node{test.node})
var podsOnNode []*v1.Pod
for _, pod := range test.pods {
if pod.Spec.NodeName == node.Name {
podsOnNode = append(podsOnNode, pod)
}
}
fit := PodAffinityChecker{ fit := PodAffinityChecker{
nodeInfoLister: fakelisters.NewNodeInfoLister([]*v1.Node{node}), nodeInfoLister: s.NodeInfos(),
podLister: fakelisters.PodLister(test.pods), podLister: fakelisters.PodLister(test.pods),
} }
nodeInfo := schedulernodeinfo.NewNodeInfo(podsOnNode...) fits, reasons, _ := fit.InterPodAffinityMatches(test.pod, GetPredicateMetadata(test.pod, s), s.NodeInfoMap[test.node.Name])
nodeInfo.SetNode(test.node)
nodeInfoMap := map[string]*schedulernodeinfo.NodeInfo{test.node.Name: nodeInfo}
fits, reasons, _ := fit.InterPodAffinityMatches(test.pod, GetPredicateMetadata(test.pod, nodeInfoMap), nodeInfo)
if !fits && !reflect.DeepEqual(reasons, test.expectFailureReasons) { if !fits && !reflect.DeepEqual(reasons, test.expectFailureReasons) {
t.Errorf("unexpected failure reasons: %v, want: %v", reasons, test.expectFailureReasons) t.Errorf("unexpected failure reasons: %v, want: %v", reasons, test.expectFailureReasons)
} }
@ -4024,48 +4015,32 @@ func TestInterPodAffinityWithMultipleNodes(t *testing.T) {
}, },
} }
selectorExpectedFailureReasons := []PredicateFailureReason{ErrNodeSelectorNotMatch}
for indexTest, test := range tests { for indexTest, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
nodeInfoMap := make(map[string]*schedulernodeinfo.NodeInfo) snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes)
for i, node := range test.nodes {
var podsOnNode []*v1.Pod
for _, pod := range test.pods {
if pod.Spec.NodeName == node.Name {
podsOnNode = append(podsOnNode, pod)
}
}
nodeInfo := schedulernodeinfo.NewNodeInfo(podsOnNode...)
nodeInfo.SetNode(test.nodes[i])
nodeInfoMap[node.Name] = nodeInfo
}
for indexNode, node := range test.nodes { for indexNode, node := range test.nodes {
testFit := PodAffinityChecker{ testFit := PodAffinityChecker{
nodeInfoLister: fakelisters.NewNodeInfoLister(test.nodes), nodeInfoLister: snapshot.NodeInfos(),
podLister: fakelisters.PodLister(test.pods), podLister: snapshot.Pods(),
} }
var meta PredicateMetadata var meta PredicateMetadata
if !test.nometa { if !test.nometa {
meta = GetPredicateMetadata(test.pod, nodeInfoMap) meta = GetPredicateMetadata(test.pod, snapshot)
} }
fits, reasons, _ := testFit.InterPodAffinityMatches(test.pod, meta, nodeInfoMap[node.Name]) fits, reasons, _ := testFit.InterPodAffinityMatches(test.pod, meta, snapshot.NodeInfoMap[node.Name])
if !fits && !reflect.DeepEqual(reasons, test.nodesExpectAffinityFailureReasons[indexNode]) { if !fits && !reflect.DeepEqual(reasons, test.nodesExpectAffinityFailureReasons[indexNode]) {
t.Errorf("index: %d unexpected failure reasons: %v expect: %v", indexTest, reasons, test.nodesExpectAffinityFailureReasons[indexNode]) t.Errorf("index: %d unexpected failure reasons: %v expect: %v", indexTest, reasons, test.nodesExpectAffinityFailureReasons[indexNode])
} }
affinity := test.pod.Spec.Affinity affinity := test.pod.Spec.Affinity
if affinity != nil && affinity.NodeAffinity != nil { if affinity != nil && affinity.NodeAffinity != nil {
nodeInfo := schedulernodeinfo.NewNodeInfo() s := nodeinfosnapshot.NewSnapshot(nil, []*v1.Node{node})
nodeInfo.SetNode(node) fits2, reasons, err := PodMatchNodeSelector(test.pod, GetPredicateMetadata(test.pod, s), s.NodeInfoMap[node.Name])
nodeInfoMap := map[string]*schedulernodeinfo.NodeInfo{node.Name: nodeInfo}
fits2, reasons, err := PodMatchNodeSelector(test.pod, GetPredicateMetadata(test.pod, nodeInfoMap), nodeInfo)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
selectorExpectedFailureReasons := []PredicateFailureReason{ErrNodeSelectorNotMatch}
if !fits2 && !reflect.DeepEqual(reasons, selectorExpectedFailureReasons) { if !fits2 && !reflect.DeepEqual(reasons, selectorExpectedFailureReasons) {
t.Errorf("unexpected failure reasons: %v, want: %v", reasons, selectorExpectedFailureReasons) t.Errorf("unexpected failure reasons: %v, want: %v", reasons, selectorExpectedFailureReasons)
} }
@ -4981,10 +4956,10 @@ func TestEvenPodsSpreadPredicate_SingleConstraint(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(tt.existingPods, tt.nodes) s := nodeinfosnapshot.NewSnapshot(tt.existingPods, tt.nodes)
meta := GetPredicateMetadata(tt.pod, nodeInfoMap) meta := GetPredicateMetadata(tt.pod, s)
for _, node := range tt.nodes { for _, node := range tt.nodes {
fits, _, _ := EvenPodsSpreadPredicate(tt.pod, meta, nodeInfoMap[node.Name]) fits, _, _ := EvenPodsSpreadPredicate(tt.pod, meta, s.NodeInfoMap[node.Name])
if fits != tt.fits[node.Name] { if fits != tt.fits[node.Name] {
t.Errorf("[%s]: expected %v got %v", node.Name, tt.fits[node.Name], fits) t.Errorf("[%s]: expected %v got %v", node.Name, tt.fits[node.Name], fits)
} }
@ -5174,10 +5149,10 @@ func TestEvenPodsSpreadPredicate_MultipleConstraints(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(tt.existingPods, tt.nodes) s := nodeinfosnapshot.NewSnapshot(tt.existingPods, tt.nodes)
meta := GetPredicateMetadata(tt.pod, nodeInfoMap) meta := GetPredicateMetadata(tt.pod, s)
for _, node := range tt.nodes { for _, node := range tt.nodes {
fits, _, _ := EvenPodsSpreadPredicate(tt.pod, meta, nodeInfoMap[node.Name]) fits, _, _ := EvenPodsSpreadPredicate(tt.pod, meta, s.NodeInfoMap[node.Name])
if fits != tt.fits[node.Name] { if fits != tt.fits[node.Name] {
t.Errorf("[%s]: expected %v got %v", node.Name, tt.fits[node.Name], fits) t.Errorf("[%s]: expected %v got %v", node.Name, tt.fits[node.Name], fits)
} }

View File

@ -109,9 +109,9 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
// pm stores (1) all nodes that should be considered and (2) the so-far computed score for each node. // pm stores (1) all nodes that should be considered and (2) the so-far computed score for each node.
pm := newPodAffinityPriorityMap(nodes) pm := newPodAffinityPriorityMap(nodes)
allNodeNames := make([]string, 0, len(nodeNameToInfo)) allNodes, err := ipa.nodeInfoLister.List()
for name := range nodeNameToInfo { if err != nil {
allNodeNames = append(allNodeNames, name) return nil, err
} }
// convert the topology key based weights to the node name based weights // convert the topology key based weights to the node name based weights
@ -186,7 +186,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
errCh := schedutil.NewErrorChannel() errCh := schedutil.NewErrorChannel()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
processNode := func(i int) { processNode := func(i int) {
nodeInfo := nodeNameToInfo[allNodeNames[i]] nodeInfo := allNodes[i]
if nodeInfo.Node() != nil { if nodeInfo.Node() != nil {
if hasAffinityConstraints || hasAntiAffinityConstraints { if hasAffinityConstraints || hasAntiAffinityConstraints {
// We need to process all the pods. // We need to process all the pods.
@ -208,7 +208,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
} }
} }
} }
workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processNode) workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode)
if err := errCh.ReceiveError(); err != nil { if err := errCh.ReceiveError(); err != nil {
return nil, err return nil, err
} }

View File

@ -61,6 +61,7 @@ go_test(
"//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/listers/fake:go_default_library", "//pkg/scheduler/listers/fake:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//pkg/scheduler/util:go_default_library", "//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",

View File

@ -483,7 +483,7 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
// We can use the same metadata producer for all nodes. // We can use the same metadata producer for all nodes.
meta := g.predicateMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap) meta := g.predicateMetaProducer(pod, g.nodeInfoSnapshot)
state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta}) state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta})
checkNode := func(i int) { checkNode := func(i int) {
@ -1033,7 +1033,7 @@ func (g *genericScheduler) selectNodesForPreemption(
var resultLock sync.Mutex var resultLock sync.Mutex
// We can use the same metadata producer for all nodes. // We can use the same metadata producer for all nodes.
meta := g.predicateMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap) meta := g.predicateMetaProducer(pod, g.nodeInfoSnapshot)
checkNode := func(i int) { checkNode := func(i int) {
nodeName := potentialNodes[i].Name nodeName := potentialNodes[i].Name
if g.nodeInfoSnapshot.NodeInfoMap[nodeName] == nil { if g.nodeInfoSnapshot.NodeInfoMap[nodeName] == nil {

View File

@ -49,6 +49,7 @@ import (
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake" fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
) )
var ( var (
@ -1424,14 +1425,14 @@ func TestSelectNodesForPreemption(t *testing.T) {
p := fakelisters.PodLister(test.pods) p := fakelisters.PodLister(test.pods)
test.predicates[algorithmpredicates.MatchInterPodAffinityPred] = algorithmpredicates.NewPodAffinityPredicate(n, p) test.predicates[algorithmpredicates.MatchInterPodAffinityPred] = algorithmpredicates.NewPodAffinityPredicate(n, p)
} }
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, nodes)
g.nodeInfoSnapshot = nodeinfosnapshot.NewSnapshot(test.pods, nodes)
// newnode simulate a case that a new node is added to the cluster, but nodeNameToInfo // newnode simulate a case that a new node is added to the cluster, but nodeNameToInfo
// doesn't have it yet. // doesn't have it yet.
newnode := makeNode("newnode", 1000*5, priorityutil.DefaultMemoryRequest*5) newnode := makeNode("newnode", 1000*5, priorityutil.DefaultMemoryRequest*5)
newnode.ObjectMeta.Labels = map[string]string{"hostname": "newnode"} newnode.ObjectMeta.Labels = map[string]string{"hostname": "newnode"}
nodes = append(nodes, newnode) nodes = append(nodes, newnode)
state := framework.NewCycleState() state := framework.NewCycleState()
g.nodeInfoSnapshot.NodeInfoMap = nodeNameToInfo
nodeToPods, err := g.selectNodesForPreemption(context.Background(), state, test.pod, nodes, nil) nodeToPods, err := g.selectNodesForPreemption(context.Background(), state, test.pod, nodes, nil)
if err != nil { if err != nil {
t.Error(err) t.Error(err)

View File

@ -23,6 +23,7 @@ go_test(
"//pkg/scheduler/framework/plugins/migration:go_default_library", "//pkg/scheduler/framework/plugins/migration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//pkg/util/parsers:go_default_library", "//pkg/util/parsers:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@ -27,8 +27,6 @@ import (
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
) )
var mb int64 = 1024 * 1024
// ImageLocality is a score plugin that favors nodes that already have requested pod container's images. // ImageLocality is a score plugin that favors nodes that already have requested pod container's images.
type ImageLocality struct { type ImageLocality struct {
handle framework.FrameworkHandle handle framework.FrameworkHandle
@ -46,9 +44,9 @@ func (pl *ImageLocality) Name() string {
// Score invoked at the score extension point. // Score invoked at the score extension point.
func (pl *ImageLocality) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { func (pl *ImageLocality) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, exist := pl.handle.NodeInfoSnapshot().NodeInfoMap[nodeName] nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if !exist { if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("node %q does not exist in NodeInfoSnapshot", nodeName)) return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
} }
meta := migration.PriorityMetadata(state) meta := migration.PriorityMetadata(state)

View File

@ -29,9 +29,12 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
"k8s.io/kubernetes/pkg/util/parsers" "k8s.io/kubernetes/pkg/util/parsers"
) )
var mb int64 = 1024 * 1024
func TestImageLocalityPriority(t *testing.T) { func TestImageLocalityPriority(t *testing.T) {
test40250 := v1.PodSpec{ test40250 := v1.PodSpec{
Containers: []v1.Container{ Containers: []v1.Container{
@ -205,9 +208,7 @@ func TestImageLocalityPriority(t *testing.T) {
state := framework.NewCycleState() state := framework.NewCycleState()
state.Write(migration.PrioritiesStateKey, &migration.PrioritiesStateData{Reference: meta}) state.Write(migration.PrioritiesStateKey, &migration.PrioritiesStateData{Reference: meta})
fh, _ := framework.NewFramework(nil, nil, nil) fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(nodeinfosnapshot.NewSnapshot(nil, test.nodes)))
snapshot := fh.NodeInfoSnapshot()
snapshot.NodeInfoMap = schedulernodeinfo.CreateNodeNameToInfoMap(nil, test.nodes)
p, _ := New(nil, fh) p, _ := New(nil, fh)
var gotList framework.NodeScoreList var gotList framework.NodeScoreList

View File

@ -733,7 +733,7 @@ func TestSingleNode(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, []*v1.Node{test.node}) snapshot := nodeinfosnapshot.NewSnapshot(test.pods, []*v1.Node{test.node})
meta := predicates.GetPredicateMetadata(test.pod, snapshot.NodeInfoMap) meta := predicates.GetPredicateMetadata(test.pod, snapshot)
state := framework.NewCycleState() state := framework.NewCycleState()
state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta}) state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta})
@ -1434,7 +1434,7 @@ func TestMultipleNodes(t *testing.T) {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes) snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes)
for indexNode, node := range test.nodes { for indexNode, node := range test.nodes {
meta := predicates.GetPredicateMetadata(test.pod, snapshot.NodeInfoMap) meta := predicates.GetPredicateMetadata(test.pod, snapshot)
state := framework.NewCycleState() state := framework.NewCycleState()
state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta}) state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta})

View File

@ -39,6 +39,7 @@ go_test(
"//pkg/scheduler/api:go_default_library", "//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
], ],

View File

@ -53,9 +53,9 @@ func (pl *NodeAffinity) Filter(ctx context.Context, state *framework.CycleState,
// Score invoked at the Score extension point. // Score invoked at the Score extension point.
func (pl *NodeAffinity) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { func (pl *NodeAffinity) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, exist := pl.handle.NodeInfoSnapshot().NodeInfoMap[nodeName] nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if !exist { if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("node %q does not exist in NodeInfoSnapshot", nodeName)) return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
} }
meta := migration.PriorityMetadata(state) meta := migration.PriorityMetadata(state)

View File

@ -27,6 +27,7 @@ import (
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
) )
// TODO: Add test case for RequiredDuringSchedulingRequiredDuringExecution after it's implemented. // TODO: Add test case for RequiredDuringSchedulingRequiredDuringExecution after it's implemented.
@ -849,10 +850,7 @@ func TestNodeAffinityPriority(t *testing.T) {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
state := framework.NewCycleState() state := framework.NewCycleState()
fh, _ := framework.NewFramework(nil, nil, nil) fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(nodeinfosnapshot.NewSnapshot(nil, test.nodes)))
snapshot := fh.NodeInfoSnapshot()
snapshot.NodeInfoMap = schedulernodeinfo.CreateNodeNameToInfoMap(nil, test.nodes)
p, _ := New(nil, fh) p, _ := New(nil, fh)
var gotList framework.NodeScoreList var gotList framework.NodeScoreList
for _, n := range test.nodes { for _, n := range test.nodes {

View File

@ -20,7 +20,7 @@ go_test(
embed = [":go_default_library"], embed = [":go_default_library"],
deps = [ deps = [
"//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library", "//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
], ],

View File

@ -45,9 +45,9 @@ func (pl *NodePreferAvoidPods) Name() string {
// Score invoked at the score extension point. // Score invoked at the score extension point.
func (pl *NodePreferAvoidPods) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { func (pl *NodePreferAvoidPods) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, exist := pl.handle.NodeInfoSnapshot().NodeInfoMap[nodeName] nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if !exist { if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("node %q does not exist in NodeInfoSnapshot", nodeName)) return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
} }
meta := migration.PriorityMetadata(state) meta := migration.PriorityMetadata(state)

View File

@ -24,7 +24,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
) )
func TestNodePreferAvoidPods(t *testing.T) { func TestNodePreferAvoidPods(t *testing.T) {
@ -143,11 +143,7 @@ func TestNodePreferAvoidPods(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
state := framework.NewCycleState() state := framework.NewCycleState()
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(nodeinfosnapshot.NewSnapshot(nil, test.nodes)))
fh, _ := framework.NewFramework(nil, nil, nil)
snapshot := fh.NodeInfoSnapshot()
snapshot.NodeInfoMap = schedulernodeinfo.CreateNodeNameToInfoMap(nil, test.nodes)
p, _ := New(nil, fh) p, _ := New(nil, fh)
var gotList framework.NodeScoreList var gotList framework.NodeScoreList
for _, n := range test.nodes { for _, n := range test.nodes {

View File

@ -54,6 +54,7 @@ go_test(
"//pkg/scheduler/framework/plugins/migration:go_default_library", "//pkg/scheduler/framework/plugins/migration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@ -45,10 +45,11 @@ func (ba *BalancedAllocation) Name() string {
// Score invoked at the score extension point. // Score invoked at the score extension point.
func (ba *BalancedAllocation) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { func (ba *BalancedAllocation) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, exist := ba.handle.NodeInfoSnapshot().NodeInfoMap[nodeName] nodeInfo, err := ba.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if !exist { if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("node %q does not exist in NodeInfoSnapshot", nodeName)) return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
} }
// BalancedResourceAllocationMap does not use priority metadata, hence we pass nil here // BalancedResourceAllocationMap does not use priority metadata, hence we pass nil here
s, err := priorities.BalancedResourceAllocationMap(pod, nil, nodeInfo) s, err := priorities.BalancedResourceAllocationMap(pod, nil, nodeInfo)
return s.Score, migration.ErrorToFrameworkStatus(err) return s.Score, migration.ErrorToFrameworkStatus(err)

View File

@ -28,7 +28,7 @@ import (
featuregatetesting "k8s.io/component-base/featuregate/testing" featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
) )
// getExistingVolumeCountForNode gets the current number of volumes on node. // getExistingVolumeCountForNode gets the current number of volumes on node.
@ -379,18 +379,16 @@ func TestNodeResourcesBalancedAllocation(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, test.nodes) snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes)
if len(test.pod.Spec.Volumes) > 0 { if len(test.pod.Spec.Volumes) > 0 {
maxVolumes := 5 maxVolumes := 5
for _, info := range nodeNameToInfo { nodeInfoList, _ := snapshot.NodeInfos().List()
for _, info := range nodeInfoList {
info.TransientInfo.TransNodeInfo.AllocatableVolumesCount = getExistingVolumeCountForNode(info.Pods(), maxVolumes) info.TransientInfo.TransNodeInfo.AllocatableVolumesCount = getExistingVolumeCountForNode(info.Pods(), maxVolumes)
info.TransientInfo.TransNodeInfo.RequestedVolumes = len(test.pod.Spec.Volumes) info.TransientInfo.TransNodeInfo.RequestedVolumes = len(test.pod.Spec.Volumes)
} }
} }
fh, _ := framework.NewFramework(nil, nil, nil) fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(snapshot))
snapshot := fh.NodeInfoSnapshot()
snapshot.NodeInfoMap = nodeNameToInfo
p, _ := NewBalancedAllocation(nil, fh) p, _ := NewBalancedAllocation(nil, fh)
for i := range test.nodes { for i := range test.nodes {

View File

@ -44,10 +44,11 @@ func (la *LeastAllocated) Name() string {
// Score invoked at the score extension point. // Score invoked at the score extension point.
func (la *LeastAllocated) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { func (la *LeastAllocated) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, exist := la.handle.NodeInfoSnapshot().NodeInfoMap[nodeName] nodeInfo, err := la.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if !exist { if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("node %q does not exist in NodeInfoSnapshot", nodeName)) return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
} }
// LeastRequestedPriorityMap does not use priority metadata, hence we pass nil here // LeastRequestedPriorityMap does not use priority metadata, hence we pass nil here
s, err := priorities.LeastRequestedPriorityMap(pod, nil, nodeInfo) s, err := priorities.LeastRequestedPriorityMap(pod, nil, nodeInfo)
return s.Score, migration.ErrorToFrameworkStatus(err) return s.Score, migration.ErrorToFrameworkStatus(err)

View File

@ -25,7 +25,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
) )
func TestNodeResourcesLeastAllocated(t *testing.T) { func TestNodeResourcesLeastAllocated(t *testing.T) {
@ -233,11 +233,8 @@ func TestNodeResourcesLeastAllocated(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, test.nodes) snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes)
fh, _ := framework.NewFramework(nil, nil, nil) fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(snapshot))
snapshot := fh.NodeInfoSnapshot()
snapshot.NodeInfoMap = nodeNameToInfo
p, _ := NewLeastAllocated(nil, fh) p, _ := NewLeastAllocated(nil, fh)
for i := range test.nodes { for i := range test.nodes {
hostResult, err := p.(framework.ScorePlugin).Score(context.Background(), nil, test.pod, test.nodes[i].Name) hostResult, err := p.(framework.ScorePlugin).Score(context.Background(), nil, test.pod, test.nodes[i].Name)

View File

@ -44,10 +44,11 @@ func (ma *MostAllocated) Name() string {
// Score invoked at the Score extension point. // Score invoked at the Score extension point.
func (ma *MostAllocated) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { func (ma *MostAllocated) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, exist := ma.handle.NodeInfoSnapshot().NodeInfoMap[nodeName] nodeInfo, err := ma.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if !exist { if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("node %q does not exist in NodeInfoSnapshot", nodeName)) return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
} }
// MostRequestedPriorityMap does not use priority metadata, hence we pass nil here // MostRequestedPriorityMap does not use priority metadata, hence we pass nil here
s, err := priorities.MostRequestedPriorityMap(pod, nil, nodeInfo) s, err := priorities.MostRequestedPriorityMap(pod, nil, nodeInfo)
return s.Score, migration.ErrorToFrameworkStatus(err) return s.Score, migration.ErrorToFrameworkStatus(err)

View File

@ -25,7 +25,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
) )
func TestNodeResourcesMostAllocated(t *testing.T) { func TestNodeResourcesMostAllocated(t *testing.T) {
@ -196,11 +196,8 @@ func TestNodeResourcesMostAllocated(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, test.nodes) snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes)
fh, _ := framework.NewFramework(nil, nil, nil) fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(snapshot))
snapshot := fh.NodeInfoSnapshot()
snapshot.NodeInfoMap = nodeNameToInfo
p, _ := NewMostAllocated(nil, fh) p, _ := NewMostAllocated(nil, fh)
for i := range test.nodes { for i := range test.nodes {
hostResult, err := p.(framework.ScorePlugin).Score(context.Background(), nil, test.pod, test.nodes[i].Name) hostResult, err := p.(framework.ScorePlugin).Score(context.Background(), nil, test.pod, test.nodes[i].Name)

View File

@ -23,7 +23,7 @@ go_test(
"//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/framework/plugins/migration:go_default_library", "//pkg/scheduler/framework/plugins/migration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library", "//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//pkg/scheduler/testing:go_default_library", "//pkg/scheduler/testing:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
], ],

View File

@ -24,7 +24,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
"k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
st "k8s.io/kubernetes/pkg/scheduler/testing" st "k8s.io/kubernetes/pkg/scheduler/testing"
) )
@ -267,13 +267,14 @@ func TestPodTopologySpreadFilter_SingleConstraint(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(tt.existingPods, tt.nodes) snapshot := nodeinfosnapshot.NewSnapshot(tt.existingPods, tt.nodes)
meta := predicates.GetPredicateMetadata(tt.pod, nodeInfoMap) meta := predicates.GetPredicateMetadata(tt.pod, snapshot)
state := v1alpha1.NewCycleState() state := v1alpha1.NewCycleState()
state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta}) state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta})
plugin, _ := New(nil, nil) plugin, _ := New(nil, nil)
for _, node := range tt.nodes { for _, node := range tt.nodes {
status := plugin.(*PodTopologySpread).Filter(context.Background(), state, tt.pod, nodeInfoMap[node.Name]) nodeInfo, _ := snapshot.NodeInfos().Get(node.Name)
status := plugin.(*PodTopologySpread).Filter(context.Background(), state, tt.pod, nodeInfo)
if status.IsSuccess() != tt.fits[node.Name] { if status.IsSuccess() != tt.fits[node.Name] {
t.Errorf("[%s]: expected %v got %v", node.Name, tt.fits[node.Name], status.IsSuccess()) t.Errorf("[%s]: expected %v got %v", node.Name, tt.fits[node.Name], status.IsSuccess())
} }
@ -463,13 +464,14 @@ func TestPodTopologySpreadFilter_MultipleConstraints(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(tt.existingPods, tt.nodes) snapshot := nodeinfosnapshot.NewSnapshot(tt.existingPods, tt.nodes)
meta := predicates.GetPredicateMetadata(tt.pod, nodeInfoMap) meta := predicates.GetPredicateMetadata(tt.pod, snapshot)
state := v1alpha1.NewCycleState() state := v1alpha1.NewCycleState()
state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta}) state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta})
plugin, _ := New(nil, nil) plugin, _ := New(nil, nil)
for _, node := range tt.nodes { for _, node := range tt.nodes {
status := plugin.(*PodTopologySpread).Filter(context.Background(), state, tt.pod, nodeInfoMap[node.Name]) nodeInfo, _ := snapshot.NodeInfos().Get(node.Name)
status := plugin.(*PodTopologySpread).Filter(context.Background(), state, tt.pod, nodeInfo)
if status.IsSuccess() != tt.fits[node.Name] { if status.IsSuccess() != tt.fits[node.Name] {
t.Errorf("[%s]: expected %v got %v", node.Name, tt.fits[node.Name], status.IsSuccess()) t.Errorf("[%s]: expected %v got %v", node.Name, tt.fits[node.Name], status.IsSuccess())
} }

View File

@ -54,9 +54,9 @@ func (pl *TaintToleration) Filter(ctx context.Context, state *framework.CycleSta
// Score invoked at the Score extension point. // Score invoked at the Score extension point.
func (pl *TaintToleration) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { func (pl *TaintToleration) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, exist := pl.handle.NodeInfoSnapshot().NodeInfoMap[nodeName] nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if !exist { if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("node %q does not exist in NodeInfoSnapshot", nodeName)) return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
} }
meta := migration.PriorityMetadata(state) meta := migration.PriorityMetadata(state)
s, err := priorities.ComputeTaintTolerationPriorityMap(pod, meta, nodeInfo) s, err := priorities.ComputeTaintTolerationPriorityMap(pod, meta, nodeInfo)