scheduler/volumebinding: move all volume binding logic into VolumeBinding plugin

This commit is contained in:
Yecheng Fu
2020-05-11 13:21:43 +08:00
parent ba3bf32300
commit c14b749521
17 changed files with 602 additions and 121 deletions

View File

@@ -41,12 +41,14 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
clientsetfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
clienttesting "k8s.io/client-go/testing"
clientcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
"k8s.io/kubernetes/pkg/controller/volume/scheduling"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/core"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
@@ -348,7 +350,6 @@ func TestSchedulerScheduleOne(t *testing.T) {
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName),
},
},
VolumeBinder: scheduling.NewFakeVolumeBinder(&scheduling.FakeVolumeBinderConfig{AllBound: true}),
}
called := make(chan struct{})
stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
@@ -671,7 +672,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache internalcache.Cache,
informerFactory informers.SharedInformerFactory, stop chan struct{}, pod *v1.Pod, node *v1.Node, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, nil, nil, fns...)
scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, nil, fns...)
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
@@ -756,7 +757,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
st.RegisterPluginAsExtensions(noderesources.FitName, noderesources.NewFit, "Filter", "PreFilter"),
}
scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, nil, nil, fns...)
scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, nil, fns...)
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
@@ -783,11 +784,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
// queuedPodStore: pods queued before processing.
// scache: scheduler cache that might contain assumed pods.
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, broadcaster events.EventBroadcaster, volumeBinder scheduling.SchedulerVolumeBinder, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
if volumeBinder == nil {
// Create default volume binder if it didn't set.
volumeBinder = scheduling.NewFakeVolumeBinder(&scheduling.FakeVolumeBinderConfig{AllBound: true})
}
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, broadcaster events.EventBroadcaster, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
bindingChan := make(chan *v1.Binding, 1)
client := clientsetfake.NewSimpleClientset()
client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
@@ -799,7 +796,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
return true, b, nil
})
fwk, _ := st.NewFramework(fns, framework.WithClientSet(client), framework.WithVolumeBinder(volumeBinder))
fwk, _ := st.NewFramework(fns, framework.WithClientSet(client))
prof := &profile.Profile{
Framework: fwk,
Recorder: &events.FakeRecorder{},
@@ -835,7 +832,6 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
Profiles: profiles,
podConditionUpdater: fakePodConditionUpdater{},
podPreemptor: fakePodPreemptor{},
VolumeBinder: volumeBinder,
}
return sched, bindingChan, errChan
@@ -858,12 +854,13 @@ func setupTestSchedulerWithVolumeBinding(volumeBinder scheduling.SchedulerVolume
fns := []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
st.RegisterFilterPlugin(volumebinding.Name, volumebinding.New),
st.RegisterPluginAsExtensions(volumebinding.Name, func(plArgs runtime.Object, handle framework.FrameworkHandle) (framework.Plugin, error) {
return &volumebinding.VolumeBinding{Binder: volumeBinder}, nil
}, "Filter", "Reserve", "Unreserve", "PreBind", "PostBind"),
}
s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, broadcaster, volumeBinder, fns...)
s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, broadcaster, fns...)
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
s.VolumeBinder = volumeBinder
return s, bindingChan, errChan
}
@@ -952,7 +949,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
},
expectAssumeCalled: true,
eventReason: "FailedScheduling",
expectError: assumeErr,
expectError: fmt.Errorf("error while running %q reserve plugin for pod %q: %v", volumebinding.Name, "foo", assumeErr),
},
{
name: "bind error",
@@ -962,7 +959,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
expectAssumeCalled: true,
expectBindCalled: true,
eventReason: "FailedScheduling",
expectError: bindErr,
expectError: fmt.Errorf("error while running %q prebind plugin for pod %q: %v", volumebinding.Name, "foo", bindErr),
},
}
@@ -1200,3 +1197,122 @@ func TestSchedulerBinding(t *testing.T) {
})
}
}
// TestInjectingPluginConfigForVolumeBinding tests injecting
// KubeSchedulerConfiguration.BindTimeoutSeconds as args for VolumeBinding if
// no plugin args is configured for it.
// TODO remove when KubeSchedulerConfiguration.BindTimeoutSeconds is eliminated
func TestInjectingPluginConfigForVolumeBinding(t *testing.T) {
defaultPluginConfigs := []config.PluginConfig{
{
Name: "VolumeBinding",
Args: &config.VolumeBindingArgs{
BindTimeoutSeconds: 600,
},
},
}
tests := []struct {
name string
opts []Option
wantPluginConfig []config.PluginConfig
}{
{
name: "default with provider",
wantPluginConfig: defaultPluginConfigs,
},
{
name: "default with policy",
opts: []Option{
WithAlgorithmSource(schedulerapi.SchedulerAlgorithmSource{
Policy: &config.SchedulerPolicySource{},
}),
},
wantPluginConfig: defaultPluginConfigs,
},
{
name: "customize BindTimeoutSeconds with provider",
opts: []Option{
WithBindTimeoutSeconds(100),
},
wantPluginConfig: []config.PluginConfig{
{
Name: "VolumeBinding",
Args: &config.VolumeBindingArgs{
BindTimeoutSeconds: 100,
},
},
},
},
{
name: "customize BindTimeoutSeconds with policy",
opts: []Option{
WithAlgorithmSource(schedulerapi.SchedulerAlgorithmSource{
Policy: &config.SchedulerPolicySource{},
}),
WithBindTimeoutSeconds(100),
},
wantPluginConfig: []config.PluginConfig{
{
Name: "VolumeBinding",
Args: &config.VolumeBindingArgs{
BindTimeoutSeconds: 100,
},
},
},
},
{
name: "PluginConfig is preferred",
opts: []Option{
WithBindTimeoutSeconds(100),
WithProfiles(config.KubeSchedulerProfile{
SchedulerName: v1.DefaultSchedulerName,
PluginConfig: []config.PluginConfig{
{
Name: "VolumeBinding",
Args: &config.VolumeBindingArgs{
BindTimeoutSeconds: 200,
},
},
},
}),
},
wantPluginConfig: []config.PluginConfig{
{
Name: "VolumeBinding",
Args: &config.VolumeBindingArgs{
BindTimeoutSeconds: 200,
},
},
},
},
}
for _, tt := range tests {
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
recorderFactory := profile.NewRecorderFactory(events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")}))
opts := append(tt.opts, WithBuildFrameworkCapturer(func(p config.KubeSchedulerProfile) {
if p.SchedulerName != v1.DefaultSchedulerName {
t.Errorf("unexpected scheduler name (want %q, got %q)", v1.DefaultSchedulerName, p.SchedulerName)
}
if diff := cmp.Diff(tt.wantPluginConfig, p.PluginConfig); diff != "" {
t.Errorf("unexpected plugins diff (-want, +got): %s", diff)
}
}))
_, err := New(
client,
informerFactory,
informerFactory.Core().V1().Pods(),
recorderFactory,
make(chan struct{}),
opts...,
)
if err != nil {
t.Fatalf("Error constructing: %v", err)
}
}
}