diff --git a/contrib/mesos/pkg/scheduler/components/deleter/deleter.go b/contrib/mesos/pkg/scheduler/components/deleter/deleter.go index 94ee4d74086..2ce14ea3c23 100644 --- a/contrib/mesos/pkg/scheduler/components/deleter/deleter.go +++ b/contrib/mesos/pkg/scheduler/components/deleter/deleter.go @@ -36,10 +36,10 @@ type Deleter interface { type deleter struct { sched scheduler.Scheduler - qr *queuer.Queuer + qr queuer.Queuer } -func New(sched scheduler.Scheduler, qr *queuer.Queuer) Deleter { +func New(sched scheduler.Scheduler, qr queuer.Queuer) Deleter { return &deleter{ sched: sched, qr: qr, diff --git a/contrib/mesos/pkg/scheduler/components/deleter/deleter_test.go b/contrib/mesos/pkg/scheduler/components/deleter/deleter_test.go index f9c822073e7..51926712dc5 100644 --- a/contrib/mesos/pkg/scheduler/components/deleter/deleter_test.go +++ b/contrib/mesos/pkg/scheduler/components/deleter/deleter_test.go @@ -14,7 +14,137 @@ See the License for the specific language governing permissions and limitations under the License. */ -package deleter_test +package deleter -// Due to access to private members of Queuer the deleter tests are moved to the -// queuer package. +import ( + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/kubernetes/contrib/mesos/pkg/queue" + types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" + "k8s.io/kubernetes/pkg/api" +) + +func TestDeleteOne_NonexistentPod(t *testing.T) { + assert := assert.New(t) + obj := &types.MockScheduler{} + reg := podtask.NewInMemoryRegistry() + obj.On("Tasks").Return(reg) + + q := queue.NewDelayFIFO() + qr := queuer.New(q, nil) + assert.Equal(0, len(q.List())) + d := New(obj, qr) + pod := &queuer.Pod{Pod: &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: api.NamespaceDefault, + }}} + err := d.DeleteOne(pod) + assert.Equal(err, errors.NoSuchPodErr) + obj.AssertExpectations(t) +} + +func TestDeleteOne_PendingPod(t *testing.T) { + assert := assert.New(t) + obj := &types.MockScheduler{} + reg := podtask.NewInMemoryRegistry() + obj.On("Tasks").Return(reg) + + pod := &queuer.Pod{Pod: &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + UID: "foo0", + Namespace: api.NamespaceDefault, + }}} + _, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", pod.Pod)) + if err != nil { + t.Fatalf("failed to create task: %v", err) + } + + // preconditions + q := queue.NewDelayFIFO() + qr := queuer.New(q, nil) + q.Add(pod, queue.ReplaceExisting) + assert.Equal(1, len(q.List())) + _, found := q.Get("default/foo") + assert.True(found) + + // exec & post conditions + d := New(obj, qr) + err = d.DeleteOne(pod) + assert.Nil(err) + _, found = q.Get("foo0") + assert.False(found) + assert.Equal(0, len(q.List())) + obj.AssertExpectations(t) +} + +func TestDeleteOne_Running(t *testing.T) { + assert := assert.New(t) + obj := &types.MockScheduler{} + reg := podtask.NewInMemoryRegistry() + obj.On("Tasks").Return(reg) + + pod := &queuer.Pod{Pod: &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + UID: "foo0", + Namespace: api.NamespaceDefault, + }}} + task, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", pod.Pod)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + task.Set(podtask.Launched) + err = reg.Update(task) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // preconditions + q := queue.NewDelayFIFO() + qr := queuer.New(q, nil) + q.Add(pod, queue.ReplaceExisting) + assert.Equal(1, len(q.List())) + _, found := q.Get("default/foo") + assert.True(found) + + obj.On("KillTask", task.ID).Return(nil) + + // exec & post conditions + d := New(obj, qr) + err = d.DeleteOne(pod) + assert.Nil(err) + _, found = q.Get("foo0") + assert.False(found) + assert.Equal(0, len(q.List())) + obj.AssertExpectations(t) +} + +func TestDeleteOne_badPodNaming(t *testing.T) { + assert := assert.New(t) + obj := &types.MockScheduler{} + pod := &queuer.Pod{Pod: &api.Pod{}} + q := queue.NewDelayFIFO() + qr := queuer.New(q, nil) + d := New(obj, qr) + + err := d.DeleteOne(pod) + assert.NotNil(err) + + pod.Pod.ObjectMeta.Name = "foo" + err = d.DeleteOne(pod) + assert.NotNil(err) + + pod.Pod.ObjectMeta.Name = "" + pod.Pod.ObjectMeta.Namespace = "bar" + err = d.DeleteOne(pod) + assert.NotNil(err) + + obj.AssertExpectations(t) +} diff --git a/contrib/mesos/pkg/scheduler/components/errorhandler/errorhandler.go b/contrib/mesos/pkg/scheduler/components/errorhandler/errorhandler.go index a756a248618..92153bc6758 100644 --- a/contrib/mesos/pkg/scheduler/components/errorhandler/errorhandler.go +++ b/contrib/mesos/pkg/scheduler/components/errorhandler/errorhandler.go @@ -35,11 +35,11 @@ type ErrorHandler interface { type errorHandler struct { sched scheduler.Scheduler backoff *backoff.Backoff - qr *queuer.Queuer + qr queuer.Queuer newBreakChan func(podKey string) queue.BreakChan } -func New(sched scheduler.Scheduler, backoff *backoff.Backoff, qr *queuer.Queuer, newBC func(podKey string) queue.BreakChan) ErrorHandler { +func New(sched scheduler.Scheduler, backoff *backoff.Backoff, qr queuer.Queuer, newBC func(podKey string) queue.BreakChan) ErrorHandler { return &errorHandler{ sched: sched, backoff: backoff, diff --git a/contrib/mesos/pkg/scheduler/components/podreconciler/podreconciler.go b/contrib/mesos/pkg/scheduler/components/podreconciler/podreconciler.go index 900b6e729e0..a2c8652d510 100644 --- a/contrib/mesos/pkg/scheduler/components/podreconciler/podreconciler.go +++ b/contrib/mesos/pkg/scheduler/components/podreconciler/podreconciler.go @@ -38,11 +38,11 @@ type PodReconciler interface { type podReconciler struct { sched scheduler.Scheduler client *client.Client - qr *queuer.Queuer + qr queuer.Queuer deleter deleter.Deleter } -func New(sched scheduler.Scheduler, client *client.Client, qr *queuer.Queuer, deleter deleter.Deleter) PodReconciler { +func New(sched scheduler.Scheduler, client *client.Client, qr queuer.Queuer, deleter deleter.Deleter) PodReconciler { return &podReconciler{ sched: sched, client: client, diff --git a/contrib/mesos/pkg/scheduler/components/scheduler.go b/contrib/mesos/pkg/scheduler/components/scheduler.go index aac1eb62856..39671b85f83 100644 --- a/contrib/mesos/pkg/scheduler/components/scheduler.go +++ b/contrib/mesos/pkg/scheduler/components/scheduler.go @@ -67,7 +67,7 @@ func New(c *config.Config, fw framework.Framework, ps podschedulers.PodScheduler podUpdates := &podStoreAdapter{queue.NewHistorical(updates)} reflector := cache.NewReflector(lw, &api.Pod{}, podUpdates, 0) - q := queuer.New(podUpdates) + q := queuer.New(queue.NewDelayFIFO(), podUpdates) algorithm := algorithm.New(core, podUpdates, ps) diff --git a/contrib/mesos/pkg/scheduler/queuer/deleter_test.go b/contrib/mesos/pkg/scheduler/queuer/deleter_test.go deleted file mode 100644 index a6731ad0c1a..00000000000 --- a/contrib/mesos/pkg/scheduler/queuer/deleter_test.go +++ /dev/null @@ -1,148 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package queuer - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "k8s.io/kubernetes/contrib/mesos/pkg/queue" - types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/deleter" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" - "k8s.io/kubernetes/pkg/api" -) - -// The following deleter tests are here in queuer package because they require -// private access to the Queuer. - -func TestDeleteOne_NonexistentPod(t *testing.T) { - assert := assert.New(t) - obj := &types.MockScheduler{} - reg := podtask.NewInMemoryRegistry() - obj.On("Tasks").Return(reg) - - qr := New(nil) - assert.Equal(0, len(qr.queue.List())) - d := deleter.New(obj, qr) - pod := &Pod{Pod: &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - Namespace: api.NamespaceDefault, - }}} - err := d.DeleteOne(pod) - assert.Equal(err, errors.NoSuchPodErr) - obj.AssertExpectations(t) -} - -func TestDeleteOne_PendingPod(t *testing.T) { - assert := assert.New(t) - obj := &types.MockScheduler{} - reg := podtask.NewInMemoryRegistry() - obj.On("Tasks").Return(reg) - - pod := &Pod{Pod: &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - UID: "foo0", - Namespace: api.NamespaceDefault, - }}} - _, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", pod.Pod)) - if err != nil { - t.Fatalf("failed to create task: %v", err) - } - - // preconditions - qr := New(nil) - qr.queue.Add(pod, queue.ReplaceExisting) - assert.Equal(1, len(qr.queue.List())) - _, found := qr.queue.Get("default/foo") - assert.True(found) - - // exec & post conditions - d := deleter.New(obj, qr) - err = d.DeleteOne(pod) - assert.Nil(err) - _, found = qr.queue.Get("foo0") - assert.False(found) - assert.Equal(0, len(qr.queue.List())) - obj.AssertExpectations(t) -} - -func TestDeleteOne_Running(t *testing.T) { - assert := assert.New(t) - obj := &types.MockScheduler{} - reg := podtask.NewInMemoryRegistry() - obj.On("Tasks").Return(reg) - - pod := &Pod{Pod: &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - UID: "foo0", - Namespace: api.NamespaceDefault, - }}} - task, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", pod.Pod)) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - task.Set(podtask.Launched) - err = reg.Update(task) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - // preconditions - qr := New(nil) - qr.queue.Add(pod, queue.ReplaceExisting) - assert.Equal(1, len(qr.queue.List())) - _, found := qr.queue.Get("default/foo") - assert.True(found) - - obj.On("KillTask", task.ID).Return(nil) - - // exec & post conditions - d := deleter.New(obj, qr) - err = d.DeleteOne(pod) - assert.Nil(err) - _, found = qr.queue.Get("foo0") - assert.False(found) - assert.Equal(0, len(qr.queue.List())) - obj.AssertExpectations(t) -} - -func TestDeleteOne_badPodNaming(t *testing.T) { - assert := assert.New(t) - obj := &types.MockScheduler{} - pod := &Pod{Pod: &api.Pod{}} - d := deleter.New(obj, New(nil)) - - err := d.DeleteOne(pod) - assert.NotNil(err) - - pod.Pod.ObjectMeta.Name = "foo" - err = d.DeleteOne(pod) - assert.NotNil(err) - - pod.Pod.ObjectMeta.Name = "" - pod.Pod.ObjectMeta.Namespace = "bar" - err = d.DeleteOne(pod) - assert.NotNil(err) - - obj.AssertExpectations(t) -} diff --git a/contrib/mesos/pkg/scheduler/queuer/queuer.go b/contrib/mesos/pkg/scheduler/queuer/queuer.go index f6256570a80..f23620a1987 100644 --- a/contrib/mesos/pkg/scheduler/queuer/queuer.go +++ b/contrib/mesos/pkg/scheduler/queuer/queuer.go @@ -38,7 +38,19 @@ const ( yieldWaitTimeout = 1 * time.Second ) -type Queuer struct { +type Queuer interface { + InstallDebugHandlers(mux *http.ServeMux) + UpdatesAvailable() + Dequeue(id string) + Requeue(pod *Pod) + Reoffer(pod *Pod) + + Yield() *api.Pod + + Run(done <-chan struct{}) +} + +type queuer struct { lock sync.Mutex // shared by condition variables of this struct updates queue.FIFO // queue of pod updates to be processed queue *queue.DelayFIFO // queue of pods to be scheduled @@ -46,9 +58,9 @@ type Queuer struct { unscheduledCond sync.Cond // there are unscheduled pods for processing } -func New(updates queue.FIFO) *Queuer { - q := &Queuer{ - queue: queue.NewDelayFIFO(), +func New(queue *queue.DelayFIFO, updates queue.FIFO) Queuer { + q := &queuer{ + queue: queue, updates: updates, } q.deltaCond.L = &q.lock @@ -56,7 +68,7 @@ func New(updates queue.FIFO) *Queuer { return q } -func (q *Queuer) InstallDebugHandlers(mux *http.ServeMux) { +func (q *queuer) InstallDebugHandlers(mux *http.ServeMux) { mux.HandleFunc("/debug/scheduler/podqueue", func(w http.ResponseWriter, r *http.Request) { for _, x := range q.queue.List() { if _, err := io.WriteString(w, fmt.Sprintf("%+v\n", x)); err != nil { @@ -74,18 +86,18 @@ func (q *Queuer) InstallDebugHandlers(mux *http.ServeMux) { } // signal that there are probably pod updates waiting to be processed -func (q *Queuer) UpdatesAvailable() { +func (q *queuer) UpdatesAvailable() { q.deltaCond.Broadcast() } // delete a pod from the to-be-scheduled queue -func (q *Queuer) Dequeue(id string) { +func (q *queuer) Dequeue(id string) { q.queue.Delete(id) } // re-add a pod to the to-be-scheduled queue, will not overwrite existing pod data (that // may have already changed). -func (q *Queuer) Requeue(pod *Pod) { +func (q *queuer) Requeue(pod *Pod) { // use KeepExisting in case the pod has already been updated (can happen if binding fails // due to constraint voilations); we don't want to overwrite a newer entry with stale data. q.queue.Add(pod, queue.KeepExisting) @@ -93,7 +105,7 @@ func (q *Queuer) Requeue(pod *Pod) { } // same as Requeue but calls podQueue.Offer instead of podQueue.Add -func (q *Queuer) Reoffer(pod *Pod) { +func (q *queuer) Reoffer(pod *Pod) { // use KeepExisting in case the pod has already been updated (can happen if binding fails // due to constraint voilations); we don't want to overwrite a newer entry with stale data. if q.queue.Offer(pod, queue.KeepExisting) { @@ -103,7 +115,7 @@ func (q *Queuer) Reoffer(pod *Pod) { // spawns a go-routine to watch for unscheduled pods and queue them up // for scheduling. returns immediately. -func (q *Queuer) Run(done <-chan struct{}) { +func (q *queuer) Run(done <-chan struct{}) { go runtime.Until(func() { log.Info("Watching for newly created pods") q.lock.Lock() @@ -148,7 +160,7 @@ func (q *Queuer) Run(done <-chan struct{}) { } // implementation of scheduling plugin's NextPod func; see k8s plugin/pkg/scheduler -func (q *Queuer) Yield() *api.Pod { +func (q *queuer) Yield() *api.Pod { log.V(2).Info("attempting to yield a pod") q.lock.Lock() defer q.lock.Unlock()