From ed26fcf5b87228ebaf5e870505a9bd5682c9ff35 Mon Sep 17 00:00:00 2001 From: dingzhu lurong <1015542478@qq.com> Date: Sat, 6 May 2023 10:59:23 +0800 Subject: [PATCH] cleanup useless null pointer check about nodeInfo.Node() from snapshot for in-tree plugins --- .../plugins/interpodaffinity/filtering.go | 14 +- .../plugins/interpodaffinity/scoring.go | 4 +- .../plugins/nodeaffinity/node_affinity.go | 4 +- .../framework/plugins/nodename/node_name.go | 4 +- .../noderesources/resource_allocation.go | 4 +- .../nodeunschedulable/node_unschedulable.go | 4 +- .../framework/plugins/nodevolumelimits/csi.go | 3 - .../plugins/nodevolumelimits/non_csi.go | 3 - .../plugins/podtopologyspread/filtering.go | 8 - .../plugins/podtopologyspread/scoring.go | 3 - .../tainttoleration/taint_toleration.go | 3 - .../plugins/volumebinding/volume_binding.go | 3 - pkg/scheduler/framework/runtime/framework.go | 2 +- pkg/scheduler/schedule_one_test.go | 169 ++++++++++++++++++ 14 files changed, 177 insertions(+), 51 deletions(-) diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go b/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go index 02a7c792693..b5a83e81048 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go @@ -23,7 +23,6 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -158,10 +157,7 @@ func (pl *InterPodAffinity) getExistingAntiAffinityCounts(ctx context.Context, p processNode := func(i int) { nodeInfo := nodes[i] node := nodeInfo.Node() - if node == nil { - klog.ErrorS(nil, "Node not found") - return - } + topoMap := make(topologyToMatchedTermCount) for _, existingPod := range nodeInfo.PodsWithRequiredAntiAffinity { topoMap.updateWithAntiAffinityTerms(existingPod.RequiredAntiAffinityTerms, pod, nsLabels, node, 1) @@ -197,10 +193,7 @@ func (pl *InterPodAffinity) getIncomingAffinityAntiAffinityCounts(ctx context.Co processNode := func(i int) { nodeInfo := allNodes[i] node := nodeInfo.Node() - if node == nil { - klog.ErrorS(nil, "Node not found") - return - } + affinity := make(topologyToMatchedTermCount) antiAffinity := make(topologyToMatchedTermCount) for _, existingPod := range nodeInfo.Pods { @@ -369,9 +362,6 @@ func satisfyPodAffinity(state *preFilterState, nodeInfo *framework.NodeInfo) boo // Filter invoked at the filter extension point. // It checks if a pod can be scheduled on the specified node with pod affinity/anti-affinity configuration. func (pl *InterPodAffinity) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { - if nodeInfo.Node() == nil { - return framework.NewStatus(framework.Error, "node not found") - } state, err := getPreFilterState(cycleState) if err != nil { diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go b/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go index fd4cc24327d..90a18344355 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go @@ -192,9 +192,7 @@ func (pl *InterPodAffinity) PreScore( index := int32(-1) processNode := func(i int) { nodeInfo := allNodes[i] - if nodeInfo.Node() == nil { - return - } + // Unless the pod being scheduled has preferred affinity terms, we only // need to process pods with affinity in the node. podsToProcess := nodeInfo.PodsWithAffinity diff --git a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go index 6cf555c7a37..871fd924ea4 100644 --- a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go +++ b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go @@ -149,9 +149,7 @@ func (pl *NodeAffinity) PreFilterExtensions() framework.PreFilterExtensions { // the plugin's added affinity. func (pl *NodeAffinity) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { node := nodeInfo.Node() - if node == nil { - return framework.NewStatus(framework.Error, "node not found") - } + if pl.addedNodeSelector != nil && !pl.addedNodeSelector.Match(node) { return framework.NewStatus(framework.UnschedulableAndUnresolvable, errReasonEnforced) } diff --git a/pkg/scheduler/framework/plugins/nodename/node_name.go b/pkg/scheduler/framework/plugins/nodename/node_name.go index e02134f793c..1f0a0f388df 100644 --- a/pkg/scheduler/framework/plugins/nodename/node_name.go +++ b/pkg/scheduler/framework/plugins/nodename/node_name.go @@ -54,9 +54,7 @@ func (pl *NodeName) Name() string { // Filter invoked at the filter extension point. func (pl *NodeName) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { - if nodeInfo.Node() == nil { - return framework.NewStatus(framework.Error, "node not found") - } + if !Fits(pod, nodeInfo) { return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReason) } diff --git a/pkg/scheduler/framework/plugins/noderesources/resource_allocation.go b/pkg/scheduler/framework/plugins/noderesources/resource_allocation.go index 535aacf9ecc..863647bd0af 100644 --- a/pkg/scheduler/framework/plugins/noderesources/resource_allocation.go +++ b/pkg/scheduler/framework/plugins/noderesources/resource_allocation.go @@ -52,9 +52,7 @@ func (r *resourceAllocationScorer) score( podRequests []int64) (int64, *framework.Status) { logger := klog.FromContext(ctx) node := nodeInfo.Node() - if node == nil { - return 0, framework.NewStatus(framework.Error, "node not found") - } + // resources not set, nothing scheduled, if len(r.resources) == 0 { return 0, framework.NewStatus(framework.Error, "resources not found") diff --git a/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go b/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go index 47231837e5c..47844847ff8 100644 --- a/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go +++ b/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go @@ -60,9 +60,7 @@ func (pl *NodeUnschedulable) Name() string { // Filter invoked at the filter extension point. func (pl *NodeUnschedulable) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { node := nodeInfo.Node() - if node == nil { - return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonUnknownCondition) - } + // If pod tolerate unschedulable taint, it's also tolerate `node.Spec.Unschedulable`. podToleratesUnschedulable := v1helper.TolerationsTolerateTaint(pod.Spec.Tolerations, &v1.Taint{ Key: v1.TaintNodeUnschedulable, diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go index e9a021061e6..8d345d602ee 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go @@ -109,9 +109,6 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v } node := nodeInfo.Node() - if node == nil { - return framework.NewStatus(framework.Error, "node not found") - } // If CSINode doesn't exist, the predicate may read the limits from Node object csiNode, err := pl.csiNodeLister.Get(node.Name) diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go b/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go index 5363f7db691..03a012eb0f3 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go @@ -249,9 +249,6 @@ func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod } node := nodeInfo.Node() - if node == nil { - return framework.NewStatus(framework.Error, "node not found") - } var csiNode *storage.CSINode var err error diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/filtering.go b/pkg/scheduler/framework/plugins/podtopologyspread/filtering.go index a3b11b9fd4c..68cdc2199fb 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/filtering.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/filtering.go @@ -275,14 +275,9 @@ func (pl *PodTopologySpread) calPreFilterState(ctx context.Context, pod *v1.Pod) tpCountsByNode := make([]map[topologyPair]int, len(allNodes)) requiredNodeAffinity := nodeaffinity.GetRequiredNodeAffinity(pod) - logger := klog.FromContext(ctx) processNode := func(i int) { nodeInfo := allNodes[i] node := nodeInfo.Node() - if node == nil { - logger.Error(nil, "Node not found") - return - } if !pl.enableNodeInclusionPolicyInPodTopologySpread { // spreading is applied to nodes that pass those filters. @@ -339,9 +334,6 @@ func (pl *PodTopologySpread) calPreFilterState(ctx context.Context, pod *v1.Pod) // Filter invoked at the filter extension point. func (pl *PodTopologySpread) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { node := nodeInfo.Node() - if node == nil { - return framework.AsStatus(fmt.Errorf("node not found")) - } s, err := getPreFilterState(cycleState) if err != nil { diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/scoring.go b/pkg/scheduler/framework/plugins/podtopologyspread/scoring.go index 9dee1ac8d80..bf5e8fe7d05 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/scoring.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/scoring.go @@ -149,9 +149,6 @@ func (pl *PodTopologySpread) PreScore( processAllNode := func(i int) { nodeInfo := allNodes[i] node := nodeInfo.Node() - if node == nil { - return - } if !pl.enableNodeInclusionPolicyInPodTopologySpread { // `node` should satisfy incoming pod's NodeSelector/NodeAffinity diff --git a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go index 4611a98158c..a990754e741 100644 --- a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go +++ b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go @@ -63,9 +63,6 @@ func (pl *TaintToleration) EventsToRegister() []framework.ClusterEvent { // Filter invoked at the filter extension point. func (pl *TaintToleration) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { node := nodeInfo.Node() - if node == nil { - return framework.AsStatus(fmt.Errorf("invalid nodeInfo")) - } taint, isUntolerated := v1helper.FindMatchingUntoleratedTaint(node.Spec.Taints, pod.Spec.Tolerations, helper.DoNotScheduleTaintsFilterFunc()) if !isUntolerated { diff --git a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go index 3d251ba4a55..2209e9309bb 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go +++ b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go @@ -233,9 +233,6 @@ func getStateData(cs *framework.CycleState) (*stateData, error) { // PVCs can be matched with an available and node-compatible PV. func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { node := nodeInfo.Node() - if node == nil { - return framework.NewStatus(framework.Error, "node not found") - } state, err := getStateData(cs) if err != nil { diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 9fcb35fabc4..c6d5edc9e44 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -873,7 +873,7 @@ func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, s // to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState, // 3) augmented nodeInfo. func addNominatedPods(ctx context.Context, fh framework.Handle, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) { - if fh == nil || nodeInfo.Node() == nil { + if fh == nil { // This may happen only in tests. return false, state, nodeInfo, nil } diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index fd925059c38..4de3aa51120 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "math" + "math/rand" "reflect" "regexp" "sort" @@ -296,6 +297,44 @@ func newFakeNodeSelector(args runtime.Object, _ framework.Handle) (framework.Plu return pl, nil } +const ( + fakeSpecifiedNodeNameAnnotation = "fake-specified-node-name" +) + +// fakeNodeSelectorDependOnPodAnnotation schedules pod to the specified one node from pod.Annotations[fakeSpecifiedNodeNameAnnotation]. +type fakeNodeSelectorDependOnPodAnnotation struct{} + +func (f *fakeNodeSelectorDependOnPodAnnotation) Name() string { + return "FakeNodeSelectorDependOnPodAnnotation" +} + +// Filter selects the specified one node and rejects other non-specified nodes. +func (f *fakeNodeSelectorDependOnPodAnnotation) Filter(_ context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { + resolveNodeNameFromPodAnnotation := func(pod *v1.Pod) (string, error) { + if pod == nil { + return "", fmt.Errorf("empty pod") + } + nodeName, ok := pod.Annotations[fakeSpecifiedNodeNameAnnotation] + if !ok { + return "", fmt.Errorf("no specified node name on pod %s/%s annotation", pod.Namespace, pod.Name) + } + return nodeName, nil + } + + nodeName, err := resolveNodeNameFromPodAnnotation(pod) + if err != nil { + return framework.AsStatus(err) + } + if nodeInfo.Node().Name != nodeName { + return framework.NewStatus(framework.UnschedulableAndUnresolvable) + } + return nil +} + +func newFakeNodeSelectorDependOnPodAnnotation(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return &fakeNodeSelectorDependOnPodAnnotation{}, nil +} + type TestPlugin struct { name string } @@ -448,6 +487,136 @@ func TestSchedulerMultipleProfilesScheduling(t *testing.T) { } } +// TestSchedulerGuaranteeNonNilNodeInSchedulingCycle is for detecting potential panic on nil Node when iterating Nodes. +func TestSchedulerGuaranteeNonNilNodeInSchedulingCycle(t *testing.T) { + random := rand.New(rand.NewSource(time.Now().UnixNano())) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var ( + initialNodeNumber = 1000 + initialPodNumber = 500 + waitSchedulingPodNumber = 200 + deleteNodeNumberPerRound = 20 + createPodNumberPerRound = 50 + + fakeSchedulerName = "fake-scheduler" + fakeNamespace = "fake-namespace" + + initialNodes []runtime.Object + initialPods []runtime.Object + ) + + for i := 0; i < initialNodeNumber; i++ { + nodeName := fmt.Sprintf("node%d", i) + initialNodes = append(initialNodes, st.MakeNode().Name(nodeName).UID(nodeName).Obj()) + } + // Randomly scatter initial pods onto nodes. + for i := 0; i < initialPodNumber; i++ { + podName := fmt.Sprintf("scheduled-pod%d", i) + assignedNodeName := fmt.Sprintf("node%d", random.Intn(initialNodeNumber)) + initialPods = append(initialPods, st.MakePod().Name(podName).UID(podName).Node(assignedNodeName).Obj()) + } + + objs := []runtime.Object{&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: fakeNamespace}}} + objs = append(objs, initialNodes...) + objs = append(objs, initialPods...) + client := clientsetfake.NewSimpleClientset(objs...) + broadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) + + informerFactory := informers.NewSharedInformerFactory(client, 0) + sched, err := New( + client, + informerFactory, + nil, + profile.NewRecorderFactory(broadcaster), + ctx.Done(), + WithProfiles( + schedulerapi.KubeSchedulerProfile{SchedulerName: fakeSchedulerName, + Plugins: &schedulerapi.Plugins{ + Filter: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "FakeNodeSelectorDependOnPodAnnotation"}}}, + QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}}, + Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}}, + }, + }, + ), + WithFrameworkOutOfTreeRegistry(frameworkruntime.Registry{ + "FakeNodeSelectorDependOnPodAnnotation": newFakeNodeSelectorDependOnPodAnnotation, + }), + ) + if err != nil { + t.Fatal(err) + } + + // Run scheduler. + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + go sched.Run(ctx) + + var deleteNodeIndex int + deleteNodesOneRound := func() { + for i := 0; i < deleteNodeNumberPerRound; i++ { + if deleteNodeIndex >= initialNodeNumber { + // all initial nodes are already deleted + return + } + deleteNodeName := fmt.Sprintf("node%d", deleteNodeIndex) + if err := client.CoreV1().Nodes().Delete(ctx, deleteNodeName, metav1.DeleteOptions{}); err != nil { + t.Fatal(err) + } + deleteNodeIndex++ + } + } + var createPodIndex int + createPodsOneRound := func() { + if createPodIndex > waitSchedulingPodNumber { + return + } + for i := 0; i < createPodNumberPerRound; i++ { + podName := fmt.Sprintf("pod%d", createPodIndex) + // Note: the node(specifiedNodeName) may already be deleted, which leads pod scheduled failed. + specifiedNodeName := fmt.Sprintf("node%d", random.Intn(initialNodeNumber)) + + waitSchedulingPod := st.MakePod().Namespace(fakeNamespace).Name(podName).UID(podName).Annotation(fakeSpecifiedNodeNameAnnotation, specifiedNodeName).SchedulerName(fakeSchedulerName).Obj() + if _, err := client.CoreV1().Pods(fakeNamespace).Create(ctx, waitSchedulingPod, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + createPodIndex++ + } + } + + // Following we start 2 goroutines asynchronously to detect potential racing issues: + // 1) One is responsible for deleting several nodes in each round; + // 2) Another is creating several pods in each round to trigger scheduling; + // Those two goroutines will stop until ctx.Done() is called, which means all waiting pods are scheduled at least once. + go wait.Until(deleteNodesOneRound, 10*time.Millisecond, ctx.Done()) + go wait.Until(createPodsOneRound, 9*time.Millisecond, ctx.Done()) + + // Capture the events to wait all pods to be scheduled at least once. + allWaitSchedulingPods := sets.NewString() + for i := 0; i < waitSchedulingPodNumber; i++ { + allWaitSchedulingPods.Insert(fmt.Sprintf("pod%d", i)) + } + var wg sync.WaitGroup + wg.Add(waitSchedulingPodNumber) + stopFn, err := broadcaster.StartEventWatcher(func(obj runtime.Object) { + e, ok := obj.(*eventsv1.Event) + if !ok || (e.Reason != "Scheduled" && e.Reason != "FailedScheduling") { + return + } + if allWaitSchedulingPods.Has(e.Regarding.Name) { + wg.Done() + allWaitSchedulingPods.Delete(e.Regarding.Name) + } + }) + if err != nil { + t.Fatal(err) + } + defer stopFn() + + wg.Wait() +} + func TestSchedulerScheduleOne(t *testing.T) { testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1", UID: types.UID("node1")}} client := clientsetfake.NewSimpleClientset(&testNode)