mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 21:47:07 +00:00
Refactor scheduler's framework permit API
This commit is contained in:
parent
534051acec
commit
38b7668bb3
@ -542,10 +542,10 @@ func (f *framework) RunPermitPlugins(
|
|||||||
ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
|
ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
defer func() { recordExtensionPointDuration(startTime, permit, status) }()
|
defer func() { recordExtensionPointDuration(startTime, permit, status) }()
|
||||||
timeout := maxTimeout
|
pluginsWaitTime := make(map[string]time.Duration)
|
||||||
statusCode := Success
|
statusCode := Success
|
||||||
for _, pl := range f.permitPlugins {
|
for _, pl := range f.permitPlugins {
|
||||||
status, d := pl.Permit(ctx, state, pod, nodeName)
|
status, timeout := pl.Permit(ctx, state, pod, nodeName)
|
||||||
if !status.IsSuccess() {
|
if !status.IsSuccess() {
|
||||||
if status.IsUnschedulable() {
|
if status.IsUnschedulable() {
|
||||||
msg := fmt.Sprintf("rejected by %q at permit: %v", pl.Name(), status.Message())
|
msg := fmt.Sprintf("rejected by %q at permit: %v", pl.Name(), status.Message())
|
||||||
@ -553,10 +553,11 @@ func (f *framework) RunPermitPlugins(
|
|||||||
return NewStatus(status.Code(), msg)
|
return NewStatus(status.Code(), msg)
|
||||||
}
|
}
|
||||||
if status.Code() == Wait {
|
if status.Code() == Wait {
|
||||||
// Use the minimum timeout duration.
|
// Not allowed to be greater than maxTimeout.
|
||||||
if timeout > d {
|
if timeout > maxTimeout {
|
||||||
timeout = d
|
timeout = maxTimeout
|
||||||
}
|
}
|
||||||
|
pluginsWaitTime[pl.Name()] = timeout
|
||||||
statusCode = Wait
|
statusCode = Wait
|
||||||
} else {
|
} else {
|
||||||
msg := fmt.Sprintf("error while running %q permit plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
|
msg := fmt.Sprintf("error while running %q permit plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
|
||||||
@ -569,27 +570,20 @@ func (f *framework) RunPermitPlugins(
|
|||||||
// We now wait for the minimum duration if at least one plugin asked to
|
// We now wait for the minimum duration if at least one plugin asked to
|
||||||
// wait (and no plugin rejected the pod)
|
// wait (and no plugin rejected the pod)
|
||||||
if statusCode == Wait {
|
if statusCode == Wait {
|
||||||
w := newWaitingPod(pod)
|
w := newWaitingPod(pod, pluginsWaitTime)
|
||||||
f.waitingPods.add(w)
|
f.waitingPods.add(w)
|
||||||
defer f.waitingPods.remove(pod.UID)
|
defer f.waitingPods.remove(pod.UID)
|
||||||
timer := time.NewTimer(timeout)
|
klog.V(4).Infof("waiting for pod %q at permit", pod.Name)
|
||||||
klog.V(4).Infof("waiting for %v for pod %q at permit", timeout, pod.Name)
|
s := <-w.s
|
||||||
select {
|
if !s.IsSuccess() {
|
||||||
case <-timer.C:
|
if s.IsUnschedulable() {
|
||||||
msg := fmt.Sprintf("pod %q rejected due to timeout after waiting %v at permit", pod.Name, timeout)
|
msg := fmt.Sprintf("pod %q rejected while waiting at permit: %v", pod.Name, s.Message())
|
||||||
klog.V(4).Infof(msg)
|
klog.V(4).Infof(msg)
|
||||||
return NewStatus(Unschedulable, msg)
|
return NewStatus(s.Code(), msg)
|
||||||
case s := <-w.s:
|
|
||||||
if !s.IsSuccess() {
|
|
||||||
if s.IsUnschedulable() {
|
|
||||||
msg := fmt.Sprintf("rejected while waiting at permit: %v", s.Message())
|
|
||||||
klog.V(4).Infof(msg)
|
|
||||||
return NewStatus(s.Code(), msg)
|
|
||||||
}
|
|
||||||
msg := fmt.Sprintf("error received while waiting at permit for pod %q: %v", pod.Name, s.Message())
|
|
||||||
klog.Error(msg)
|
|
||||||
return NewStatus(Error, msg)
|
|
||||||
}
|
}
|
||||||
|
msg := fmt.Sprintf("error received while waiting at permit for pod %q: %v", pod.Name, s.Message())
|
||||||
|
klog.Error(msg)
|
||||||
|
return NewStatus(Error, msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,10 +150,14 @@ func NewStatus(code Code, msg string) *Status {
|
|||||||
type WaitingPod interface {
|
type WaitingPod interface {
|
||||||
// GetPod returns a reference to the waiting pod.
|
// GetPod returns a reference to the waiting pod.
|
||||||
GetPod() *v1.Pod
|
GetPod() *v1.Pod
|
||||||
// Allow the waiting pod to be scheduled. Returns true if the allow signal was
|
// GetPendingPlugins returns a list of pending permit plugin's name.
|
||||||
// successfully delivered, false otherwise.
|
GetPendingPlugins() []string
|
||||||
Allow() bool
|
// Allow declares the waiting pod is allowed to be scheduled by plugin pluginName.
|
||||||
// Reject declares the waiting pod unschedulable. Returns true if the allow signal
|
// If this is the last remaining plugin to allow, then a success signal is delivered
|
||||||
|
// to unblock the pod.
|
||||||
|
// Returns true if the allow signal was successfully dealt with, false otherwise.
|
||||||
|
Allow(pluginName string) bool
|
||||||
|
// Reject declares the waiting pod unschedulable. Returns true if the reject signal
|
||||||
// was successfully delivered, false otherwise.
|
// was successfully delivered, false otherwise.
|
||||||
Reject(msg string) bool
|
Reject(msg string) bool
|
||||||
}
|
}
|
||||||
|
@ -17,7 +17,9 @@ limitations under the License.
|
|||||||
package v1alpha1
|
package v1alpha1
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
@ -69,16 +71,30 @@ func (m *waitingPodsMap) iterate(callback func(WaitingPod)) {
|
|||||||
|
|
||||||
// waitingPod represents a pod waiting in the permit phase.
|
// waitingPod represents a pod waiting in the permit phase.
|
||||||
type waitingPod struct {
|
type waitingPod struct {
|
||||||
pod *v1.Pod
|
pod *v1.Pod
|
||||||
s chan *Status
|
pendingPlugins map[string]*time.Timer
|
||||||
|
s chan *Status
|
||||||
|
mu sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// newWaitingPod returns a new waitingPod instance.
|
// newWaitingPod returns a new waitingPod instance.
|
||||||
func newWaitingPod(pod *v1.Pod) *waitingPod {
|
func newWaitingPod(pod *v1.Pod, pluginsMaxWaitTime map[string]time.Duration) *waitingPod {
|
||||||
return &waitingPod{
|
wp := &waitingPod{
|
||||||
pod: pod,
|
pod: pod,
|
||||||
s: make(chan *Status),
|
s: make(chan *Status),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
wp.pendingPlugins = make(map[string]*time.Timer, len(pluginsMaxWaitTime))
|
||||||
|
for k, v := range pluginsMaxWaitTime {
|
||||||
|
plugin, waitTime := k, v
|
||||||
|
wp.pendingPlugins[plugin] = time.AfterFunc(waitTime, func() {
|
||||||
|
msg := fmt.Sprintf("rejected due to timeout after waiting %v at plugin %v",
|
||||||
|
waitTime, plugin)
|
||||||
|
wp.Reject(msg)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return wp
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPod returns a reference to the waiting pod.
|
// GetPod returns a reference to the waiting pod.
|
||||||
@ -86,9 +102,35 @@ func (w *waitingPod) GetPod() *v1.Pod {
|
|||||||
return w.pod
|
return w.pod
|
||||||
}
|
}
|
||||||
|
|
||||||
// Allow the waiting pod to be scheduled. Returns true if the allow signal was
|
// GetPendingPlugins returns a list of pending permit plugin's name.
|
||||||
// successfully delivered, false otherwise.
|
func (w *waitingPod) GetPendingPlugins() []string {
|
||||||
func (w *waitingPod) Allow() bool {
|
w.mu.RLock()
|
||||||
|
defer w.mu.RUnlock()
|
||||||
|
plugins := make([]string, 0, len(w.pendingPlugins))
|
||||||
|
for p := range w.pendingPlugins {
|
||||||
|
plugins = append(plugins, p)
|
||||||
|
}
|
||||||
|
|
||||||
|
return plugins
|
||||||
|
}
|
||||||
|
|
||||||
|
// Allow declares the waiting pod is allowed to be scheduled by plugin pluginName.
|
||||||
|
// If this is the last remaining plugin to allow, then a success signal is delivered
|
||||||
|
// to unblock the pod.
|
||||||
|
// Returns true if the allow signal was successfully dealt with, false otherwise.
|
||||||
|
func (w *waitingPod) Allow(pluginName string) bool {
|
||||||
|
w.mu.Lock()
|
||||||
|
defer w.mu.Unlock()
|
||||||
|
if timer, exist := w.pendingPlugins[pluginName]; exist {
|
||||||
|
timer.Stop()
|
||||||
|
delete(w.pendingPlugins, pluginName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only signal success status after all plugins have allowed
|
||||||
|
if len(w.pendingPlugins) != 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case w.s <- NewStatus(Success, ""):
|
case w.s <- NewStatus(Success, ""):
|
||||||
return true
|
return true
|
||||||
@ -97,9 +139,15 @@ func (w *waitingPod) Allow() bool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reject declares the waiting pod unschedulable. Returns true if the allow signal
|
// Reject declares the waiting pod unschedulable. Returns true if the reject signal
|
||||||
// was successfully delivered, false otherwise.
|
// was successfully delivered, false otherwise.
|
||||||
func (w *waitingPod) Reject(msg string) bool {
|
func (w *waitingPod) Reject(msg string) bool {
|
||||||
|
w.mu.RLock()
|
||||||
|
defer w.mu.RUnlock()
|
||||||
|
for _, timer := range w.pendingPlugins {
|
||||||
|
timer.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case w.s <- NewStatus(Unschedulable, msg):
|
case w.s <- NewStatus(Unschedulable, msg):
|
||||||
return true
|
return true
|
||||||
|
@ -93,6 +93,7 @@ type UnreservePlugin struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type PermitPlugin struct {
|
type PermitPlugin struct {
|
||||||
|
name string
|
||||||
numPermitCalled int
|
numPermitCalled int
|
||||||
failPermit bool
|
failPermit bool
|
||||||
rejectPermit bool
|
rejectPermit bool
|
||||||
@ -381,7 +382,7 @@ func (up *UnreservePlugin) reset() {
|
|||||||
|
|
||||||
// Name returns name of the plugin.
|
// Name returns name of the plugin.
|
||||||
func (pp *PermitPlugin) Name() string {
|
func (pp *PermitPlugin) Name() string {
|
||||||
return permitPluginName
|
return pp.name
|
||||||
}
|
}
|
||||||
|
|
||||||
// Permit implements the permit test plugin.
|
// Permit implements the permit test plugin.
|
||||||
@ -416,13 +417,18 @@ func (pp *PermitPlugin) Permit(ctx context.Context, state *framework.CycleState,
|
|||||||
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)), 0
|
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)), 0
|
||||||
}
|
}
|
||||||
if pp.waitAndAllowPermit {
|
if pp.waitAndAllowPermit {
|
||||||
pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Allow() })
|
pp.allowAllPods()
|
||||||
return nil, 0
|
return nil, 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil, 0
|
return nil, 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// allowAllPods allows all waiting pods.
|
||||||
|
func (pp *PermitPlugin) allowAllPods() {
|
||||||
|
pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Allow(pp.name) })
|
||||||
|
}
|
||||||
|
|
||||||
// reset used to reset permit plugin.
|
// reset used to reset permit plugin.
|
||||||
func (pp *PermitPlugin) reset() {
|
func (pp *PermitPlugin) reset() {
|
||||||
pp.numPermitCalled = 0
|
pp.numPermitCalled = 0
|
||||||
@ -684,17 +690,10 @@ func TestPrebindPlugin(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
// Set reserve prebind config for testing
|
|
||||||
preBindPluginConfig := []schedulerconfig.PluginConfig{
|
|
||||||
{
|
|
||||||
Name: preBindPluginName,
|
|
||||||
Args: runtime.Unknown{},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
// Create the master and the scheduler with the test plugin set.
|
// Create the master and the scheduler with the test plugin set.
|
||||||
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "prebind-plugin", nil), 2,
|
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "prebind-plugin", nil), 2,
|
||||||
scheduler.WithFrameworkPlugins(plugins),
|
scheduler.WithFrameworkPlugins(plugins),
|
||||||
scheduler.WithFrameworkPluginConfig(preBindPluginConfig),
|
|
||||||
scheduler.WithFrameworkDefaultRegistry(registry))
|
scheduler.WithFrameworkDefaultRegistry(registry))
|
||||||
defer cleanupTest(t, context)
|
defer cleanupTest(t, context)
|
||||||
|
|
||||||
@ -774,22 +773,10 @@ func TestUnreservePlugin(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
// Set unreserve and prebind plugin config for testing
|
|
||||||
pluginConfig := []schedulerconfig.PluginConfig{
|
|
||||||
{
|
|
||||||
Name: unreservePluginName,
|
|
||||||
Args: runtime.Unknown{},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Name: preBindPluginName,
|
|
||||||
Args: runtime.Unknown{},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create the master and the scheduler with the test plugin set.
|
// Create the master and the scheduler with the test plugin set.
|
||||||
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "unreserve-plugin", nil), 2,
|
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "unreserve-plugin", nil), 2,
|
||||||
scheduler.WithFrameworkPlugins(plugins),
|
scheduler.WithFrameworkPlugins(plugins),
|
||||||
scheduler.WithFrameworkPluginConfig(pluginConfig),
|
|
||||||
scheduler.WithFrameworkDefaultRegistry(registry))
|
scheduler.WithFrameworkDefaultRegistry(registry))
|
||||||
defer cleanupTest(t, context)
|
defer cleanupTest(t, context)
|
||||||
|
|
||||||
@ -876,30 +863,10 @@ func TestBindPlugin(t *testing.T) {
|
|||||||
Enabled: []schedulerconfig.Plugin{{Name: postBindPlugin.Name()}},
|
Enabled: []schedulerconfig.Plugin{{Name: postBindPlugin.Name()}},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
// Set reserve and bind config for testing
|
|
||||||
pluginConfig := []schedulerconfig.PluginConfig{
|
|
||||||
{
|
|
||||||
Name: unreservePlugin.Name(),
|
|
||||||
Args: runtime.Unknown{},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Name: bindPlugin1.Name(),
|
|
||||||
Args: runtime.Unknown{},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Name: bindPlugin2.Name(),
|
|
||||||
Args: runtime.Unknown{},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Name: postBindPlugin.Name(),
|
|
||||||
Args: runtime.Unknown{},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create the master and the scheduler with the test plugin set.
|
// Create the master and the scheduler with the test plugin set.
|
||||||
context := initTestSchedulerWithOptions(t, testContext, false, nil, time.Second,
|
context := initTestSchedulerWithOptions(t, testContext, false, nil, time.Second,
|
||||||
scheduler.WithFrameworkPlugins(plugins),
|
scheduler.WithFrameworkPlugins(plugins),
|
||||||
scheduler.WithFrameworkPluginConfig(pluginConfig),
|
|
||||||
scheduler.WithFrameworkDefaultRegistry(registry),
|
scheduler.WithFrameworkDefaultRegistry(registry),
|
||||||
scheduler.WithFrameworkConfigProducerRegistry(nil))
|
scheduler.WithFrameworkConfigProducerRegistry(nil))
|
||||||
defer cleanupTest(t, context)
|
defer cleanupTest(t, context)
|
||||||
@ -1057,22 +1024,10 @@ func TestPostBindPlugin(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
// Set reserve prebind and postbind config for testing
|
|
||||||
pluginConfig := []schedulerconfig.PluginConfig{
|
|
||||||
{
|
|
||||||
Name: preBindPluginName,
|
|
||||||
Args: runtime.Unknown{},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Name: postBindPluginName,
|
|
||||||
Args: runtime.Unknown{},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create the master and the scheduler with the test plugin set.
|
// Create the master and the scheduler with the test plugin set.
|
||||||
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "postbind-plugin", nil), 2,
|
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "postbind-plugin", nil), 2,
|
||||||
scheduler.WithFrameworkPlugins(plugins),
|
scheduler.WithFrameworkPlugins(plugins),
|
||||||
scheduler.WithFrameworkPluginConfig(pluginConfig),
|
|
||||||
scheduler.WithFrameworkDefaultRegistry(registry))
|
scheduler.WithFrameworkDefaultRegistry(registry))
|
||||||
defer cleanupTest(t, context)
|
defer cleanupTest(t, context)
|
||||||
|
|
||||||
@ -1123,7 +1078,7 @@ func TestPostBindPlugin(t *testing.T) {
|
|||||||
// TestPermitPlugin tests invocation of permit plugins.
|
// TestPermitPlugin tests invocation of permit plugins.
|
||||||
func TestPermitPlugin(t *testing.T) {
|
func TestPermitPlugin(t *testing.T) {
|
||||||
// Create a plugin registry for testing. Register only a permit plugin.
|
// Create a plugin registry for testing. Register only a permit plugin.
|
||||||
perPlugin := &PermitPlugin{}
|
perPlugin := &PermitPlugin{name: permitPluginName}
|
||||||
registry := framework.Registry{permitPluginName: newPermitPlugin(perPlugin)}
|
registry := framework.Registry{permitPluginName: newPermitPlugin(perPlugin)}
|
||||||
|
|
||||||
// Setup initial permit plugin for testing.
|
// Setup initial permit plugin for testing.
|
||||||
@ -1136,18 +1091,10 @@ func TestPermitPlugin(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
// Set permit plugin config for testing
|
|
||||||
pluginConfig := []schedulerconfig.PluginConfig{
|
|
||||||
{
|
|
||||||
Name: permitPluginName,
|
|
||||||
Args: runtime.Unknown{},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create the master and the scheduler with the test plugin set.
|
// Create the master and the scheduler with the test plugin set.
|
||||||
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugin", nil), 2,
|
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugin", nil), 2,
|
||||||
scheduler.WithFrameworkPlugins(plugins),
|
scheduler.WithFrameworkPlugins(plugins),
|
||||||
scheduler.WithFrameworkPluginConfig(pluginConfig),
|
|
||||||
scheduler.WithFrameworkDefaultRegistry(registry))
|
scheduler.WithFrameworkDefaultRegistry(registry))
|
||||||
defer cleanupTest(t, context)
|
defer cleanupTest(t, context)
|
||||||
|
|
||||||
@ -1226,10 +1173,82 @@ func TestPermitPlugin(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestMultiplePermitPlugins tests multiple permit plugins returning wait for a same pod.
|
||||||
|
func TestMultiplePermitPlugins(t *testing.T) {
|
||||||
|
// Create a plugin registry for testing.
|
||||||
|
perPlugin1 := &PermitPlugin{name: "permit-plugin-1"}
|
||||||
|
perPlugin2 := &PermitPlugin{name: "permit-plugin-2"}
|
||||||
|
registry := framework.Registry{
|
||||||
|
perPlugin1.Name(): newPermitPlugin(perPlugin1),
|
||||||
|
perPlugin2.Name(): newPermitPlugin(perPlugin2),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup initial permit plugins for testing.
|
||||||
|
plugins := &schedulerconfig.Plugins{
|
||||||
|
Permit: &schedulerconfig.PluginSet{
|
||||||
|
Enabled: []schedulerconfig.Plugin{
|
||||||
|
{
|
||||||
|
Name: perPlugin1.Name(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: perPlugin2.Name(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the master and the scheduler with the test plugin set.
|
||||||
|
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "multi-permit-plugin", nil), 2,
|
||||||
|
scheduler.WithFrameworkPlugins(plugins),
|
||||||
|
scheduler.WithFrameworkDefaultRegistry(registry))
|
||||||
|
defer cleanupTest(t, context)
|
||||||
|
|
||||||
|
// Both permit plugins will return Wait for permitting
|
||||||
|
perPlugin1.timeoutPermit = true
|
||||||
|
perPlugin2.timeoutPermit = true
|
||||||
|
|
||||||
|
// Create a test pod.
|
||||||
|
podName := "test-pod"
|
||||||
|
pod, err := createPausePod(context.clientSet,
|
||||||
|
initPausePod(context.clientSet, &pausePodConfig{Name: podName, Namespace: context.ns.Name}))
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Error while creating a test pod: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var waitingPod framework.WaitingPod
|
||||||
|
// Wait until the test pod is actually waiting.
|
||||||
|
wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) {
|
||||||
|
waitingPod = perPlugin1.fh.GetWaitingPod(pod.UID)
|
||||||
|
return waitingPod != nil, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
// Check the number of pending permits
|
||||||
|
if l := len(waitingPod.GetPendingPlugins()); l != 2 {
|
||||||
|
t.Errorf("Expected the number of pending plugins is 2, but got %d", l)
|
||||||
|
}
|
||||||
|
|
||||||
|
perPlugin1.allowAllPods()
|
||||||
|
// Check the number of pending permits
|
||||||
|
if l := len(waitingPod.GetPendingPlugins()); l != 1 {
|
||||||
|
t.Errorf("Expected the number of pending plugins is 1, but got %d", l)
|
||||||
|
}
|
||||||
|
|
||||||
|
perPlugin2.allowAllPods()
|
||||||
|
if err = waitForPodToSchedule(context.clientSet, pod); err != nil {
|
||||||
|
t.Errorf("Expected the pod to be scheduled. error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if perPlugin1.numPermitCalled == 0 || perPlugin2.numPermitCalled == 0 {
|
||||||
|
t.Errorf("Expected the permit plugin to be called.")
|
||||||
|
}
|
||||||
|
|
||||||
|
cleanupPods(context.clientSet, t, []*v1.Pod{pod})
|
||||||
|
}
|
||||||
|
|
||||||
// TestCoSchedulingWithPermitPlugin tests invocation of permit plugins.
|
// TestCoSchedulingWithPermitPlugin tests invocation of permit plugins.
|
||||||
func TestCoSchedulingWithPermitPlugin(t *testing.T) {
|
func TestCoSchedulingWithPermitPlugin(t *testing.T) {
|
||||||
// Create a plugin registry for testing. Register only a permit plugin.
|
// Create a plugin registry for testing. Register only a permit plugin.
|
||||||
permitPlugin := &PermitPlugin{}
|
permitPlugin := &PermitPlugin{name: permitPluginName}
|
||||||
registry := framework.Registry{permitPluginName: newPermitPlugin(permitPlugin)}
|
registry := framework.Registry{permitPluginName: newPermitPlugin(permitPlugin)}
|
||||||
|
|
||||||
// Setup initial permit plugin for testing.
|
// Setup initial permit plugin for testing.
|
||||||
@ -1242,18 +1261,10 @@ func TestCoSchedulingWithPermitPlugin(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
// Set permit plugin config for testing
|
|
||||||
pluginConfig := []schedulerconfig.PluginConfig{
|
|
||||||
{
|
|
||||||
Name: permitPluginName,
|
|
||||||
Args: runtime.Unknown{},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create the master and the scheduler with the test plugin set.
|
// Create the master and the scheduler with the test plugin set.
|
||||||
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugin", nil), 2,
|
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugin", nil), 2,
|
||||||
scheduler.WithFrameworkPlugins(plugins),
|
scheduler.WithFrameworkPlugins(plugins),
|
||||||
scheduler.WithFrameworkPluginConfig(pluginConfig),
|
|
||||||
scheduler.WithFrameworkDefaultRegistry(registry))
|
scheduler.WithFrameworkDefaultRegistry(registry))
|
||||||
defer cleanupTest(t, context)
|
defer cleanupTest(t, context)
|
||||||
|
|
||||||
@ -1433,18 +1444,10 @@ func TestPreemptWithPermitPlugin(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
// Set permit plugin config for testing
|
|
||||||
pluginConfig := []schedulerconfig.PluginConfig{
|
|
||||||
{
|
|
||||||
Name: permitPluginName,
|
|
||||||
Args: runtime.Unknown{},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create the master and the scheduler with the test plugin set.
|
// Create the master and the scheduler with the test plugin set.
|
||||||
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "preempt-with-permit-plugin", nil), 0,
|
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "preempt-with-permit-plugin", nil), 0,
|
||||||
scheduler.WithFrameworkPlugins(plugins),
|
scheduler.WithFrameworkPlugins(plugins),
|
||||||
scheduler.WithFrameworkPluginConfig(pluginConfig),
|
|
||||||
scheduler.WithFrameworkDefaultRegistry(registry))
|
scheduler.WithFrameworkDefaultRegistry(registry))
|
||||||
defer cleanupTest(t, context)
|
defer cleanupTest(t, context)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user