mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 10:19:50 +00:00
Pull apart plugin, PodSchedulers, Deleter and Binder
This commit is contained in:
parent
b9538dd70c
commit
30b5faff53
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package scheduler
|
||||
package algorithm
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package scheduler
|
||||
package algorithm
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
19
contrib/mesos/pkg/scheduler/api/doc.go
Normal file
19
contrib/mesos/pkg/scheduler/api/doc.go
Normal file
@ -0,0 +1,19 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package api contains an abstract scheduler interface, implemented by the
|
||||
// scheduler plugin and consumed by the scheduler operations.
|
||||
package api
|
98
contrib/mesos/pkg/scheduler/api/mock.go
Normal file
98
contrib/mesos/pkg/scheduler/api/mock.go
Normal file
@ -0,0 +1,98 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package api
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/mock"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/offers"
|
||||
malgorithm "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/algorithm"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
)
|
||||
|
||||
// @deprecated this is a placeholder for me to test the mock package
|
||||
func TestNoSlavesYet(t *testing.T) {
|
||||
obj := &MockScheduler{}
|
||||
obj.On("SlaveHostNameFor", "foo").Return(nil)
|
||||
obj.SlaveHostNameFor("foo")
|
||||
obj.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// MockScheduler implements SchedulerApi
|
||||
type MockScheduler struct {
|
||||
sync.RWMutex
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (m *MockScheduler) SlaveHostNameFor(id string) (hostName string) {
|
||||
args := m.Called(id)
|
||||
x := args.Get(0)
|
||||
if x != nil {
|
||||
hostName = x.(string)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (m *MockScheduler) Algorithm() (f malgorithm.PodScheduler) {
|
||||
args := m.Called()
|
||||
x := args.Get(0)
|
||||
if x != nil {
|
||||
f = x.(malgorithm.PodScheduler)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (m *MockScheduler) CreatePodTask(ctx api.Context, pod *api.Pod) (task *podtask.T, err error) {
|
||||
args := m.Called(ctx, pod)
|
||||
x := args.Get(0)
|
||||
if x != nil {
|
||||
task = x.(*podtask.T)
|
||||
}
|
||||
err = args.Error(1)
|
||||
return
|
||||
}
|
||||
|
||||
func (m *MockScheduler) Offers() (f offers.Registry) {
|
||||
args := m.Called()
|
||||
x := args.Get(0)
|
||||
if x != nil {
|
||||
f = x.(offers.Registry)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (m *MockScheduler) Tasks() (f podtask.Registry) {
|
||||
args := m.Called()
|
||||
x := args.Get(0)
|
||||
if x != nil {
|
||||
f = x.(podtask.Registry)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (m *MockScheduler) KillTask(taskId string) error {
|
||||
args := m.Called(taskId)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (m *MockScheduler) LaunchTask(task *podtask.T) error {
|
||||
args := m.Called(task)
|
||||
return args.Error(0)
|
||||
}
|
45
contrib/mesos/pkg/scheduler/api/types.go
Normal file
45
contrib/mesos/pkg/scheduler/api/types.go
Normal file
@ -0,0 +1,45 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package api
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/offers"
|
||||
malgorithm "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/algorithm"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
)
|
||||
|
||||
// scheduler abstraction to allow for easier unit testing
|
||||
type SchedulerApi interface {
|
||||
sync.Locker // synchronize scheduler plugin operations
|
||||
|
||||
malgorithm.SlaveIndex
|
||||
Algorithm() malgorithm.PodScheduler
|
||||
Offers() offers.Registry
|
||||
Tasks() podtask.Registry
|
||||
|
||||
// driver calls
|
||||
|
||||
KillTask(taskId string) error
|
||||
LaunchTask(*podtask.T) error
|
||||
|
||||
// convenience
|
||||
|
||||
CreatePodTask(api.Context, *api.Pod) (*podtask.T, error)
|
||||
}
|
18
contrib/mesos/pkg/scheduler/errors/doc.go
Normal file
18
contrib/mesos/pkg/scheduler/errors/doc.go
Normal file
@ -0,0 +1,18 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package errors contains all scheduler wide used errors
|
||||
package errors
|
26
contrib/mesos/pkg/scheduler/errors/errors.go
Normal file
26
contrib/mesos/pkg/scheduler/errors/errors.go
Normal file
@ -0,0 +1,26 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package errors
|
||||
|
||||
import (
|
||||
"errors"
|
||||
)
|
||||
|
||||
var (
|
||||
NoSuchPodErr = errors.New("No such pod exists")
|
||||
NoSuchTaskErr = errors.New("No such task exists")
|
||||
)
|
@ -14,24 +14,32 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package scheduler
|
||||
package operations
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
log "github.com/golang/glog"
|
||||
schedapi "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/api"
|
||||
merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
|
||||
annotation "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
)
|
||||
|
||||
type binder struct {
|
||||
api schedulerInterface
|
||||
type Binder struct {
|
||||
api schedapi.SchedulerApi
|
||||
}
|
||||
|
||||
func NewBinder(api schedapi.SchedulerApi) *Binder {
|
||||
return &Binder{
|
||||
api: api,
|
||||
}
|
||||
}
|
||||
|
||||
// implements binding.Registry, launches the pod-associated-task in mesos
|
||||
func (b *binder) Bind(binding *api.Binding) error {
|
||||
func (b *Binder) Bind(binding *api.Binding) error {
|
||||
|
||||
ctx := api.WithNamespace(api.NewContext(), binding.Namespace)
|
||||
|
||||
@ -44,21 +52,21 @@ func (b *binder) Bind(binding *api.Binding) error {
|
||||
b.api.Lock()
|
||||
defer b.api.Unlock()
|
||||
|
||||
switch task, state := b.api.tasks().ForPod(podKey); state {
|
||||
switch task, state := b.api.Tasks().ForPod(podKey); state {
|
||||
case podtask.StatePending:
|
||||
return b.bind(ctx, binding, task)
|
||||
default:
|
||||
// in this case it's likely that the pod has been deleted between Schedule
|
||||
// and Bind calls
|
||||
log.Infof("No pending task for pod %s", podKey)
|
||||
return noSuchPodErr //TODO(jdef) this error is somewhat misleading since the task could be running?!
|
||||
return merrors.NoSuchPodErr //TODO(jdef) this error is somewhat misleading since the task could be running?!
|
||||
}
|
||||
}
|
||||
|
||||
func (b *binder) rollback(task *podtask.T, err error) error {
|
||||
func (b *Binder) rollback(task *podtask.T, err error) error {
|
||||
task.Offer.Release()
|
||||
task.Reset()
|
||||
if err2 := b.api.tasks().Update(task); err2 != nil {
|
||||
if err2 := b.api.Tasks().Update(task); err2 != nil {
|
||||
log.Errorf("failed to update pod task: %v", err2)
|
||||
}
|
||||
return err
|
||||
@ -70,7 +78,7 @@ func (b *binder) rollback(task *podtask.T, err error) error {
|
||||
// kubernetes executor on the slave will finally do the binding. This is different from the
|
||||
// upstream scheduler in the sense that the upstream scheduler does the binding and the
|
||||
// kubelet will notice that and launches the pod.
|
||||
func (b *binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (err error) {
|
||||
func (b *Binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (err error) {
|
||||
// sanity check: ensure that the task hasAcceptedOffer(), it's possible that between
|
||||
// Schedule() and now that the offer for this task was rescinded or invalidated.
|
||||
// ((we should never see this here))
|
||||
@ -80,7 +88,7 @@ func (b *binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (e
|
||||
|
||||
// By this time, there is a chance that the slave is disconnected.
|
||||
offerId := task.GetOfferId()
|
||||
if offer, ok := b.api.offers().Get(offerId); !ok || offer.HasExpired() {
|
||||
if offer, ok := b.api.Offers().Get(offerId); !ok || offer.HasExpired() {
|
||||
// already rescinded or timed out or otherwise invalidated
|
||||
return b.rollback(task, fmt.Errorf("failed prior to launchTask due to expired offer for task %v", task.ID))
|
||||
}
|
||||
@ -88,10 +96,10 @@ func (b *binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (e
|
||||
if err = b.prepareTaskForLaunch(ctx, binding.Target.Name, task, offerId); err == nil {
|
||||
log.V(2).Infof("launching task: %q on target %q slave %q for pod \"%v/%v\", cpu %.2f, mem %.2f MB",
|
||||
task.ID, binding.Target.Name, task.Spec.SlaveID, task.Pod.Namespace, task.Pod.Name, task.Spec.CPU, task.Spec.Memory)
|
||||
if err = b.api.launchTask(task); err == nil {
|
||||
b.api.offers().Invalidate(offerId)
|
||||
if err = b.api.LaunchTask(task); err == nil {
|
||||
b.api.Offers().Invalidate(offerId)
|
||||
task.Set(podtask.Launched)
|
||||
if err = b.api.tasks().Update(task); err != nil {
|
||||
if err = b.api.Tasks().Update(task); err != nil {
|
||||
// this should only happen if the task has been removed or has changed status,
|
||||
// which SHOULD NOT HAPPEN as long as we're synchronizing correctly
|
||||
log.Errorf("failed to update task w/ Launched status: %v", err)
|
||||
@ -103,7 +111,7 @@ func (b *binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (e
|
||||
}
|
||||
|
||||
//TODO(jdef) unit test this, ensure that task's copy of api.Pod is not modified
|
||||
func (b *binder) prepareTaskForLaunch(ctx api.Context, machine string, task *podtask.T, offerId string) error {
|
||||
func (b *Binder) prepareTaskForLaunch(ctx api.Context, machine string, task *podtask.T, offerId string) error {
|
||||
pod := task.Pod
|
||||
|
||||
// we make an effort here to avoid making changes to the task's copy of the pod, since
|
||||
@ -142,4 +150,4 @@ func (b *binder) prepareTaskForLaunch(ctx api.Context, machine string, task *pod
|
||||
}
|
||||
task.Spec.Data = data
|
||||
return nil
|
||||
}
|
||||
}
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package scheduler
|
||||
package operations
|
||||
|
||||
import (
|
||||
"time"
|
||||
@ -22,25 +22,34 @@ import (
|
||||
log "github.com/golang/glog"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
|
||||
schedapi "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/api"
|
||||
merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
)
|
||||
|
||||
type deleter struct {
|
||||
api schedulerInterface
|
||||
type Deleter struct {
|
||||
api schedapi.SchedulerApi
|
||||
qr *queuer.Queuer
|
||||
}
|
||||
|
||||
func NewDeleter(api schedapi.SchedulerApi, qr *queuer.Queuer) *Deleter {
|
||||
return &Deleter{
|
||||
api: api,
|
||||
qr: qr,
|
||||
}
|
||||
}
|
||||
|
||||
// currently monitors for "pod deleted" events, upon which handle()
|
||||
// is invoked.
|
||||
func (k *deleter) Run(updates <-chan queue.Entry, done <-chan struct{}) {
|
||||
func (k *Deleter) Run(updates <-chan queue.Entry, done <-chan struct{}) {
|
||||
go runtime.Until(func() {
|
||||
for {
|
||||
entry := <-updates
|
||||
pod := entry.Value().(*queuer.Pod)
|
||||
if entry.Is(queue.DELETE_EVENT) {
|
||||
if err := k.deleteOne(pod); err != nil {
|
||||
if err := k.DeleteOne(pod); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
} else if !entry.Is(queue.POP_EVENT) {
|
||||
@ -50,7 +59,7 @@ func (k *deleter) Run(updates <-chan queue.Entry, done <-chan struct{}) {
|
||||
}, 1*time.Second, done)
|
||||
}
|
||||
|
||||
func (k *deleter) deleteOne(pod *queuer.Pod) error {
|
||||
func (k *Deleter) DeleteOne(pod *queuer.Pod) error {
|
||||
ctx := api.WithNamespace(api.NewDefaultContext(), pod.Namespace)
|
||||
podKey, err := podtask.MakePodKey(ctx, pod.Name)
|
||||
if err != nil {
|
||||
@ -72,10 +81,10 @@ func (k *deleter) deleteOne(pod *queuer.Pod) error {
|
||||
// will abort Bind()ing
|
||||
k.qr.Dequeue(pod.GetUID())
|
||||
|
||||
switch task, state := k.api.tasks().ForPod(podKey); state {
|
||||
switch task, state := k.api.Tasks().ForPod(podKey); state {
|
||||
case podtask.StateUnknown:
|
||||
log.V(2).Infof("Could not resolve pod '%s' to task id", podKey)
|
||||
return noSuchPodErr
|
||||
return merrors.NoSuchPodErr
|
||||
|
||||
// determine if the task has already been launched to mesos, if not then
|
||||
// cleanup is easier (unregister) since there's no state to sync
|
||||
@ -87,11 +96,11 @@ func (k *deleter) deleteOne(pod *queuer.Pod) error {
|
||||
task.Reset()
|
||||
task.Set(podtask.Deleted)
|
||||
//TODO(jdef) probably want better handling here
|
||||
if err := k.api.tasks().Update(task); err != nil {
|
||||
if err := k.api.Tasks().Update(task); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
k.api.tasks().Unregister(task)
|
||||
k.api.Tasks().Unregister(task)
|
||||
return nil
|
||||
}
|
||||
fallthrough
|
||||
@ -99,13 +108,13 @@ func (k *deleter) deleteOne(pod *queuer.Pod) error {
|
||||
case podtask.StateRunning:
|
||||
// signal to watchers that the related pod is going down
|
||||
task.Set(podtask.Deleted)
|
||||
if err := k.api.tasks().Update(task); err != nil {
|
||||
if err := k.api.Tasks().Update(task); err != nil {
|
||||
log.Errorf("failed to update task w/ Deleted status: %v", err)
|
||||
}
|
||||
return k.api.killTask(task.ID)
|
||||
return k.api.KillTask(task.ID)
|
||||
|
||||
default:
|
||||
log.Infof("cannot kill pod '%s': non-terminal task not found %v", podKey, task.ID)
|
||||
return noSuchTaskErr
|
||||
return merrors.NoSuchTaskErr
|
||||
}
|
||||
}
|
147
contrib/mesos/pkg/scheduler/operations/deleter_test.go
Normal file
147
contrib/mesos/pkg/scheduler/operations/deleter_test.go
Normal file
@ -0,0 +1,147 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package operations
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
|
||||
mesos "github.com/mesos/mesos-go/mesosproto"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
|
||||
schedapi "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/api"
|
||||
merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
|
||||
)
|
||||
|
||||
func TestDeleteOne_NonexistentPod(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
obj := &schedapi.MockScheduler{}
|
||||
reg := podtask.NewInMemoryRegistry()
|
||||
obj.On("Tasks").Return(reg)
|
||||
|
||||
qr := queuer.New(nil)
|
||||
assert.Equal(0, len(qr.PodQueue.List()))
|
||||
d := NewDeleter(obj, qr)
|
||||
pod := &queuer.Pod{Pod: &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: api.NamespaceDefault,
|
||||
}}}
|
||||
err := d.DeleteOne(pod)
|
||||
assert.Equal(err, merrors.NoSuchPodErr)
|
||||
obj.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestDeleteOne_PendingPod(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
obj := &schedapi.MockScheduler{}
|
||||
reg := podtask.NewInMemoryRegistry()
|
||||
obj.On("Tasks").Return(reg)
|
||||
|
||||
pod := &queuer.Pod{Pod: &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
UID: "foo0",
|
||||
Namespace: api.NamespaceDefault,
|
||||
}}}
|
||||
_, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{}))
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create task: %v", err)
|
||||
}
|
||||
|
||||
// preconditions
|
||||
qr := queuer.New(nil)
|
||||
qr.PodQueue.Add(pod, queue.ReplaceExisting)
|
||||
assert.Equal(1, len(qr.PodQueue.List()))
|
||||
_, found := qr.PodQueue.Get("default/foo")
|
||||
assert.True(found)
|
||||
|
||||
// exec & post conditions
|
||||
d := NewDeleter(obj, qr)
|
||||
err = d.DeleteOne(pod)
|
||||
assert.Nil(err)
|
||||
_, found = qr.PodQueue.Get("foo0")
|
||||
assert.False(found)
|
||||
assert.Equal(0, len(qr.PodQueue.List()))
|
||||
obj.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestDeleteOne_Running(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
obj := &schedapi.MockScheduler{}
|
||||
reg := podtask.NewInMemoryRegistry()
|
||||
obj.On("Tasks").Return(reg)
|
||||
|
||||
pod := &queuer.Pod{Pod: &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
UID: "foo0",
|
||||
Namespace: api.NamespaceDefault,
|
||||
}}}
|
||||
task, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{}))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
task.Set(podtask.Launched)
|
||||
err = reg.Update(task)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// preconditions
|
||||
qr := queuer.New(nil)
|
||||
qr.PodQueue.Add(pod, queue.ReplaceExisting)
|
||||
assert.Equal(1, len(qr.PodQueue.List()))
|
||||
_, found := qr.PodQueue.Get("default/foo")
|
||||
assert.True(found)
|
||||
|
||||
obj.On("KillTask", task.ID).Return(nil)
|
||||
|
||||
// exec & post conditions
|
||||
d := NewDeleter(obj, qr)
|
||||
err = d.DeleteOne(pod)
|
||||
assert.Nil(err)
|
||||
_, found = qr.PodQueue.Get("foo0")
|
||||
assert.False(found)
|
||||
assert.Equal(0, len(qr.PodQueue.List()))
|
||||
obj.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestDeleteOne_badPodNaming(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
obj := &schedapi.MockScheduler{}
|
||||
pod := &queuer.Pod{Pod: &api.Pod{}}
|
||||
d := NewDeleter(obj, queuer.New(nil))
|
||||
|
||||
err := d.DeleteOne(pod)
|
||||
assert.NotNil(err)
|
||||
|
||||
pod.Pod.ObjectMeta.Name = "foo"
|
||||
err = d.DeleteOne(pod)
|
||||
assert.NotNil(err)
|
||||
|
||||
pod.Pod.ObjectMeta.Name = ""
|
||||
pod.Pod.ObjectMeta.Namespace = "bar"
|
||||
err = d.DeleteOne(pod)
|
||||
assert.NotNil(err)
|
||||
|
||||
obj.AssertExpectations(t)
|
||||
}
|
18
contrib/mesos/pkg/scheduler/operations/doc.go
Normal file
18
contrib/mesos/pkg/scheduler/operations/doc.go
Normal file
@ -0,0 +1,18 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package operations implements independent aspects of the scheduler
|
||||
package operations
|
@ -17,7 +17,6 @@ limitations under the License.
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
@ -31,6 +30,9 @@ import (
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
|
||||
malgorithm "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/algorithm"
|
||||
schedapi "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/api"
|
||||
merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
@ -51,48 +53,24 @@ const (
|
||||
Scheduled = "Scheduled"
|
||||
)
|
||||
|
||||
var (
|
||||
noSuchPodErr = errors.New("No such pod exists")
|
||||
noSuchTaskErr = errors.New("No such task exists")
|
||||
)
|
||||
|
||||
// scheduler abstraction to allow for easier unit testing
|
||||
type schedulerInterface interface {
|
||||
sync.Locker // synchronize scheduler plugin operations
|
||||
|
||||
malgorithm.SlaveIndex
|
||||
algorithm() malgorithm.PodScheduler
|
||||
offers() offers.Registry
|
||||
tasks() podtask.Registry
|
||||
|
||||
// driver calls
|
||||
|
||||
killTask(taskId string) error
|
||||
launchTask(*podtask.T) error
|
||||
|
||||
// convenience
|
||||
|
||||
createPodTask(api.Context, *api.Pod) (*podtask.T, error)
|
||||
}
|
||||
|
||||
type k8smScheduler struct {
|
||||
sync.Mutex
|
||||
internal *KubernetesMesosScheduler
|
||||
}
|
||||
|
||||
func (k *k8smScheduler) algorithm() malgorithm.PodScheduler {
|
||||
func (k *k8smScheduler) Algorithm() malgorithm.PodScheduler {
|
||||
return k.internal
|
||||
}
|
||||
|
||||
func (k *k8smScheduler) offers() offers.Registry {
|
||||
func (k *k8smScheduler) Offers() offers.Registry {
|
||||
return k.internal.offers
|
||||
}
|
||||
|
||||
func (k *k8smScheduler) tasks() podtask.Registry {
|
||||
func (k *k8smScheduler) Tasks() podtask.Registry {
|
||||
return k.internal.taskRegistry
|
||||
}
|
||||
|
||||
func (k *k8smScheduler) createPodTask(ctx api.Context, pod *api.Pod) (*podtask.T, error) {
|
||||
func (k *k8smScheduler) CreatePodTask(ctx api.Context, pod *api.Pod) (*podtask.T, error) {
|
||||
return podtask.New(ctx, "", *pod, k.internal.executor)
|
||||
}
|
||||
|
||||
@ -100,13 +78,13 @@ func (k *k8smScheduler) SlaveHostNameFor(id string) string {
|
||||
return k.internal.slaveHostNames.HostName(id)
|
||||
}
|
||||
|
||||
func (k *k8smScheduler) killTask(taskId string) error {
|
||||
func (k *k8smScheduler) KillTask(taskId string) error {
|
||||
killTaskId := mutil.NewTaskID(taskId)
|
||||
_, err := k.internal.driver.KillTask(killTaskId)
|
||||
return err
|
||||
}
|
||||
|
||||
func (k *k8smScheduler) launchTask(task *podtask.T) error {
|
||||
func (k *k8smScheduler) LaunchTask(task *podtask.T) error {
|
||||
// assume caller is holding scheduler lock
|
||||
taskList := []*mesos.TaskInfo{task.BuildTaskInfo()}
|
||||
offerIds := []*mesos.OfferID{task.Offer.Details().Id}
|
||||
@ -116,7 +94,7 @@ func (k *k8smScheduler) launchTask(task *podtask.T) error {
|
||||
}
|
||||
|
||||
type kubeScheduler struct {
|
||||
api schedulerInterface
|
||||
api schedapi.SchedulerApi
|
||||
podUpdates queue.FIFO
|
||||
}
|
||||
|
||||
@ -135,7 +113,7 @@ func (k *kubeScheduler) Schedule(pod *api.Pod, unused algorithm.NodeLister) (str
|
||||
k.api.Lock()
|
||||
defer k.api.Unlock()
|
||||
|
||||
switch task, state := k.api.tasks().ForPod(podKey); state {
|
||||
switch task, state := k.api.Tasks().ForPod(podKey); state {
|
||||
case podtask.StateUnknown:
|
||||
// There's a bit of a potential race here, a pod could have been yielded() and
|
||||
// then before we get *here* it could be deleted.
|
||||
@ -143,14 +121,14 @@ func (k *kubeScheduler) Schedule(pod *api.Pod, unused algorithm.NodeLister) (str
|
||||
podName, err := cache.MetaNamespaceKeyFunc(pod)
|
||||
if err != nil {
|
||||
log.Warningf("aborting Schedule, unable to understand pod object %+v", pod)
|
||||
return "", noSuchPodErr
|
||||
return "", merrors.NoSuchPodErr
|
||||
}
|
||||
if deleted := k.podUpdates.Poll(podName, queue.DELETE_EVENT); deleted {
|
||||
// avoid scheduling a pod that's been deleted between yieldPod() and Schedule()
|
||||
log.Infof("aborting Schedule, pod has been deleted %+v", pod)
|
||||
return "", noSuchPodErr
|
||||
return "", merrors.NoSuchPodErr
|
||||
}
|
||||
return k.doSchedule(k.api.tasks().Register(k.api.createPodTask(ctx, pod)))
|
||||
return k.doSchedule(k.api.Tasks().Register(k.api.CreatePodTask(ctx, pod)))
|
||||
|
||||
//TODO(jdef) it's possible that the pod state has diverged from what
|
||||
//we knew previously, we should probably update the task.Pod state here
|
||||
@ -180,19 +158,19 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) {
|
||||
if task.HasAcceptedOffer() {
|
||||
// verify that the offer is still on the table
|
||||
offerId := task.GetOfferId()
|
||||
if offer, ok := k.api.offers().Get(offerId); ok && !offer.HasExpired() {
|
||||
if offer, ok := k.api.Offers().Get(offerId); ok && !offer.HasExpired() {
|
||||
// skip tasks that have already have assigned offers
|
||||
offer = task.Offer
|
||||
} else {
|
||||
task.Offer.Release()
|
||||
task.Reset()
|
||||
if err = k.api.tasks().Update(task); err != nil {
|
||||
if err = k.api.Tasks().Update(task); err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
}
|
||||
if err == nil && offer == nil {
|
||||
offer, err = k.api.algorithm().SchedulePod(k.api.offers(), k.api, task)
|
||||
offer, err = k.api.Algorithm().SchedulePod(k.api.Offers(), k.api, task)
|
||||
}
|
||||
if err != nil {
|
||||
return "", err
|
||||
@ -205,7 +183,7 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) {
|
||||
if slaveHostName := k.api.SlaveHostNameFor(slaveId); slaveHostName == "" {
|
||||
// not much sense in Release()ing the offer here since its owner died
|
||||
offer.Release()
|
||||
k.api.offers().Invalidate(details.Id.GetValue())
|
||||
k.api.Offers().Invalidate(details.Id.GetValue())
|
||||
return "", fmt.Errorf("Slave disappeared (%v) while scheduling task %v", slaveId, task.ID)
|
||||
} else {
|
||||
if task.Offer != nil && task.Offer != offer {
|
||||
@ -213,9 +191,9 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) {
|
||||
}
|
||||
|
||||
task.Offer = offer
|
||||
k.api.algorithm().Procurement()(task, details) // TODO(jdef) why is nothing checking the error returned here?
|
||||
k.api.Algorithm().Procurement()(task, details) // TODO(jdef) why is nothing checking the error returned here?
|
||||
|
||||
if err := k.api.tasks().Update(task); err != nil {
|
||||
if err := k.api.Tasks().Update(task); err != nil {
|
||||
offer.Release()
|
||||
return "", err
|
||||
}
|
||||
@ -224,7 +202,7 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) {
|
||||
}
|
||||
|
||||
type errorHandler struct {
|
||||
api schedulerInterface
|
||||
api schedapi.SchedulerApi
|
||||
backoff *backoff.Backoff
|
||||
qr *queuer.Queuer
|
||||
}
|
||||
@ -232,7 +210,7 @@ type errorHandler struct {
|
||||
// implementation of scheduling plugin's Error func; see plugin/pkg/scheduler
|
||||
func (k *errorHandler) handleSchedulingError(pod *api.Pod, schedulingErr error) {
|
||||
|
||||
if schedulingErr == noSuchPodErr {
|
||||
if schedulingErr == merrors.NoSuchPodErr {
|
||||
log.V(2).Infof("Not rescheduling non-existent pod %v", pod.Name)
|
||||
return
|
||||
}
|
||||
@ -252,7 +230,7 @@ func (k *errorHandler) handleSchedulingError(pod *api.Pod, schedulingErr error)
|
||||
k.api.Lock()
|
||||
defer k.api.Unlock()
|
||||
|
||||
switch task, state := k.api.tasks().ForPod(podKey); state {
|
||||
switch task, state := k.api.Tasks().ForPod(podKey); state {
|
||||
case podtask.StateUnknown:
|
||||
// if we don't have a mapping here any more then someone deleted the pod
|
||||
log.V(2).Infof("Could not resolve pod to task, aborting pod reschdule: %s", podKey)
|
||||
@ -266,16 +244,16 @@ func (k *errorHandler) handleSchedulingError(pod *api.Pod, schedulingErr error)
|
||||
breakoutEarly := queue.BreakChan(nil)
|
||||
if schedulingErr == malgorithm.NoSuitableOffersErr {
|
||||
log.V(3).Infof("adding backoff breakout handler for pod %v", podKey)
|
||||
breakoutEarly = queue.BreakChan(k.api.offers().Listen(podKey, func(offer *mesos.Offer) bool {
|
||||
breakoutEarly = queue.BreakChan(k.api.Offers().Listen(podKey, func(offer *mesos.Offer) bool {
|
||||
k.api.Lock()
|
||||
defer k.api.Unlock()
|
||||
switch task, state := k.api.tasks().Get(task.ID); state {
|
||||
switch task, state := k.api.Tasks().Get(task.ID); state {
|
||||
case podtask.StatePending:
|
||||
// Assess fitness of pod with the current offer. The scheduler normally
|
||||
// "backs off" when it can't find an offer that matches up with a pod.
|
||||
// The backoff period for a pod can terminate sooner if an offer becomes
|
||||
// available that matches up.
|
||||
return !task.Has(podtask.Launched) && k.api.algorithm().FitPredicate()(task, offer, nil)
|
||||
return !task.Has(podtask.Launched) && k.api.Algorithm().FitPredicate()(task, offer, nil)
|
||||
default:
|
||||
// no point in continuing to check for matching offers
|
||||
return true
|
||||
@ -310,10 +288,7 @@ func (k *KubernetesMesosScheduler) NewPluginConfig(terminate <-chan struct{}, mu
|
||||
// an ordering (vs interleaving) of operations that's easier to reason about.
|
||||
kapi := &k8smScheduler{internal: k}
|
||||
q := queuer.New(podUpdates)
|
||||
podDeleter := &deleter{
|
||||
api: kapi,
|
||||
qr: q,
|
||||
}
|
||||
podDeleter := operations.NewDeleter(kapi, q)
|
||||
eh := &errorHandler{
|
||||
api: kapi,
|
||||
backoff: backoff.New(k.schedulerConfig.InitialPodBackoff.Duration, k.schedulerConfig.MaxPodBackoff.Duration),
|
||||
@ -337,7 +312,7 @@ func (k *KubernetesMesosScheduler) NewPluginConfig(terminate <-chan struct{}, mu
|
||||
api: kapi,
|
||||
podUpdates: podUpdates,
|
||||
},
|
||||
Binder: &binder{api: kapi},
|
||||
Binder: operations.NewBinder(kapi),
|
||||
NextPod: q.Yield,
|
||||
Error: eh.handleSchedulingError,
|
||||
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}),
|
||||
@ -352,10 +327,10 @@ func (k *KubernetesMesosScheduler) NewPluginConfig(terminate <-chan struct{}, mu
|
||||
|
||||
type PluginConfig struct {
|
||||
*plugin.Config
|
||||
api schedulerInterface
|
||||
api schedapi.SchedulerApi
|
||||
client *client.Client
|
||||
qr *queuer.Queuer
|
||||
deleter *deleter
|
||||
deleter *operations.Deleter
|
||||
starting chan struct{} // startup latch
|
||||
}
|
||||
|
||||
@ -372,10 +347,10 @@ func NewPlugin(c *PluginConfig) PluginInterface {
|
||||
|
||||
type schedulingPlugin struct {
|
||||
config *plugin.Config
|
||||
api schedulerInterface
|
||||
api schedapi.SchedulerApi
|
||||
client *client.Client
|
||||
qr *queuer.Queuer
|
||||
deleter *deleter
|
||||
deleter *operations.Deleter
|
||||
starting chan struct{}
|
||||
}
|
||||
|
||||
@ -440,7 +415,7 @@ func (s *schedulingPlugin) reconcileTask(t *podtask.T) {
|
||||
if err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
// attempt to delete
|
||||
if err = s.deleter.deleteOne(&queuer.Pod{Pod: &t.Pod}); err != nil && err != noSuchPodErr && err != noSuchTaskErr {
|
||||
if err = s.deleter.DeleteOne(&queuer.Pod{Pod: &t.Pod}); err != nil && err != merrors.NoSuchPodErr && err != merrors.NoSuchTaskErr {
|
||||
log.Errorf("failed to delete pod: %v: %v", t.Pod.Name, err)
|
||||
}
|
||||
} else {
|
||||
@ -467,7 +442,7 @@ func (s *schedulingPlugin) reconcileTask(t *podtask.T) {
|
||||
s.api.Lock()
|
||||
defer s.api.Unlock()
|
||||
|
||||
if _, state := s.api.tasks().ForPod(podKey); state != podtask.StateUnknown {
|
||||
if _, state := s.api.Tasks().ForPod(podKey); state != podtask.StateUnknown {
|
||||
//TODO(jdef) reconcile the task
|
||||
log.Errorf("task already registered for pod %v", pod.Name)
|
||||
return
|
||||
|
@ -41,13 +41,11 @@ import (
|
||||
"github.com/stretchr/testify/mock"
|
||||
assertext "k8s.io/kubernetes/contrib/mesos/pkg/assert"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/executor/messages"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
|
||||
malgorithm "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/algorithm"
|
||||
schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
|
||||
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
)
|
||||
@ -401,19 +399,6 @@ func (a *EventAssertions) EventWithReason(observer *EventObserver, reason string
|
||||
}, msgAndArgs...)
|
||||
}
|
||||
|
||||
type joinableDriver struct {
|
||||
MockSchedulerDriver
|
||||
joinFunc func() (mesos.Status, error)
|
||||
}
|
||||
|
||||
// Join invokes joinFunc if it has been set, otherwise blocks forever
|
||||
func (m *joinableDriver) Join() (mesos.Status, error) {
|
||||
if m.joinFunc != nil {
|
||||
return m.joinFunc()
|
||||
}
|
||||
select {}
|
||||
}
|
||||
|
||||
// Create mesos.TaskStatus for a given task
|
||||
func newTaskStatusForTask(task *mesos.TaskInfo, state mesos.TaskState) *mesos.TaskStatus {
|
||||
healthy := state == mesos.TaskState_TASK_RUNNING
|
||||
@ -824,7 +809,7 @@ func TestPlugin_LifeCycle(t *testing.T) {
|
||||
|
||||
podKey, _ := podtask.MakePodKey(api.NewDefaultContext(), pod.Name)
|
||||
assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
|
||||
t, _ := lt.plugin.api.tasks().ForPod(podKey)
|
||||
t, _ := lt.plugin.api.Tasks().ForPod(podKey)
|
||||
return t == nil
|
||||
})
|
||||
|
||||
@ -847,131 +832,3 @@ func TestPlugin_LifeCycle(t *testing.T) {
|
||||
time.Sleep(time.Second / 2)
|
||||
failPodFromExecutor(launchedTask.taskInfo)
|
||||
}
|
||||
|
||||
func TestDeleteOne_NonexistentPod(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
obj := &MockScheduler{}
|
||||
reg := podtask.NewInMemoryRegistry()
|
||||
obj.On("tasks").Return(reg)
|
||||
|
||||
qr := queuer.New(nil)
|
||||
assert.Equal(0, len(qr.PodQueue.List()))
|
||||
d := &deleter{
|
||||
api: obj,
|
||||
qr: qr,
|
||||
}
|
||||
pod := &queuer.Pod{Pod: &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: api.NamespaceDefault,
|
||||
}}}
|
||||
err := d.deleteOne(pod)
|
||||
assert.Equal(err, noSuchPodErr)
|
||||
obj.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestDeleteOne_PendingPod(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
obj := &MockScheduler{}
|
||||
reg := podtask.NewInMemoryRegistry()
|
||||
obj.On("tasks").Return(reg)
|
||||
|
||||
pod := &queuer.Pod{Pod: &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
UID: "foo0",
|
||||
Namespace: api.NamespaceDefault,
|
||||
}}}
|
||||
_, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{}))
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create task: %v", err)
|
||||
}
|
||||
|
||||
// preconditions
|
||||
qr := queuer.New(nil)
|
||||
qr.PodQueue.Add(pod, queue.ReplaceExisting)
|
||||
assert.Equal(1, len(qr.PodQueue.List()))
|
||||
_, found := qr.PodQueue.Get("default/foo")
|
||||
assert.True(found)
|
||||
|
||||
// exec & post conditions
|
||||
d := &deleter{
|
||||
api: obj,
|
||||
qr: qr,
|
||||
}
|
||||
err = d.deleteOne(pod)
|
||||
assert.Nil(err)
|
||||
_, found = qr.PodQueue.Get("foo0")
|
||||
assert.False(found)
|
||||
assert.Equal(0, len(qr.PodQueue.List()))
|
||||
obj.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestDeleteOne_Running(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
obj := &MockScheduler{}
|
||||
reg := podtask.NewInMemoryRegistry()
|
||||
obj.On("tasks").Return(reg)
|
||||
|
||||
pod := &queuer.Pod{Pod: &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
UID: "foo0",
|
||||
Namespace: api.NamespaceDefault,
|
||||
}}}
|
||||
task, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{}))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
task.Set(podtask.Launched)
|
||||
err = reg.Update(task)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// preconditions
|
||||
qr := queuer.New(nil)
|
||||
qr.PodQueue.Add(pod, queue.ReplaceExisting)
|
||||
assert.Equal(1, len(qr.PodQueue.List()))
|
||||
_, found := qr.PodQueue.Get("default/foo")
|
||||
assert.True(found)
|
||||
|
||||
obj.On("killTask", task.ID).Return(nil)
|
||||
|
||||
// exec & post conditions
|
||||
d := &deleter{
|
||||
api: obj,
|
||||
qr: qr,
|
||||
}
|
||||
err = d.deleteOne(pod)
|
||||
assert.Nil(err)
|
||||
_, found = qr.PodQueue.Get("foo0")
|
||||
assert.False(found)
|
||||
assert.Equal(0, len(qr.PodQueue.List()))
|
||||
obj.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestDeleteOne_badPodNaming(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
obj := &MockScheduler{}
|
||||
pod := &queuer.Pod{Pod: &api.Pod{}}
|
||||
d := &deleter{
|
||||
api: obj,
|
||||
qr: queuer.New(nil),
|
||||
}
|
||||
|
||||
err := d.deleteOne(pod)
|
||||
assert.NotNil(err)
|
||||
|
||||
pod.Pod.ObjectMeta.Name = "foo"
|
||||
err = d.deleteOne(pod)
|
||||
assert.NotNil(err)
|
||||
|
||||
pod.Pod.ObjectMeta.Name = ""
|
||||
pod.Pod.ObjectMeta.Namespace = "bar"
|
||||
err = d.deleteOne(pod)
|
||||
assert.NotNil(err)
|
||||
|
||||
obj.AssertExpectations(t)
|
||||
}
|
||||
|
@ -17,81 +17,10 @@ limitations under the License.
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
mesos "github.com/mesos/mesos-go/mesosproto"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/offers"
|
||||
malgorithm "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/algorithm"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
)
|
||||
|
||||
// implements SchedulerInterface
|
||||
type MockScheduler struct {
|
||||
sync.RWMutex
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (m *MockScheduler) SlaveHostNameFor(id string) (hostName string) {
|
||||
args := m.Called(id)
|
||||
x := args.Get(0)
|
||||
if x != nil {
|
||||
hostName = x.(string)
|
||||
}
|
||||
return
|
||||
}
|
||||
func (m *MockScheduler) algorithm() (f malgorithm.PodScheduler) {
|
||||
args := m.Called()
|
||||
x := args.Get(0)
|
||||
if x != nil {
|
||||
f = x.(malgorithm.PodScheduler)
|
||||
}
|
||||
return
|
||||
}
|
||||
func (m *MockScheduler) createPodTask(ctx api.Context, pod *api.Pod) (task *podtask.T, err error) {
|
||||
args := m.Called(ctx, pod)
|
||||
x := args.Get(0)
|
||||
if x != nil {
|
||||
task = x.(*podtask.T)
|
||||
}
|
||||
err = args.Error(1)
|
||||
return
|
||||
}
|
||||
func (m *MockScheduler) offers() (f offers.Registry) {
|
||||
args := m.Called()
|
||||
x := args.Get(0)
|
||||
if x != nil {
|
||||
f = x.(offers.Registry)
|
||||
}
|
||||
return
|
||||
}
|
||||
func (m *MockScheduler) tasks() (f podtask.Registry) {
|
||||
args := m.Called()
|
||||
x := args.Get(0)
|
||||
if x != nil {
|
||||
f = x.(podtask.Registry)
|
||||
}
|
||||
return
|
||||
}
|
||||
func (m *MockScheduler) killTask(taskId string) error {
|
||||
args := m.Called(taskId)
|
||||
return args.Error(0)
|
||||
}
|
||||
func (m *MockScheduler) launchTask(task *podtask.T) error {
|
||||
args := m.Called(task)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
// @deprecated this is a placeholder for me to test the mock package
|
||||
func TestNoSlavesYet(t *testing.T) {
|
||||
obj := &MockScheduler{}
|
||||
obj.On("SlaveHostNameFor", "foo").Return(nil)
|
||||
obj.SlaveHostNameFor("foo")
|
||||
obj.AssertExpectations(t)
|
||||
}
|
||||
|
||||
/*-----------------------------------------------------------------------------
|
||||
|
|
||||
| this really belongs in the mesos-go package, but that's being updated soon
|
||||
@ -147,57 +76,84 @@ func (m *MockSchedulerDriver) Init() error {
|
||||
args := m.Called()
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (m *MockSchedulerDriver) Start() (mesos.Status, error) {
|
||||
args := m.Called()
|
||||
return status(args, 0), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockSchedulerDriver) Stop(b bool) (mesos.Status, error) {
|
||||
args := m.Called(b)
|
||||
return status(args, 0), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockSchedulerDriver) Abort() (mesos.Status, error) {
|
||||
args := m.Called()
|
||||
return status(args, 0), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockSchedulerDriver) Join() (mesos.Status, error) {
|
||||
args := m.Called()
|
||||
return status(args, 0), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockSchedulerDriver) Run() (mesos.Status, error) {
|
||||
args := m.Called()
|
||||
return status(args, 0), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockSchedulerDriver) RequestResources(r []*mesos.Request) (mesos.Status, error) {
|
||||
args := m.Called(r)
|
||||
return status(args, 0), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockSchedulerDriver) ReconcileTasks(statuses []*mesos.TaskStatus) (mesos.Status, error) {
|
||||
args := m.Called(statuses)
|
||||
return status(args, 0), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockSchedulerDriver) LaunchTasks(offerIds []*mesos.OfferID, ti []*mesos.TaskInfo, f *mesos.Filters) (mesos.Status, error) {
|
||||
args := m.Called(offerIds, ti, f)
|
||||
return status(args, 0), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockSchedulerDriver) KillTask(tid *mesos.TaskID) (mesos.Status, error) {
|
||||
args := m.Called(tid)
|
||||
return status(args, 0), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockSchedulerDriver) DeclineOffer(oid *mesos.OfferID, f *mesos.Filters) (mesos.Status, error) {
|
||||
args := m.Called(oid, f)
|
||||
return status(args, 0), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockSchedulerDriver) ReviveOffers() (mesos.Status, error) {
|
||||
args := m.Called()
|
||||
return status(args, 0), args.Error(0)
|
||||
}
|
||||
|
||||
func (m *MockSchedulerDriver) SendFrameworkMessage(eid *mesos.ExecutorID, sid *mesos.SlaveID, s string) (mesos.Status, error) {
|
||||
args := m.Called(eid, sid, s)
|
||||
return status(args, 0), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockSchedulerDriver) Destroy() {
|
||||
m.Called()
|
||||
}
|
||||
|
||||
func (m *MockSchedulerDriver) Wait() {
|
||||
m.Called()
|
||||
}
|
||||
|
||||
type joinableDriver struct {
|
||||
MockSchedulerDriver
|
||||
joinFunc func() (mesos.Status, error)
|
||||
}
|
||||
|
||||
// Join invokes joinFunc if it has been set, otherwise blocks forever
|
||||
func (m *joinableDriver) Join() (mesos.Status, error) {
|
||||
if m.joinFunc != nil {
|
||||
return m.joinFunc()
|
||||
}
|
||||
select {}
|
||||
}
|
Loading…
Reference in New Issue
Block a user