From 30b5faff53bce83c69bd8764491d0738a52fa1a4 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Sun, 25 Oct 2015 13:24:14 -0700 Subject: [PATCH] Pull apart plugin, PodSchedulers, Deleter and Binder --- contrib/mesos/pkg/scheduler/algorithm/fcfs.go | 2 +- .../mesos/pkg/scheduler/algorithm/types.go | 2 +- contrib/mesos/pkg/scheduler/api/doc.go | 19 +++ contrib/mesos/pkg/scheduler/api/mock.go | 98 ++++++++++++ contrib/mesos/pkg/scheduler/api/types.go | 45 ++++++ contrib/mesos/pkg/scheduler/errors/doc.go | 18 +++ contrib/mesos/pkg/scheduler/errors/errors.go | 26 ++++ .../pkg/scheduler/{ => operations}/binder.go | 38 +++-- .../pkg/scheduler/{ => operations}/deleter.go | 35 +++-- .../pkg/scheduler/operations/deleter_test.go | 147 ++++++++++++++++++ contrib/mesos/pkg/scheduler/operations/doc.go | 18 +++ contrib/mesos/pkg/scheduler/plugin.go | 93 ++++------- contrib/mesos/pkg/scheduler/plugin_test.go | 145 +---------------- .../{mock_test.go => scheduler_mock.go} | 98 ++++-------- 14 files changed, 480 insertions(+), 304 deletions(-) create mode 100644 contrib/mesos/pkg/scheduler/api/doc.go create mode 100644 contrib/mesos/pkg/scheduler/api/mock.go create mode 100644 contrib/mesos/pkg/scheduler/api/types.go create mode 100644 contrib/mesos/pkg/scheduler/errors/doc.go create mode 100644 contrib/mesos/pkg/scheduler/errors/errors.go rename contrib/mesos/pkg/scheduler/{ => operations}/binder.go (83%) rename contrib/mesos/pkg/scheduler/{ => operations}/deleter.go (78%) create mode 100644 contrib/mesos/pkg/scheduler/operations/deleter_test.go create mode 100644 contrib/mesos/pkg/scheduler/operations/doc.go rename contrib/mesos/pkg/scheduler/{mock_test.go => scheduler_mock.go} (72%) diff --git a/contrib/mesos/pkg/scheduler/algorithm/fcfs.go b/contrib/mesos/pkg/scheduler/algorithm/fcfs.go index f81a8e47513..77c4891acbf 100644 --- a/contrib/mesos/pkg/scheduler/algorithm/fcfs.go +++ b/contrib/mesos/pkg/scheduler/algorithm/fcfs.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package scheduler +package algorithm import ( "fmt" diff --git a/contrib/mesos/pkg/scheduler/algorithm/types.go b/contrib/mesos/pkg/scheduler/algorithm/types.go index 99f3a7e8b2c..74fc20bfc92 100644 --- a/contrib/mesos/pkg/scheduler/algorithm/types.go +++ b/contrib/mesos/pkg/scheduler/algorithm/types.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package scheduler +package algorithm import ( "errors" diff --git a/contrib/mesos/pkg/scheduler/api/doc.go b/contrib/mesos/pkg/scheduler/api/doc.go new file mode 100644 index 00000000000..0eb289149f5 --- /dev/null +++ b/contrib/mesos/pkg/scheduler/api/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package api contains an abstract scheduler interface, implemented by the +// scheduler plugin and consumed by the scheduler operations. +package api diff --git a/contrib/mesos/pkg/scheduler/api/mock.go b/contrib/mesos/pkg/scheduler/api/mock.go new file mode 100644 index 00000000000..9fc62372023 --- /dev/null +++ b/contrib/mesos/pkg/scheduler/api/mock.go @@ -0,0 +1,98 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package api + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/mock" + "k8s.io/kubernetes/contrib/mesos/pkg/offers" + malgorithm "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/algorithm" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" + "k8s.io/kubernetes/pkg/api" +) + +// @deprecated this is a placeholder for me to test the mock package +func TestNoSlavesYet(t *testing.T) { + obj := &MockScheduler{} + obj.On("SlaveHostNameFor", "foo").Return(nil) + obj.SlaveHostNameFor("foo") + obj.AssertExpectations(t) +} + +// MockScheduler implements SchedulerApi +type MockScheduler struct { + sync.RWMutex + mock.Mock +} + +func (m *MockScheduler) SlaveHostNameFor(id string) (hostName string) { + args := m.Called(id) + x := args.Get(0) + if x != nil { + hostName = x.(string) + } + return +} + +func (m *MockScheduler) Algorithm() (f malgorithm.PodScheduler) { + args := m.Called() + x := args.Get(0) + if x != nil { + f = x.(malgorithm.PodScheduler) + } + return +} + +func (m *MockScheduler) CreatePodTask(ctx api.Context, pod *api.Pod) (task *podtask.T, err error) { + args := m.Called(ctx, pod) + x := args.Get(0) + if x != nil { + task = x.(*podtask.T) + } + err = args.Error(1) + return +} + +func (m *MockScheduler) Offers() (f offers.Registry) { + args := m.Called() + x := args.Get(0) + if x != nil { + f = x.(offers.Registry) + } + return +} + +func (m *MockScheduler) Tasks() (f podtask.Registry) { + args := m.Called() + x := args.Get(0) + if x != nil { + f = x.(podtask.Registry) + } + return +} + +func (m *MockScheduler) KillTask(taskId string) error { + args := m.Called(taskId) + return args.Error(0) +} + +func (m *MockScheduler) LaunchTask(task *podtask.T) error { + args := m.Called(task) + return args.Error(0) +} diff --git a/contrib/mesos/pkg/scheduler/api/types.go b/contrib/mesos/pkg/scheduler/api/types.go new file mode 100644 index 00000000000..05a92aac8fd --- /dev/null +++ b/contrib/mesos/pkg/scheduler/api/types.go @@ -0,0 +1,45 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package api + +import ( + "sync" + + "k8s.io/kubernetes/contrib/mesos/pkg/offers" + malgorithm "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/algorithm" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" + "k8s.io/kubernetes/pkg/api" +) + +// scheduler abstraction to allow for easier unit testing +type SchedulerApi interface { + sync.Locker // synchronize scheduler plugin operations + + malgorithm.SlaveIndex + Algorithm() malgorithm.PodScheduler + Offers() offers.Registry + Tasks() podtask.Registry + + // driver calls + + KillTask(taskId string) error + LaunchTask(*podtask.T) error + + // convenience + + CreatePodTask(api.Context, *api.Pod) (*podtask.T, error) +} diff --git a/contrib/mesos/pkg/scheduler/errors/doc.go b/contrib/mesos/pkg/scheduler/errors/doc.go new file mode 100644 index 00000000000..14a6aab8355 --- /dev/null +++ b/contrib/mesos/pkg/scheduler/errors/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package errors contains all scheduler wide used errors +package errors diff --git a/contrib/mesos/pkg/scheduler/errors/errors.go b/contrib/mesos/pkg/scheduler/errors/errors.go new file mode 100644 index 00000000000..8823ad84d63 --- /dev/null +++ b/contrib/mesos/pkg/scheduler/errors/errors.go @@ -0,0 +1,26 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package errors + +import ( + "errors" +) + +var ( + NoSuchPodErr = errors.New("No such pod exists") + NoSuchTaskErr = errors.New("No such task exists") +) diff --git a/contrib/mesos/pkg/scheduler/binder.go b/contrib/mesos/pkg/scheduler/operations/binder.go similarity index 83% rename from contrib/mesos/pkg/scheduler/binder.go rename to contrib/mesos/pkg/scheduler/operations/binder.go index 7ec4d30fb23..c94a04482e2 100644 --- a/contrib/mesos/pkg/scheduler/binder.go +++ b/contrib/mesos/pkg/scheduler/operations/binder.go @@ -14,24 +14,32 @@ See the License for the specific language governing permissions and limitations under the License. */ -package scheduler +package operations import ( "fmt" "strconv" log "github.com/golang/glog" + schedapi "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/api" + merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" annotation "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/pkg/api" ) -type binder struct { - api schedulerInterface +type Binder struct { + api schedapi.SchedulerApi +} + +func NewBinder(api schedapi.SchedulerApi) *Binder { + return &Binder{ + api: api, + } } // implements binding.Registry, launches the pod-associated-task in mesos -func (b *binder) Bind(binding *api.Binding) error { +func (b *Binder) Bind(binding *api.Binding) error { ctx := api.WithNamespace(api.NewContext(), binding.Namespace) @@ -44,21 +52,21 @@ func (b *binder) Bind(binding *api.Binding) error { b.api.Lock() defer b.api.Unlock() - switch task, state := b.api.tasks().ForPod(podKey); state { + switch task, state := b.api.Tasks().ForPod(podKey); state { case podtask.StatePending: return b.bind(ctx, binding, task) default: // in this case it's likely that the pod has been deleted between Schedule // and Bind calls log.Infof("No pending task for pod %s", podKey) - return noSuchPodErr //TODO(jdef) this error is somewhat misleading since the task could be running?! + return merrors.NoSuchPodErr //TODO(jdef) this error is somewhat misleading since the task could be running?! } } -func (b *binder) rollback(task *podtask.T, err error) error { +func (b *Binder) rollback(task *podtask.T, err error) error { task.Offer.Release() task.Reset() - if err2 := b.api.tasks().Update(task); err2 != nil { + if err2 := b.api.Tasks().Update(task); err2 != nil { log.Errorf("failed to update pod task: %v", err2) } return err @@ -70,7 +78,7 @@ func (b *binder) rollback(task *podtask.T, err error) error { // kubernetes executor on the slave will finally do the binding. This is different from the // upstream scheduler in the sense that the upstream scheduler does the binding and the // kubelet will notice that and launches the pod. -func (b *binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (err error) { +func (b *Binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (err error) { // sanity check: ensure that the task hasAcceptedOffer(), it's possible that between // Schedule() and now that the offer for this task was rescinded or invalidated. // ((we should never see this here)) @@ -80,7 +88,7 @@ func (b *binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (e // By this time, there is a chance that the slave is disconnected. offerId := task.GetOfferId() - if offer, ok := b.api.offers().Get(offerId); !ok || offer.HasExpired() { + if offer, ok := b.api.Offers().Get(offerId); !ok || offer.HasExpired() { // already rescinded or timed out or otherwise invalidated return b.rollback(task, fmt.Errorf("failed prior to launchTask due to expired offer for task %v", task.ID)) } @@ -88,10 +96,10 @@ func (b *binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (e if err = b.prepareTaskForLaunch(ctx, binding.Target.Name, task, offerId); err == nil { log.V(2).Infof("launching task: %q on target %q slave %q for pod \"%v/%v\", cpu %.2f, mem %.2f MB", task.ID, binding.Target.Name, task.Spec.SlaveID, task.Pod.Namespace, task.Pod.Name, task.Spec.CPU, task.Spec.Memory) - if err = b.api.launchTask(task); err == nil { - b.api.offers().Invalidate(offerId) + if err = b.api.LaunchTask(task); err == nil { + b.api.Offers().Invalidate(offerId) task.Set(podtask.Launched) - if err = b.api.tasks().Update(task); err != nil { + if err = b.api.Tasks().Update(task); err != nil { // this should only happen if the task has been removed or has changed status, // which SHOULD NOT HAPPEN as long as we're synchronizing correctly log.Errorf("failed to update task w/ Launched status: %v", err) @@ -103,7 +111,7 @@ func (b *binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (e } //TODO(jdef) unit test this, ensure that task's copy of api.Pod is not modified -func (b *binder) prepareTaskForLaunch(ctx api.Context, machine string, task *podtask.T, offerId string) error { +func (b *Binder) prepareTaskForLaunch(ctx api.Context, machine string, task *podtask.T, offerId string) error { pod := task.Pod // we make an effort here to avoid making changes to the task's copy of the pod, since @@ -142,4 +150,4 @@ func (b *binder) prepareTaskForLaunch(ctx api.Context, machine string, task *pod } task.Spec.Data = data return nil -} \ No newline at end of file +} diff --git a/contrib/mesos/pkg/scheduler/deleter.go b/contrib/mesos/pkg/scheduler/operations/deleter.go similarity index 78% rename from contrib/mesos/pkg/scheduler/deleter.go rename to contrib/mesos/pkg/scheduler/operations/deleter.go index 0badee9d7a4..eb5e954fc7b 100644 --- a/contrib/mesos/pkg/scheduler/deleter.go +++ b/contrib/mesos/pkg/scheduler/operations/deleter.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package scheduler +package operations import ( "time" @@ -22,25 +22,34 @@ import ( log "github.com/golang/glog" "k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/runtime" + schedapi "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/api" + merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" "k8s.io/kubernetes/pkg/api" ) -type deleter struct { - api schedulerInterface +type Deleter struct { + api schedapi.SchedulerApi qr *queuer.Queuer } +func NewDeleter(api schedapi.SchedulerApi, qr *queuer.Queuer) *Deleter { + return &Deleter{ + api: api, + qr: qr, + } +} + // currently monitors for "pod deleted" events, upon which handle() // is invoked. -func (k *deleter) Run(updates <-chan queue.Entry, done <-chan struct{}) { +func (k *Deleter) Run(updates <-chan queue.Entry, done <-chan struct{}) { go runtime.Until(func() { for { entry := <-updates pod := entry.Value().(*queuer.Pod) if entry.Is(queue.DELETE_EVENT) { - if err := k.deleteOne(pod); err != nil { + if err := k.DeleteOne(pod); err != nil { log.Error(err) } } else if !entry.Is(queue.POP_EVENT) { @@ -50,7 +59,7 @@ func (k *deleter) Run(updates <-chan queue.Entry, done <-chan struct{}) { }, 1*time.Second, done) } -func (k *deleter) deleteOne(pod *queuer.Pod) error { +func (k *Deleter) DeleteOne(pod *queuer.Pod) error { ctx := api.WithNamespace(api.NewDefaultContext(), pod.Namespace) podKey, err := podtask.MakePodKey(ctx, pod.Name) if err != nil { @@ -72,10 +81,10 @@ func (k *deleter) deleteOne(pod *queuer.Pod) error { // will abort Bind()ing k.qr.Dequeue(pod.GetUID()) - switch task, state := k.api.tasks().ForPod(podKey); state { + switch task, state := k.api.Tasks().ForPod(podKey); state { case podtask.StateUnknown: log.V(2).Infof("Could not resolve pod '%s' to task id", podKey) - return noSuchPodErr + return merrors.NoSuchPodErr // determine if the task has already been launched to mesos, if not then // cleanup is easier (unregister) since there's no state to sync @@ -87,11 +96,11 @@ func (k *deleter) deleteOne(pod *queuer.Pod) error { task.Reset() task.Set(podtask.Deleted) //TODO(jdef) probably want better handling here - if err := k.api.tasks().Update(task); err != nil { + if err := k.api.Tasks().Update(task); err != nil { return err } } - k.api.tasks().Unregister(task) + k.api.Tasks().Unregister(task) return nil } fallthrough @@ -99,13 +108,13 @@ func (k *deleter) deleteOne(pod *queuer.Pod) error { case podtask.StateRunning: // signal to watchers that the related pod is going down task.Set(podtask.Deleted) - if err := k.api.tasks().Update(task); err != nil { + if err := k.api.Tasks().Update(task); err != nil { log.Errorf("failed to update task w/ Deleted status: %v", err) } - return k.api.killTask(task.ID) + return k.api.KillTask(task.ID) default: log.Infof("cannot kill pod '%s': non-terminal task not found %v", podKey, task.ID) - return noSuchTaskErr + return merrors.NoSuchTaskErr } } diff --git a/contrib/mesos/pkg/scheduler/operations/deleter_test.go b/contrib/mesos/pkg/scheduler/operations/deleter_test.go new file mode 100644 index 00000000000..d3bb81df909 --- /dev/null +++ b/contrib/mesos/pkg/scheduler/operations/deleter_test.go @@ -0,0 +1,147 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operations + +import ( + "testing" + + "k8s.io/kubernetes/pkg/api" + + mesos "github.com/mesos/mesos-go/mesosproto" + "github.com/stretchr/testify/assert" + "k8s.io/kubernetes/contrib/mesos/pkg/queue" + schedapi "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/api" + merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" +) + +func TestDeleteOne_NonexistentPod(t *testing.T) { + assert := assert.New(t) + obj := &schedapi.MockScheduler{} + reg := podtask.NewInMemoryRegistry() + obj.On("Tasks").Return(reg) + + qr := queuer.New(nil) + assert.Equal(0, len(qr.PodQueue.List())) + d := NewDeleter(obj, qr) + pod := &queuer.Pod{Pod: &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: api.NamespaceDefault, + }}} + err := d.DeleteOne(pod) + assert.Equal(err, merrors.NoSuchPodErr) + obj.AssertExpectations(t) +} + +func TestDeleteOne_PendingPod(t *testing.T) { + assert := assert.New(t) + obj := &schedapi.MockScheduler{} + reg := podtask.NewInMemoryRegistry() + obj.On("Tasks").Return(reg) + + pod := &queuer.Pod{Pod: &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + UID: "foo0", + Namespace: api.NamespaceDefault, + }}} + _, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{})) + if err != nil { + t.Fatalf("failed to create task: %v", err) + } + + // preconditions + qr := queuer.New(nil) + qr.PodQueue.Add(pod, queue.ReplaceExisting) + assert.Equal(1, len(qr.PodQueue.List())) + _, found := qr.PodQueue.Get("default/foo") + assert.True(found) + + // exec & post conditions + d := NewDeleter(obj, qr) + err = d.DeleteOne(pod) + assert.Nil(err) + _, found = qr.PodQueue.Get("foo0") + assert.False(found) + assert.Equal(0, len(qr.PodQueue.List())) + obj.AssertExpectations(t) +} + +func TestDeleteOne_Running(t *testing.T) { + assert := assert.New(t) + obj := &schedapi.MockScheduler{} + reg := podtask.NewInMemoryRegistry() + obj.On("Tasks").Return(reg) + + pod := &queuer.Pod{Pod: &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + UID: "foo0", + Namespace: api.NamespaceDefault, + }}} + task, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{})) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + task.Set(podtask.Launched) + err = reg.Update(task) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // preconditions + qr := queuer.New(nil) + qr.PodQueue.Add(pod, queue.ReplaceExisting) + assert.Equal(1, len(qr.PodQueue.List())) + _, found := qr.PodQueue.Get("default/foo") + assert.True(found) + + obj.On("KillTask", task.ID).Return(nil) + + // exec & post conditions + d := NewDeleter(obj, qr) + err = d.DeleteOne(pod) + assert.Nil(err) + _, found = qr.PodQueue.Get("foo0") + assert.False(found) + assert.Equal(0, len(qr.PodQueue.List())) + obj.AssertExpectations(t) +} + +func TestDeleteOne_badPodNaming(t *testing.T) { + assert := assert.New(t) + obj := &schedapi.MockScheduler{} + pod := &queuer.Pod{Pod: &api.Pod{}} + d := NewDeleter(obj, queuer.New(nil)) + + err := d.DeleteOne(pod) + assert.NotNil(err) + + pod.Pod.ObjectMeta.Name = "foo" + err = d.DeleteOne(pod) + assert.NotNil(err) + + pod.Pod.ObjectMeta.Name = "" + pod.Pod.ObjectMeta.Namespace = "bar" + err = d.DeleteOne(pod) + assert.NotNil(err) + + obj.AssertExpectations(t) +} diff --git a/contrib/mesos/pkg/scheduler/operations/doc.go b/contrib/mesos/pkg/scheduler/operations/doc.go new file mode 100644 index 00000000000..8ca6dbba645 --- /dev/null +++ b/contrib/mesos/pkg/scheduler/operations/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package operations implements independent aspects of the scheduler +package operations diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index 8556d1558c3..07e8f426220 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -17,7 +17,6 @@ limitations under the License. package scheduler import ( - "errors" "fmt" "net/http" "sync" @@ -31,6 +30,9 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/runtime" malgorithm "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/algorithm" + schedapi "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/api" + merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" "k8s.io/kubernetes/pkg/api" @@ -51,48 +53,24 @@ const ( Scheduled = "Scheduled" ) -var ( - noSuchPodErr = errors.New("No such pod exists") - noSuchTaskErr = errors.New("No such task exists") -) - -// scheduler abstraction to allow for easier unit testing -type schedulerInterface interface { - sync.Locker // synchronize scheduler plugin operations - - malgorithm.SlaveIndex - algorithm() malgorithm.PodScheduler - offers() offers.Registry - tasks() podtask.Registry - - // driver calls - - killTask(taskId string) error - launchTask(*podtask.T) error - - // convenience - - createPodTask(api.Context, *api.Pod) (*podtask.T, error) -} - type k8smScheduler struct { sync.Mutex internal *KubernetesMesosScheduler } -func (k *k8smScheduler) algorithm() malgorithm.PodScheduler { +func (k *k8smScheduler) Algorithm() malgorithm.PodScheduler { return k.internal } -func (k *k8smScheduler) offers() offers.Registry { +func (k *k8smScheduler) Offers() offers.Registry { return k.internal.offers } -func (k *k8smScheduler) tasks() podtask.Registry { +func (k *k8smScheduler) Tasks() podtask.Registry { return k.internal.taskRegistry } -func (k *k8smScheduler) createPodTask(ctx api.Context, pod *api.Pod) (*podtask.T, error) { +func (k *k8smScheduler) CreatePodTask(ctx api.Context, pod *api.Pod) (*podtask.T, error) { return podtask.New(ctx, "", *pod, k.internal.executor) } @@ -100,13 +78,13 @@ func (k *k8smScheduler) SlaveHostNameFor(id string) string { return k.internal.slaveHostNames.HostName(id) } -func (k *k8smScheduler) killTask(taskId string) error { +func (k *k8smScheduler) KillTask(taskId string) error { killTaskId := mutil.NewTaskID(taskId) _, err := k.internal.driver.KillTask(killTaskId) return err } -func (k *k8smScheduler) launchTask(task *podtask.T) error { +func (k *k8smScheduler) LaunchTask(task *podtask.T) error { // assume caller is holding scheduler lock taskList := []*mesos.TaskInfo{task.BuildTaskInfo()} offerIds := []*mesos.OfferID{task.Offer.Details().Id} @@ -116,7 +94,7 @@ func (k *k8smScheduler) launchTask(task *podtask.T) error { } type kubeScheduler struct { - api schedulerInterface + api schedapi.SchedulerApi podUpdates queue.FIFO } @@ -135,7 +113,7 @@ func (k *kubeScheduler) Schedule(pod *api.Pod, unused algorithm.NodeLister) (str k.api.Lock() defer k.api.Unlock() - switch task, state := k.api.tasks().ForPod(podKey); state { + switch task, state := k.api.Tasks().ForPod(podKey); state { case podtask.StateUnknown: // There's a bit of a potential race here, a pod could have been yielded() and // then before we get *here* it could be deleted. @@ -143,14 +121,14 @@ func (k *kubeScheduler) Schedule(pod *api.Pod, unused algorithm.NodeLister) (str podName, err := cache.MetaNamespaceKeyFunc(pod) if err != nil { log.Warningf("aborting Schedule, unable to understand pod object %+v", pod) - return "", noSuchPodErr + return "", merrors.NoSuchPodErr } if deleted := k.podUpdates.Poll(podName, queue.DELETE_EVENT); deleted { // avoid scheduling a pod that's been deleted between yieldPod() and Schedule() log.Infof("aborting Schedule, pod has been deleted %+v", pod) - return "", noSuchPodErr + return "", merrors.NoSuchPodErr } - return k.doSchedule(k.api.tasks().Register(k.api.createPodTask(ctx, pod))) + return k.doSchedule(k.api.Tasks().Register(k.api.CreatePodTask(ctx, pod))) //TODO(jdef) it's possible that the pod state has diverged from what //we knew previously, we should probably update the task.Pod state here @@ -180,19 +158,19 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) { if task.HasAcceptedOffer() { // verify that the offer is still on the table offerId := task.GetOfferId() - if offer, ok := k.api.offers().Get(offerId); ok && !offer.HasExpired() { + if offer, ok := k.api.Offers().Get(offerId); ok && !offer.HasExpired() { // skip tasks that have already have assigned offers offer = task.Offer } else { task.Offer.Release() task.Reset() - if err = k.api.tasks().Update(task); err != nil { + if err = k.api.Tasks().Update(task); err != nil { return "", err } } } if err == nil && offer == nil { - offer, err = k.api.algorithm().SchedulePod(k.api.offers(), k.api, task) + offer, err = k.api.Algorithm().SchedulePod(k.api.Offers(), k.api, task) } if err != nil { return "", err @@ -205,7 +183,7 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) { if slaveHostName := k.api.SlaveHostNameFor(slaveId); slaveHostName == "" { // not much sense in Release()ing the offer here since its owner died offer.Release() - k.api.offers().Invalidate(details.Id.GetValue()) + k.api.Offers().Invalidate(details.Id.GetValue()) return "", fmt.Errorf("Slave disappeared (%v) while scheduling task %v", slaveId, task.ID) } else { if task.Offer != nil && task.Offer != offer { @@ -213,9 +191,9 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) { } task.Offer = offer - k.api.algorithm().Procurement()(task, details) // TODO(jdef) why is nothing checking the error returned here? + k.api.Algorithm().Procurement()(task, details) // TODO(jdef) why is nothing checking the error returned here? - if err := k.api.tasks().Update(task); err != nil { + if err := k.api.Tasks().Update(task); err != nil { offer.Release() return "", err } @@ -224,7 +202,7 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) { } type errorHandler struct { - api schedulerInterface + api schedapi.SchedulerApi backoff *backoff.Backoff qr *queuer.Queuer } @@ -232,7 +210,7 @@ type errorHandler struct { // implementation of scheduling plugin's Error func; see plugin/pkg/scheduler func (k *errorHandler) handleSchedulingError(pod *api.Pod, schedulingErr error) { - if schedulingErr == noSuchPodErr { + if schedulingErr == merrors.NoSuchPodErr { log.V(2).Infof("Not rescheduling non-existent pod %v", pod.Name) return } @@ -252,7 +230,7 @@ func (k *errorHandler) handleSchedulingError(pod *api.Pod, schedulingErr error) k.api.Lock() defer k.api.Unlock() - switch task, state := k.api.tasks().ForPod(podKey); state { + switch task, state := k.api.Tasks().ForPod(podKey); state { case podtask.StateUnknown: // if we don't have a mapping here any more then someone deleted the pod log.V(2).Infof("Could not resolve pod to task, aborting pod reschdule: %s", podKey) @@ -266,16 +244,16 @@ func (k *errorHandler) handleSchedulingError(pod *api.Pod, schedulingErr error) breakoutEarly := queue.BreakChan(nil) if schedulingErr == malgorithm.NoSuitableOffersErr { log.V(3).Infof("adding backoff breakout handler for pod %v", podKey) - breakoutEarly = queue.BreakChan(k.api.offers().Listen(podKey, func(offer *mesos.Offer) bool { + breakoutEarly = queue.BreakChan(k.api.Offers().Listen(podKey, func(offer *mesos.Offer) bool { k.api.Lock() defer k.api.Unlock() - switch task, state := k.api.tasks().Get(task.ID); state { + switch task, state := k.api.Tasks().Get(task.ID); state { case podtask.StatePending: // Assess fitness of pod with the current offer. The scheduler normally // "backs off" when it can't find an offer that matches up with a pod. // The backoff period for a pod can terminate sooner if an offer becomes // available that matches up. - return !task.Has(podtask.Launched) && k.api.algorithm().FitPredicate()(task, offer, nil) + return !task.Has(podtask.Launched) && k.api.Algorithm().FitPredicate()(task, offer, nil) default: // no point in continuing to check for matching offers return true @@ -310,10 +288,7 @@ func (k *KubernetesMesosScheduler) NewPluginConfig(terminate <-chan struct{}, mu // an ordering (vs interleaving) of operations that's easier to reason about. kapi := &k8smScheduler{internal: k} q := queuer.New(podUpdates) - podDeleter := &deleter{ - api: kapi, - qr: q, - } + podDeleter := operations.NewDeleter(kapi, q) eh := &errorHandler{ api: kapi, backoff: backoff.New(k.schedulerConfig.InitialPodBackoff.Duration, k.schedulerConfig.MaxPodBackoff.Duration), @@ -337,7 +312,7 @@ func (k *KubernetesMesosScheduler) NewPluginConfig(terminate <-chan struct{}, mu api: kapi, podUpdates: podUpdates, }, - Binder: &binder{api: kapi}, + Binder: operations.NewBinder(kapi), NextPod: q.Yield, Error: eh.handleSchedulingError, Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}), @@ -352,10 +327,10 @@ func (k *KubernetesMesosScheduler) NewPluginConfig(terminate <-chan struct{}, mu type PluginConfig struct { *plugin.Config - api schedulerInterface + api schedapi.SchedulerApi client *client.Client qr *queuer.Queuer - deleter *deleter + deleter *operations.Deleter starting chan struct{} // startup latch } @@ -372,10 +347,10 @@ func NewPlugin(c *PluginConfig) PluginInterface { type schedulingPlugin struct { config *plugin.Config - api schedulerInterface + api schedapi.SchedulerApi client *client.Client qr *queuer.Queuer - deleter *deleter + deleter *operations.Deleter starting chan struct{} } @@ -440,7 +415,7 @@ func (s *schedulingPlugin) reconcileTask(t *podtask.T) { if err != nil { if apierrors.IsNotFound(err) { // attempt to delete - if err = s.deleter.deleteOne(&queuer.Pod{Pod: &t.Pod}); err != nil && err != noSuchPodErr && err != noSuchTaskErr { + if err = s.deleter.DeleteOne(&queuer.Pod{Pod: &t.Pod}); err != nil && err != merrors.NoSuchPodErr && err != merrors.NoSuchTaskErr { log.Errorf("failed to delete pod: %v: %v", t.Pod.Name, err) } } else { @@ -467,7 +442,7 @@ func (s *schedulingPlugin) reconcileTask(t *podtask.T) { s.api.Lock() defer s.api.Unlock() - if _, state := s.api.tasks().ForPod(podKey); state != podtask.StateUnknown { + if _, state := s.api.Tasks().ForPod(podKey); state != podtask.StateUnknown { //TODO(jdef) reconcile the task log.Errorf("task already registered for pod %v", pod.Name) return diff --git a/contrib/mesos/pkg/scheduler/plugin_test.go b/contrib/mesos/pkg/scheduler/plugin_test.go index 3ec1aa762cb..d07a2efcb70 100644 --- a/contrib/mesos/pkg/scheduler/plugin_test.go +++ b/contrib/mesos/pkg/scheduler/plugin_test.go @@ -41,13 +41,11 @@ import ( "github.com/stretchr/testify/mock" assertext "k8s.io/kubernetes/contrib/mesos/pkg/assert" "k8s.io/kubernetes/contrib/mesos/pkg/executor/messages" - "k8s.io/kubernetes/contrib/mesos/pkg/queue" malgorithm "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/algorithm" schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" "k8s.io/kubernetes/pkg/util" ) @@ -401,19 +399,6 @@ func (a *EventAssertions) EventWithReason(observer *EventObserver, reason string }, msgAndArgs...) } -type joinableDriver struct { - MockSchedulerDriver - joinFunc func() (mesos.Status, error) -} - -// Join invokes joinFunc if it has been set, otherwise blocks forever -func (m *joinableDriver) Join() (mesos.Status, error) { - if m.joinFunc != nil { - return m.joinFunc() - } - select {} -} - // Create mesos.TaskStatus for a given task func newTaskStatusForTask(task *mesos.TaskInfo, state mesos.TaskState) *mesos.TaskStatus { healthy := state == mesos.TaskState_TASK_RUNNING @@ -824,7 +809,7 @@ func TestPlugin_LifeCycle(t *testing.T) { podKey, _ := podtask.MakePodKey(api.NewDefaultContext(), pod.Name) assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { - t, _ := lt.plugin.api.tasks().ForPod(podKey) + t, _ := lt.plugin.api.Tasks().ForPod(podKey) return t == nil }) @@ -847,131 +832,3 @@ func TestPlugin_LifeCycle(t *testing.T) { time.Sleep(time.Second / 2) failPodFromExecutor(launchedTask.taskInfo) } - -func TestDeleteOne_NonexistentPod(t *testing.T) { - assert := assert.New(t) - obj := &MockScheduler{} - reg := podtask.NewInMemoryRegistry() - obj.On("tasks").Return(reg) - - qr := queuer.New(nil) - assert.Equal(0, len(qr.PodQueue.List())) - d := &deleter{ - api: obj, - qr: qr, - } - pod := &queuer.Pod{Pod: &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - Namespace: api.NamespaceDefault, - }}} - err := d.deleteOne(pod) - assert.Equal(err, noSuchPodErr) - obj.AssertExpectations(t) -} - -func TestDeleteOne_PendingPod(t *testing.T) { - assert := assert.New(t) - obj := &MockScheduler{} - reg := podtask.NewInMemoryRegistry() - obj.On("tasks").Return(reg) - - pod := &queuer.Pod{Pod: &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - UID: "foo0", - Namespace: api.NamespaceDefault, - }}} - _, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{})) - if err != nil { - t.Fatalf("failed to create task: %v", err) - } - - // preconditions - qr := queuer.New(nil) - qr.PodQueue.Add(pod, queue.ReplaceExisting) - assert.Equal(1, len(qr.PodQueue.List())) - _, found := qr.PodQueue.Get("default/foo") - assert.True(found) - - // exec & post conditions - d := &deleter{ - api: obj, - qr: qr, - } - err = d.deleteOne(pod) - assert.Nil(err) - _, found = qr.PodQueue.Get("foo0") - assert.False(found) - assert.Equal(0, len(qr.PodQueue.List())) - obj.AssertExpectations(t) -} - -func TestDeleteOne_Running(t *testing.T) { - assert := assert.New(t) - obj := &MockScheduler{} - reg := podtask.NewInMemoryRegistry() - obj.On("tasks").Return(reg) - - pod := &queuer.Pod{Pod: &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - UID: "foo0", - Namespace: api.NamespaceDefault, - }}} - task, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{})) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - task.Set(podtask.Launched) - err = reg.Update(task) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - // preconditions - qr := queuer.New(nil) - qr.PodQueue.Add(pod, queue.ReplaceExisting) - assert.Equal(1, len(qr.PodQueue.List())) - _, found := qr.PodQueue.Get("default/foo") - assert.True(found) - - obj.On("killTask", task.ID).Return(nil) - - // exec & post conditions - d := &deleter{ - api: obj, - qr: qr, - } - err = d.deleteOne(pod) - assert.Nil(err) - _, found = qr.PodQueue.Get("foo0") - assert.False(found) - assert.Equal(0, len(qr.PodQueue.List())) - obj.AssertExpectations(t) -} - -func TestDeleteOne_badPodNaming(t *testing.T) { - assert := assert.New(t) - obj := &MockScheduler{} - pod := &queuer.Pod{Pod: &api.Pod{}} - d := &deleter{ - api: obj, - qr: queuer.New(nil), - } - - err := d.deleteOne(pod) - assert.NotNil(err) - - pod.Pod.ObjectMeta.Name = "foo" - err = d.deleteOne(pod) - assert.NotNil(err) - - pod.Pod.ObjectMeta.Name = "" - pod.Pod.ObjectMeta.Namespace = "bar" - err = d.deleteOne(pod) - assert.NotNil(err) - - obj.AssertExpectations(t) -} diff --git a/contrib/mesos/pkg/scheduler/mock_test.go b/contrib/mesos/pkg/scheduler/scheduler_mock.go similarity index 72% rename from contrib/mesos/pkg/scheduler/mock_test.go rename to contrib/mesos/pkg/scheduler/scheduler_mock.go index 93a7ed90549..f0b7e6dea43 100644 --- a/contrib/mesos/pkg/scheduler/mock_test.go +++ b/contrib/mesos/pkg/scheduler/scheduler_mock.go @@ -17,81 +17,10 @@ limitations under the License. package scheduler import ( - "sync" - "testing" - mesos "github.com/mesos/mesos-go/mesosproto" "github.com/stretchr/testify/mock" - "k8s.io/kubernetes/contrib/mesos/pkg/offers" - malgorithm "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/algorithm" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" - "k8s.io/kubernetes/pkg/api" ) -// implements SchedulerInterface -type MockScheduler struct { - sync.RWMutex - mock.Mock -} - -func (m *MockScheduler) SlaveHostNameFor(id string) (hostName string) { - args := m.Called(id) - x := args.Get(0) - if x != nil { - hostName = x.(string) - } - return -} -func (m *MockScheduler) algorithm() (f malgorithm.PodScheduler) { - args := m.Called() - x := args.Get(0) - if x != nil { - f = x.(malgorithm.PodScheduler) - } - return -} -func (m *MockScheduler) createPodTask(ctx api.Context, pod *api.Pod) (task *podtask.T, err error) { - args := m.Called(ctx, pod) - x := args.Get(0) - if x != nil { - task = x.(*podtask.T) - } - err = args.Error(1) - return -} -func (m *MockScheduler) offers() (f offers.Registry) { - args := m.Called() - x := args.Get(0) - if x != nil { - f = x.(offers.Registry) - } - return -} -func (m *MockScheduler) tasks() (f podtask.Registry) { - args := m.Called() - x := args.Get(0) - if x != nil { - f = x.(podtask.Registry) - } - return -} -func (m *MockScheduler) killTask(taskId string) error { - args := m.Called(taskId) - return args.Error(0) -} -func (m *MockScheduler) launchTask(task *podtask.T) error { - args := m.Called(task) - return args.Error(0) -} - -// @deprecated this is a placeholder for me to test the mock package -func TestNoSlavesYet(t *testing.T) { - obj := &MockScheduler{} - obj.On("SlaveHostNameFor", "foo").Return(nil) - obj.SlaveHostNameFor("foo") - obj.AssertExpectations(t) -} - /*----------------------------------------------------------------------------- | | this really belongs in the mesos-go package, but that's being updated soon @@ -147,57 +76,84 @@ func (m *MockSchedulerDriver) Init() error { args := m.Called() return args.Error(0) } + func (m *MockSchedulerDriver) Start() (mesos.Status, error) { args := m.Called() return status(args, 0), args.Error(1) } + func (m *MockSchedulerDriver) Stop(b bool) (mesos.Status, error) { args := m.Called(b) return status(args, 0), args.Error(1) } + func (m *MockSchedulerDriver) Abort() (mesos.Status, error) { args := m.Called() return status(args, 0), args.Error(1) } + func (m *MockSchedulerDriver) Join() (mesos.Status, error) { args := m.Called() return status(args, 0), args.Error(1) } + func (m *MockSchedulerDriver) Run() (mesos.Status, error) { args := m.Called() return status(args, 0), args.Error(1) } + func (m *MockSchedulerDriver) RequestResources(r []*mesos.Request) (mesos.Status, error) { args := m.Called(r) return status(args, 0), args.Error(1) } + func (m *MockSchedulerDriver) ReconcileTasks(statuses []*mesos.TaskStatus) (mesos.Status, error) { args := m.Called(statuses) return status(args, 0), args.Error(1) } + func (m *MockSchedulerDriver) LaunchTasks(offerIds []*mesos.OfferID, ti []*mesos.TaskInfo, f *mesos.Filters) (mesos.Status, error) { args := m.Called(offerIds, ti, f) return status(args, 0), args.Error(1) } + func (m *MockSchedulerDriver) KillTask(tid *mesos.TaskID) (mesos.Status, error) { args := m.Called(tid) return status(args, 0), args.Error(1) } + func (m *MockSchedulerDriver) DeclineOffer(oid *mesos.OfferID, f *mesos.Filters) (mesos.Status, error) { args := m.Called(oid, f) return status(args, 0), args.Error(1) } + func (m *MockSchedulerDriver) ReviveOffers() (mesos.Status, error) { args := m.Called() return status(args, 0), args.Error(0) } + func (m *MockSchedulerDriver) SendFrameworkMessage(eid *mesos.ExecutorID, sid *mesos.SlaveID, s string) (mesos.Status, error) { args := m.Called(eid, sid, s) return status(args, 0), args.Error(1) } + func (m *MockSchedulerDriver) Destroy() { m.Called() } + func (m *MockSchedulerDriver) Wait() { m.Called() } + +type joinableDriver struct { + MockSchedulerDriver + joinFunc func() (mesos.Status, error) +} + +// Join invokes joinFunc if it has been set, otherwise blocks forever +func (m *joinableDriver) Join() (mesos.Status, error) { + if m.joinFunc != nil { + return m.joinFunc() + } + select {} +}