diff --git a/pkg/scheduler/algorithm/predicates/BUILD b/pkg/scheduler/algorithm/predicates/BUILD index 43c5093c6cc..b1c5dec4cf0 100644 --- a/pkg/scheduler/algorithm/predicates/BUILD +++ b/pkg/scheduler/algorithm/predicates/BUILD @@ -63,6 +63,7 @@ go_test( "//pkg/scheduler/api:go_default_library", "//pkg/scheduler/listers/fake:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library", + "//pkg/scheduler/nodeinfo/snapshot:go_default_library", "//pkg/scheduler/testing:go_default_library", "//pkg/volume/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", diff --git a/pkg/scheduler/algorithm/predicates/metadata.go b/pkg/scheduler/algorithm/predicates/metadata.go index dadd476871e..d2f7abf2727 100644 --- a/pkg/scheduler/algorithm/predicates/metadata.go +++ b/pkg/scheduler/algorithm/predicates/metadata.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" + schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" schedutil "k8s.io/kubernetes/pkg/scheduler/util" ) @@ -42,7 +43,7 @@ type PredicateMetadata interface { } // PredicateMetadataProducer is a function that computes predicate metadata for a given pod. -type PredicateMetadataProducer func(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo) PredicateMetadata +type PredicateMetadataProducer func(pod *v1.Pod, sharedLister schedulerlisters.SharedLister) PredicateMetadata // AntiAffinityTerm's topology key value used in predicate metadata type topologyPair struct { @@ -314,7 +315,7 @@ func RegisterPredicateMetadataProducer(predicateName string, precomp predicateMe } // EmptyPredicateMetadataProducer returns a no-op MetadataProducer type. -func EmptyPredicateMetadataProducer(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo) PredicateMetadata { +func EmptyPredicateMetadataProducer(pod *v1.Pod, sharedLister schedulerlisters.SharedLister) PredicateMetadata { return nil } @@ -330,20 +331,31 @@ func RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtende } // GetPredicateMetadata returns the predicateMetadata which will be used by various predicates. -func GetPredicateMetadata(pod *v1.Pod, nodeNameToInfoMap map[string]*schedulernodeinfo.NodeInfo) PredicateMetadata { +func GetPredicateMetadata(pod *v1.Pod, sharedLister schedulerlisters.SharedLister) PredicateMetadata { // If we cannot compute metadata, just return nil if pod == nil { return nil } + + var allNodes []*schedulernodeinfo.NodeInfo + if sharedLister != nil { + n, err := sharedLister.NodeInfos().List() + if err != nil { + klog.Errorf("failed to list NodeInfos: %v", err) + return nil + } + allNodes = n + } + // evenPodsSpreadMetadata represents how existing pods match "pod" // on its spread constraints - evenPodsSpreadMetadata, err := getEvenPodsSpreadMetadata(pod, nodeNameToInfoMap) + evenPodsSpreadMetadata, err := getEvenPodsSpreadMetadata(pod, allNodes) if err != nil { klog.Errorf("Error calculating spreadConstraintsMap: %v", err) return nil } - podAffinityMetadata, err := getPodAffinityMetadata(pod, nodeNameToInfoMap) + podAffinityMetadata, err := getPodAffinityMetadata(pod, allNodes) if err != nil { klog.Errorf("Error calculating podAffinityMetadata: %v", err) return nil @@ -375,15 +387,15 @@ func getPodFitsResourcesMetedata(pod *v1.Pod) *podFitsResourcesMetadata { } } -func getPodAffinityMetadata(pod *v1.Pod, nodeNameToInfoMap map[string]*schedulernodeinfo.NodeInfo) (*podAffinityMetadata, error) { +func getPodAffinityMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo) (*podAffinityMetadata, error) { // existingPodAntiAffinityMap will be used later for efficient check on existing pods' anti-affinity - existingPodAntiAffinityMap, err := getTPMapMatchingExistingAntiAffinity(pod, nodeNameToInfoMap) + existingPodAntiAffinityMap, err := getTPMapMatchingExistingAntiAffinity(pod, allNodes) if err != nil { return nil, err } // incomingPodAffinityMap will be used later for efficient check on incoming pod's affinity // incomingPodAntiAffinityMap will be used later for efficient check on incoming pod's anti-affinity - incomingPodAffinityMap, incomingPodAntiAffinityMap, err := getTPMapMatchingIncomingAffinityAntiAffinity(pod, nodeNameToInfoMap) + incomingPodAffinityMap, incomingPodAntiAffinityMap, err := getTPMapMatchingIncomingAffinityAntiAffinity(pod, allNodes) if err != nil { return nil, err } @@ -395,7 +407,7 @@ func getPodAffinityMetadata(pod *v1.Pod, nodeNameToInfoMap map[string]*scheduler }, nil } -func getEvenPodsSpreadMetadata(pod *v1.Pod, nodeInfoMap map[string]*schedulernodeinfo.NodeInfo) (*evenPodsSpreadMetadata, error) { +func getEvenPodsSpreadMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo) (*evenPodsSpreadMetadata, error) { // We have feature gating in APIServer to strip the spec // so don't need to re-check feature gate, just check length of constraints. constraints := getHardTopologySpreadConstraints(pod) @@ -403,11 +415,6 @@ func getEvenPodsSpreadMetadata(pod *v1.Pod, nodeInfoMap map[string]*schedulernod return nil, nil } - allNodeNames := make([]string, 0, len(nodeInfoMap)) - for name := range nodeInfoMap { - allNodeNames = append(allNodeNames, name) - } - errCh := schedutil.NewErrorChannel() var lock sync.Mutex @@ -426,10 +433,10 @@ func getEvenPodsSpreadMetadata(pod *v1.Pod, nodeInfoMap map[string]*schedulernod ctx, cancel := context.WithCancel(context.Background()) processNode := func(i int) { - nodeInfo := nodeInfoMap[allNodeNames[i]] + nodeInfo := allNodes[i] node := nodeInfo.Node() if node == nil { - klog.Errorf("node %q not found", allNodeNames[i]) + klog.Error("node not found") return } // In accordance to design, if NodeAffinity or NodeSelector is defined, @@ -462,7 +469,7 @@ func getEvenPodsSpreadMetadata(pod *v1.Pod, nodeInfoMap map[string]*schedulernod addTopologyPairMatchNum(pair, matchTotal) } } - workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processNode) + workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode) if err := errCh.ReceiveError(); err != nil { return nil, err @@ -726,12 +733,7 @@ func podMatchesAnyAffinityTermProperties(pod *v1.Pod, properties []*affinityTerm // getTPMapMatchingExistingAntiAffinity calculates the following for each existing pod on each node: // (1) Whether it has PodAntiAffinity // (2) Whether any AffinityTerm matches the incoming pod -func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodeInfoMap map[string]*schedulernodeinfo.NodeInfo) (*topologyPairsMaps, error) { - allNodeNames := make([]string, 0, len(nodeInfoMap)) - for name := range nodeInfoMap { - allNodeNames = append(allNodeNames, name) - } - +func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo) (*topologyPairsMaps, error) { errCh := schedutil.NewErrorChannel() var lock sync.Mutex topologyMaps := newTopologyPairsMaps() @@ -745,10 +747,10 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodeInfoMap map[string]*s ctx, cancel := context.WithCancel(context.Background()) processNode := func(i int) { - nodeInfo := nodeInfoMap[allNodeNames[i]] + nodeInfo := allNodes[i] node := nodeInfo.Node() if node == nil { - klog.Errorf("node %q not found", allNodeNames[i]) + klog.Error("node not found") return } for _, existingPod := range nodeInfo.PodsWithAffinity() { @@ -760,7 +762,7 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodeInfoMap map[string]*s appendTopologyPairsMaps(existingPodTopologyMaps) } } - workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processNode) + workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode) if err := errCh.ReceiveError(); err != nil { return nil, err @@ -773,19 +775,13 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodeInfoMap map[string]*s // It returns a topologyPairsMaps that are checked later by the affinity // predicate. With this topologyPairsMaps available, the affinity predicate does not // need to check all the pods in the cluster. -func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, nodeInfoMap map[string]*schedulernodeinfo.NodeInfo) (topologyPairsAffinityPodsMaps *topologyPairsMaps, topologyPairsAntiAffinityPodsMaps *topologyPairsMaps, err error) { +func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo) (topologyPairsAffinityPodsMaps *topologyPairsMaps, topologyPairsAntiAffinityPodsMaps *topologyPairsMaps, err error) { affinity := pod.Spec.Affinity if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) { return newTopologyPairsMaps(), newTopologyPairsMaps(), nil } - allNodeNames := make([]string, 0, len(nodeInfoMap)) - for name := range nodeInfoMap { - allNodeNames = append(allNodeNames, name) - } - errCh := schedutil.NewErrorChannel() - var lock sync.Mutex topologyPairsAffinityPodsMaps = newTopologyPairsMaps() topologyPairsAntiAffinityPodsMaps = newTopologyPairsMaps() @@ -811,10 +807,10 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, nodeInfoMap map[s ctx, cancel := context.WithCancel(context.Background()) processNode := func(i int) { - nodeInfo := nodeInfoMap[allNodeNames[i]] + nodeInfo := allNodes[i] node := nodeInfo.Node() if node == nil { - klog.Errorf("node %q not found", allNodeNames[i]) + klog.Error("node not found") return } nodeTopologyPairsAffinityPodsMaps := newTopologyPairsMaps() @@ -850,7 +846,7 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, nodeInfoMap map[s appendResult(node.Name, nodeTopologyPairsAffinityPodsMaps, nodeTopologyPairsAntiAffinityPodsMaps) } } - workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processNode) + workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode) if err := errCh.ReceiveError(); err != nil { return nil, nil, err diff --git a/pkg/scheduler/algorithm/predicates/metadata_test.go b/pkg/scheduler/algorithm/predicates/metadata_test.go index c703b26b994..495a327b21f 100644 --- a/pkg/scheduler/algorithm/predicates/metadata_test.go +++ b/pkg/scheduler/algorithm/predicates/metadata_test.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/labels" fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" + nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" st "k8s.io/kubernetes/pkg/scheduler/testing" ) @@ -353,12 +354,12 @@ func TestPredicateMetadata_AddRemovePod(t *testing.T) { t.Run(test.name, func(t *testing.T) { allPodLister := fakelisters.PodLister(append(test.existingPods, test.addedPod)) // getMeta creates predicate meta data given the list of pods. - getMeta := func(lister fakelisters.PodLister) (*predicateMetadata, map[string]*schedulernodeinfo.NodeInfo) { - nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(lister, test.nodes) - _, precompute := NewServiceAffinityPredicate(fakelisters.NewNodeInfoLister(test.nodes), lister, fakelisters.ServiceLister(test.services), nil) + getMeta := func(pods []*v1.Pod) (*predicateMetadata, map[string]*schedulernodeinfo.NodeInfo) { + s := nodeinfosnapshot.NewSnapshot(pods, test.nodes) + _, precompute := NewServiceAffinityPredicate(s.NodeInfos(), s.Pods(), fakelisters.ServiceLister(test.services), nil) RegisterPredicateMetadataProducer("ServiceAffinityMetaProducer", precompute) - meta := GetPredicateMetadata(test.pendingPod, nodeInfoMap) - return meta.(*predicateMetadata), nodeInfoMap + meta := GetPredicateMetadata(test.pendingPod, s) + return meta.(*predicateMetadata), s.NodeInfoMap } // allPodsMeta is meta data produced when all pods, including test.addedPod @@ -784,9 +785,9 @@ func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(tt.existingPods, tt.nodes) - - gotAffinityPodsMaps, gotAntiAffinityPodsMaps, err := getTPMapMatchingIncomingAffinityAntiAffinity(tt.pod, nodeInfoMap) + s := nodeinfosnapshot.NewSnapshot(tt.existingPods, tt.nodes) + l, _ := s.NodeInfos().List() + gotAffinityPodsMaps, gotAntiAffinityPodsMaps, err := getTPMapMatchingIncomingAffinityAntiAffinity(tt.pod, l) if (err != nil) != tt.wantErr { t.Errorf("getTPMapMatchingIncomingAffinityAntiAffinity() error = %v, wantErr %v", err, tt.wantErr) return @@ -1180,8 +1181,9 @@ func TestGetTPMapMatchingSpreadConstraints(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(tt.existingPods, tt.nodes) - got, _ := getEvenPodsSpreadMetadata(tt.pod, nodeInfoMap) + s := nodeinfosnapshot.NewSnapshot(tt.existingPods, tt.nodes) + l, _ := s.NodeInfos().List() + got, _ := getEvenPodsSpreadMetadata(tt.pod, l) got.sortCriticalPaths() if !reflect.DeepEqual(got, tt.want) { t.Errorf("getEvenPodsSpreadMetadata() = %v, want %v", *got, *tt.want) @@ -1447,9 +1449,9 @@ func TestPodSpreadCache_addPod(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(tt.existingPods, tt.nodes) - evenPodsSpreadMetadata, _ := getEvenPodsSpreadMetadata(tt.preemptor, nodeInfoMap) - + s := nodeinfosnapshot.NewSnapshot(tt.existingPods, tt.nodes) + l, _ := s.NodeInfos().List() + evenPodsSpreadMetadata, _ := getEvenPodsSpreadMetadata(tt.preemptor, l) evenPodsSpreadMetadata.addPod(tt.addedPod, tt.preemptor, tt.nodes[tt.nodeIdx]) evenPodsSpreadMetadata.sortCriticalPaths() if !reflect.DeepEqual(evenPodsSpreadMetadata, tt.want) { @@ -1625,8 +1627,9 @@ func TestPodSpreadCache_removePod(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(tt.existingPods, tt.nodes) - evenPodsSpreadMetadata, _ := getEvenPodsSpreadMetadata(tt.preemptor, nodeInfoMap) + s := nodeinfosnapshot.NewSnapshot(tt.existingPods, tt.nodes) + l, _ := s.NodeInfos().List() + evenPodsSpreadMetadata, _ := getEvenPodsSpreadMetadata(tt.preemptor, l) var deletedPod *v1.Pod if tt.deletedPodIdx < len(tt.existingPods) && tt.deletedPodIdx >= 0 { @@ -1683,10 +1686,11 @@ func BenchmarkTestGetTPMapMatchingSpreadConstraints(b *testing.B) { for _, tt := range tests { b.Run(tt.name, func(b *testing.B) { existingPods, allNodes, _ := st.MakeNodesAndPodsForEvenPodsSpread(tt.pod, tt.existingPodsNum, tt.allNodesNum, tt.filteredNodesNum) - nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(existingPods, allNodes) + s := nodeinfosnapshot.NewSnapshot(existingPods, allNodes) + l, _ := s.NodeInfos().List() b.ResetTimer() for i := 0; i < b.N; i++ { - getEvenPodsSpreadMetadata(tt.pod, nodeNameToInfo) + getEvenPodsSpreadMetadata(tt.pod, l) } }) } diff --git a/pkg/scheduler/algorithm/predicates/predicates_test.go b/pkg/scheduler/algorithm/predicates/predicates_test.go index fa617e26f81..935cf45c7ae 100644 --- a/pkg/scheduler/algorithm/predicates/predicates_test.go +++ b/pkg/scheduler/algorithm/predicates/predicates_test.go @@ -36,6 +36,7 @@ import ( schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" + nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" st "k8s.io/kubernetes/pkg/scheduler/testing" ) @@ -1854,19 +1855,18 @@ func TestServiceAffinity(t *testing.T) { testIt := func(skipPrecompute bool) { t.Run(fmt.Sprintf("%v/skipPrecompute/%v", test.name, skipPrecompute), func(t *testing.T) { nodes := []*v1.Node{&node1, &node2, &node3, &node4, &node5} - nodeInfo := schedulernodeinfo.NewNodeInfo() - nodeInfo.SetNode(test.node) - nodeInfoMap := map[string]*schedulernodeinfo.NodeInfo{test.node.Name: nodeInfo} + s := nodeinfosnapshot.NewSnapshot(test.pods, nodes) + // Reimplementing the logic that the scheduler implements: Any time it makes a predicate, it registers any precomputations. - predicate, precompute := NewServiceAffinityPredicate(fakelisters.NewNodeInfoLister(nodes), fakelisters.PodLister(test.pods), fakelisters.ServiceLister(test.services), test.labels) + predicate, precompute := NewServiceAffinityPredicate(s.NodeInfos(), s.Pods(), fakelisters.ServiceLister(test.services), test.labels) // Register a precomputation or Rewrite the precomputation to a no-op, depending on the state we want to test. RegisterPredicateMetadataProducer("ServiceAffinityMetaProducer", func(pm *predicateMetadata) { if !skipPrecompute { precompute(pm) } }) - if pmeta, ok := (GetPredicateMetadata(test.pod, nodeInfoMap)).(*predicateMetadata); ok { - fits, reasons, err := predicate(test.pod, pmeta, nodeInfo) + if pmeta, ok := (GetPredicateMetadata(test.pod, s)).(*predicateMetadata); ok { + fits, reasons, err := predicate(test.pod, pmeta, s.NodeInfoMap[test.node.Name]) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -2922,22 +2922,13 @@ func TestInterPodAffinity(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - node := test.node - var podsOnNode []*v1.Pod - for _, pod := range test.pods { - if pod.Spec.NodeName == node.Name { - podsOnNode = append(podsOnNode, pod) - } - } + s := nodeinfosnapshot.NewSnapshot(test.pods, []*v1.Node{test.node}) fit := PodAffinityChecker{ - nodeInfoLister: fakelisters.NewNodeInfoLister([]*v1.Node{node}), + nodeInfoLister: s.NodeInfos(), podLister: fakelisters.PodLister(test.pods), } - nodeInfo := schedulernodeinfo.NewNodeInfo(podsOnNode...) - nodeInfo.SetNode(test.node) - nodeInfoMap := map[string]*schedulernodeinfo.NodeInfo{test.node.Name: nodeInfo} - fits, reasons, _ := fit.InterPodAffinityMatches(test.pod, GetPredicateMetadata(test.pod, nodeInfoMap), nodeInfo) + fits, reasons, _ := fit.InterPodAffinityMatches(test.pod, GetPredicateMetadata(test.pod, s), s.NodeInfoMap[test.node.Name]) if !fits && !reflect.DeepEqual(reasons, test.expectFailureReasons) { t.Errorf("unexpected failure reasons: %v, want: %v", reasons, test.expectFailureReasons) } @@ -4024,48 +4015,32 @@ func TestInterPodAffinityWithMultipleNodes(t *testing.T) { }, } - selectorExpectedFailureReasons := []PredicateFailureReason{ErrNodeSelectorNotMatch} - for indexTest, test := range tests { t.Run(test.name, func(t *testing.T) { - nodeInfoMap := make(map[string]*schedulernodeinfo.NodeInfo) - for i, node := range test.nodes { - var podsOnNode []*v1.Pod - for _, pod := range test.pods { - if pod.Spec.NodeName == node.Name { - podsOnNode = append(podsOnNode, pod) - } - } - - nodeInfo := schedulernodeinfo.NewNodeInfo(podsOnNode...) - nodeInfo.SetNode(test.nodes[i]) - nodeInfoMap[node.Name] = nodeInfo - } - + snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes) for indexNode, node := range test.nodes { testFit := PodAffinityChecker{ - nodeInfoLister: fakelisters.NewNodeInfoLister(test.nodes), - podLister: fakelisters.PodLister(test.pods), + nodeInfoLister: snapshot.NodeInfos(), + podLister: snapshot.Pods(), } var meta PredicateMetadata if !test.nometa { - meta = GetPredicateMetadata(test.pod, nodeInfoMap) + meta = GetPredicateMetadata(test.pod, snapshot) } - fits, reasons, _ := testFit.InterPodAffinityMatches(test.pod, meta, nodeInfoMap[node.Name]) + fits, reasons, _ := testFit.InterPodAffinityMatches(test.pod, meta, snapshot.NodeInfoMap[node.Name]) if !fits && !reflect.DeepEqual(reasons, test.nodesExpectAffinityFailureReasons[indexNode]) { t.Errorf("index: %d unexpected failure reasons: %v expect: %v", indexTest, reasons, test.nodesExpectAffinityFailureReasons[indexNode]) } affinity := test.pod.Spec.Affinity if affinity != nil && affinity.NodeAffinity != nil { - nodeInfo := schedulernodeinfo.NewNodeInfo() - nodeInfo.SetNode(node) - nodeInfoMap := map[string]*schedulernodeinfo.NodeInfo{node.Name: nodeInfo} - fits2, reasons, err := PodMatchNodeSelector(test.pod, GetPredicateMetadata(test.pod, nodeInfoMap), nodeInfo) + s := nodeinfosnapshot.NewSnapshot(nil, []*v1.Node{node}) + fits2, reasons, err := PodMatchNodeSelector(test.pod, GetPredicateMetadata(test.pod, s), s.NodeInfoMap[node.Name]) if err != nil { t.Errorf("unexpected error: %v", err) } + selectorExpectedFailureReasons := []PredicateFailureReason{ErrNodeSelectorNotMatch} if !fits2 && !reflect.DeepEqual(reasons, selectorExpectedFailureReasons) { t.Errorf("unexpected failure reasons: %v, want: %v", reasons, selectorExpectedFailureReasons) } @@ -4981,10 +4956,10 @@ func TestEvenPodsSpreadPredicate_SingleConstraint(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(tt.existingPods, tt.nodes) - meta := GetPredicateMetadata(tt.pod, nodeInfoMap) + s := nodeinfosnapshot.NewSnapshot(tt.existingPods, tt.nodes) + meta := GetPredicateMetadata(tt.pod, s) for _, node := range tt.nodes { - fits, _, _ := EvenPodsSpreadPredicate(tt.pod, meta, nodeInfoMap[node.Name]) + fits, _, _ := EvenPodsSpreadPredicate(tt.pod, meta, s.NodeInfoMap[node.Name]) if fits != tt.fits[node.Name] { t.Errorf("[%s]: expected %v got %v", node.Name, tt.fits[node.Name], fits) } @@ -5174,10 +5149,10 @@ func TestEvenPodsSpreadPredicate_MultipleConstraints(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(tt.existingPods, tt.nodes) - meta := GetPredicateMetadata(tt.pod, nodeInfoMap) + s := nodeinfosnapshot.NewSnapshot(tt.existingPods, tt.nodes) + meta := GetPredicateMetadata(tt.pod, s) for _, node := range tt.nodes { - fits, _, _ := EvenPodsSpreadPredicate(tt.pod, meta, nodeInfoMap[node.Name]) + fits, _, _ := EvenPodsSpreadPredicate(tt.pod, meta, s.NodeInfoMap[node.Name]) if fits != tt.fits[node.Name] { t.Errorf("[%s]: expected %v got %v", node.Name, tt.fits[node.Name], fits) } diff --git a/pkg/scheduler/algorithm/priorities/interpod_affinity.go b/pkg/scheduler/algorithm/priorities/interpod_affinity.go index 5f94eaab466..0be20e5e5d5 100644 --- a/pkg/scheduler/algorithm/priorities/interpod_affinity.go +++ b/pkg/scheduler/algorithm/priorities/interpod_affinity.go @@ -109,9 +109,9 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node // pm stores (1) all nodes that should be considered and (2) the so-far computed score for each node. pm := newPodAffinityPriorityMap(nodes) - allNodeNames := make([]string, 0, len(nodeNameToInfo)) - for name := range nodeNameToInfo { - allNodeNames = append(allNodeNames, name) + allNodes, err := ipa.nodeInfoLister.List() + if err != nil { + return nil, err } // convert the topology key based weights to the node name based weights @@ -186,7 +186,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node errCh := schedutil.NewErrorChannel() ctx, cancel := context.WithCancel(context.Background()) processNode := func(i int) { - nodeInfo := nodeNameToInfo[allNodeNames[i]] + nodeInfo := allNodes[i] if nodeInfo.Node() != nil { if hasAffinityConstraints || hasAntiAffinityConstraints { // We need to process all the pods. @@ -208,7 +208,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node } } } - workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processNode) + workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode) if err := errCh.ReceiveError(); err != nil { return nil, err } diff --git a/pkg/scheduler/core/BUILD b/pkg/scheduler/core/BUILD index e0c2c4a48cd..389be23fe6a 100644 --- a/pkg/scheduler/core/BUILD +++ b/pkg/scheduler/core/BUILD @@ -61,6 +61,7 @@ go_test( "//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/listers/fake:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library", + "//pkg/scheduler/nodeinfo/snapshot:go_default_library", "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 52fd1f17b11..019d7f90689 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -483,7 +483,7 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor ctx, cancel := context.WithCancel(ctx) // We can use the same metadata producer for all nodes. - meta := g.predicateMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap) + meta := g.predicateMetaProducer(pod, g.nodeInfoSnapshot) state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta}) checkNode := func(i int) { @@ -1033,7 +1033,7 @@ func (g *genericScheduler) selectNodesForPreemption( var resultLock sync.Mutex // We can use the same metadata producer for all nodes. - meta := g.predicateMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap) + meta := g.predicateMetaProducer(pod, g.nodeInfoSnapshot) checkNode := func(i int) { nodeName := potentialNodes[i].Name if g.nodeInfoSnapshot.NodeInfoMap[nodeName] == nil { diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index d39e701427e..62a56c7413f 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -49,6 +49,7 @@ import ( internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" + nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" ) var ( @@ -1424,14 +1425,14 @@ func TestSelectNodesForPreemption(t *testing.T) { p := fakelisters.PodLister(test.pods) test.predicates[algorithmpredicates.MatchInterPodAffinityPred] = algorithmpredicates.NewPodAffinityPredicate(n, p) } - nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, nodes) + + g.nodeInfoSnapshot = nodeinfosnapshot.NewSnapshot(test.pods, nodes) // newnode simulate a case that a new node is added to the cluster, but nodeNameToInfo // doesn't have it yet. newnode := makeNode("newnode", 1000*5, priorityutil.DefaultMemoryRequest*5) newnode.ObjectMeta.Labels = map[string]string{"hostname": "newnode"} nodes = append(nodes, newnode) state := framework.NewCycleState() - g.nodeInfoSnapshot.NodeInfoMap = nodeNameToInfo nodeToPods, err := g.selectNodesForPreemption(context.Background(), state, test.pod, nodes, nil) if err != nil { t.Error(err) diff --git a/pkg/scheduler/framework/plugins/imagelocality/BUILD b/pkg/scheduler/framework/plugins/imagelocality/BUILD index fc81c5c5644..1b70d7113c4 100644 --- a/pkg/scheduler/framework/plugins/imagelocality/BUILD +++ b/pkg/scheduler/framework/plugins/imagelocality/BUILD @@ -23,6 +23,7 @@ go_test( "//pkg/scheduler/framework/plugins/migration:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library", + "//pkg/scheduler/nodeinfo/snapshot:go_default_library", "//pkg/util/parsers:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/pkg/scheduler/framework/plugins/imagelocality/image_locality.go b/pkg/scheduler/framework/plugins/imagelocality/image_locality.go index 7f90a79e39a..f8555c528ad 100644 --- a/pkg/scheduler/framework/plugins/imagelocality/image_locality.go +++ b/pkg/scheduler/framework/plugins/imagelocality/image_locality.go @@ -27,8 +27,6 @@ import ( framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" ) -var mb int64 = 1024 * 1024 - // ImageLocality is a score plugin that favors nodes that already have requested pod container's images. type ImageLocality struct { handle framework.FrameworkHandle @@ -46,9 +44,9 @@ func (pl *ImageLocality) Name() string { // Score invoked at the score extension point. func (pl *ImageLocality) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { - nodeInfo, exist := pl.handle.NodeInfoSnapshot().NodeInfoMap[nodeName] - if !exist { - return 0, framework.NewStatus(framework.Error, fmt.Sprintf("node %q does not exist in NodeInfoSnapshot", nodeName)) + nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil { + return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) } meta := migration.PriorityMetadata(state) diff --git a/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go b/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go index 2e57341952d..3a8b2986350 100644 --- a/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go +++ b/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go @@ -29,9 +29,12 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" + nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" "k8s.io/kubernetes/pkg/util/parsers" ) +var mb int64 = 1024 * 1024 + func TestImageLocalityPriority(t *testing.T) { test40250 := v1.PodSpec{ Containers: []v1.Container{ @@ -205,9 +208,7 @@ func TestImageLocalityPriority(t *testing.T) { state := framework.NewCycleState() state.Write(migration.PrioritiesStateKey, &migration.PrioritiesStateData{Reference: meta}) - fh, _ := framework.NewFramework(nil, nil, nil) - snapshot := fh.NodeInfoSnapshot() - snapshot.NodeInfoMap = schedulernodeinfo.CreateNodeNameToInfoMap(nil, test.nodes) + fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(nodeinfosnapshot.NewSnapshot(nil, test.nodes))) p, _ := New(nil, fh) var gotList framework.NodeScoreList diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/interpod_affinity_test.go b/pkg/scheduler/framework/plugins/interpodaffinity/interpod_affinity_test.go index 2133ba74e92..6092698be07 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/interpod_affinity_test.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/interpod_affinity_test.go @@ -733,7 +733,7 @@ func TestSingleNode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { snapshot := nodeinfosnapshot.NewSnapshot(test.pods, []*v1.Node{test.node}) - meta := predicates.GetPredicateMetadata(test.pod, snapshot.NodeInfoMap) + meta := predicates.GetPredicateMetadata(test.pod, snapshot) state := framework.NewCycleState() state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta}) @@ -1434,7 +1434,7 @@ func TestMultipleNodes(t *testing.T) { t.Run(test.name, func(t *testing.T) { snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes) for indexNode, node := range test.nodes { - meta := predicates.GetPredicateMetadata(test.pod, snapshot.NodeInfoMap) + meta := predicates.GetPredicateMetadata(test.pod, snapshot) state := framework.NewCycleState() state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta}) diff --git a/pkg/scheduler/framework/plugins/nodeaffinity/BUILD b/pkg/scheduler/framework/plugins/nodeaffinity/BUILD index 9e507c4b7fd..54031928207 100644 --- a/pkg/scheduler/framework/plugins/nodeaffinity/BUILD +++ b/pkg/scheduler/framework/plugins/nodeaffinity/BUILD @@ -39,6 +39,7 @@ go_test( "//pkg/scheduler/api:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library", + "//pkg/scheduler/nodeinfo/snapshot:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", ], diff --git a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go index 1bd599503ee..e43999e1342 100644 --- a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go +++ b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go @@ -53,9 +53,9 @@ func (pl *NodeAffinity) Filter(ctx context.Context, state *framework.CycleState, // Score invoked at the Score extension point. func (pl *NodeAffinity) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { - nodeInfo, exist := pl.handle.NodeInfoSnapshot().NodeInfoMap[nodeName] - if !exist { - return 0, framework.NewStatus(framework.Error, fmt.Sprintf("node %q does not exist in NodeInfoSnapshot", nodeName)) + nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil { + return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) } meta := migration.PriorityMetadata(state) diff --git a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go index a588083c6f3..d995d5124df 100644 --- a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go +++ b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go @@ -27,6 +27,7 @@ import ( schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" + nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" ) // TODO: Add test case for RequiredDuringSchedulingRequiredDuringExecution after it's implemented. @@ -849,10 +850,7 @@ func TestNodeAffinityPriority(t *testing.T) { t.Run(test.name, func(t *testing.T) { state := framework.NewCycleState() - fh, _ := framework.NewFramework(nil, nil, nil) - snapshot := fh.NodeInfoSnapshot() - snapshot.NodeInfoMap = schedulernodeinfo.CreateNodeNameToInfoMap(nil, test.nodes) - + fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(nodeinfosnapshot.NewSnapshot(nil, test.nodes))) p, _ := New(nil, fh) var gotList framework.NodeScoreList for _, n := range test.nodes { diff --git a/pkg/scheduler/framework/plugins/nodepreferavoidpods/BUILD b/pkg/scheduler/framework/plugins/nodepreferavoidpods/BUILD index 9db862008f9..ee60c6b6728 100644 --- a/pkg/scheduler/framework/plugins/nodepreferavoidpods/BUILD +++ b/pkg/scheduler/framework/plugins/nodepreferavoidpods/BUILD @@ -20,7 +20,7 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/scheduler/framework/v1alpha1:go_default_library", - "//pkg/scheduler/nodeinfo:go_default_library", + "//pkg/scheduler/nodeinfo/snapshot:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", ], diff --git a/pkg/scheduler/framework/plugins/nodepreferavoidpods/node_prefer_avoid_pods.go b/pkg/scheduler/framework/plugins/nodepreferavoidpods/node_prefer_avoid_pods.go index af17e48bbf5..ce62bb34b4e 100644 --- a/pkg/scheduler/framework/plugins/nodepreferavoidpods/node_prefer_avoid_pods.go +++ b/pkg/scheduler/framework/plugins/nodepreferavoidpods/node_prefer_avoid_pods.go @@ -45,9 +45,9 @@ func (pl *NodePreferAvoidPods) Name() string { // Score invoked at the score extension point. func (pl *NodePreferAvoidPods) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { - nodeInfo, exist := pl.handle.NodeInfoSnapshot().NodeInfoMap[nodeName] - if !exist { - return 0, framework.NewStatus(framework.Error, fmt.Sprintf("node %q does not exist in NodeInfoSnapshot", nodeName)) + nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil { + return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) } meta := migration.PriorityMetadata(state) diff --git a/pkg/scheduler/framework/plugins/nodepreferavoidpods/node_prefer_avoid_pods_test.go b/pkg/scheduler/framework/plugins/nodepreferavoidpods/node_prefer_avoid_pods_test.go index 16c36b0c7a7..436b56ede35 100644 --- a/pkg/scheduler/framework/plugins/nodepreferavoidpods/node_prefer_avoid_pods_test.go +++ b/pkg/scheduler/framework/plugins/nodepreferavoidpods/node_prefer_avoid_pods_test.go @@ -24,7 +24,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" - schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" + nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" ) func TestNodePreferAvoidPods(t *testing.T) { @@ -143,11 +143,7 @@ func TestNodePreferAvoidPods(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { state := framework.NewCycleState() - - fh, _ := framework.NewFramework(nil, nil, nil) - snapshot := fh.NodeInfoSnapshot() - snapshot.NodeInfoMap = schedulernodeinfo.CreateNodeNameToInfoMap(nil, test.nodes) - + fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(nodeinfosnapshot.NewSnapshot(nil, test.nodes))) p, _ := New(nil, fh) var gotList framework.NodeScoreList for _, n := range test.nodes { diff --git a/pkg/scheduler/framework/plugins/noderesources/BUILD b/pkg/scheduler/framework/plugins/noderesources/BUILD index 665acbbbc0c..aa2751836fb 100644 --- a/pkg/scheduler/framework/plugins/noderesources/BUILD +++ b/pkg/scheduler/framework/plugins/noderesources/BUILD @@ -54,6 +54,7 @@ go_test( "//pkg/scheduler/framework/plugins/migration:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library", + "//pkg/scheduler/nodeinfo/snapshot:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/pkg/scheduler/framework/plugins/noderesources/balanced_allocation.go b/pkg/scheduler/framework/plugins/noderesources/balanced_allocation.go index 2ee23724f36..e0984f2835c 100644 --- a/pkg/scheduler/framework/plugins/noderesources/balanced_allocation.go +++ b/pkg/scheduler/framework/plugins/noderesources/balanced_allocation.go @@ -45,10 +45,11 @@ func (ba *BalancedAllocation) Name() string { // Score invoked at the score extension point. func (ba *BalancedAllocation) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { - nodeInfo, exist := ba.handle.NodeInfoSnapshot().NodeInfoMap[nodeName] - if !exist { - return 0, framework.NewStatus(framework.Error, fmt.Sprintf("node %q does not exist in NodeInfoSnapshot", nodeName)) + nodeInfo, err := ba.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil { + return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) } + // BalancedResourceAllocationMap does not use priority metadata, hence we pass nil here s, err := priorities.BalancedResourceAllocationMap(pod, nil, nodeInfo) return s.Score, migration.ErrorToFrameworkStatus(err) diff --git a/pkg/scheduler/framework/plugins/noderesources/balanced_allocation_test.go b/pkg/scheduler/framework/plugins/noderesources/balanced_allocation_test.go index f8ea9197ff5..80ea3c11dea 100644 --- a/pkg/scheduler/framework/plugins/noderesources/balanced_allocation_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/balanced_allocation_test.go @@ -28,7 +28,7 @@ import ( featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/kubernetes/pkg/features" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" - schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" + nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" ) // getExistingVolumeCountForNode gets the current number of volumes on node. @@ -379,18 +379,16 @@ func TestNodeResourcesBalancedAllocation(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, test.nodes) + snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes) if len(test.pod.Spec.Volumes) > 0 { maxVolumes := 5 - for _, info := range nodeNameToInfo { + nodeInfoList, _ := snapshot.NodeInfos().List() + for _, info := range nodeInfoList { info.TransientInfo.TransNodeInfo.AllocatableVolumesCount = getExistingVolumeCountForNode(info.Pods(), maxVolumes) info.TransientInfo.TransNodeInfo.RequestedVolumes = len(test.pod.Spec.Volumes) } } - fh, _ := framework.NewFramework(nil, nil, nil) - snapshot := fh.NodeInfoSnapshot() - snapshot.NodeInfoMap = nodeNameToInfo - + fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(snapshot)) p, _ := NewBalancedAllocation(nil, fh) for i := range test.nodes { diff --git a/pkg/scheduler/framework/plugins/noderesources/least_allocated.go b/pkg/scheduler/framework/plugins/noderesources/least_allocated.go index b1c59923ece..6c8b7d7b195 100644 --- a/pkg/scheduler/framework/plugins/noderesources/least_allocated.go +++ b/pkg/scheduler/framework/plugins/noderesources/least_allocated.go @@ -44,10 +44,11 @@ func (la *LeastAllocated) Name() string { // Score invoked at the score extension point. func (la *LeastAllocated) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { - nodeInfo, exist := la.handle.NodeInfoSnapshot().NodeInfoMap[nodeName] - if !exist { - return 0, framework.NewStatus(framework.Error, fmt.Sprintf("node %q does not exist in NodeInfoSnapshot", nodeName)) + nodeInfo, err := la.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil { + return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) } + // LeastRequestedPriorityMap does not use priority metadata, hence we pass nil here s, err := priorities.LeastRequestedPriorityMap(pod, nil, nodeInfo) return s.Score, migration.ErrorToFrameworkStatus(err) diff --git a/pkg/scheduler/framework/plugins/noderesources/least_allocated_test.go b/pkg/scheduler/framework/plugins/noderesources/least_allocated_test.go index e6728081cd6..b30660cd340 100644 --- a/pkg/scheduler/framework/plugins/noderesources/least_allocated_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/least_allocated_test.go @@ -25,7 +25,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" - schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" + nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" ) func TestNodeResourcesLeastAllocated(t *testing.T) { @@ -233,11 +233,8 @@ func TestNodeResourcesLeastAllocated(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, test.nodes) - fh, _ := framework.NewFramework(nil, nil, nil) - snapshot := fh.NodeInfoSnapshot() - snapshot.NodeInfoMap = nodeNameToInfo - + snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes) + fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(snapshot)) p, _ := NewLeastAllocated(nil, fh) for i := range test.nodes { hostResult, err := p.(framework.ScorePlugin).Score(context.Background(), nil, test.pod, test.nodes[i].Name) diff --git a/pkg/scheduler/framework/plugins/noderesources/most_allocated.go b/pkg/scheduler/framework/plugins/noderesources/most_allocated.go index 00d1b11b60d..b4ef4d337c6 100644 --- a/pkg/scheduler/framework/plugins/noderesources/most_allocated.go +++ b/pkg/scheduler/framework/plugins/noderesources/most_allocated.go @@ -44,10 +44,11 @@ func (ma *MostAllocated) Name() string { // Score invoked at the Score extension point. func (ma *MostAllocated) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { - nodeInfo, exist := ma.handle.NodeInfoSnapshot().NodeInfoMap[nodeName] - if !exist { - return 0, framework.NewStatus(framework.Error, fmt.Sprintf("node %q does not exist in NodeInfoSnapshot", nodeName)) + nodeInfo, err := ma.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil { + return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) } + // MostRequestedPriorityMap does not use priority metadata, hence we pass nil here s, err := priorities.MostRequestedPriorityMap(pod, nil, nodeInfo) return s.Score, migration.ErrorToFrameworkStatus(err) diff --git a/pkg/scheduler/framework/plugins/noderesources/most_allocated_test.go b/pkg/scheduler/framework/plugins/noderesources/most_allocated_test.go index d803ffda71b..fe69e95c1c4 100644 --- a/pkg/scheduler/framework/plugins/noderesources/most_allocated_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/most_allocated_test.go @@ -25,7 +25,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" - schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" + nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" ) func TestNodeResourcesMostAllocated(t *testing.T) { @@ -196,11 +196,8 @@ func TestNodeResourcesMostAllocated(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, test.nodes) - fh, _ := framework.NewFramework(nil, nil, nil) - snapshot := fh.NodeInfoSnapshot() - snapshot.NodeInfoMap = nodeNameToInfo - + snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes) + fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(snapshot)) p, _ := NewMostAllocated(nil, fh) for i := range test.nodes { hostResult, err := p.(framework.ScorePlugin).Score(context.Background(), nil, test.pod, test.nodes[i].Name) diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/BUILD b/pkg/scheduler/framework/plugins/podtopologyspread/BUILD index 5f58cae2436..8a4edce1609 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/BUILD +++ b/pkg/scheduler/framework/plugins/podtopologyspread/BUILD @@ -23,7 +23,7 @@ go_test( "//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/framework/plugins/migration:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", - "//pkg/scheduler/nodeinfo:go_default_library", + "//pkg/scheduler/nodeinfo/snapshot:go_default_library", "//pkg/scheduler/testing:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", ], diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/pod_topology_spread_test.go b/pkg/scheduler/framework/plugins/podtopologyspread/pod_topology_spread_test.go index 3283639493a..6c4663d8f98 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/pod_topology_spread_test.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/pod_topology_spread_test.go @@ -24,7 +24,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" - schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" + nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" st "k8s.io/kubernetes/pkg/scheduler/testing" ) @@ -267,13 +267,14 @@ func TestPodTopologySpreadFilter_SingleConstraint(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(tt.existingPods, tt.nodes) - meta := predicates.GetPredicateMetadata(tt.pod, nodeInfoMap) + snapshot := nodeinfosnapshot.NewSnapshot(tt.existingPods, tt.nodes) + meta := predicates.GetPredicateMetadata(tt.pod, snapshot) state := v1alpha1.NewCycleState() state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta}) plugin, _ := New(nil, nil) for _, node := range tt.nodes { - status := plugin.(*PodTopologySpread).Filter(context.Background(), state, tt.pod, nodeInfoMap[node.Name]) + nodeInfo, _ := snapshot.NodeInfos().Get(node.Name) + status := plugin.(*PodTopologySpread).Filter(context.Background(), state, tt.pod, nodeInfo) if status.IsSuccess() != tt.fits[node.Name] { t.Errorf("[%s]: expected %v got %v", node.Name, tt.fits[node.Name], status.IsSuccess()) } @@ -463,13 +464,14 @@ func TestPodTopologySpreadFilter_MultipleConstraints(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(tt.existingPods, tt.nodes) - meta := predicates.GetPredicateMetadata(tt.pod, nodeInfoMap) + snapshot := nodeinfosnapshot.NewSnapshot(tt.existingPods, tt.nodes) + meta := predicates.GetPredicateMetadata(tt.pod, snapshot) state := v1alpha1.NewCycleState() state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta}) plugin, _ := New(nil, nil) for _, node := range tt.nodes { - status := plugin.(*PodTopologySpread).Filter(context.Background(), state, tt.pod, nodeInfoMap[node.Name]) + nodeInfo, _ := snapshot.NodeInfos().Get(node.Name) + status := plugin.(*PodTopologySpread).Filter(context.Background(), state, tt.pod, nodeInfo) if status.IsSuccess() != tt.fits[node.Name] { t.Errorf("[%s]: expected %v got %v", node.Name, tt.fits[node.Name], status.IsSuccess()) } diff --git a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go index 569980edfad..6ce6f96a957 100644 --- a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go +++ b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go @@ -54,9 +54,9 @@ func (pl *TaintToleration) Filter(ctx context.Context, state *framework.CycleSta // Score invoked at the Score extension point. func (pl *TaintToleration) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { - nodeInfo, exist := pl.handle.NodeInfoSnapshot().NodeInfoMap[nodeName] - if !exist { - return 0, framework.NewStatus(framework.Error, fmt.Sprintf("node %q does not exist in NodeInfoSnapshot", nodeName)) + nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil { + return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) } meta := migration.PriorityMetadata(state) s, err := priorities.ComputeTaintTolerationPriorityMap(pod, meta, nodeInfo)