From 5871b501accfd20722f621fa570d03c65d82dfa7 Mon Sep 17 00:00:00 2001 From: Michelle Au Date: Wed, 8 Nov 2017 13:09:58 -0800 Subject: [PATCH] Add assume/bind volume functions to scheduler --- plugin/pkg/scheduler/scheduler.go | 144 ++++++++++++++++- plugin/pkg/scheduler/scheduler_test.go | 215 ++++++++++++++++++++++++- 2 files changed, 351 insertions(+), 8 deletions(-) diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index 2f63ad5312b..b69aacc8eaf 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -17,17 +17,20 @@ limitations under the License. package scheduler import ( + "fmt" "time" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/record" - "k8s.io/kubernetes/pkg/controller/volume/persistentvolume" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" + "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" "k8s.io/kubernetes/plugin/pkg/scheduler/core" "k8s.io/kubernetes/plugin/pkg/scheduler/metrics" @@ -35,6 +38,7 @@ import ( "k8s.io/kubernetes/plugin/pkg/scheduler/util" "github.com/golang/glog" + "k8s.io/kubernetes/plugin/pkg/scheduler/volumebinder" ) // Binder knows how to write a binding. @@ -131,7 +135,8 @@ type Config struct { // Close this to shut down the scheduler. StopEverything chan struct{} - VolumeBinder persistentvolume.SchedulerVolumeBinder + // VolumeBinder handles PVC/PV binding for the pod. + VolumeBinder *volumebinder.VolumeBinder } // NewFromConfigurator returns a new scheduler that is created entirely by the Configurator. Assumes Create() is implemented. @@ -167,6 +172,10 @@ func (sched *Scheduler) Run() { return } + if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { + go sched.config.VolumeBinder.Run(sched.bindVolumesWorker, sched.config.StopEverything) + } + go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything) } @@ -243,6 +252,114 @@ func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, e return nodeName, err } +// assumeAndBindVolumes will update the volume cache and then asynchronously bind volumes if required. +// +// If volume binding is required, then the bind volumes routine will update the pod to send it back through +// the scheduler. +// +// Otherwise, return nil error and continue to assume the pod. +// +// This function modifies assumed if volume binding is required. +func (sched *Scheduler) assumeAndBindVolumes(assumed *v1.Pod, host string) error { + if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { + allBound, bindingRequired, err := sched.config.VolumeBinder.Binder.AssumePodVolumes(assumed, host) + if err != nil { + sched.config.Error(assumed, err) + sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "AssumePodVolumes failed: %v", err) + sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{ + Type: v1.PodScheduled, + Status: v1.ConditionFalse, + Reason: "SchedulerError", + Message: err.Error(), + }) + return err + } + if !allBound { + err = fmt.Errorf("Volume binding started, waiting for completion") + if bindingRequired { + if sched.config.Ecache != nil { + invalidPredicates := sets.NewString(predicates.CheckVolumeBinding) + sched.config.Ecache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) + } + + // bindVolumesWorker() will update the Pod object to put it back in the scheduler queue + sched.config.VolumeBinder.BindQueue.Add(assumed) + } else { + // We are just waiting for PV controller to finish binding, put it back in the + // scheduler queue + sched.config.Error(assumed, err) + sched.config.Recorder.Eventf(assumed, v1.EventTypeNormal, "FailedScheduling", "%v", err) + sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{ + Type: v1.PodScheduled, + Status: v1.ConditionFalse, + Reason: "VolumeBindingWaiting", + }) + } + return err + } + } + return nil +} + +// bindVolumesWorker() processes pods queued in assumeAndBindVolumes() and tries to +// make the API update for volume binding. +// This function runs forever until the volume BindQueue is closed. +func (sched *Scheduler) bindVolumesWorker() { + workFunc := func() bool { + keyObj, quit := sched.config.VolumeBinder.BindQueue.Get() + if quit { + return true + } + defer sched.config.VolumeBinder.BindQueue.Done(keyObj) + + assumed, ok := keyObj.(*v1.Pod) + if !ok { + glog.V(4).Infof("Object is not a *v1.Pod") + return false + } + + // TODO: add metrics + var reason string + var eventType string + + glog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name) + + // The Pod is always sent back to the scheduler afterwards. + err := sched.config.VolumeBinder.Binder.BindPodVolumes(assumed) + if err != nil { + glog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name, err) + reason = "VolumeBindingFailed" + eventType = v1.EventTypeWarning + } else { + glog.V(4).Infof("Successfully bound volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name) + reason = "VolumeBindingWaiting" + eventType = v1.EventTypeNormal + err = fmt.Errorf("Volume binding started, waiting for completion") + } + + // Always fail scheduling regardless of binding success. + // The Pod needs to be sent back through the scheduler to: + // * Retry volume binding if it fails. + // * Retry volume binding if dynamic provisioning fails. + // * Bind the Pod to the Node once all volumes are bound. + sched.config.Error(assumed, err) + sched.config.Recorder.Eventf(assumed, eventType, "FailedScheduling", "%v", err) + sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{ + Type: v1.PodScheduled, + Status: v1.ConditionFalse, + Reason: reason, + }) + return false + } + + for { + if quit := workFunc(); quit { + glog.V(4).Infof("bindVolumesWorker shutting down") + break + } + } +} + // assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous. // assume modifies `assumed`. func (sched *Scheduler) assume(assumed *v1.Pod, host string) error { @@ -337,15 +454,32 @@ func (sched *Scheduler) scheduleOne() { // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet. // This allows us to keep scheduling without waiting on binding to occur. - assumedPod := *pod + assumedPod := pod.DeepCopy() + + // Assume volumes first before assuming the pod. + // + // If no volumes need binding, then nil is returned, and continue to assume the pod. + // + // Otherwise, error is returned and volume binding is started asynchronously for all of the pod's volumes. + // scheduleOne() returns immediately on error, so that it doesn't continue to assume the pod. + // + // After the asynchronous volume binding updates are made, it will send the pod back through the scheduler for + // subsequent passes until all volumes are fully bound. + // + // This function modifies 'assumedPod' if volume binding is required. + err = sched.assumeAndBindVolumes(assumedPod, suggestedHost) + if err != nil { + return + } + // assume modifies `assumedPod` by setting NodeName=suggestedHost - err = sched.assume(&assumedPod, suggestedHost) + err = sched.assume(assumedPod, suggestedHost) if err != nil { return } // bind the pod to its host asynchronously (we can do this b/c of the assumption step above). go func() { - err := sched.bind(&assumedPod, &v1.Binding{ + err := sched.bind(assumedPod, &v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID}, Target: v1.ObjectReference{ Kind: "Node", diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index 3ef8f84700a..e08397723f7 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -29,15 +29,18 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" clientcache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/api/legacyscheme" + "k8s.io/kubernetes/pkg/controller/volume/persistentvolume" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/plugin/pkg/scheduler/core" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing" "k8s.io/kubernetes/plugin/pkg/scheduler/util" + "k8s.io/kubernetes/plugin/pkg/scheduler/volumebinder" ) type fakeBinder struct { @@ -420,7 +423,7 @@ func TestSchedulerErrorWithLongBinding(t *testing.T) { func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, pod *v1.Pod, node *v1.Node) (*Scheduler, chan *v1.Binding, chan error) { - scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, nodeLister, predicateMap) + scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, nodeLister, predicateMap, nil) queuedPodStore.Add(pod) // queuedPodStore: [foo:8080] @@ -495,7 +498,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { predicates.NewInsufficientResourceError(v1.ResourceMemory, 500, 0, 100), } } - scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, nodeLister, predicateMap) + scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, nodeLister, predicateMap, nil) queuedPodStore.Add(podWithTooBigResourceRequests) scheduler.scheduleOne() @@ -519,7 +522,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { // queuedPodStore: pods queued before processing. // scache: scheduler cache that might contain assumed pods. -func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate) (*Scheduler, chan *v1.Binding, chan error) { +func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, recorder record.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) { algo := core.NewGenericScheduler( scache, nil, @@ -553,6 +556,10 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache. }, } + if recorder != nil { + configurator.Config.Recorder = recorder + } + sched, _ := NewFromConfigurator(configurator, nil...) return sched, bindingChan, errChan @@ -600,3 +607,205 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc return sched, bindingChan } + +func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBinder, stop <-chan struct{}, broadcaster record.EventBroadcaster) (*Scheduler, chan *v1.Binding, chan error) { + testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}} + nodeLister := schedulertesting.FakeNodeLister([]*v1.Node{&testNode}) + queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) + queuedPodStore.Add(podWithID("foo", "")) + scache := schedulercache.New(10*time.Minute, stop) + scache.AddNode(&testNode) + + predicateMap := map[string]algorithm.FitPredicate{ + "VolumeBindingChecker": predicates.NewVolumeBindingPredicate(fakeVolumeBinder), + } + + recorder := broadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"}) + s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, nodeLister, predicateMap, recorder) + s.config.VolumeBinder = fakeVolumeBinder + return s, bindingChan, errChan +} + +// This is a workaround because golint complains that errors cannot +// end with punctuation. However, the real predicate error message does +// end with a period. +func makePredicateError(failReason string) error { + s := fmt.Sprintf("0/1 nodes are available: %v.", failReason) + return fmt.Errorf(s) +} + +func TestSchedulerWithVolumeBinding(t *testing.T) { + findErr := fmt.Errorf("find err") + assumeErr := fmt.Errorf("assume err") + bindErr := fmt.Errorf("bind err") + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(t.Logf).Stop() + + // This can be small because we wait for pod to finish scheduling first + chanTimeout := 2 * time.Second + + utilfeature.DefaultFeatureGate.Set("VolumeScheduling=true") + defer utilfeature.DefaultFeatureGate.Set("VolumeScheduling=false") + + table := map[string]struct { + expectError error + expectPodBind *v1.Binding + expectAssumeCalled bool + expectBindCalled bool + eventReason string + volumeBinderConfig *persistentvolume.FakeVolumeBinderConfig + }{ + "all-bound": { + volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{ + AllBound: true, + FindUnboundSatsified: true, + FindBoundSatsified: true, + }, + expectAssumeCalled: true, + expectPodBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Target: v1.ObjectReference{Kind: "Node", Name: "machine1"}}, + eventReason: "Scheduled", + }, + "bound,invalid-pv-affinity": { + volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{ + AllBound: true, + FindUnboundSatsified: true, + FindBoundSatsified: false, + }, + eventReason: "FailedScheduling", + expectError: makePredicateError("1 VolumeNodeAffinityConflict"), + }, + "unbound,no-matches": { + volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{ + FindUnboundSatsified: false, + FindBoundSatsified: true, + }, + eventReason: "FailedScheduling", + expectError: makePredicateError("1 VolumeBindingNoMatch"), + }, + "bound-and-unbound-unsatisfied": { + volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{ + FindUnboundSatsified: false, + FindBoundSatsified: false, + }, + eventReason: "FailedScheduling", + expectError: makePredicateError("1 VolumeBindingNoMatch, 1 VolumeNodeAffinityConflict"), + }, + "unbound,found-matches": { + volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{ + FindUnboundSatsified: true, + FindBoundSatsified: true, + AssumeBindingRequired: true, + }, + expectAssumeCalled: true, + expectBindCalled: true, + eventReason: "FailedScheduling", + expectError: fmt.Errorf("Volume binding started, waiting for completion"), + }, + "unbound,found-matches,already-bound": { + volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{ + FindUnboundSatsified: true, + FindBoundSatsified: true, + AssumeBindingRequired: false, + }, + expectAssumeCalled: true, + expectBindCalled: false, + eventReason: "FailedScheduling", + expectError: fmt.Errorf("Volume binding started, waiting for completion"), + }, + "predicate-error": { + volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{ + FindErr: findErr, + }, + eventReason: "FailedScheduling", + expectError: findErr, + }, + "assume-error": { + volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{ + FindUnboundSatsified: true, + FindBoundSatsified: true, + AssumeErr: assumeErr, + }, + expectAssumeCalled: true, + eventReason: "FailedScheduling", + expectError: assumeErr, + }, + "bind-error": { + volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{ + FindUnboundSatsified: true, + FindBoundSatsified: true, + AssumeBindingRequired: true, + BindErr: bindErr, + }, + expectAssumeCalled: true, + expectBindCalled: true, + eventReason: "FailedScheduling", + expectError: bindErr, + }, + } + + for name, item := range table { + stop := make(chan struct{}) + fakeVolumeBinder := volumebinder.NewFakeVolumeBinder(item.volumeBinderConfig) + internalBinder, ok := fakeVolumeBinder.Binder.(*persistentvolume.FakeVolumeBinder) + if !ok { + t.Fatalf("Failed to get fake volume binder") + } + s, bindingChan, errChan := setupTestSchedulerWithVolumeBinding(fakeVolumeBinder, stop, eventBroadcaster) + + eventChan := make(chan struct{}) + events := eventBroadcaster.StartEventWatcher(func(e *v1.Event) { + if e, a := item.eventReason, e.Reason; e != a { + t.Errorf("%v: expected %v, got %v", name, e, a) + } + close(eventChan) + }) + + go fakeVolumeBinder.Run(s.bindVolumesWorker, stop) + + s.scheduleOne() + + // Wait for pod to succeed or fail scheduling + select { + case <-eventChan: + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("%v: scheduling timeout after %v", name, wait.ForeverTestTimeout) + } + + events.Stop() + + // Wait for scheduling to return an error + select { + case err := <-errChan: + if item.expectError == nil || !reflect.DeepEqual(item.expectError.Error(), err.Error()) { + t.Errorf("%v: \n err \nWANT=%+v,\nGOT=%+v", name, item.expectError, err) + } + case <-time.After(chanTimeout): + if item.expectError != nil { + t.Errorf("%v: did not receive error after %v", name, chanTimeout) + } + } + + // Wait for pod to succeed binding + select { + case b := <-bindingChan: + if !reflect.DeepEqual(item.expectPodBind, b) { + t.Errorf("%v: \n err \nWANT=%+v,\nGOT=%+v", name, item.expectPodBind, b) + } + case <-time.After(chanTimeout): + if item.expectPodBind != nil { + t.Errorf("%v: did not receive pod binding after %v", name, chanTimeout) + } + } + + if item.expectAssumeCalled != internalBinder.AssumeCalled { + t.Errorf("%v: expectedAssumeCall %v", name, item.expectAssumeCalled) + } + + if item.expectBindCalled != internalBinder.BindCalled { + t.Errorf("%v: expectedBindCall %v", name, item.expectBindCalled) + } + + close(stop) + } +}