mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-12 13:31:52 +00:00
Merge pull request #117834 from NoicFank/cleanup-scheduler-node-must-not-nil-in-snapshot
cleanup useless null pointer check about nodeInfo.Node() from snapshot for in-tree plugins
This commit is contained in:
commit
c7c41d27b4
@ -23,7 +23,6 @@ import (
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
)
|
||||
|
||||
@ -158,10 +157,7 @@ func (pl *InterPodAffinity) getExistingAntiAffinityCounts(ctx context.Context, p
|
||||
processNode := func(i int) {
|
||||
nodeInfo := nodes[i]
|
||||
node := nodeInfo.Node()
|
||||
if node == nil {
|
||||
klog.ErrorS(nil, "Node not found")
|
||||
return
|
||||
}
|
||||
|
||||
topoMap := make(topologyToMatchedTermCount)
|
||||
for _, existingPod := range nodeInfo.PodsWithRequiredAntiAffinity {
|
||||
topoMap.updateWithAntiAffinityTerms(existingPod.RequiredAntiAffinityTerms, pod, nsLabels, node, 1)
|
||||
@ -197,10 +193,7 @@ func (pl *InterPodAffinity) getIncomingAffinityAntiAffinityCounts(ctx context.Co
|
||||
processNode := func(i int) {
|
||||
nodeInfo := allNodes[i]
|
||||
node := nodeInfo.Node()
|
||||
if node == nil {
|
||||
klog.ErrorS(nil, "Node not found")
|
||||
return
|
||||
}
|
||||
|
||||
affinity := make(topologyToMatchedTermCount)
|
||||
antiAffinity := make(topologyToMatchedTermCount)
|
||||
for _, existingPod := range nodeInfo.Pods {
|
||||
@ -369,9 +362,6 @@ func satisfyPodAffinity(state *preFilterState, nodeInfo *framework.NodeInfo) boo
|
||||
// Filter invoked at the filter extension point.
|
||||
// It checks if a pod can be scheduled on the specified node with pod affinity/anti-affinity configuration.
|
||||
func (pl *InterPodAffinity) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
|
||||
if nodeInfo.Node() == nil {
|
||||
return framework.NewStatus(framework.Error, "node not found")
|
||||
}
|
||||
|
||||
state, err := getPreFilterState(cycleState)
|
||||
if err != nil {
|
||||
|
@ -190,9 +190,7 @@ func (pl *InterPodAffinity) PreScore(
|
||||
index := int32(-1)
|
||||
processNode := func(i int) {
|
||||
nodeInfo := allNodes[i]
|
||||
if nodeInfo.Node() == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Unless the pod being scheduled has preferred affinity terms, we only
|
||||
// need to process pods with affinity in the node.
|
||||
podsToProcess := nodeInfo.PodsWithAffinity
|
||||
|
@ -149,9 +149,7 @@ func (pl *NodeAffinity) PreFilterExtensions() framework.PreFilterExtensions {
|
||||
// the plugin's added affinity.
|
||||
func (pl *NodeAffinity) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
|
||||
node := nodeInfo.Node()
|
||||
if node == nil {
|
||||
return framework.NewStatus(framework.Error, "node not found")
|
||||
}
|
||||
|
||||
if pl.addedNodeSelector != nil && !pl.addedNodeSelector.Match(node) {
|
||||
return framework.NewStatus(framework.UnschedulableAndUnresolvable, errReasonEnforced)
|
||||
}
|
||||
|
@ -54,9 +54,7 @@ func (pl *NodeName) Name() string {
|
||||
|
||||
// Filter invoked at the filter extension point.
|
||||
func (pl *NodeName) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
|
||||
if nodeInfo.Node() == nil {
|
||||
return framework.NewStatus(framework.Error, "node not found")
|
||||
}
|
||||
|
||||
if !Fits(pod, nodeInfo) {
|
||||
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReason)
|
||||
}
|
||||
|
@ -52,9 +52,7 @@ func (r *resourceAllocationScorer) score(
|
||||
podRequests []int64) (int64, *framework.Status) {
|
||||
logger := klog.FromContext(ctx)
|
||||
node := nodeInfo.Node()
|
||||
if node == nil {
|
||||
return 0, framework.NewStatus(framework.Error, "node not found")
|
||||
}
|
||||
|
||||
// resources not set, nothing scheduled,
|
||||
if len(r.resources) == 0 {
|
||||
return 0, framework.NewStatus(framework.Error, "resources not found")
|
||||
|
@ -60,9 +60,7 @@ func (pl *NodeUnschedulable) Name() string {
|
||||
// Filter invoked at the filter extension point.
|
||||
func (pl *NodeUnschedulable) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
|
||||
node := nodeInfo.Node()
|
||||
if node == nil {
|
||||
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonUnknownCondition)
|
||||
}
|
||||
|
||||
// If pod tolerate unschedulable taint, it's also tolerate `node.Spec.Unschedulable`.
|
||||
podToleratesUnschedulable := v1helper.TolerationsTolerateTaint(pod.Spec.Tolerations, &v1.Taint{
|
||||
Key: v1.TaintNodeUnschedulable,
|
||||
|
@ -109,9 +109,6 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
|
||||
}
|
||||
|
||||
node := nodeInfo.Node()
|
||||
if node == nil {
|
||||
return framework.NewStatus(framework.Error, "node not found")
|
||||
}
|
||||
|
||||
// If CSINode doesn't exist, the predicate may read the limits from Node object
|
||||
csiNode, err := pl.csiNodeLister.Get(node.Name)
|
||||
|
@ -249,9 +249,6 @@ func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod
|
||||
}
|
||||
|
||||
node := nodeInfo.Node()
|
||||
if node == nil {
|
||||
return framework.NewStatus(framework.Error, "node not found")
|
||||
}
|
||||
|
||||
var csiNode *storage.CSINode
|
||||
var err error
|
||||
|
@ -275,14 +275,9 @@ func (pl *PodTopologySpread) calPreFilterState(ctx context.Context, pod *v1.Pod)
|
||||
|
||||
tpCountsByNode := make([]map[topologyPair]int, len(allNodes))
|
||||
requiredNodeAffinity := nodeaffinity.GetRequiredNodeAffinity(pod)
|
||||
logger := klog.FromContext(ctx)
|
||||
processNode := func(i int) {
|
||||
nodeInfo := allNodes[i]
|
||||
node := nodeInfo.Node()
|
||||
if node == nil {
|
||||
logger.Error(nil, "Node not found")
|
||||
return
|
||||
}
|
||||
|
||||
if !pl.enableNodeInclusionPolicyInPodTopologySpread {
|
||||
// spreading is applied to nodes that pass those filters.
|
||||
@ -339,9 +334,6 @@ func (pl *PodTopologySpread) calPreFilterState(ctx context.Context, pod *v1.Pod)
|
||||
// Filter invoked at the filter extension point.
|
||||
func (pl *PodTopologySpread) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
|
||||
node := nodeInfo.Node()
|
||||
if node == nil {
|
||||
return framework.AsStatus(fmt.Errorf("node not found"))
|
||||
}
|
||||
|
||||
s, err := getPreFilterState(cycleState)
|
||||
if err != nil {
|
||||
|
@ -149,9 +149,6 @@ func (pl *PodTopologySpread) PreScore(
|
||||
processAllNode := func(i int) {
|
||||
nodeInfo := allNodes[i]
|
||||
node := nodeInfo.Node()
|
||||
if node == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if !pl.enableNodeInclusionPolicyInPodTopologySpread {
|
||||
// `node` should satisfy incoming pod's NodeSelector/NodeAffinity
|
||||
|
@ -63,9 +63,6 @@ func (pl *TaintToleration) EventsToRegister() []framework.ClusterEvent {
|
||||
// Filter invoked at the filter extension point.
|
||||
func (pl *TaintToleration) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
|
||||
node := nodeInfo.Node()
|
||||
if node == nil {
|
||||
return framework.AsStatus(fmt.Errorf("invalid nodeInfo"))
|
||||
}
|
||||
|
||||
taint, isUntolerated := v1helper.FindMatchingUntoleratedTaint(node.Spec.Taints, pod.Spec.Tolerations, helper.DoNotScheduleTaintsFilterFunc())
|
||||
if !isUntolerated {
|
||||
|
@ -233,9 +233,6 @@ func getStateData(cs *framework.CycleState) (*stateData, error) {
|
||||
// PVCs can be matched with an available and node-compatible PV.
|
||||
func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
|
||||
node := nodeInfo.Node()
|
||||
if node == nil {
|
||||
return framework.NewStatus(framework.Error, "node not found")
|
||||
}
|
||||
|
||||
state, err := getStateData(cs)
|
||||
if err != nil {
|
||||
|
@ -933,7 +933,7 @@ func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, s
|
||||
// to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState,
|
||||
// 3) augmented nodeInfo.
|
||||
func addNominatedPods(ctx context.Context, fh framework.Handle, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) {
|
||||
if fh == nil || nodeInfo.Node() == nil {
|
||||
if fh == nil {
|
||||
// This may happen only in tests.
|
||||
return false, state, nodeInfo, nil
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"sort"
|
||||
@ -296,6 +297,44 @@ func newFakeNodeSelector(args runtime.Object, _ framework.Handle) (framework.Plu
|
||||
return pl, nil
|
||||
}
|
||||
|
||||
const (
|
||||
fakeSpecifiedNodeNameAnnotation = "fake-specified-node-name"
|
||||
)
|
||||
|
||||
// fakeNodeSelectorDependOnPodAnnotation schedules pod to the specified one node from pod.Annotations[fakeSpecifiedNodeNameAnnotation].
|
||||
type fakeNodeSelectorDependOnPodAnnotation struct{}
|
||||
|
||||
func (f *fakeNodeSelectorDependOnPodAnnotation) Name() string {
|
||||
return "FakeNodeSelectorDependOnPodAnnotation"
|
||||
}
|
||||
|
||||
// Filter selects the specified one node and rejects other non-specified nodes.
|
||||
func (f *fakeNodeSelectorDependOnPodAnnotation) Filter(_ context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
|
||||
resolveNodeNameFromPodAnnotation := func(pod *v1.Pod) (string, error) {
|
||||
if pod == nil {
|
||||
return "", fmt.Errorf("empty pod")
|
||||
}
|
||||
nodeName, ok := pod.Annotations[fakeSpecifiedNodeNameAnnotation]
|
||||
if !ok {
|
||||
return "", fmt.Errorf("no specified node name on pod %s/%s annotation", pod.Namespace, pod.Name)
|
||||
}
|
||||
return nodeName, nil
|
||||
}
|
||||
|
||||
nodeName, err := resolveNodeNameFromPodAnnotation(pod)
|
||||
if err != nil {
|
||||
return framework.AsStatus(err)
|
||||
}
|
||||
if nodeInfo.Node().Name != nodeName {
|
||||
return framework.NewStatus(framework.UnschedulableAndUnresolvable)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func newFakeNodeSelectorDependOnPodAnnotation(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
|
||||
return &fakeNodeSelectorDependOnPodAnnotation{}, nil
|
||||
}
|
||||
|
||||
type TestPlugin struct {
|
||||
name string
|
||||
}
|
||||
@ -448,6 +487,136 @@ func TestSchedulerMultipleProfilesScheduling(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestSchedulerGuaranteeNonNilNodeInSchedulingCycle is for detecting potential panic on nil Node when iterating Nodes.
|
||||
func TestSchedulerGuaranteeNonNilNodeInSchedulingCycle(t *testing.T) {
|
||||
random := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
var (
|
||||
initialNodeNumber = 1000
|
||||
initialPodNumber = 500
|
||||
waitSchedulingPodNumber = 200
|
||||
deleteNodeNumberPerRound = 20
|
||||
createPodNumberPerRound = 50
|
||||
|
||||
fakeSchedulerName = "fake-scheduler"
|
||||
fakeNamespace = "fake-namespace"
|
||||
|
||||
initialNodes []runtime.Object
|
||||
initialPods []runtime.Object
|
||||
)
|
||||
|
||||
for i := 0; i < initialNodeNumber; i++ {
|
||||
nodeName := fmt.Sprintf("node%d", i)
|
||||
initialNodes = append(initialNodes, st.MakeNode().Name(nodeName).UID(nodeName).Obj())
|
||||
}
|
||||
// Randomly scatter initial pods onto nodes.
|
||||
for i := 0; i < initialPodNumber; i++ {
|
||||
podName := fmt.Sprintf("scheduled-pod%d", i)
|
||||
assignedNodeName := fmt.Sprintf("node%d", random.Intn(initialNodeNumber))
|
||||
initialPods = append(initialPods, st.MakePod().Name(podName).UID(podName).Node(assignedNodeName).Obj())
|
||||
}
|
||||
|
||||
objs := []runtime.Object{&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: fakeNamespace}}}
|
||||
objs = append(objs, initialNodes...)
|
||||
objs = append(objs, initialPods...)
|
||||
client := clientsetfake.NewSimpleClientset(objs...)
|
||||
broadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
|
||||
|
||||
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
||||
sched, err := New(
|
||||
client,
|
||||
informerFactory,
|
||||
nil,
|
||||
profile.NewRecorderFactory(broadcaster),
|
||||
ctx.Done(),
|
||||
WithProfiles(
|
||||
schedulerapi.KubeSchedulerProfile{SchedulerName: fakeSchedulerName,
|
||||
Plugins: &schedulerapi.Plugins{
|
||||
Filter: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "FakeNodeSelectorDependOnPodAnnotation"}}},
|
||||
QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
|
||||
Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
|
||||
},
|
||||
},
|
||||
),
|
||||
WithFrameworkOutOfTreeRegistry(frameworkruntime.Registry{
|
||||
"FakeNodeSelectorDependOnPodAnnotation": newFakeNodeSelectorDependOnPodAnnotation,
|
||||
}),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Run scheduler.
|
||||
informerFactory.Start(ctx.Done())
|
||||
informerFactory.WaitForCacheSync(ctx.Done())
|
||||
go sched.Run(ctx)
|
||||
|
||||
var deleteNodeIndex int
|
||||
deleteNodesOneRound := func() {
|
||||
for i := 0; i < deleteNodeNumberPerRound; i++ {
|
||||
if deleteNodeIndex >= initialNodeNumber {
|
||||
// all initial nodes are already deleted
|
||||
return
|
||||
}
|
||||
deleteNodeName := fmt.Sprintf("node%d", deleteNodeIndex)
|
||||
if err := client.CoreV1().Nodes().Delete(ctx, deleteNodeName, metav1.DeleteOptions{}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
deleteNodeIndex++
|
||||
}
|
||||
}
|
||||
var createPodIndex int
|
||||
createPodsOneRound := func() {
|
||||
if createPodIndex > waitSchedulingPodNumber {
|
||||
return
|
||||
}
|
||||
for i := 0; i < createPodNumberPerRound; i++ {
|
||||
podName := fmt.Sprintf("pod%d", createPodIndex)
|
||||
// Note: the node(specifiedNodeName) may already be deleted, which leads pod scheduled failed.
|
||||
specifiedNodeName := fmt.Sprintf("node%d", random.Intn(initialNodeNumber))
|
||||
|
||||
waitSchedulingPod := st.MakePod().Namespace(fakeNamespace).Name(podName).UID(podName).Annotation(fakeSpecifiedNodeNameAnnotation, specifiedNodeName).SchedulerName(fakeSchedulerName).Obj()
|
||||
if _, err := client.CoreV1().Pods(fakeNamespace).Create(ctx, waitSchedulingPod, metav1.CreateOptions{}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
createPodIndex++
|
||||
}
|
||||
}
|
||||
|
||||
// Following we start 2 goroutines asynchronously to detect potential racing issues:
|
||||
// 1) One is responsible for deleting several nodes in each round;
|
||||
// 2) Another is creating several pods in each round to trigger scheduling;
|
||||
// Those two goroutines will stop until ctx.Done() is called, which means all waiting pods are scheduled at least once.
|
||||
go wait.Until(deleteNodesOneRound, 10*time.Millisecond, ctx.Done())
|
||||
go wait.Until(createPodsOneRound, 9*time.Millisecond, ctx.Done())
|
||||
|
||||
// Capture the events to wait all pods to be scheduled at least once.
|
||||
allWaitSchedulingPods := sets.NewString()
|
||||
for i := 0; i < waitSchedulingPodNumber; i++ {
|
||||
allWaitSchedulingPods.Insert(fmt.Sprintf("pod%d", i))
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(waitSchedulingPodNumber)
|
||||
stopFn, err := broadcaster.StartEventWatcher(func(obj runtime.Object) {
|
||||
e, ok := obj.(*eventsv1.Event)
|
||||
if !ok || (e.Reason != "Scheduled" && e.Reason != "FailedScheduling") {
|
||||
return
|
||||
}
|
||||
if allWaitSchedulingPods.Has(e.Regarding.Name) {
|
||||
wg.Done()
|
||||
allWaitSchedulingPods.Delete(e.Regarding.Name)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer stopFn()
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestSchedulerScheduleOne(t *testing.T) {
|
||||
testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1", UID: types.UID("node1")}}
|
||||
client := clientsetfake.NewSimpleClientset(&testNode)
|
||||
|
Loading…
Reference in New Issue
Block a user