Break dependency circuit between deleter and queuer

This commit is contained in:
Dr. Stefan Schimanski
2015-11-09 11:53:15 +01:00
parent d35cb3b211
commit 91c5c2d3b6
7 changed files with 163 additions and 169 deletions

View File

@@ -36,10 +36,10 @@ type Deleter interface {
type deleter struct { type deleter struct {
sched scheduler.Scheduler sched scheduler.Scheduler
qr *queuer.Queuer qr queuer.Queuer
} }
func New(sched scheduler.Scheduler, qr *queuer.Queuer) Deleter { func New(sched scheduler.Scheduler, qr queuer.Queuer) Deleter {
return &deleter{ return &deleter{
sched: sched, sched: sched,
qr: qr, qr: qr,

View File

@@ -14,7 +14,137 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package deleter_test package deleter
// Due to access to private members of Queuer the deleter tests are moved to the import (
// queuer package. "testing"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
"k8s.io/kubernetes/pkg/api"
)
func TestDeleteOne_NonexistentPod(t *testing.T) {
assert := assert.New(t)
obj := &types.MockScheduler{}
reg := podtask.NewInMemoryRegistry()
obj.On("Tasks").Return(reg)
q := queue.NewDelayFIFO()
qr := queuer.New(q, nil)
assert.Equal(0, len(q.List()))
d := New(obj, qr)
pod := &queuer.Pod{Pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: api.NamespaceDefault,
}}}
err := d.DeleteOne(pod)
assert.Equal(err, errors.NoSuchPodErr)
obj.AssertExpectations(t)
}
func TestDeleteOne_PendingPod(t *testing.T) {
assert := assert.New(t)
obj := &types.MockScheduler{}
reg := podtask.NewInMemoryRegistry()
obj.On("Tasks").Return(reg)
pod := &queuer.Pod{Pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
UID: "foo0",
Namespace: api.NamespaceDefault,
}}}
_, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", pod.Pod))
if err != nil {
t.Fatalf("failed to create task: %v", err)
}
// preconditions
q := queue.NewDelayFIFO()
qr := queuer.New(q, nil)
q.Add(pod, queue.ReplaceExisting)
assert.Equal(1, len(q.List()))
_, found := q.Get("default/foo")
assert.True(found)
// exec & post conditions
d := New(obj, qr)
err = d.DeleteOne(pod)
assert.Nil(err)
_, found = q.Get("foo0")
assert.False(found)
assert.Equal(0, len(q.List()))
obj.AssertExpectations(t)
}
func TestDeleteOne_Running(t *testing.T) {
assert := assert.New(t)
obj := &types.MockScheduler{}
reg := podtask.NewInMemoryRegistry()
obj.On("Tasks").Return(reg)
pod := &queuer.Pod{Pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
UID: "foo0",
Namespace: api.NamespaceDefault,
}}}
task, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", pod.Pod))
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
task.Set(podtask.Launched)
err = reg.Update(task)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// preconditions
q := queue.NewDelayFIFO()
qr := queuer.New(q, nil)
q.Add(pod, queue.ReplaceExisting)
assert.Equal(1, len(q.List()))
_, found := q.Get("default/foo")
assert.True(found)
obj.On("KillTask", task.ID).Return(nil)
// exec & post conditions
d := New(obj, qr)
err = d.DeleteOne(pod)
assert.Nil(err)
_, found = q.Get("foo0")
assert.False(found)
assert.Equal(0, len(q.List()))
obj.AssertExpectations(t)
}
func TestDeleteOne_badPodNaming(t *testing.T) {
assert := assert.New(t)
obj := &types.MockScheduler{}
pod := &queuer.Pod{Pod: &api.Pod{}}
q := queue.NewDelayFIFO()
qr := queuer.New(q, nil)
d := New(obj, qr)
err := d.DeleteOne(pod)
assert.NotNil(err)
pod.Pod.ObjectMeta.Name = "foo"
err = d.DeleteOne(pod)
assert.NotNil(err)
pod.Pod.ObjectMeta.Name = ""
pod.Pod.ObjectMeta.Namespace = "bar"
err = d.DeleteOne(pod)
assert.NotNil(err)
obj.AssertExpectations(t)
}

View File

