Add 'reserve' and 'prebind' plugin tests

This commit is contained in:
Bobby (Babak) Salamat 2018-11-08 18:55:19 -08:00
parent e60f510e38
commit 679d88c1a0
3 changed files with 298 additions and 3 deletions

View File

@ -0,0 +1,269 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"fmt"
"testing"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
)
// StatefulMultipointExample is an example plugin that is executed at multiple extension points.
// This plugin is stateful. It receives arguments at initialization (NewMultipointPlugin)
// and changes its state when it is executed.
type TesterPlugin struct {
numReserveCalled int
numPrebindCalled int
failReserve bool
failPrebind bool
rejectPrebind bool
}
var _ = plugins.ReservePlugin(&TesterPlugin{})
var _ = plugins.PrebindPlugin(&TesterPlugin{})
// Name returns name of the plugin.
func (tp *TesterPlugin) Name() string {
return "tester-plugin"
}
// Reserve is a test function that returns an error or nil, depending on the
// value of "failReserve".
func (tp *TesterPlugin) Reserve(ps plugins.PluginSet, pod *v1.Pod, nodeName string) error {
tp.numReserveCalled++
if tp.failReserve {
return fmt.Errorf("injecting failure for pod %v", pod.Name)
}
return nil
}
// Prebind is a test function that returns (true, nil) or errors for testing.
func (tp *TesterPlugin) Prebind(ps plugins.PluginSet, pod *v1.Pod, nodeName string) (bool, error) {
var err error = nil
tp.numPrebindCalled++
if tp.failPrebind {
err = fmt.Errorf("injecting failure for pod %v", pod.Name)
}
if tp.rejectPrebind {
return false, err
}
return true, err
}
// TestPluginSet is a plugin set used for testing purposes.
type TestPluginSet struct {
data *plugins.PluginData
reservePlugins []plugins.ReservePlugin
prebindPlugins []plugins.PrebindPlugin
}
var _ = plugins.PluginSet(&TestPluginSet{})
// ReservePlugins returns a slice of default reserve plugins.
func (r *TestPluginSet) ReservePlugins() []plugins.ReservePlugin {
return r.reservePlugins
}
// PrebindPlugins returns a slice of default prebind plugins.
func (r *TestPluginSet) PrebindPlugins() []plugins.PrebindPlugin {
return r.prebindPlugins
}
// Data returns a pointer to PluginData.
func (r *TestPluginSet) Data() *plugins.PluginData {
return r.data
}
// TestReservePlugin tests invocation of reserve plugins.
func TestReservePlugin(t *testing.T) {
// Create a plugin set for testing. Register only a reserve plugin.
testerPlugin := &TesterPlugin{}
testPluginSet := &TestPluginSet{
data: &plugins.PluginData{
Ctx: plugins.NewPluginContext(),
},
reservePlugins: []plugins.ReservePlugin{testerPlugin},
}
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "reserve-plugin", nil),
false, nil, testPluginSet, false, true, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet
// Add a few nodes.
_, err := createNodes(cs, "test-node", nil, 2)
if err != nil {
t.Fatalf("Cannot create nodes: %v", err)
}
for _, fail := range []bool{false, true} {
testerPlugin.failReserve = fail
// Create a best effort pod.
pod, err := createPausePod(cs,
initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
if fail {
if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err != nil {
t.Errorf("Didn't expected the pod to be scheduled. error: %v", err)
}
} else {
if err = waitForPodToSchedule(cs, pod); err != nil {
t.Errorf("Expected the pod to be scheduled. error: %v", err)
}
}
if testerPlugin.numReserveCalled == 0 {
t.Errorf("Expected the reserve plugin to be called.")
}
cleanupPods(cs, t, []*v1.Pod{pod})
}
}
// TestPrebindPlugin tests invocation of prebind plugins.
func TestPrebindPlugin(t *testing.T) {
// Create a plugin set for testing. Register only a prebind plugin.
testerPlugin := &TesterPlugin{}
testPluginSet := &TestPluginSet{
data: &plugins.PluginData{
Ctx: plugins.NewPluginContext(),
},
prebindPlugins: []plugins.PrebindPlugin{testerPlugin},
}
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "prebind-plugin", nil),
false, nil, testPluginSet, false, true, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet
// Add a few nodes.
_, err := createNodes(cs, "test-node", nil, 2)
if err != nil {
t.Fatalf("Cannot create nodes: %v", err)
}
tests := []struct {
fail bool
reject bool
}{
{
fail: false,
reject: false,
},
{
fail: true,
reject: false,
},
{
fail: false,
reject: true,
},
{
fail: true,
reject: true,
},
}
for i, test := range tests {
testerPlugin.failPrebind = test.fail
testerPlugin.rejectPrebind = test.reject
// Create a best effort pod.
pod, err := createPausePod(cs,
initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
if test.fail {
if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err != nil {
t.Errorf("test #%v: Expected a scheduling error, but didn't get it. error: %v", i, err)
}
} else {
if test.reject {
if err = waitForPodUnschedulable(cs, pod); err != nil {
t.Errorf("test #%v: Didn't expected the pod to be scheduled. error: %v", i, err)
}
} else {
if err = waitForPodToSchedule(cs, pod); err != nil {
t.Errorf("test #%v: Expected the pod to be scheduled. error: %v", i, err)
}
}
}
if testerPlugin.numPrebindCalled == 0 {
t.Errorf("Expected the prebind plugin to be called.")
}
cleanupPods(cs, t, []*v1.Pod{pod})
}
}
// TestContextCleanup tests that data inserted in the pluginContext is removed
// after a scheduling cycle is over.
func TestContextCleanup(t *testing.T) {
// Create a plugin set for testing.
testerPlugin := &TesterPlugin{}
testPluginSet := &TestPluginSet{
data: &plugins.PluginData{
Ctx: plugins.NewPluginContext(),
},
reservePlugins: []plugins.ReservePlugin{testerPlugin},
prebindPlugins: []plugins.PrebindPlugin{testerPlugin},
}
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "plugin-context-cleanup", nil),
false, nil, testPluginSet, false, true, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet
// Add a few nodes.
_, err := createNodes(cs, "test-node", nil, 2)
if err != nil {
t.Fatalf("Cannot create nodes: %v", err)
}
// Insert something in the plugin context.
testPluginSet.Data().Ctx.Write("test", "foo")
// Create and schedule a best effort pod.
pod, err := runPausePod(cs,
initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name}))
if err != nil {
t.Errorf("Error while creating or scheduling a test pod: %v", err)
}
// Make sure the data inserted in the plugin context is removed.
_, err = testPluginSet.Data().Ctx.Read("test")
if err == nil || err.Error() != plugins.NotFound {
t.Errorf("Expected the plugin context to be cleaned up after a scheduling cycle. error: %v", err)
}
cleanupPods(cs, t, []*v1.Pod{pod})
}

