Merge pull request #88526 from alculquicondor/multiprofiles-test

Add unit and integration tests for running multiple scheduling profiles
This commit is contained in:
Kubernetes Prow Robot 2020-02-26 13:33:49 -08:00 committed by GitHub
commit 1deac1e466
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 419 additions and 60 deletions

View File

@ -220,8 +220,8 @@ users:
}
// plugin config
pluginconfigFile := filepath.Join(tmpDir, "plugin.yaml")
if err := ioutil.WriteFile(pluginconfigFile, []byte(fmt.Sprintf(`
pluginConfigFile := filepath.Join(tmpDir, "plugin.yaml")
if err := ioutil.WriteFile(pluginConfigFile, []byte(fmt.Sprintf(`
apiVersion: kubescheduler.config.k8s.io/v1alpha2
kind: KubeSchedulerConfiguration
clientConnection:
@ -244,6 +244,31 @@ profiles:
`, configKubeconfig)), os.FileMode(0600)); err != nil {
t.Fatal(err)
}
// multiple profiles config
multiProfilesConfig := filepath.Join(tmpDir, "multi-profiles.yaml")
if err := ioutil.WriteFile(multiProfilesConfig, []byte(fmt.Sprintf(`
apiVersion: kubescheduler.config.k8s.io/v1alpha2
kind: KubeSchedulerConfiguration
clientConnection:
kubeconfig: "%s"
profiles:
- schedulerName: "foo-profile"
plugins:
reserve:
enabled:
- name: foo
- schedulerName: "bar-profile"
plugins:
preBind:
disabled:
- name: baz
pluginConfig:
- name: foo
`, configKubeconfig)), os.FileMode(0600)); err != nil {
t.Fatal(err)
}
// v1alpha1 postfilter plugin config
postfilterPluginConfigFile := filepath.Join(tmpDir, "v1alpha1_postfilter_plugin.yaml")
if err := ioutil.WriteFile(postfilterPluginConfigFile, []byte(fmt.Sprintf(`
@ -516,7 +541,7 @@ plugins:
{
name: "plugin config",
options: &Options{
ConfigFile: pluginconfigFile,
ConfigFile: pluginConfigFile,
},
expectedUsername: "config",
expectedConfig: kubeschedulerconfig.KubeSchedulerConfiguration{
@ -554,29 +579,84 @@ plugins:
Plugins: &kubeschedulerconfig.Plugins{
Reserve: &kubeschedulerconfig.PluginSet{
Enabled: []kubeschedulerconfig.Plugin{
{
Name: "foo",
},
{
Name: "bar",
},
{Name: "foo"},
{Name: "bar"},
},
Disabled: []kubeschedulerconfig.Plugin{
{
Name: "baz",
},
{Name: "baz"},
},
},
PreBind: &kubeschedulerconfig.PluginSet{
Enabled: []kubeschedulerconfig.Plugin{
{
Name: "foo",
},
{Name: "foo"},
},
Disabled: []kubeschedulerconfig.Plugin{
{
Name: "baz",
},
{Name: "baz"},
},
},
},
PluginConfig: []kubeschedulerconfig.PluginConfig{
{
Name: "foo",
Args: runtime.Unknown{},
},
},
},
},
},
},
{
name: "multiple profiles",
options: &Options{
ConfigFile: multiProfilesConfig,
},
expectedUsername: "config",
expectedConfig: kubeschedulerconfig.KubeSchedulerConfiguration{
AlgorithmSource: kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &defaultSource},
HealthzBindAddress: "0.0.0.0:10251",
MetricsBindAddress: "0.0.0.0:10251",
DebuggingConfiguration: componentbaseconfig.DebuggingConfiguration{
EnableProfiling: true,
EnableContentionProfiling: true,
},
LeaderElection: kubeschedulerconfig.KubeSchedulerLeaderElectionConfiguration{
LeaderElectionConfiguration: componentbaseconfig.LeaderElectionConfiguration{
LeaderElect: true,
LeaseDuration: metav1.Duration{Duration: 15 * time.Second},
RenewDeadline: metav1.Duration{Duration: 10 * time.Second},
RetryPeriod: metav1.Duration{Duration: 2 * time.Second},
ResourceLock: "endpointsleases",
ResourceNamespace: "kube-system",
ResourceName: "kube-scheduler",
},
},
ClientConnection: componentbaseconfig.ClientConnectionConfiguration{
Kubeconfig: configKubeconfig,
QPS: 50,
Burst: 100,
ContentType: "application/vnd.kubernetes.protobuf",
},
PercentageOfNodesToScore: defaultPercentageOfNodesToScore,
BindTimeoutSeconds: defaultBindTimeoutSeconds,
PodInitialBackoffSeconds: defaultPodInitialBackoffSeconds,
PodMaxBackoffSeconds: defaultPodMaxBackoffSeconds,
Profiles: []kubeschedulerconfig.KubeSchedulerProfile{
{
SchedulerName: "foo-profile",
Plugins: &kubeschedulerconfig.Plugins{
Reserve: &kubeschedulerconfig.PluginSet{
Enabled: []kubeschedulerconfig.Plugin{
{Name: "foo"},
},
},
},
},
{
SchedulerName: "bar-profile",
Plugins: &kubeschedulerconfig.Plugins{
PreBind: &kubeschedulerconfig.PluginSet{
Disabled: []kubeschedulerconfig.Plugin{
{Name: "baz"},
},
},
},

View File

@ -24,6 +24,9 @@ import (
"os"
"path"
"reflect"
"sort"
"strings"
"sync"
"testing"
"time"
@ -46,7 +49,6 @@ import (
volumescheduling "k8s.io/kubernetes/pkg/controller/volume/scheduling"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/core"
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
@ -56,6 +58,7 @@ import (
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/pkg/scheduler/profile"
st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
@ -148,49 +151,87 @@ func (es mockScheduler) Preempt(ctx context.Context, i *profile.Profile, state *
}
func TestSchedulerCreation(t *testing.T) {
client := clientsetfake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")})
stopCh := make(chan struct{})
defer close(stopCh)
_, err := New(client,
informerFactory,
NewPodInformer(client, 0),
profile.NewRecorderFactory(eventBroadcaster),
stopCh,
WithPodInitialBackoffSeconds(1),
WithPodMaxBackoffSeconds(10),
)
if err != nil {
t.Fatalf("Failed to create scheduler: %v", err)
invalidRegistry := map[string]framework.PluginFactory{
defaultbinder.Name: defaultbinder.New,
}
// Test case for when a plugin name in frameworkOutOfTreeRegistry already exist in defaultRegistry.
fakeFrameworkPluginName := ""
for name := range frameworkplugins.NewInTreeRegistry() {
fakeFrameworkPluginName = name
break
validRegistry := map[string]framework.PluginFactory{
"Foo": defaultbinder.New,
}
registryFake := map[string]framework.PluginFactory{
fakeFrameworkPluginName: func(_ *runtime.Unknown, fh framework.FrameworkHandle) (framework.Plugin, error) {
return nil, nil
cases := []struct {
name string
opts []Option
wantErr string
wantProfiles []string
}{
{
name: "default scheduler",
wantProfiles: []string{"default-scheduler"},
},
{
name: "valid out-of-tree registry",
opts: []Option{WithFrameworkOutOfTreeRegistry(validRegistry)},
wantProfiles: []string{"default-scheduler"},
},
{
name: "repeated plugin name in out-of-tree plugin",
opts: []Option{WithFrameworkOutOfTreeRegistry(invalidRegistry)},
wantProfiles: []string{"default-scheduler"},
wantErr: "a plugin named DefaultBinder already exists",
},
{
name: "multiple profiles",
opts: []Option{WithProfiles(
schedulerapi.KubeSchedulerProfile{SchedulerName: "foo"},
schedulerapi.KubeSchedulerProfile{SchedulerName: "bar"},
)},
wantProfiles: []string{"bar", "foo"},
},
{
name: "Repeated profiles",
opts: []Option{WithProfiles(
schedulerapi.KubeSchedulerProfile{SchedulerName: "foo"},
schedulerapi.KubeSchedulerProfile{SchedulerName: "bar"},
schedulerapi.KubeSchedulerProfile{SchedulerName: "foo"},
)},
wantErr: "duplicate profile with scheduler name \"foo\"",
},
}
_, err = New(client,
informerFactory,
NewPodInformer(client, 0),
profile.NewRecorderFactory(eventBroadcaster),
stopCh,
WithPodInitialBackoffSeconds(1),
WithPodMaxBackoffSeconds(10),
WithFrameworkOutOfTreeRegistry(registryFake),
)
if err == nil {
t.Fatalf("Create scheduler should fail")
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
client := clientsetfake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")})
stopCh := make(chan struct{})
defer close(stopCh)
s, err := New(client,
informerFactory,
NewPodInformer(client, 0),
profile.NewRecorderFactory(eventBroadcaster),
stopCh,
tc.opts...,
)
if len(tc.wantErr) != 0 {
if err == nil || !strings.Contains(err.Error(), tc.wantErr) {
t.Errorf("got error %q, want %q", err, tc.wantErr)
}
return
}
if err != nil {
t.Fatalf("Failed to create scheduler: %v", err)
}
profiles := make([]string, 0, len(s.Profiles))
for name := range s.Profiles {
profiles = append(profiles, name)
}
sort.Strings(profiles)
if diff := cmp.Diff(tc.wantProfiles, profiles); diff != "" {
t.Errorf("unexpected profiles (-want, +got):\n%s", diff)
}
})
}
}
@ -341,6 +382,153 @@ func TestSchedulerScheduleOne(t *testing.T) {
}
}
type fakeNodeSelectorArgs struct {
NodeName string `json:"nodeName"`
}
type fakeNodeSelector struct {
fakeNodeSelectorArgs
}
func newFakeNodeSelector(args *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
pl := &fakeNodeSelector{}
if err := framework.DecodeInto(args, &pl.fakeNodeSelectorArgs); err != nil {
return nil, err
}
return pl, nil
}
func (s *fakeNodeSelector) Name() string {
return "FakeNodeSelector"
}
func (s *fakeNodeSelector) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
if nodeInfo.Node().Name != s.NodeName {
return framework.NewStatus(framework.UnschedulableAndUnresolvable)
}
return nil
}
func TestSchedulerMultipleProfilesScheduling(t *testing.T) {
nodes := []runtime.Object{
st.MakeNode().Name("machine1").UID("machine1").Obj(),
st.MakeNode().Name("machine2").UID("machine2").Obj(),
st.MakeNode().Name("machine3").UID("machine3").Obj(),
}
pods := []*v1.Pod{
st.MakePod().Name("pod1").UID("pod1").SchedulerName("match-machine3").Obj(),
st.MakePod().Name("pod2").UID("pod2").SchedulerName("match-machine2").Obj(),
st.MakePod().Name("pod3").UID("pod3").SchedulerName("match-machine2").Obj(),
st.MakePod().Name("pod4").UID("pod4").SchedulerName("match-machine3").Obj(),
}
wantBindings := map[string]string{
"pod1": "machine3",
"pod2": "machine2",
"pod3": "machine2",
"pod4": "machine3",
}
wantControllers := map[string]string{
"pod1": "match-machine3",
"pod2": "match-machine2",
"pod3": "match-machine2",
"pod4": "match-machine3",
}
// Set up scheduler for the 3 nodes.
// We use a fake filter that only allows one particular node. We create two
// profiles, each with a different node in the filter configuration.
client := clientsetfake.NewSimpleClientset(nodes...)
broadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informerFactory := informers.NewSharedInformerFactory(client, 0)
sched, err := New(client,
informerFactory,
informerFactory.Core().V1().Pods(),
profile.NewRecorderFactory(broadcaster),
ctx.Done(),
WithProfiles(
schedulerapi.KubeSchedulerProfile{SchedulerName: "match-machine2",
Plugins: &schedulerapi.Plugins{
Filter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{{Name: "FakeNodeSelector"}},
Disabled: []schedulerapi.Plugin{{Name: "*"}},
}},
PluginConfig: []schedulerapi.PluginConfig{
{Name: "FakeNodeSelector",
Args: runtime.Unknown{Raw: []byte(`{"nodeName":"machine2"}`)},
},
},
},
schedulerapi.KubeSchedulerProfile{
SchedulerName: "match-machine3",
Plugins: &schedulerapi.Plugins{
Filter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{{Name: "FakeNodeSelector"}},
Disabled: []schedulerapi.Plugin{{Name: "*"}},
}},
PluginConfig: []schedulerapi.PluginConfig{
{Name: "FakeNodeSelector",
Args: runtime.Unknown{Raw: []byte(`{"nodeName":"machine3"}`)},
},
},
},
),
WithFrameworkOutOfTreeRegistry(framework.Registry{
"FakeNodeSelector": newFakeNodeSelector,
}),
)
if err != nil {
t.Fatal(err)
}
// Capture the bindings and events' controllers.
var wg sync.WaitGroup
wg.Add(2 * len(pods))
bindings := make(map[string]string)
client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
if action.GetSubresource() != "binding" {
return false, nil, nil
}
binding := action.(clienttesting.CreateAction).GetObject().(*v1.Binding)
bindings[binding.Name] = binding.Target.Name
wg.Done()
return true, binding, nil
})
controllers := make(map[string]string)
stopFn := broadcaster.StartEventWatcher(func(obj runtime.Object) {
e, ok := obj.(*v1beta1.Event)
if !ok || e.Reason != "Scheduled" {
return
}
controllers[e.Regarding.Name] = e.ReportingController
wg.Done()
})
defer stopFn()
// Run scheduler.
informerFactory.Start(ctx.Done())
go sched.Run(ctx)
// Send pods to be scheduled.
for _, p := range pods {
_, err := client.CoreV1().Pods("").Create(ctx, p, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
}
wg.Wait()
// Verify correct bindings and reporting controllers.
if diff := cmp.Diff(wantBindings, bindings); diff != "" {
t.Errorf("pods were scheduled incorrectly (-want, +got):\n%s", diff)
}
if diff := cmp.Diff(wantControllers, controllers); diff != "" {
t.Errorf("events were reported with wrong controllers (-want, +got):\n%s", diff)
}
}
func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
stop := make(chan struct{})
defer close(stop)

View File

@ -15,6 +15,7 @@ go_library(
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
],
)

View File

@ -21,6 +21,7 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
var zero int64
@ -150,6 +151,18 @@ func (p *PodWrapper) Name(s string) *PodWrapper {
return p
}
// UID sets `s` as the UID of the inner pod.
func (p *PodWrapper) UID(s string) *PodWrapper {
p.SetUID(types.UID(s))
return p
}
// SchedulerName sets `s` as the scheduler name of the inner pod.
func (p *PodWrapper) SchedulerName(s string) *PodWrapper {
p.Spec.SchedulerName = s
return p
}
// Namespace sets `s` as the namespace of the inner pod.
func (p *PodWrapper) Namespace(s string) *PodWrapper {
p.SetNamespace(s)
@ -363,6 +376,12 @@ func (n *NodeWrapper) Name(s string) *NodeWrapper {
return n
}
// UID sets `s` as the UID of the inner pod.
func (n *NodeWrapper) UID(s string) *NodeWrapper {
n.SetUID(types.UID(s))
return n
}
// Label applies a {k,v} label pair to the inner node.
func (n *NodeWrapper) Label(k, v string) *NodeWrapper {
if n.Labels == nil {

View File

@ -49,6 +49,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",

View File

@ -25,12 +25,12 @@ import (
"time"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
@ -595,6 +595,76 @@ func TestMultipleSchedulers(t *testing.T) {
*/
}
func TestMultipleSchedulingProfiles(t *testing.T) {
testCtx := initTest(t, "multi-scheduler", scheduler.WithProfiles(
kubeschedulerconfig.KubeSchedulerProfile{
SchedulerName: "default-scheduler",
},
kubeschedulerconfig.KubeSchedulerProfile{
SchedulerName: "custom-scheduler",
},
))
defer cleanupTest(t, testCtx)
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "node-multi-scheduler-test-node"},
Spec: v1.NodeSpec{Unschedulable: false},
Status: v1.NodeStatus{
Capacity: v1.ResourceList{
v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
},
},
}
if _, err := testCtx.clientSet.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}); err != nil {
t.Fatal(err)
}
evs, err := testCtx.clientSet.CoreV1().Events(testCtx.ns.Name).Watch(testCtx.ctx, metav1.ListOptions{})
if err != nil {
t.Fatal(err)
}
defer evs.Stop()
for _, pc := range []*pausePodConfig{
{Name: "foo", Namespace: testCtx.ns.Name},
{Name: "bar", Namespace: testCtx.ns.Name, SchedulerName: "unknown-scheduler"},
{Name: "baz", Namespace: testCtx.ns.Name, SchedulerName: "default-scheduler"},
{Name: "zet", Namespace: testCtx.ns.Name, SchedulerName: "custom-scheduler"},
} {
if _, err := createPausePod(testCtx.clientSet, initPausePod(testCtx.clientSet, pc)); err != nil {
t.Fatal(err)
}
}
wantProfiles := map[string]string{
"foo": "default-scheduler",
"baz": "default-scheduler",
"zet": "custom-scheduler",
}
gotProfiles := make(map[string]string)
if err := wait.Poll(100*time.Millisecond, 30*time.Second, func() (bool, error) {
var ev watch.Event
select {
case ev = <-evs.ResultChan():
case <-time.After(30 * time.Second):
return false, nil
}
e, ok := ev.Object.(*v1.Event)
if !ok || e.Reason != "Scheduled" {
return false, nil
}
gotProfiles[e.InvolvedObject.Name] = e.ReportingController
return len(gotProfiles) >= len(wantProfiles), nil
}); err != nil {
t.Errorf("waiting for scheduling events: %v", err)
}
if diff := cmp.Diff(wantProfiles, gotProfiles); diff != "" {
t.Errorf("pods scheduled by the wrong profile (-want, +got):\n%s", diff)
}
}
// This test will verify scheduler can work well regardless of whether kubelet is allocatable aware or not.
func TestAllocatable(t *testing.T) {
testCtx := initTest(t, "allocatable")

View File

@ -242,8 +242,8 @@ func initDisruptionController(t *testing.T, testCtx *testContext) *disruption.Di
// initTest initializes a test environment and creates master and scheduler with default
// configuration.
func initTest(t *testing.T, nsPrefix string) *testContext {
return initTestScheduler(t, initTestMaster(t, nsPrefix, nil), true, nil)
func initTest(t *testing.T, nsPrefix string, opts ...scheduler.Option) *testContext {
return initTestSchedulerWithOptions(t, initTestMaster(t, nsPrefix, nil), true, nil, time.Second, opts...)
}
// initTestDisablePreemption initializes a test environment and creates master and scheduler with default