diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index 8bab08576e0..aa22f0f0b6f 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -542,10 +542,10 @@ func (f *framework) RunPermitPlugins( ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { startTime := time.Now() defer func() { recordExtensionPointDuration(startTime, permit, status) }() - timeout := maxTimeout + pluginsWaitTime := make(map[string]time.Duration) statusCode := Success for _, pl := range f.permitPlugins { - status, d := pl.Permit(ctx, state, pod, nodeName) + status, timeout := pl.Permit(ctx, state, pod, nodeName) if !status.IsSuccess() { if status.IsUnschedulable() { msg := fmt.Sprintf("rejected by %q at permit: %v", pl.Name(), status.Message()) @@ -553,10 +553,11 @@ func (f *framework) RunPermitPlugins( return NewStatus(status.Code(), msg) } if status.Code() == Wait { - // Use the minimum timeout duration. - if timeout > d { - timeout = d + // Not allowed to be greater than maxTimeout. + if timeout > maxTimeout { + timeout = maxTimeout } + pluginsWaitTime[pl.Name()] = timeout statusCode = Wait } else { msg := fmt.Sprintf("error while running %q permit plugin for pod %q: %v", pl.Name(), pod.Name, status.Message()) @@ -569,27 +570,20 @@ func (f *framework) RunPermitPlugins( // We now wait for the minimum duration if at least one plugin asked to // wait (and no plugin rejected the pod) if statusCode == Wait { - w := newWaitingPod(pod) + w := newWaitingPod(pod, pluginsWaitTime) f.waitingPods.add(w) defer f.waitingPods.remove(pod.UID) - timer := time.NewTimer(timeout) - klog.V(4).Infof("waiting for %v for pod %q at permit", timeout, pod.Name) - select { - case <-timer.C: - msg := fmt.Sprintf("pod %q rejected due to timeout after waiting %v at permit", pod.Name, timeout) - klog.V(4).Infof(msg) - return NewStatus(Unschedulable, msg) - case s := <-w.s: - if !s.IsSuccess() { - if s.IsUnschedulable() { - msg := fmt.Sprintf("rejected while waiting at permit: %v", s.Message()) - klog.V(4).Infof(msg) - return NewStatus(s.Code(), msg) - } - msg := fmt.Sprintf("error received while waiting at permit for pod %q: %v", pod.Name, s.Message()) - klog.Error(msg) - return NewStatus(Error, msg) + klog.V(4).Infof("waiting for pod %q at permit", pod.Name) + s := <-w.s + if !s.IsSuccess() { + if s.IsUnschedulable() { + msg := fmt.Sprintf("pod %q rejected while waiting at permit: %v", pod.Name, s.Message()) + klog.V(4).Infof(msg) + return NewStatus(s.Code(), msg) } + msg := fmt.Sprintf("error received while waiting at permit for pod %q: %v", pod.Name, s.Message()) + klog.Error(msg) + return NewStatus(Error, msg) } } diff --git a/pkg/scheduler/framework/v1alpha1/interface.go b/pkg/scheduler/framework/v1alpha1/interface.go index d198afd742a..68bb8ae754b 100644 --- a/pkg/scheduler/framework/v1alpha1/interface.go +++ b/pkg/scheduler/framework/v1alpha1/interface.go @@ -150,10 +150,14 @@ func NewStatus(code Code, msg string) *Status { type WaitingPod interface { // GetPod returns a reference to the waiting pod. GetPod() *v1.Pod - // Allow the waiting pod to be scheduled. Returns true if the allow signal was - // successfully delivered, false otherwise. - Allow() bool - // Reject declares the waiting pod unschedulable. Returns true if the allow signal + // GetPendingPlugins returns a list of pending permit plugin's name. + GetPendingPlugins() []string + // Allow declares the waiting pod is allowed to be scheduled by plugin pluginName. + // If this is the last remaining plugin to allow, then a success signal is delivered + // to unblock the pod. + // Returns true if the allow signal was successfully dealt with, false otherwise. + Allow(pluginName string) bool + // Reject declares the waiting pod unschedulable. Returns true if the reject signal // was successfully delivered, false otherwise. Reject(msg string) bool } diff --git a/pkg/scheduler/framework/v1alpha1/waiting_pods_map.go b/pkg/scheduler/framework/v1alpha1/waiting_pods_map.go index 842eff5e538..ff727f92482 100644 --- a/pkg/scheduler/framework/v1alpha1/waiting_pods_map.go +++ b/pkg/scheduler/framework/v1alpha1/waiting_pods_map.go @@ -17,7 +17,9 @@ limitations under the License. package v1alpha1 import ( + "fmt" "sync" + "time" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" @@ -69,16 +71,30 @@ func (m *waitingPodsMap) iterate(callback func(WaitingPod)) { // waitingPod represents a pod waiting in the permit phase. type waitingPod struct { - pod *v1.Pod - s chan *Status + pod *v1.Pod + pendingPlugins map[string]*time.Timer + s chan *Status + mu sync.RWMutex } // newWaitingPod returns a new waitingPod instance. -func newWaitingPod(pod *v1.Pod) *waitingPod { - return &waitingPod{ +func newWaitingPod(pod *v1.Pod, pluginsMaxWaitTime map[string]time.Duration) *waitingPod { + wp := &waitingPod{ pod: pod, s: make(chan *Status), } + + wp.pendingPlugins = make(map[string]*time.Timer, len(pluginsMaxWaitTime)) + for k, v := range pluginsMaxWaitTime { + plugin, waitTime := k, v + wp.pendingPlugins[plugin] = time.AfterFunc(waitTime, func() { + msg := fmt.Sprintf("rejected due to timeout after waiting %v at plugin %v", + waitTime, plugin) + wp.Reject(msg) + }) + } + + return wp } // GetPod returns a reference to the waiting pod. @@ -86,9 +102,35 @@ func (w *waitingPod) GetPod() *v1.Pod { return w.pod } -// Allow the waiting pod to be scheduled. Returns true if the allow signal was -// successfully delivered, false otherwise. -func (w *waitingPod) Allow() bool { +// GetPendingPlugins returns a list of pending permit plugin's name. +func (w *waitingPod) GetPendingPlugins() []string { + w.mu.RLock() + defer w.mu.RUnlock() + plugins := make([]string, 0, len(w.pendingPlugins)) + for p := range w.pendingPlugins { + plugins = append(plugins, p) + } + + return plugins +} + +// Allow declares the waiting pod is allowed to be scheduled by plugin pluginName. +// If this is the last remaining plugin to allow, then a success signal is delivered +// to unblock the pod. +// Returns true if the allow signal was successfully dealt with, false otherwise. +func (w *waitingPod) Allow(pluginName string) bool { + w.mu.Lock() + defer w.mu.Unlock() + if timer, exist := w.pendingPlugins[pluginName]; exist { + timer.Stop() + delete(w.pendingPlugins, pluginName) + } + + // Only signal success status after all plugins have allowed + if len(w.pendingPlugins) != 0 { + return true + } + select { case w.s <- NewStatus(Success, ""): return true @@ -97,9 +139,15 @@ func (w *waitingPod) Allow() bool { } } -// Reject declares the waiting pod unschedulable. Returns true if the allow signal +// Reject declares the waiting pod unschedulable. Returns true if the reject signal // was successfully delivered, false otherwise. func (w *waitingPod) Reject(msg string) bool { + w.mu.RLock() + defer w.mu.RUnlock() + for _, timer := range w.pendingPlugins { + timer.Stop() + } + select { case w.s <- NewStatus(Unschedulable, msg): return true diff --git a/test/integration/scheduler/framework_test.go b/test/integration/scheduler/framework_test.go index 86e090e95d5..5d28870a8de 100644 --- a/test/integration/scheduler/framework_test.go +++ b/test/integration/scheduler/framework_test.go @@ -93,6 +93,7 @@ type UnreservePlugin struct { } type PermitPlugin struct { + name string numPermitCalled int failPermit bool rejectPermit bool @@ -381,7 +382,7 @@ func (up *UnreservePlugin) reset() { // Name returns name of the plugin. func (pp *PermitPlugin) Name() string { - return permitPluginName + return pp.name } // Permit implements the permit test plugin. @@ -416,13 +417,18 @@ func (pp *PermitPlugin) Permit(ctx context.Context, state *framework.CycleState, return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)), 0 } if pp.waitAndAllowPermit { - pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Allow() }) + pp.allowAllPods() return nil, 0 } } return nil, 0 } +// allowAllPods allows all waiting pods. +func (pp *PermitPlugin) allowAllPods() { + pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Allow(pp.name) }) +} + // reset used to reset permit plugin. func (pp *PermitPlugin) reset() { pp.numPermitCalled = 0 @@ -684,17 +690,10 @@ func TestPrebindPlugin(t *testing.T) { }, }, } - // Set reserve prebind config for testing - preBindPluginConfig := []schedulerconfig.PluginConfig{ - { - Name: preBindPluginName, - Args: runtime.Unknown{}, - }, - } + // Create the master and the scheduler with the test plugin set. context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "prebind-plugin", nil), 2, scheduler.WithFrameworkPlugins(plugins), - scheduler.WithFrameworkPluginConfig(preBindPluginConfig), scheduler.WithFrameworkDefaultRegistry(registry)) defer cleanupTest(t, context) @@ -774,22 +773,10 @@ func TestUnreservePlugin(t *testing.T) { }, }, } - // Set unreserve and prebind plugin config for testing - pluginConfig := []schedulerconfig.PluginConfig{ - { - Name: unreservePluginName, - Args: runtime.Unknown{}, - }, - { - Name: preBindPluginName, - Args: runtime.Unknown{}, - }, - } // Create the master and the scheduler with the test plugin set. context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "unreserve-plugin", nil), 2, scheduler.WithFrameworkPlugins(plugins), - scheduler.WithFrameworkPluginConfig(pluginConfig), scheduler.WithFrameworkDefaultRegistry(registry)) defer cleanupTest(t, context) @@ -876,30 +863,10 @@ func TestBindPlugin(t *testing.T) { Enabled: []schedulerconfig.Plugin{{Name: postBindPlugin.Name()}}, }, } - // Set reserve and bind config for testing - pluginConfig := []schedulerconfig.PluginConfig{ - { - Name: unreservePlugin.Name(), - Args: runtime.Unknown{}, - }, - { - Name: bindPlugin1.Name(), - Args: runtime.Unknown{}, - }, - { - Name: bindPlugin2.Name(), - Args: runtime.Unknown{}, - }, - { - Name: postBindPlugin.Name(), - Args: runtime.Unknown{}, - }, - } // Create the master and the scheduler with the test plugin set. context := initTestSchedulerWithOptions(t, testContext, false, nil, time.Second, scheduler.WithFrameworkPlugins(plugins), - scheduler.WithFrameworkPluginConfig(pluginConfig), scheduler.WithFrameworkDefaultRegistry(registry), scheduler.WithFrameworkConfigProducerRegistry(nil)) defer cleanupTest(t, context) @@ -1057,22 +1024,10 @@ func TestPostBindPlugin(t *testing.T) { }, }, } - // Set reserve prebind and postbind config for testing - pluginConfig := []schedulerconfig.PluginConfig{ - { - Name: preBindPluginName, - Args: runtime.Unknown{}, - }, - { - Name: postBindPluginName, - Args: runtime.Unknown{}, - }, - } // Create the master and the scheduler with the test plugin set. context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "postbind-plugin", nil), 2, scheduler.WithFrameworkPlugins(plugins), - scheduler.WithFrameworkPluginConfig(pluginConfig), scheduler.WithFrameworkDefaultRegistry(registry)) defer cleanupTest(t, context) @@ -1123,7 +1078,7 @@ func TestPostBindPlugin(t *testing.T) { // TestPermitPlugin tests invocation of permit plugins. func TestPermitPlugin(t *testing.T) { // Create a plugin registry for testing. Register only a permit plugin. - perPlugin := &PermitPlugin{} + perPlugin := &PermitPlugin{name: permitPluginName} registry := framework.Registry{permitPluginName: newPermitPlugin(perPlugin)} // Setup initial permit plugin for testing. @@ -1136,18 +1091,10 @@ func TestPermitPlugin(t *testing.T) { }, }, } - // Set permit plugin config for testing - pluginConfig := []schedulerconfig.PluginConfig{ - { - Name: permitPluginName, - Args: runtime.Unknown{}, - }, - } // Create the master and the scheduler with the test plugin set. context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugin", nil), 2, scheduler.WithFrameworkPlugins(plugins), - scheduler.WithFrameworkPluginConfig(pluginConfig), scheduler.WithFrameworkDefaultRegistry(registry)) defer cleanupTest(t, context) @@ -1226,10 +1173,82 @@ func TestPermitPlugin(t *testing.T) { } } +// TestMultiplePermitPlugins tests multiple permit plugins returning wait for a same pod. +func TestMultiplePermitPlugins(t *testing.T) { + // Create a plugin registry for testing. + perPlugin1 := &PermitPlugin{name: "permit-plugin-1"} + perPlugin2 := &PermitPlugin{name: "permit-plugin-2"} + registry := framework.Registry{ + perPlugin1.Name(): newPermitPlugin(perPlugin1), + perPlugin2.Name(): newPermitPlugin(perPlugin2), + } + + // Setup initial permit plugins for testing. + plugins := &schedulerconfig.Plugins{ + Permit: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + { + Name: perPlugin1.Name(), + }, + { + Name: perPlugin2.Name(), + }, + }, + }, + } + + // Create the master and the scheduler with the test plugin set. + context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "multi-permit-plugin", nil), 2, + scheduler.WithFrameworkPlugins(plugins), + scheduler.WithFrameworkDefaultRegistry(registry)) + defer cleanupTest(t, context) + + // Both permit plugins will return Wait for permitting + perPlugin1.timeoutPermit = true + perPlugin2.timeoutPermit = true + + // Create a test pod. + podName := "test-pod" + pod, err := createPausePod(context.clientSet, + initPausePod(context.clientSet, &pausePodConfig{Name: podName, Namespace: context.ns.Name})) + if err != nil { + t.Errorf("Error while creating a test pod: %v", err) + } + + var waitingPod framework.WaitingPod + // Wait until the test pod is actually waiting. + wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) { + waitingPod = perPlugin1.fh.GetWaitingPod(pod.UID) + return waitingPod != nil, nil + }) + + // Check the number of pending permits + if l := len(waitingPod.GetPendingPlugins()); l != 2 { + t.Errorf("Expected the number of pending plugins is 2, but got %d", l) + } + + perPlugin1.allowAllPods() + // Check the number of pending permits + if l := len(waitingPod.GetPendingPlugins()); l != 1 { + t.Errorf("Expected the number of pending plugins is 1, but got %d", l) + } + + perPlugin2.allowAllPods() + if err = waitForPodToSchedule(context.clientSet, pod); err != nil { + t.Errorf("Expected the pod to be scheduled. error: %v", err) + } + + if perPlugin1.numPermitCalled == 0 || perPlugin2.numPermitCalled == 0 { + t.Errorf("Expected the permit plugin to be called.") + } + + cleanupPods(context.clientSet, t, []*v1.Pod{pod}) +} + // TestCoSchedulingWithPermitPlugin tests invocation of permit plugins. func TestCoSchedulingWithPermitPlugin(t *testing.T) { // Create a plugin registry for testing. Register only a permit plugin. - permitPlugin := &PermitPlugin{} + permitPlugin := &PermitPlugin{name: permitPluginName} registry := framework.Registry{permitPluginName: newPermitPlugin(permitPlugin)} // Setup initial permit plugin for testing. @@ -1242,18 +1261,10 @@ func TestCoSchedulingWithPermitPlugin(t *testing.T) { }, }, } - // Set permit plugin config for testing - pluginConfig := []schedulerconfig.PluginConfig{ - { - Name: permitPluginName, - Args: runtime.Unknown{}, - }, - } // Create the master and the scheduler with the test plugin set. context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugin", nil), 2, scheduler.WithFrameworkPlugins(plugins), - scheduler.WithFrameworkPluginConfig(pluginConfig), scheduler.WithFrameworkDefaultRegistry(registry)) defer cleanupTest(t, context) @@ -1433,18 +1444,10 @@ func TestPreemptWithPermitPlugin(t *testing.T) { }, }, } - // Set permit plugin config for testing - pluginConfig := []schedulerconfig.PluginConfig{ - { - Name: permitPluginName, - Args: runtime.Unknown{}, - }, - } // Create the master and the scheduler with the test plugin set. context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "preempt-with-permit-plugin", nil), 0, scheduler.WithFrameworkPlugins(plugins), - scheduler.WithFrameworkPluginConfig(pluginConfig), scheduler.WithFrameworkDefaultRegistry(registry)) defer cleanupTest(t, context)