View File

@ -51,6 +51,7 @@ import (
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/scheduler/factory"
plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
taintutils "k8s.io/kubernetes/pkg/util/taints"
"k8s.io/kubernetes/test/integration/framework"
imageutils "k8s.io/kubernetes/test/utils/image"
@ -148,7 +149,7 @@ func initTestScheduler(
) *TestContext {
// Pod preemption is enabled by default scheduler configuration, but preemption only happens when PodPriority
// feature gate is enabled at the same time.
return initTestSchedulerWithOptions(t, context, setPodInformer, policy, false, true, time.Second)
return initTestSchedulerWithOptions(t, context, setPodInformer, policy, nil, false, true, time.Second)
}
// initTestSchedulerWithOptions initializes a test environment and creates a scheduler with default
@ -158,6 +159,7 @@ func initTestSchedulerWithOptions(
context *TestContext,
setPodInformer bool,
policy *schedulerapi.Policy,
pluginSet plugins.PluginSet,
disablePreemption bool,
disableEquivalenceCache bool,
resyncPeriod time.Duration,
@ -205,6 +207,11 @@ func initTestSchedulerWithOptions(
controller.WaitForCacheSync("scheduler", context.schedulerConfig.StopEverything, podInformer.Informer().HasSynced)
}
// Set pluginSet if provided. DefaultPluginSet is used if this is not specified.
if pluginSet != nil {
context.schedulerConfig.PluginSet = pluginSet
}
eventBroadcaster := record.NewBroadcaster()
context.schedulerConfig.Recorder = eventBroadcaster.NewRecorder(
legacyscheme.Scheme,
@ -257,7 +264,7 @@ func initTest(t *testing.T, nsPrefix string) *TestContext {
// configuration but with pod preemption disabled.
func initTestDisablePreemption(t *testing.T, nsPrefix string) *TestContext {
return initTestSchedulerWithOptions(
t, initTestMaster(t, nsPrefix, nil), true, nil, true, true, time.Second)
t, initTestMaster(t, nsPrefix, nil), true, nil, nil, true, true, time.Second)
}
// cleanupTest deletes the scheduler and the test namespace. It should be called
@ -605,6 +612,25 @@ func podUnschedulable(c clientset.Interface, podNamespace, podName string) wait.
}
}
// podSchedulingError returns a condition function that returns true if the given pod
// gets unschedulable status for reasons other than "Unschedulable". The scheduler
// records such reasons in case of error.
func podSchedulingError(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{})
if errors.IsNotFound(err) {
return false, nil
}
if err != nil {
// This could be a connection error so we want to retry.
return false, nil
}
_, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
return cond != nil && cond.Status == v1.ConditionFalse &&
cond.Reason != v1.PodReasonUnschedulable, nil
}
}
// waitForPodToScheduleWithTimeout waits for a pod to get scheduled and returns
// an error if it does not scheduled within the given timeout.
func waitForPodToScheduleWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {

View File

@ -890,7 +890,7 @@ func TestRescheduleProvisioning(t *testing.T) {
}
func setupCluster(t *testing.T, nsName string, numberOfNodes int, resyncPeriod time.Duration, provisionDelaySeconds int, disableEquivalenceCache bool) *testConfig {
context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), false, nil, false, disableEquivalenceCache, resyncPeriod)
context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), false, nil, nil, false, disableEquivalenceCache, resyncPeriod)
clientset := context.clientSet
ns := context.ns.Name