From ce7cda603deb1be8aeb6e8007e10077f839320e3 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Sun, 25 Oct 2015 11:28:52 -0700 Subject: [PATCH] Move queuer into its own module --- contrib/mesos/pkg/scheduler/deleter.go | 11 +-- contrib/mesos/pkg/scheduler/plugin.go | 42 ++++-------- contrib/mesos/pkg/scheduler/plugin_test.go | 39 +++++------ .../mesos/pkg/scheduler/{ => queuer}/pod.go | 18 +++-- .../pkg/scheduler/{ => queuer}/queuer.go | 68 +++++++++++-------- 5 files changed, 91 insertions(+), 87 deletions(-) rename contrib/mesos/pkg/scheduler/{ => queuer}/pod.go (86%) rename contrib/mesos/pkg/scheduler/{ => queuer}/queuer.go (74%) diff --git a/contrib/mesos/pkg/scheduler/deleter.go b/contrib/mesos/pkg/scheduler/deleter.go index d9b59992892..0badee9d7a4 100644 --- a/contrib/mesos/pkg/scheduler/deleter.go +++ b/contrib/mesos/pkg/scheduler/deleter.go @@ -23,12 +23,13 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/runtime" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" "k8s.io/kubernetes/pkg/api" ) type deleter struct { api schedulerInterface - qr *queuer + qr *queuer.Queuer } // currently monitors for "pod deleted" events, upon which handle() @@ -37,19 +38,19 @@ func (k *deleter) Run(updates <-chan queue.Entry, done <-chan struct{}) { go runtime.Until(func() { for { entry := <-updates - pod := entry.Value().(*Pod) + pod := entry.Value().(*queuer.Pod) if entry.Is(queue.DELETE_EVENT) { if err := k.deleteOne(pod); err != nil { log.Error(err) } } else if !entry.Is(queue.POP_EVENT) { - k.qr.updatesAvailable() + k.qr.UpdatesAvailable() } } }, 1*time.Second, done) } -func (k *deleter) deleteOne(pod *Pod) error { +func (k *deleter) deleteOne(pod *queuer.Pod) error { ctx := api.WithNamespace(api.NewDefaultContext(), pod.Namespace) podKey, err := podtask.MakePodKey(ctx, pod.Name) if err != nil { @@ -69,7 +70,7 @@ func (k *deleter) deleteOne(pod *Pod) error { // it's concurrently being scheduled (somewhere between pod scheduling and // binding) - if so, then we'll end up removing it from taskRegistry which // will abort Bind()ing - k.qr.dequeue(pod.GetUID()) + k.qr.Dequeue(pod.GetUID()) switch task, state := k.api.tasks().ForPod(podKey); state { case podtask.StateUnknown: diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index b227912590b..87fdcb0d0a9 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -29,8 +29,8 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/offers" "k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/runtime" - annotation "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/client/cache" @@ -115,15 +115,6 @@ type kubeScheduler struct { podUpdates queue.FIFO } -// recoverAssignedSlave recovers the assigned Mesos slave from a pod by searching -// the BindingHostKey. For tasks in the registry of the scheduler, the same -// value is stored in T.Spec.AssignedSlave. Before launching, the BindingHostKey -// annotation is added and the executor will eventually persist that to the -// apiserver on binding. -func recoverAssignedSlave(pod *api.Pod) string { - return pod.Annotations[annotation.BindingHostKey] -} - // Schedule implements the Scheduler interface of Kubernetes. // It returns the selectedMachine's name and error (if there's any). func (k *kubeScheduler) Schedule(pod *api.Pod, unused algorithm.NodeLister) (string, error) { @@ -230,7 +221,7 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) { type errorHandler struct { api schedulerInterface backoff *backoff.Backoff - qr *queuer + qr *queuer.Queuer } // implementation of scheduling plugin's Error func; see plugin/pkg/scheduler @@ -288,7 +279,7 @@ func (k *errorHandler) handleSchedulingError(pod *api.Pod, schedulingErr error) } delay := k.backoff.Get(podKey) log.V(3).Infof("requeuing pod %v with delay %v", podKey, delay) - k.qr.requeue(&Pod{Pod: pod, delay: &delay, notify: breakoutEarly}) + k.qr.Requeue(&queuer.Pod{Pod: pod, Delay: &delay, Notify: breakoutEarly}) default: log.V(2).Infof("Task is no longer pending, aborting reschedule for pod %v", podKey) @@ -313,7 +304,7 @@ func (k *KubernetesMesosScheduler) NewPluginConfig(terminate <-chan struct{}, mu // the store (cache) to the scheduling queue; its purpose is to maintain // an ordering (vs interleaving) of operations that's easier to reason about. kapi := &k8smScheduler{internal: k} - q := newQueuer(podUpdates) + q := queuer.NewQueuer(podUpdates) podDeleter := &deleter{ api: kapi, qr: q, @@ -331,7 +322,7 @@ func (k *KubernetesMesosScheduler) NewPluginConfig(terminate <-chan struct{}, mu podDeleter.Run(updates, terminate) q.Run(terminate) - q.installDebugHandlers(mux) + q.InstallDebugHandlers(mux) podtask.InstallDebugHandlers(k.taskRegistry, mux) }) return &PluginConfig{ @@ -342,7 +333,7 @@ func (k *KubernetesMesosScheduler) NewPluginConfig(terminate <-chan struct{}, mu podUpdates: podUpdates, }, Binder: &binder{api: kapi}, - NextPod: q.yield, + NextPod: q.Yield, Error: eh.handleSchedulingError, Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}), }, @@ -358,7 +349,7 @@ type PluginConfig struct { *plugin.Config api schedulerInterface client *client.Client - qr *queuer + qr *queuer.Queuer deleter *deleter starting chan struct{} // startup latch } @@ -378,7 +369,7 @@ type schedulingPlugin struct { config *plugin.Config api schedulerInterface client *client.Client - qr *queuer + qr *queuer.Queuer deleter *deleter starting chan struct{} } @@ -444,7 +435,7 @@ func (s *schedulingPlugin) reconcileTask(t *podtask.T) { if err != nil { if errors.IsNotFound(err) { // attempt to delete - if err = s.deleter.deleteOne(&Pod{Pod: &t.Pod}); err != nil && err != noSuchPodErr && err != noSuchTaskErr { + if err = s.deleter.deleteOne(&queuer.Pod{Pod: &t.Pod}); err != nil && err != noSuchPodErr && err != noSuchTaskErr { log.Errorf("failed to delete pod: %v: %v", t.Pod.Name, err) } } else { @@ -479,10 +470,7 @@ func (s *schedulingPlugin) reconcileTask(t *podtask.T) { now := time.Now() log.V(3).Infof("reoffering pod %v", podKey) - s.qr.reoffer(&Pod{ - Pod: pod, - deadline: &now, - }) + s.qr.Reoffer(queuer.NewPodWithDeadline(pod, &now)) } else { // pod is scheduled. // not sure how this happened behind our backs. attempt to reconstruct @@ -521,22 +509,22 @@ type podStoreAdapter struct { func (psa *podStoreAdapter) Add(obj interface{}) error { pod := obj.(*api.Pod) - return psa.FIFO.Add(&Pod{Pod: pod}) + return psa.FIFO.Add(&queuer.Pod{Pod: pod}) } func (psa *podStoreAdapter) Update(obj interface{}) error { pod := obj.(*api.Pod) - return psa.FIFO.Update(&Pod{Pod: pod}) + return psa.FIFO.Update(&queuer.Pod{Pod: pod}) } func (psa *podStoreAdapter) Delete(obj interface{}) error { pod := obj.(*api.Pod) - return psa.FIFO.Delete(&Pod{Pod: pod}) + return psa.FIFO.Delete(&queuer.Pod{Pod: pod}) } func (psa *podStoreAdapter) Get(obj interface{}) (interface{}, bool, error) { pod := obj.(*api.Pod) - return psa.FIFO.Get(&Pod{Pod: pod}) + return psa.FIFO.Get(&queuer.Pod{Pod: pod}) } // Replace will delete the contents of the store, using instead the @@ -545,7 +533,7 @@ func (psa *podStoreAdapter) Replace(objs []interface{}, resourceVersion string) newobjs := make([]interface{}, len(objs)) for i, v := range objs { pod := v.(*api.Pod) - newobjs[i] = &Pod{Pod: pod} + newobjs[i] = &queuer.Pod{Pod: pod} } return psa.FIFO.Replace(newobjs, resourceVersion) } diff --git a/contrib/mesos/pkg/scheduler/plugin_test.go b/contrib/mesos/pkg/scheduler/plugin_test.go index 8d3fca35ff8..5f018d6f17f 100644 --- a/contrib/mesos/pkg/scheduler/plugin_test.go +++ b/contrib/mesos/pkg/scheduler/plugin_test.go @@ -46,6 +46,7 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" "k8s.io/kubernetes/pkg/util" ) @@ -852,13 +853,13 @@ func TestDeleteOne_NonexistentPod(t *testing.T) { reg := podtask.NewInMemoryRegistry() obj.On("tasks").Return(reg) - qr := newQueuer(nil) - assert.Equal(0, len(qr.podQueue.List())) + qr := queuer.NewQueuer(nil) + assert.Equal(0, len(qr.PodQueue.List())) d := &deleter{ api: obj, qr: qr, } - pod := &Pod{Pod: &api.Pod{ + pod := &queuer.Pod{Pod: &api.Pod{ ObjectMeta: api.ObjectMeta{ Name: "foo", Namespace: api.NamespaceDefault, @@ -874,7 +875,7 @@ func TestDeleteOne_PendingPod(t *testing.T) { reg := podtask.NewInMemoryRegistry() obj.On("tasks").Return(reg) - pod := &Pod{Pod: &api.Pod{ + pod := &queuer.Pod{Pod: &api.Pod{ ObjectMeta: api.ObjectMeta{ Name: "foo", UID: "foo0", @@ -886,10 +887,10 @@ func TestDeleteOne_PendingPod(t *testing.T) { } // preconditions - qr := newQueuer(nil) - qr.podQueue.Add(pod, queue.ReplaceExisting) - assert.Equal(1, len(qr.podQueue.List())) - _, found := qr.podQueue.Get("default/foo") + qr := queuer.NewQueuer(nil) + qr.PodQueue.Add(pod, queue.ReplaceExisting) + assert.Equal(1, len(qr.PodQueue.List())) + _, found := qr.PodQueue.Get("default/foo") assert.True(found) // exec & post conditions @@ -899,9 +900,9 @@ func TestDeleteOne_PendingPod(t *testing.T) { } err = d.deleteOne(pod) assert.Nil(err) - _, found = qr.podQueue.Get("foo0") + _, found = qr.PodQueue.Get("foo0") assert.False(found) - assert.Equal(0, len(qr.podQueue.List())) + assert.Equal(0, len(qr.PodQueue.List())) obj.AssertExpectations(t) } @@ -911,7 +912,7 @@ func TestDeleteOne_Running(t *testing.T) { reg := podtask.NewInMemoryRegistry() obj.On("tasks").Return(reg) - pod := &Pod{Pod: &api.Pod{ + pod := &queuer.Pod{Pod: &api.Pod{ ObjectMeta: api.ObjectMeta{ Name: "foo", UID: "foo0", @@ -929,10 +930,10 @@ func TestDeleteOne_Running(t *testing.T) { } // preconditions - qr := newQueuer(nil) - qr.podQueue.Add(pod, queue.ReplaceExisting) - assert.Equal(1, len(qr.podQueue.List())) - _, found := qr.podQueue.Get("default/foo") + qr := queuer.NewQueuer(nil) + qr.PodQueue.Add(pod, queue.ReplaceExisting) + assert.Equal(1, len(qr.PodQueue.List())) + _, found := qr.PodQueue.Get("default/foo") assert.True(found) obj.On("killTask", task.ID).Return(nil) @@ -944,19 +945,19 @@ func TestDeleteOne_Running(t *testing.T) { } err = d.deleteOne(pod) assert.Nil(err) - _, found = qr.podQueue.Get("foo0") + _, found = qr.PodQueue.Get("foo0") assert.False(found) - assert.Equal(0, len(qr.podQueue.List())) + assert.Equal(0, len(qr.PodQueue.List())) obj.AssertExpectations(t) } func TestDeleteOne_badPodNaming(t *testing.T) { assert := assert.New(t) obj := &MockScheduler{} - pod := &Pod{Pod: &api.Pod{}} + pod := &queuer.Pod{Pod: &api.Pod{}} d := &deleter{ api: obj, - qr: newQueuer(nil), + qr: queuer.NewQueuer(nil), } err := d.deleteOne(pod) diff --git a/contrib/mesos/pkg/scheduler/pod.go b/contrib/mesos/pkg/scheduler/queuer/pod.go similarity index 86% rename from contrib/mesos/pkg/scheduler/pod.go rename to contrib/mesos/pkg/scheduler/queuer/pod.go index bf70100bcf9..85a73df18e5 100644 --- a/contrib/mesos/pkg/scheduler/pod.go +++ b/contrib/mesos/pkg/scheduler/queuer/pod.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package scheduler +package queuer import ( "fmt" @@ -29,8 +29,12 @@ import ( type Pod struct { *api.Pod deadline *time.Time - delay *time.Duration - notify queue.BreakChan + Delay *time.Duration + Notify queue.BreakChan +} + +func NewPodWithDeadline(pod *api.Pod, deadline *time.Time) *Pod { + return &Pod{Pod: pod, deadline: deadline} } // implements Copyable @@ -54,21 +58,21 @@ func (p *Pod) GetUID() string { // implements Deadlined func (dp *Pod) Deadline() (time.Time, bool) { - if dp.deadline != nil { + if dp.Deadline != nil { return *(dp.deadline), true } return time.Time{}, false } func (dp *Pod) GetDelay() time.Duration { - if dp.delay != nil { - return *(dp.delay) + if dp.Delay != nil { + return *(dp.Delay) } return 0 } func (p *Pod) Breaker() queue.BreakChan { - return p.notify + return p.Notify } func (p *Pod) String() string { diff --git a/contrib/mesos/pkg/scheduler/queuer.go b/contrib/mesos/pkg/scheduler/queuer/queuer.go similarity index 74% rename from contrib/mesos/pkg/scheduler/queuer.go rename to contrib/mesos/pkg/scheduler/queuer/queuer.go index ef2b96f3f86..58715e60167 100644 --- a/contrib/mesos/pkg/scheduler/queuer.go +++ b/contrib/mesos/pkg/scheduler/queuer/queuer.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package scheduler +package queuer import ( "fmt" @@ -26,28 +26,29 @@ import ( log "github.com/golang/glog" "k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/runtime" + annotation "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" ) const ( - enqueuePopTimeout = 200 * time.Millisecond - enqueueWaitTimeout = 1 * time.Second - yieldPopTimeout = 200 * time.Millisecond - yieldWaitTimeout = 1 * time.Second + enqueuePopTimeout = 200 * time.Millisecond + enqueueWaitTimeout = 1 * time.Second + yieldPopTimeout = 200 * time.Millisecond + yieldWaitTimeout = 1 * time.Second ) -type queuer struct { +type Queuer struct { lock sync.Mutex // shared by condition variables of this struct podUpdates queue.FIFO // queue of pod updates to be processed - podQueue *queue.DelayFIFO // queue of pods to be scheduled + PodQueue *queue.DelayFIFO // queue of pods to be scheduled deltaCond sync.Cond // pod changes are available for processing unscheduledCond sync.Cond // there are unscheduled pods for processing } -func newQueuer(store queue.FIFO) *queuer { - q := &queuer{ - podQueue: queue.NewDelayFIFO(), +func NewQueuer(store queue.FIFO) *Queuer { + q := &Queuer{ + PodQueue: queue.NewDelayFIFO(), podUpdates: store, } q.deltaCond.L = &q.lock @@ -55,9 +56,9 @@ func newQueuer(store queue.FIFO) *queuer { return q } -func (q *queuer) installDebugHandlers(mux *http.ServeMux) { +func (q *Queuer) InstallDebugHandlers(mux *http.ServeMux) { mux.HandleFunc("/debug/scheduler/podqueue", func(w http.ResponseWriter, r *http.Request) { - for _, x := range q.podQueue.List() { + for _, x := range q.PodQueue.List() { if _, err := io.WriteString(w, fmt.Sprintf("%+v\n", x)); err != nil { break } @@ -73,36 +74,36 @@ func (q *queuer) installDebugHandlers(mux *http.ServeMux) { } // signal that there are probably pod updates waiting to be processed -func (q *queuer) updatesAvailable() { +func (q *Queuer) UpdatesAvailable() { q.deltaCond.Broadcast() } // delete a pod from the to-be-scheduled queue -func (q *queuer) dequeue(id string) { - q.podQueue.Delete(id) +func (q *Queuer) Dequeue(id string) { + q.PodQueue.Delete(id) } // re-add a pod to the to-be-scheduled queue, will not overwrite existing pod data (that // may have already changed). -func (q *queuer) requeue(pod *Pod) { +func (q *Queuer) Requeue(pod *Pod) { // use KeepExisting in case the pod has already been updated (can happen if binding fails // due to constraint voilations); we don't want to overwrite a newer entry with stale data. - q.podQueue.Add(pod, queue.KeepExisting) + q.PodQueue.Add(pod, queue.KeepExisting) q.unscheduledCond.Broadcast() } -// same as requeue but calls podQueue.Offer instead of podQueue.Add -func (q *queuer) reoffer(pod *Pod) { +// same as Requeue but calls podQueue.Offer instead of podQueue.Add +func (q *Queuer) Reoffer(pod *Pod) { // use KeepExisting in case the pod has already been updated (can happen if binding fails // due to constraint voilations); we don't want to overwrite a newer entry with stale data. - if q.podQueue.Offer(pod, queue.KeepExisting) { + if q.PodQueue.Offer(pod, queue.KeepExisting) { q.unscheduledCond.Broadcast() } } // spawns a go-routine to watch for unscheduled pods and queue them up // for scheduling. returns immediately. -func (q *queuer) Run(done <-chan struct{}) { +func (q *Queuer) Run(done <-chan struct{}) { go runtime.Until(func() { log.Info("Watching for newly created pods") q.lock.Lock() @@ -121,8 +122,8 @@ func (q *queuer) Run(done <-chan struct{}) { <-signalled // wait for lock re-acquisition log.V(4).Infoln("timed out waiting for a pod update") case <-signalled: - // we've acquired the lock and there may be - // changes for us to process now + // we've acquired the lock and there may be + // changes for us to process now } continue } @@ -130,12 +131,12 @@ func (q *queuer) Run(done <-chan struct{}) { pod := p.(*Pod) if recoverAssignedSlave(pod.Pod) != "" { log.V(3).Infof("dequeuing assigned pod for scheduling: %v", pod.Pod.Name) - q.dequeue(pod.GetUID()) + q.Dequeue(pod.GetUID()) } else { // use ReplaceExisting because we are always pushing the latest state now := time.Now() pod.deadline = &now - if q.podQueue.Offer(pod, queue.ReplaceExisting) { + if q.PodQueue.Offer(pod, queue.ReplaceExisting) { q.unscheduledCond.Broadcast() log.V(3).Infof("queued pod for scheduling: %v", pod.Pod.Name) } else { @@ -147,7 +148,7 @@ func (q *queuer) Run(done <-chan struct{}) { } // implementation of scheduling plugin's NextPod func; see k8s plugin/pkg/scheduler -func (q *queuer) yield() *api.Pod { +func (q *Queuer) Yield() *api.Pod { log.V(2).Info("attempting to yield a pod") q.lock.Lock() defer q.lock.Unlock() @@ -155,7 +156,7 @@ func (q *queuer) yield() *api.Pod { for { // limit blocking here to short intervals so that we don't block the // enqueuer Run() routine for very long - kpod := q.podQueue.Await(yieldPopTimeout) + kpod := q.PodQueue.Await(yieldPopTimeout) if kpod == nil { signalled := runtime.After(q.unscheduledCond.Wait) // lock is yielded at this point and we're going to wait for either @@ -166,8 +167,8 @@ func (q *queuer) yield() *api.Pod { <-signalled // wait for the go-routine, and the lock log.V(4).Infoln("timed out waiting for a pod to yield") case <-signalled: - // we have acquired the lock, and there - // may be a pod for us to pop now + // we have acquired the lock, and there + // may be a pod for us to pop now } continue } @@ -185,3 +186,12 @@ func (q *queuer) yield() *api.Pod { } } } + +// recoverAssignedSlave recovers the assigned Mesos slave from a pod by searching +// the BindingHostKey. For tasks in the registry of the scheduler, the same +// value is stored in T.Spec.AssignedSlave. Before launching, the BindingHostKey +// annotation is added and the executor will eventually persist that to the +// apiserver on binding. +func recoverAssignedSlave(pod *api.Pod) string { + return pod.Annotations[annotation.BindingHostKey] +}