mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-13 22:05:59 +00:00
Merge pull request #119769 from Huang-Wei/bug/prefilter-preemption
Fix a bug that PostFilter plugin may don't function if previous PreFilter plugins return Skip
This commit is contained in:
commit
57212647e9
@ -632,12 +632,13 @@ func (f *frameworkImpl) QueueSortFunc() framework.LessFunc {
|
|||||||
// If a non-success status is returned, then the scheduling cycle is aborted.
|
// If a non-success status is returned, then the scheduling cycle is aborted.
|
||||||
func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (_ *framework.PreFilterResult, status *framework.Status) {
|
func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (_ *framework.PreFilterResult, status *framework.Status) {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
|
skipPlugins := sets.New[string]()
|
||||||
defer func() {
|
defer func() {
|
||||||
|
state.SkipFilterPlugins = skipPlugins
|
||||||
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
|
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
|
||||||
}()
|
}()
|
||||||
var result *framework.PreFilterResult
|
var result *framework.PreFilterResult
|
||||||
var pluginsWithNodes []string
|
var pluginsWithNodes []string
|
||||||
skipPlugins := sets.New[string]()
|
|
||||||
logger := klog.FromContext(ctx)
|
logger := klog.FromContext(ctx)
|
||||||
logger = klog.LoggerWithName(logger, "PreFilter")
|
logger = klog.LoggerWithName(logger, "PreFilter")
|
||||||
// TODO(knelasevero): Remove duplicated keys from log entry calls
|
// TODO(knelasevero): Remove duplicated keys from log entry calls
|
||||||
@ -671,7 +672,6 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor
|
|||||||
return nil, framework.NewStatus(framework.Unschedulable, msg)
|
return nil, framework.NewStatus(framework.Unschedulable, msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
state.SkipFilterPlugins = skipPlugins
|
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1509,7 +1509,7 @@ func TestRunPreFilterPlugins(t *testing.T) {
|
|||||||
inj: injectedResult{PreFilterStatus: int(framework.Error)},
|
inj: injectedResult{PreFilterStatus: int(framework.Error)},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
wantPreFilterResult: nil,
|
wantSkippedPlugins: sets.New("skip"),
|
||||||
wantStatusCode: framework.Error,
|
wantStatusCode: framework.Error,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -99,9 +99,16 @@ func waitForNominatedNodeName(cs clientset.Interface, pod *v1.Pod) error {
|
|||||||
|
|
||||||
const tokenFilterName = "token-filter"
|
const tokenFilterName = "token-filter"
|
||||||
|
|
||||||
|
// tokenFilter is a fake plugin that implements PreFilter and Filter.
|
||||||
|
// `Token` simulates the allowed pods number a cluster can host.
|
||||||
|
// If `EnablePreFilter` is set to false or `Token` is positive, PreFilter passes; otherwise returns Unschedulable
|
||||||
|
// For each Filter() call, `Token` is decreased by one. When `Token` is positive, Filter passes; otherwise return
|
||||||
|
// Unschedulable or UnschedulableAndUnresolvable (when `Unresolvable` is set to true)
|
||||||
|
// AddPod()/RemovePod() adds/removes one token to the cluster to simulate the dryrun preemption
|
||||||
type tokenFilter struct {
|
type tokenFilter struct {
|
||||||
Tokens int
|
Tokens int
|
||||||
Unresolvable bool
|
Unresolvable bool
|
||||||
|
EnablePreFilter bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Name returns name of the plugin.
|
// Name returns name of the plugin.
|
||||||
@ -123,7 +130,10 @@ func (fp *tokenFilter) Filter(ctx context.Context, state *framework.CycleState,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (fp *tokenFilter) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
|
func (fp *tokenFilter) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
|
||||||
|
if !fp.EnablePreFilter || fp.Tokens > 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
}
|
||||||
|
return nil, framework.NewStatus(framework.Unschedulable)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fp *tokenFilter) AddPod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod,
|
func (fp *tokenFilter) AddPod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod,
|
||||||
@ -194,6 +204,7 @@ func TestPreemption(t *testing.T) {
|
|||||||
existingPods []*v1.Pod
|
existingPods []*v1.Pod
|
||||||
pod *v1.Pod
|
pod *v1.Pod
|
||||||
initTokens int
|
initTokens int
|
||||||
|
enablePreFilter bool
|
||||||
unresolvable bool
|
unresolvable bool
|
||||||
preemptedPodIndexes map[int]struct{}
|
preemptedPodIndexes map[int]struct{}
|
||||||
enablePodDisruptionConditions bool
|
enablePodDisruptionConditions bool
|
||||||
@ -274,6 +285,36 @@ func TestPreemption(t *testing.T) {
|
|||||||
}),
|
}),
|
||||||
preemptedPodIndexes: map[int]struct{}{0: {}},
|
preemptedPodIndexes: map[int]struct{}{0: {}},
|
||||||
},
|
},
|
||||||
|
// This is identical with previous subtest except for setting enablePreFilter to true.
|
||||||
|
// With this fake plugin returning Unschedulable in PreFilter, it's able to exercise the path
|
||||||
|
// that in-tree plugins return Skip in PreFilter and their AddPod/RemovePod functions are also
|
||||||
|
// skipped properly upon preemption.
|
||||||
|
{
|
||||||
|
name: "basic pod preemption with preFilter",
|
||||||
|
initTokens: 1,
|
||||||
|
enablePreFilter: true,
|
||||||
|
existingPods: []*v1.Pod{
|
||||||
|
initPausePod(&testutils.PausePodConfig{
|
||||||
|
Name: "victim-pod",
|
||||||
|
Namespace: testCtx.NS.Name,
|
||||||
|
Priority: &lowPriority,
|
||||||
|
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
|
||||||
|
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
|
||||||
|
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
pod: initPausePod(&testutils.PausePodConfig{
|
||||||
|
Name: "preemptor-pod",
|
||||||
|
Namespace: testCtx.NS.Name,
|
||||||
|
Priority: &highPriority,
|
||||||
|
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
|
||||||
|
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
|
||||||
|
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
preemptedPodIndexes: map[int]struct{}{0: {}},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
// same as the previous test, but the filter is unresolvable.
|
// same as the previous test, but the filter is unresolvable.
|
||||||
name: "basic pod preemption with unresolvable filter",
|
name: "basic pod preemption with unresolvable filter",
|
||||||
@ -445,6 +486,7 @@ func TestPreemption(t *testing.T) {
|
|||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, test.enablePodDisruptionConditions)()
|
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, test.enablePodDisruptionConditions)()
|
||||||
filter.Tokens = test.initTokens
|
filter.Tokens = test.initTokens
|
||||||
|
filter.EnablePreFilter = test.enablePreFilter
|
||||||
filter.Unresolvable = test.unresolvable
|
filter.Unresolvable = test.unresolvable
|
||||||
pods := make([]*v1.Pod, len(test.existingPods))
|
pods := make([]*v1.Pod, len(test.existingPods))
|
||||||
// Create and run existingPods.
|
// Create and run existingPods.
|
||||||
|
Loading…
Reference in New Issue
Block a user