From 83dce763ecee9f603011755bbcad5a9a5ebc9a67 Mon Sep 17 00:00:00 2001 From: Alex Wang Date: Tue, 5 Nov 2019 22:04:44 +0800 Subject: [PATCH] add event to remove and reject waitingpods --- pkg/scheduler/eventhandlers.go | 1 + pkg/scheduler/framework/v1alpha1/BUILD | 1 + pkg/scheduler/framework/v1alpha1/framework.go | 8 +++ .../framework/v1alpha1/framework_test.go | 52 +++++++++++++++++++ pkg/scheduler/framework/v1alpha1/interface.go | 3 ++ .../internal/queue/scheduling_queue_test.go | 3 ++ 6 files changed, 68 insertions(+) diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index 345002392ed..b5e4acdde19 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -204,6 +204,7 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) { // Volume binder only wants to keep unassigned pods sched.VolumeBinder.DeletePodBindings(pod) } + sched.Framework.RejectWaitingPod(pod.UID) } func (sched *Scheduler) addPodToCache(obj interface{}) { diff --git a/pkg/scheduler/framework/v1alpha1/BUILD b/pkg/scheduler/framework/v1alpha1/BUILD index 583ea46a676..6d73ee63561 100644 --- a/pkg/scheduler/framework/v1alpha1/BUILD +++ b/pkg/scheduler/framework/v1alpha1/BUILD @@ -62,6 +62,7 @@ go_test( "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", "//vendor/github.com/prometheus/client_model/go:go_default_library", ], diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index 425be9b9750..97493a5b6f2 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -627,6 +627,14 @@ func (f *framework) GetWaitingPod(uid types.UID) WaitingPod { return f.waitingPods.get(uid) } +// RejectWaitingPod rejects a WaitingPod given its UID. +func (f *framework) RejectWaitingPod(uid types.UID) { + waitingPod := f.waitingPods.get(uid) + if waitingPod != nil { + waitingPod.Reject("removed") + } +} + // HasFilterPlugins returns true if at least one filter plugin is defined. func (f *framework) HasFilterPlugins() bool { return len(f.filterPlugins) > 0 diff --git a/pkg/scheduler/framework/v1alpha1/framework_test.go b/pkg/scheduler/framework/v1alpha1/framework_test.go index 7ff56a931f7..f4cca310733 100644 --- a/pkg/scheduler/framework/v1alpha1/framework_test.go +++ b/pkg/scheduler/framework/v1alpha1/framework_test.go @@ -29,6 +29,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/metrics" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" @@ -43,6 +44,7 @@ const ( preFilterWithExtensionsPluginName = "prefilter-with-extensions-plugin" duplicatePluginName = "duplicate-plugin" testPlugin = "test-plugin" + permitPlugin = "permit-plugin" ) // TestScoreWithNormalizePlugin implements ScoreWithNormalizePlugin interface. @@ -249,6 +251,18 @@ func newDuplicatePlugin(_ *runtime.Unknown, _ FrameworkHandle) (Plugin, error) { return &TestDuplicatePlugin{}, nil } +// TestPermitPlugin only implements PermitPlugin interface. +type TestPermitPlugin struct { + PreFilterCalled int +} + +func (pp *TestPermitPlugin) Name() string { + return permitPlugin +} +func (pp *TestPermitPlugin) Permit(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) (*Status, time.Duration) { + return NewStatus(Wait, ""), time.Duration(10 * time.Second) +} + var registry Registry = func() Registry { r := make(Registry) r.Register(scoreWithNormalizePlugin1, newScoreWithNormalizePlugin1) @@ -773,6 +787,44 @@ func TestPermitWaitingMetric(t *testing.T) { } } +func TestRejectWaitingPod(t *testing.T) { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod", + UID: types.UID("pod"), + }, + } + + testPermitPlugin := &TestPermitPlugin{} + r := make(Registry) + r.Register(permitPlugin, + func(_ *runtime.Unknown, fh FrameworkHandle) (Plugin, error) { + return testPermitPlugin, nil + }) + plugins := &config.Plugins{ + Permit: &config.PluginSet{Enabled: []config.Plugin{{Name: permitPlugin, Weight: 1}}}, + } + + f, err := NewFramework(r, plugins, emptyArgs) + if err != nil { + t.Fatalf("Failed to create framework for testing: %v", err) + } + + go func() { + for { + waitingPod := f.GetWaitingPod(pod.UID) + if waitingPod != nil { + break + } + } + f.RejectWaitingPod(pod.UID) + }() + permitStatus := f.RunPermitPlugins(context.Background(), nil, pod, "") + if permitStatus.message != "pod \"pod\" rejected while waiting at permit: removed" { + t.Fatalf("RejectWaitingPod failed, permitStatus: %v", permitStatus) + } +} + func buildScoreConfigDefaultWeights(ps ...string) *config.Plugins { return buildScoreConfigWithWeights(defaultWeights, ps...) } diff --git a/pkg/scheduler/framework/v1alpha1/interface.go b/pkg/scheduler/framework/v1alpha1/interface.go index 1bad76fcbe5..dacaf04e453 100644 --- a/pkg/scheduler/framework/v1alpha1/interface.go +++ b/pkg/scheduler/framework/v1alpha1/interface.go @@ -471,6 +471,9 @@ type FrameworkHandle interface { // GetWaitingPod returns a waiting pod given its UID. GetWaitingPod(uid types.UID) WaitingPod + // RejectWaitingPod rejects a waiting pod given its UID. + RejectWaitingPod(uid types.UID) + // ClientSet returns a kubernetes clientSet. ClientSet() clientset.Interface diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 54e94e45690..f5dd01604a9 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -243,6 +243,9 @@ func (*fakeFramework) GetWaitingPod(uid types.UID) framework.WaitingPod { return nil } +func (*fakeFramework) RejectWaitingPod(uid types.UID) { +} + func (*fakeFramework) ClientSet() clientset.Interface { return nil }