mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Merge pull request #122251 from olderTaoist/unschedulable-plugin
register unschedulable plugin for those plugins that PreFilter's PreFilterResult filter out some nodes
This commit is contained in:
commit
b6899c5e08
@ -26,13 +26,14 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
)
|
||||
|
||||
type frameworkContract interface {
|
||||
RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status)
|
||||
RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status, sets.Set[string])
|
||||
RunFilterPlugins(context.Context, *framework.CycleState, *v1.Pod, *framework.NodeInfo) *framework.Status
|
||||
}
|
||||
|
||||
|
@ -589,7 +589,10 @@ type Framework interface {
|
||||
// cycle is aborted.
|
||||
// It also returns a PreFilterResult, which may influence what or how many nodes to
|
||||
// evaluate downstream.
|
||||
RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (*PreFilterResult, *Status)
|
||||
// The third returns value contains PreFilter plugin that rejected some or all Nodes with PreFilterResult.
|
||||
// But, note that it doesn't contain any plugin when a plugin rejects this Pod with non-success status,
|
||||
// not with PreFilterResult.
|
||||
RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (*PreFilterResult, *Status, sets.Set[string])
|
||||
|
||||
// RunPostFilterPlugins runs the set of configured PostFilter plugins.
|
||||
// PostFilter plugins can either be informational, in which case should be configured
|
||||
|
@ -383,7 +383,7 @@ func TestPostFilter(t *testing.T) {
|
||||
|
||||
state := framework.NewCycleState()
|
||||
// Ensure <state> is populated.
|
||||
if _, status := f.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() {
|
||||
if _, status, _ := f.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() {
|
||||
t.Errorf("Unexpected PreFilter Status: %v", status)
|
||||
}
|
||||
|
||||
@ -1141,7 +1141,7 @@ func TestDryRunPreemption(t *testing.T) {
|
||||
for cycle, pod := range tt.testPods {
|
||||
state := framework.NewCycleState()
|
||||
// Some tests rely on PreFilter plugin to compute its CycleState.
|
||||
if _, status := fwk.RunPreFilterPlugins(ctx, state, pod); !status.IsSuccess() {
|
||||
if _, status, _ := fwk.RunPreFilterPlugins(ctx, state, pod); !status.IsSuccess() {
|
||||
t.Errorf("cycle %d: Unexpected PreFilter Status: %v", cycle, status)
|
||||
}
|
||||
pe := preemption.Evaluator{
|
||||
@ -1371,7 +1371,7 @@ func TestSelectBestCandidate(t *testing.T) {
|
||||
|
||||
state := framework.NewCycleState()
|
||||
// Some tests rely on PreFilter plugin to compute its CycleState.
|
||||
if _, status := fwk.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() {
|
||||
if _, status, _ := fwk.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() {
|
||||
t.Errorf("Unexpected PreFilter Status: %v", status)
|
||||
}
|
||||
nodeInfos, err := snapshot.NodeInfos().List()
|
||||
@ -1784,7 +1784,7 @@ func TestPreempt(t *testing.T) {
|
||||
|
||||
state := framework.NewCycleState()
|
||||
// Some tests rely on PreFilter plugin to compute its CycleState.
|
||||
if _, s := fwk.RunPreFilterPlugins(ctx, state, test.pod); !s.IsSuccess() {
|
||||
if _, s, _ := fwk.RunPreFilterPlugins(ctx, state, test.pod); !s.IsSuccess() {
|
||||
t.Errorf("Unexpected preFilterStatus: %v", s)
|
||||
}
|
||||
// Call preempt and check the expected results.
|
||||
|
@ -456,7 +456,7 @@ func TestSelectCandidate(t *testing.T) {
|
||||
|
||||
state := framework.NewCycleState()
|
||||
// Some tests rely on PreFilter plugin to compute its CycleState.
|
||||
if _, status := fwk.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() {
|
||||
if _, status, _ := fwk.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() {
|
||||
t.Errorf("Unexpected PreFilter Status: %v", status)
|
||||
}
|
||||
nodeInfos, err := snapshot.NodeInfos().List()
|
||||
|
@ -695,7 +695,7 @@ func (f *frameworkImpl) QueueSortFunc() framework.LessFunc {
|
||||
// When it returns Skip status, returned PreFilterResult and other fields in status are just ignored,
|
||||
// and coupled Filter plugin/PreFilterExtensions() will be skipped in this scheduling cycle.
|
||||
// If a non-success status is returned, then the scheduling cycle is aborted.
|
||||
func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (_ *framework.PreFilterResult, status *framework.Status) {
|
||||
func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (_ *framework.PreFilterResult, status *framework.Status, _ sets.Set[string]) {
|
||||
startTime := time.Now()
|
||||
skipPlugins := sets.New[string]()
|
||||
defer func() {
|
||||
@ -703,7 +703,7 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor
|
||||
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
|
||||
}()
|
||||
var result *framework.PreFilterResult
|
||||
var pluginsWithNodes []string
|
||||
pluginsWithNodes := sets.New[string]()
|
||||
logger := klog.FromContext(ctx)
|
||||
verboseLogs := logger.V(4).Enabled()
|
||||
if verboseLogs {
|
||||
@ -726,7 +726,7 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor
|
||||
if s.Code() == framework.UnschedulableAndUnresolvable {
|
||||
// In this case, the preemption shouldn't happen in this scheduling cycle.
|
||||
// So, no need to execute all PreFilter.
|
||||
return nil, s
|
||||
return nil, s, nil
|
||||
}
|
||||
if s.Code() == framework.Unschedulable {
|
||||
// In this case, the preemption should happen later in this scheduling cycle.
|
||||
@ -735,23 +735,23 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor
|
||||
returnStatus = s
|
||||
continue
|
||||
}
|
||||
return nil, framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", pl.Name(), s.AsError())).WithPlugin(pl.Name())
|
||||
return nil, framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", pl.Name(), s.AsError())).WithPlugin(pl.Name()), nil
|
||||
}
|
||||
if !r.AllNodes() {
|
||||
pluginsWithNodes = append(pluginsWithNodes, pl.Name())
|
||||
pluginsWithNodes.Insert(pl.Name())
|
||||
}
|
||||
result = result.Merge(r)
|
||||
if !result.AllNodes() && len(result.NodeNames) == 0 {
|
||||
msg := fmt.Sprintf("node(s) didn't satisfy plugin(s) %v simultaneously", pluginsWithNodes)
|
||||
msg := fmt.Sprintf("node(s) didn't satisfy plugin(s) %v simultaneously", sets.List(pluginsWithNodes))
|
||||
if len(pluginsWithNodes) == 1 {
|
||||
msg = fmt.Sprintf("node(s) didn't satisfy plugin %v", pluginsWithNodes[0])
|
||||
msg = fmt.Sprintf("node(s) didn't satisfy plugin %v", sets.List(pluginsWithNodes)[0])
|
||||
}
|
||||
|
||||
// When PreFilterResult filters out Nodes, the framework considers Nodes that are filtered out as getting "UnschedulableAndUnresolvable".
|
||||
return result, framework.NewStatus(framework.UnschedulableAndUnresolvable, msg)
|
||||
return result, framework.NewStatus(framework.UnschedulableAndUnresolvable, msg), pluginsWithNodes
|
||||
}
|
||||
}
|
||||
return result, returnStatus
|
||||
return result, returnStatus, pluginsWithNodes
|
||||
}
|
||||
|
||||
func (f *frameworkImpl) runPreFilterPlugin(ctx context.Context, pl framework.PreFilterPlugin, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
|
||||
|
@ -1531,7 +1531,6 @@ func TestPreFilterPlugins(t *testing.T) {
|
||||
t.Fatalf("Failed to create framework for testing: %v", err)
|
||||
}
|
||||
state := framework.NewCycleState()
|
||||
|
||||
f.RunPreFilterPlugins(ctx, state, nil)
|
||||
f.RunPreFilterExtensionAddPod(ctx, state, nil, nil, nil)
|
||||
f.RunPreFilterExtensionRemovePod(ctx, state, nil, nil, nil)
|
||||
@ -1721,7 +1720,7 @@ func TestRunPreFilterPlugins(t *testing.T) {
|
||||
}
|
||||
|
||||
state := framework.NewCycleState()
|
||||
result, status := f.RunPreFilterPlugins(ctx, state, nil)
|
||||
result, status, _ := f.RunPreFilterPlugins(ctx, state, nil)
|
||||
if d := cmp.Diff(result, tt.wantPreFilterResult); d != "" {
|
||||
t.Errorf("wrong status. got: %v, want: %v, diff: %s", result, tt.wantPreFilterResult, d)
|
||||
}
|
||||
|
@ -461,7 +461,8 @@ func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.F
|
||||
return nil, diagnosis, err
|
||||
}
|
||||
// Run "prefilter" plugins.
|
||||
preRes, s := fwk.RunPreFilterPlugins(ctx, state, pod)
|
||||
preRes, s, unscheduledPlugins := fwk.RunPreFilterPlugins(ctx, state, pod)
|
||||
diagnosis.UnschedulablePlugins = unscheduledPlugins
|
||||
if !s.IsSuccess() {
|
||||
if !s.IsRejected() {
|
||||
return nil, diagnosis, s.AsError()
|
||||
|
@ -2283,7 +2283,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
|
||||
"node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
|
||||
"node3": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
|
||||
},
|
||||
UnschedulablePlugins: sets.Set[string]{},
|
||||
UnschedulablePlugins: sets.New("FakePreFilter2", "FakePreFilter3"),
|
||||
PreFilterMsg: "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously",
|
||||
},
|
||||
},
|
||||
@ -2311,7 +2311,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
|
||||
NodeToStatusMap: framework.NodeToStatusMap{
|
||||
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin FakePreFilter2"),
|
||||
},
|
||||
UnschedulablePlugins: sets.Set[string]{},
|
||||
UnschedulablePlugins: sets.New("FakePreFilter2"),
|
||||
PreFilterMsg: "node(s) didn't satisfy plugin FakePreFilter2",
|
||||
},
|
||||
},
|
||||
@ -2339,7 +2339,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
|
||||
NodeToStatusMap: framework.NodeToStatusMap{
|
||||
"node2": framework.NewStatus(framework.Unschedulable, "injecting failure for pod test-prefilter").WithPlugin("FakeFilter"),
|
||||
},
|
||||
UnschedulablePlugins: sets.New("FakeFilter"),
|
||||
UnschedulablePlugins: sets.New("FakePreFilter", "FakeFilter"),
|
||||
PreFilterMsg: "",
|
||||
},
|
||||
},
|
||||
@ -2445,6 +2445,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
|
||||
NumAllNodes: 2,
|
||||
Diagnosis: framework.Diagnosis{
|
||||
NodeToStatusMap: framework.NodeToStatusMap{},
|
||||
UnschedulablePlugins: sets.New("FakePreFilter"),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -188,7 +188,7 @@ func TestCoreResourceEnqueue(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
// initialNode is the Node to be created at first.
|
||||
initialNode *v1.Node
|
||||
initialNodes []*v1.Node
|
||||
// initialPod is the Pod to be created at first if it's not empty.
|
||||
initialPod *v1.Pod
|
||||
// pods are the list of Pods to be created.
|
||||
@ -203,7 +203,7 @@ func TestCoreResourceEnqueue(t *testing.T) {
|
||||
}{
|
||||
{
|
||||
name: "Pod without a required toleration to a node isn't requeued to activeQ",
|
||||
initialNode: st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Taints([]v1.Taint{{Key: v1.TaintNodeNotReady, Effect: v1.TaintEffectNoSchedule}}).Obj(),
|
||||
initialNodes: []*v1.Node{st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Taints([]v1.Taint{{Key: v1.TaintNodeNotReady, Effect: v1.TaintEffectNoSchedule}}).Obj()},
|
||||
pods: []*v1.Pod{
|
||||
// - Pod1 doesn't have the required toleration and will be rejected by the TaintToleration plugin.
|
||||
// (TaintToleration plugin is evaluated before NodeResourcesFit plugin.)
|
||||
@ -225,7 +225,7 @@ func TestCoreResourceEnqueue(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "Pod rejected by the PodAffinity plugin is requeued when a new Node is created and turned to ready",
|
||||
initialNode: st.MakeNode().Name("fake-node").Label("node", "fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(),
|
||||
initialNodes: []*v1.Node{st.MakeNode().Name("fake-node").Label("node", "fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj()},
|
||||
initialPod: st.MakePod().Label("anti", "anti").Name("pod1").PodAntiAffinityExists("anti", "node", st.PodAntiAffinityWithRequiredReq).Container("image").Node("fake-node").Obj(),
|
||||
pods: []*v1.Pod{
|
||||
// - Pod2 will be rejected by the PodAffinity plugin.
|
||||
@ -255,7 +255,7 @@ func TestCoreResourceEnqueue(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "Pod updated with toleration requeued to activeQ",
|
||||
initialNode: st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Taints([]v1.Taint{{Key: "taint-key", Effect: v1.TaintEffectNoSchedule}}).Obj(),
|
||||
initialNodes: []*v1.Node{st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Taints([]v1.Taint{{Key: "taint-key", Effect: v1.TaintEffectNoSchedule}}).Obj()},
|
||||
pods: []*v1.Pod{
|
||||
// - Pod1 doesn't have the required toleration and will be rejected by the TaintToleration plugin.
|
||||
st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Container("image").Obj(),
|
||||
@ -273,7 +273,7 @@ func TestCoreResourceEnqueue(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "Pod got resource scaled down requeued to activeQ",
|
||||
initialNode: st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(),
|
||||
initialNodes: []*v1.Node{st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj()},
|
||||
pods: []*v1.Pod{
|
||||
// - Pod1 requests a large amount of CPU and will be rejected by the NodeResourcesFit plugin.
|
||||
st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Obj(),
|
||||
@ -291,7 +291,7 @@ func TestCoreResourceEnqueue(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "Updating pod condition doesn't retry scheduling if the Pod was rejected by TaintToleration",
|
||||
initialNode: st.MakeNode().Name("fake-node").Taints([]v1.Taint{{Key: v1.TaintNodeNotReady, Effect: v1.TaintEffectNoSchedule}}).Obj(),
|
||||
initialNodes: []*v1.Node{st.MakeNode().Name("fake-node").Taints([]v1.Taint{{Key: v1.TaintNodeNotReady, Effect: v1.TaintEffectNoSchedule}}).Obj()},
|
||||
pods: []*v1.Pod{
|
||||
// - Pod1 doesn't have the required toleration and will be rejected by the TaintToleration plugin.
|
||||
st.MakePod().Name("pod1").Container("image").Obj(),
|
||||
@ -327,6 +327,41 @@ func TestCoreResourceEnqueue(t *testing.T) {
|
||||
// because QHint of TaintToleration would decide to ignore a Pod update.
|
||||
enableSchedulingQueueHint: []bool{true},
|
||||
},
|
||||
{
|
||||
// The test case makes sure that PreFilter plugins returning PreFilterResult are also inserted into pInfo.UnschedulablePlugins
|
||||
// meaning, they're taken into consideration during requeuing flow of the queue.
|
||||
// https://github.com/kubernetes/kubernetes/issues/122018
|
||||
name: "Pod rejected by the PreFilter of NodeAffinity plugin and Filter of NodeResourcesFit is requeued based on both plugins",
|
||||
initialNodes: []*v1.Node{
|
||||
st.MakeNode().Name("fake-node").Label("node", "fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(),
|
||||
st.MakeNode().Name("fake-node2").Label("node", "fake-node2").Label("zone", "zone1").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(),
|
||||
},
|
||||
pods: []*v1.Pod{
|
||||
// - Pod1 will be rejected by the NodeAffinity plugin and NodeResourcesFit plugin.
|
||||
st.MakePod().Label("unscheduled", "plugins").Name("pod1").NodeAffinityIn("metadata.name", []string{"fake-node"}).Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj(),
|
||||
},
|
||||
triggerFn: func(testCtx *testutils.TestContext) error {
|
||||
// Trigger a NodeDeleted event.
|
||||
// It will not requeue pod1 to activeQ,
|
||||
// because both NodeAffinity and NodeResourceFit don't register Node/delete event.
|
||||
err := testCtx.ClientSet.CoreV1().Nodes().Delete(testCtx.Ctx, "fake-node", metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete fake-node node")
|
||||
}
|
||||
|
||||
// Trigger a NodeCreated event.
|
||||
// It will requeue pod1 to activeQ, because QHint of NodeAffinity return Queue.
|
||||
// It makes sure PreFilter plugins returned PreFilterResult takes an effect for sure,
|
||||
// because NodeResourceFit QHint returns QueueSkip for this event actually.
|
||||
node := st.MakeNode().Name("fake-node").Label("node", "fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj()
|
||||
if _, err := testCtx.ClientSet.CoreV1().Nodes().Create(testCtx.Ctx, node, metav1.CreateOptions{}); err != nil {
|
||||
return fmt.Errorf("failed to create a new node: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
wantRequeuedPods: sets.New("pod1"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
@ -350,9 +385,11 @@ func TestCoreResourceEnqueue(t *testing.T) {
|
||||
defer testCtx.Scheduler.SchedulingQueue.Close()
|
||||
|
||||
cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx
|
||||
// Create initialNode.
|
||||
if _, err := cs.CoreV1().Nodes().Create(ctx, tt.initialNode, metav1.CreateOptions{}); err != nil {
|
||||
t.Fatalf("Failed to create an initial Node %q: %v", tt.initialNode.Name, err)
|
||||
// Create one Node with a taint.
|
||||
for _, node := range tt.initialNodes {
|
||||
if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil {
|
||||
t.Fatalf("Failed to create an initial Node %q: %v", node.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
if tt.initialPod != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user