@@ -35,11 +35,11 @@ type ErrorHandler interface {
type errorHandler struct { type errorHandler struct {
sched scheduler.Scheduler sched scheduler.Scheduler
backoff *backoff.Backoff backoff *backoff.Backoff
qr *queuer.Queuer qr queuer.Queuer
newBreakChan func(podKey string) queue.BreakChan newBreakChan func(podKey string) queue.BreakChan
} }
func New(sched scheduler.Scheduler, backoff *backoff.Backoff, qr *queuer.Queuer, newBC func(podKey string) queue.BreakChan) ErrorHandler { func New(sched scheduler.Scheduler, backoff *backoff.Backoff, qr queuer.Queuer, newBC func(podKey string) queue.BreakChan) ErrorHandler {
return &errorHandler{ return &errorHandler{
sched: sched, sched: sched,
backoff: backoff, backoff: backoff,

View File

@@ -38,11 +38,11 @@ type PodReconciler interface {
type podReconciler struct { type podReconciler struct {
sched scheduler.Scheduler sched scheduler.Scheduler
client *client.Client client *client.Client
qr *queuer.Queuer qr queuer.Queuer
deleter deleter.Deleter deleter deleter.Deleter
} }
func New(sched scheduler.Scheduler, client *client.Client, qr *queuer.Queuer, deleter deleter.Deleter) PodReconciler { func New(sched scheduler.Scheduler, client *client.Client, qr queuer.Queuer, deleter deleter.Deleter) PodReconciler {
return &podReconciler{ return &podReconciler{
sched: sched, sched: sched,
client: client, client: client,

View File

@@ -67,7 +67,7 @@ func New(c *config.Config, fw framework.Framework, ps podschedulers.PodScheduler
podUpdates := &podStoreAdapter{queue.NewHistorical(updates)} podUpdates := &podStoreAdapter{queue.NewHistorical(updates)}
reflector := cache.NewReflector(lw, &api.Pod{}, podUpdates, 0) reflector := cache.NewReflector(lw, &api.Pod{}, podUpdates, 0)
q := queuer.New(podUpdates) q := queuer.New(queue.NewDelayFIFO(), podUpdates)
algorithm := algorithm.New(core, podUpdates, ps) algorithm := algorithm.New(core, podUpdates, ps)

View File

@@ -1,148 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queuer
import (
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/deleter"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/pkg/api"
)
// The following deleter tests are here in queuer package because they require
// private access to the Queuer.
func TestDeleteOne_NonexistentPod(t *testing.T) {
assert := assert.New(t)
obj := &types.MockScheduler{}
reg := podtask.NewInMemoryRegistry()
obj.On("Tasks").Return(reg)
qr := New(nil)
assert.Equal(0, len(qr.queue.List()))
d := deleter.New(obj, qr)
pod := &Pod{Pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: api.NamespaceDefault,
}}}
err := d.DeleteOne(pod)
assert.Equal(err, errors.NoSuchPodErr)
obj.AssertExpectations(t)
}
func TestDeleteOne_PendingPod(t *testing.T) {
assert := assert.New(t)
obj := &types.MockScheduler{}
reg := podtask.NewInMemoryRegistry()
obj.On("Tasks").Return(reg)
pod := &Pod{Pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
UID: "foo0",
Namespace: api.NamespaceDefault,
}}}
_, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", pod.Pod))
if err != nil {
t.Fatalf("failed to create task: %v", err)
}
// preconditions
qr := New(nil)
qr.queue.Add(pod, queue.ReplaceExisting)
assert.Equal(1, len(qr.queue.List()))
_, found := qr.queue.Get("default/foo")
assert.True(found)
// exec & post conditions
d := deleter.New(obj, qr)
err = d.DeleteOne(pod)
assert.Nil(err)
_, found = qr.queue.Get("foo0")
assert.False(found)
assert.Equal(0, len(qr.queue.List()))
obj.AssertExpectations(t)
}
func TestDeleteOne_Running(t *testing.T) {
assert := assert.New(t)
obj := &types.MockScheduler{}
reg := podtask.NewInMemoryRegistry()
obj.On("Tasks").Return(reg)
pod := &Pod{Pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
UID: "foo0",
Namespace: api.NamespaceDefault,
}}}
task, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", pod.Pod))
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
task.Set(podtask.Launched)
err = reg.Update(task)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// preconditions
qr := New(nil)
qr.queue.Add(pod, queue.ReplaceExisting)
assert.Equal(1, len(qr.queue.List()))
_, found := qr.queue.Get("default/foo")
assert.True(found)
obj.On("KillTask", task.ID).Return(nil)
// exec & post conditions
d := deleter.New(obj, qr)
err = d.DeleteOne(pod)
assert.Nil(err)
_, found = qr.queue.Get("foo0")
assert.False(found)
assert.Equal(0, len(qr.queue.List()))
obj.AssertExpectations(t)
}
func TestDeleteOne_badPodNaming(t *testing.T) {
assert := assert.New(t)
obj := &types.MockScheduler{}
pod := &Pod{Pod: &api.Pod{}}
d := deleter.New(obj, New(nil))
err := d.DeleteOne(pod)
assert.NotNil(err)
pod.Pod.ObjectMeta.Name = "foo"
err = d.DeleteOne(pod)
assert.NotNil(err)
pod.Pod.ObjectMeta.Name = ""
pod.Pod.ObjectMeta.Namespace = "bar"
err = d.DeleteOne(pod)
assert.NotNil(err)
obj.AssertExpectations(t)
}

View File

@@ -38,7 +38,19 @@ const (
yieldWaitTimeout = 1 * time.Second yieldWaitTimeout = 1 * time.Second
) )
type Queuer struct { type Queuer interface {
InstallDebugHandlers(mux *http.ServeMux)
UpdatesAvailable()
Dequeue(id string)
Requeue(pod *Pod)
Reoffer(pod *Pod)
Yield() *api.Pod
Run(done <-chan struct{})
}
type queuer struct {
lock sync.Mutex // shared by condition variables of this struct lock sync.Mutex // shared by condition variables of this struct
updates queue.FIFO // queue of pod updates to be processed updates queue.FIFO // queue of pod updates to be processed
queue *queue.DelayFIFO // queue of pods to be scheduled queue *queue.DelayFIFO // queue of pods to be scheduled
@@ -46,9 +58,9 @@ type Queuer struct {
unscheduledCond sync.Cond // there are unscheduled pods for processing unscheduledCond sync.Cond // there are unscheduled pods for processing
} }
func New(updates queue.FIFO) *Queuer { func New(queue *queue.DelayFIFO, updates queue.FIFO) Queuer {
q := &Queuer{ q := &queuer{
queue: queue.NewDelayFIFO(), queue: queue,
updates: updates, updates: updates,
} }
q.deltaCond.L = &q.lock q.deltaCond.L = &q.lock
@@ -56,7 +68,7 @@ func New(updates queue.FIFO) *Queuer {
return q return q
} }
func (q *Queuer) InstallDebugHandlers(mux *http.ServeMux) { func (q *queuer) InstallDebugHandlers(mux *http.ServeMux) {
mux.HandleFunc("/debug/scheduler/podqueue", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/debug/scheduler/podqueue", func(w http.ResponseWriter, r *http.Request) {
for _, x := range q.queue.List() { for _, x := range q.queue.List() {
if _, err := io.WriteString(w, fmt.Sprintf("%+v\n", x)); err != nil { if _, err := io.WriteString(w, fmt.Sprintf("%+v\n", x)); err != nil {
@@ -74,18 +86,18 @@ func (q *Queuer) InstallDebugHandlers(mux *http.ServeMux) {
} }
// signal that there are probably pod updates waiting to be processed // signal that there are probably pod updates waiting to be processed
func (q *Queuer) UpdatesAvailable() { func (q *queuer) UpdatesAvailable() {
q.deltaCond.Broadcast() q.deltaCond.Broadcast()
} }
// delete a pod from the to-be-scheduled queue // delete a pod from the to-be-scheduled queue
func (q *Queuer) Dequeue(id string) { func (q *queuer) Dequeue(id string) {
q.queue.Delete(id) q.queue.Delete(id)
} }
// re-add a pod to the to-be-scheduled queue, will not overwrite existing pod data (that // re-add a pod to the to-be-scheduled queue, will not overwrite existing pod data (that
// may have already changed). // may have already changed).
func (q *Queuer) Requeue(pod *Pod) { func (q *queuer) Requeue(pod *Pod) {
// use KeepExisting in case the pod has already been updated (can happen if binding fails // use KeepExisting in case the pod has already been updated (can happen if binding fails
// due to constraint voilations); we don't want to overwrite a newer entry with stale data. // due to constraint voilations); we don't want to overwrite a newer entry with stale data.
q.queue.Add(pod, queue.KeepExisting) q.queue.Add(pod, queue.KeepExisting)
@@ -93,7 +105,7 @@ func (q *Queuer) Requeue(pod *Pod) {
} }
// same as Requeue but calls podQueue.Offer instead of podQueue.Add // same as Requeue but calls podQueue.Offer instead of podQueue.Add
func (q *Queuer) Reoffer(pod *Pod) { func (q *queuer) Reoffer(pod *Pod) {
// use KeepExisting in case the pod has already been updated (can happen if binding fails // use KeepExisting in case the pod has already been updated (can happen if binding fails
// due to constraint voilations); we don't want to overwrite a newer entry with stale data. // due to constraint voilations); we don't want to overwrite a newer entry with stale data.
if q.queue.Offer(pod, queue.KeepExisting) { if q.queue.Offer(pod, queue.KeepExisting) {
@@ -103,7 +115,7 @@ func (q *Queuer) Reoffer(pod *Pod) {
// spawns a go-routine to watch for unscheduled pods and queue them up // spawns a go-routine to watch for unscheduled pods and queue them up
// for scheduling. returns immediately. // for scheduling. returns immediately.
func (q *Queuer) Run(done <-chan struct{}) { func (q *queuer) Run(done <-chan struct{}) {
go runtime.Until(func() { go runtime.Until(func() {
log.Info("Watching for newly created pods") log.Info("Watching for newly created pods")
q.lock.Lock() q.lock.Lock()
@@ -148,7 +160,7 @@ func (q *Queuer) Run(done <-chan struct{}) {
} }
// implementation of scheduling plugin's NextPod func; see k8s plugin/pkg/scheduler // implementation of scheduling plugin's NextPod func; see k8s plugin/pkg/scheduler
func (q *Queuer) Yield() *api.Pod { func (q *queuer) Yield() *api.Pod {
log.V(2).Info("attempting to yield a pod") log.V(2).Info("attempting to yield a pod")
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()