Move queuer into its own module

This commit is contained in:
Dr. Stefan Schimanski 2015-10-25 11:28:52 -07:00
parent 26338dcd4d
commit ce7cda603d
5 changed files with 91 additions and 87 deletions

View File

@ -23,12 +23,13 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
"k8s.io/kubernetes/pkg/api"
)
type deleter struct {
api schedulerInterface
qr *queuer
qr *queuer.Queuer
}
// currently monitors for "pod deleted" events, upon which handle()
@ -37,19 +38,19 @@ func (k *deleter) Run(updates <-chan queue.Entry, done <-chan struct{}) {
go runtime.Until(func() {
for {
entry := <-updates
pod := entry.Value().(*Pod)
pod := entry.Value().(*queuer.Pod)
if entry.Is(queue.DELETE_EVENT) {
if err := k.deleteOne(pod); err != nil {
log.Error(err)
}
} else if !entry.Is(queue.POP_EVENT) {
k.qr.updatesAvailable()
k.qr.UpdatesAvailable()
}
}
}, 1*time.Second, done)
}
func (k *deleter) deleteOne(pod *Pod) error {
func (k *deleter) deleteOne(pod *queuer.Pod) error {
ctx := api.WithNamespace(api.NewDefaultContext(), pod.Namespace)
podKey, err := podtask.MakePodKey(ctx, pod.Name)
if err != nil {
@ -69,7 +70,7 @@ func (k *deleter) deleteOne(pod *Pod) error {
// it's concurrently being scheduled (somewhere between pod scheduling and
// binding) - if so, then we'll end up removing it from taskRegistry which
// will abort Bind()ing
k.qr.dequeue(pod.GetUID())
k.qr.Dequeue(pod.GetUID())
switch task, state := k.api.tasks().ForPod(podKey); state {
case podtask.StateUnknown:

View File

@ -29,8 +29,8 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/offers"
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
annotation "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/cache"
@ -115,15 +115,6 @@ type kubeScheduler struct {
podUpdates queue.FIFO
}
// recoverAssignedSlave recovers the assigned Mesos slave from a pod by searching
// the BindingHostKey. For tasks in the registry of the scheduler, the same
// value is stored in T.Spec.AssignedSlave. Before launching, the BindingHostKey
// annotation is added and the executor will eventually persist that to the
// apiserver on binding.
func recoverAssignedSlave(pod *api.Pod) string {
return pod.Annotations[annotation.BindingHostKey]
}
// Schedule implements the Scheduler interface of Kubernetes.
// It returns the selectedMachine's name and error (if there's any).
func (k *kubeScheduler) Schedule(pod *api.Pod, unused algorithm.NodeLister) (string, error) {
@ -230,7 +221,7 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) {
type errorHandler struct {
api schedulerInterface
backoff *backoff.Backoff
qr *queuer
qr *queuer.Queuer
}
// implementation of scheduling plugin's Error func; see plugin/pkg/scheduler
@ -288,7 +279,7 @@ func (k *errorHandler) handleSchedulingError(pod *api.Pod, schedulingErr error)
}
delay := k.backoff.Get(podKey)
log.V(3).Infof("requeuing pod %v with delay %v", podKey, delay)
k.qr.requeue(&Pod{Pod: pod, delay: &delay, notify: breakoutEarly})
k.qr.Requeue(&queuer.Pod{Pod: pod, Delay: &delay, Notify: breakoutEarly})
default:
log.V(2).Infof("Task is no longer pending, aborting reschedule for pod %v", podKey)
@ -313,7 +304,7 @@ func (k *KubernetesMesosScheduler) NewPluginConfig(terminate <-chan struct{}, mu
// the store (cache) to the scheduling queue; its purpose is to maintain
// an ordering (vs interleaving) of operations that's easier to reason about.
kapi := &k8smScheduler{internal: k}
q := newQueuer(podUpdates)
q := queuer.NewQueuer(podUpdates)
podDeleter := &deleter{
api: kapi,
qr: q,
@ -331,7 +322,7 @@ func (k *KubernetesMesosScheduler) NewPluginConfig(terminate <-chan struct{}, mu
podDeleter.Run(updates, terminate)
q.Run(terminate)
q.installDebugHandlers(mux)
q.InstallDebugHandlers(mux)
podtask.InstallDebugHandlers(k.taskRegistry, mux)
})
return &PluginConfig{
@ -342,7 +333,7 @@ func (k *KubernetesMesosScheduler) NewPluginConfig(terminate <-chan struct{}, mu
podUpdates: podUpdates,
},
Binder: &binder{api: kapi},
NextPod: q.yield,
NextPod: q.Yield,
Error: eh.handleSchedulingError,
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}),
},
@ -358,7 +349,7 @@ type PluginConfig struct {
*plugin.Config
api schedulerInterface
client *client.Client
qr *queuer
qr *queuer.Queuer
deleter *deleter
starting chan struct{} // startup latch
}
@ -378,7 +369,7 @@ type schedulingPlugin struct {
config *plugin.Config
api schedulerInterface
client *client.Client
qr *queuer
qr *queuer.Queuer
deleter *deleter
starting chan struct{}
}
@ -444,7 +435,7 @@ func (s *schedulingPlugin) reconcileTask(t *podtask.T) {
if err != nil {
if errors.IsNotFound(err) {
// attempt to delete
if err = s.deleter.deleteOne(&Pod{Pod: &t.Pod}); err != nil && err != noSuchPodErr && err != noSuchTaskErr {
if err = s.deleter.deleteOne(&queuer.Pod{Pod: &t.Pod}); err != nil && err != noSuchPodErr && err != noSuchTaskErr {
log.Errorf("failed to delete pod: %v: %v", t.Pod.Name, err)
}
} else {
@ -479,10 +470,7 @@ func (s *schedulingPlugin) reconcileTask(t *podtask.T) {
now := time.Now()
log.V(3).Infof("reoffering pod %v", podKey)
s.qr.reoffer(&Pod{
Pod: pod,
deadline: &now,
})
s.qr.Reoffer(queuer.NewPodWithDeadline(pod, &now))
} else {
// pod is scheduled.
// not sure how this happened behind our backs. attempt to reconstruct
@ -521,22 +509,22 @@ type podStoreAdapter struct {
func (psa *podStoreAdapter) Add(obj interface{}) error {
pod := obj.(*api.Pod)
return psa.FIFO.Add(&Pod{Pod: pod})
return psa.FIFO.Add(&queuer.Pod{Pod: pod})
}
func (psa *podStoreAdapter) Update(obj interface{}) error {
pod := obj.(*api.Pod)
return psa.FIFO.Update(&Pod{Pod: pod})
return psa.FIFO.Update(&queuer.Pod{Pod: pod})
}
func (psa *podStoreAdapter) Delete(obj interface{}) error {
pod := obj.(*api.Pod)
return psa.FIFO.Delete(&Pod{Pod: pod})
return psa.FIFO.Delete(&queuer.Pod{Pod: pod})
}
func (psa *podStoreAdapter) Get(obj interface{}) (interface{}, bool, error) {
pod := obj.(*api.Pod)
return psa.FIFO.Get(&Pod{Pod: pod})
return psa.FIFO.Get(&queuer.Pod{Pod: pod})
}
// Replace will delete the contents of the store, using instead the
@ -545,7 +533,7 @@ func (psa *podStoreAdapter) Replace(objs []interface{}, resourceVersion string)
newobjs := make([]interface{}, len(objs))
for i, v := range objs {
pod := v.(*api.Pod)
newobjs[i] = &Pod{Pod: pod}
newobjs[i] = &queuer.Pod{Pod: pod}
}
return psa.FIFO.Replace(newobjs, resourceVersion)
}

View File

@ -46,6 +46,7 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/pkg/util"
)
@ -852,13 +853,13 @@ func TestDeleteOne_NonexistentPod(t *testing.T) {
reg := podtask.NewInMemoryRegistry()
obj.On("tasks").Return(reg)
qr := newQueuer(nil)
assert.Equal(0, len(qr.podQueue.List()))
qr := queuer.NewQueuer(nil)
assert.Equal(0, len(qr.PodQueue.List()))
d := &deleter{
api: obj,
qr: qr,
}
pod := &Pod{Pod: &api.Pod{
pod := &queuer.Pod{Pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: api.NamespaceDefault,
@ -874,7 +875,7 @@ func TestDeleteOne_PendingPod(t *testing.T) {
reg := podtask.NewInMemoryRegistry()
obj.On("tasks").Return(reg)
pod := &Pod{Pod: &api.Pod{
pod := &queuer.Pod{Pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
UID: "foo0",
@ -886,10 +887,10 @@ func TestDeleteOne_PendingPod(t *testing.T) {
}
// preconditions
qr := newQueuer(nil)
qr.podQueue.Add(pod, queue.ReplaceExisting)
assert.Equal(1, len(qr.podQueue.List()))
_, found := qr.podQueue.Get("default/foo")
qr := queuer.NewQueuer(nil)
qr.PodQueue.Add(pod, queue.ReplaceExisting)
assert.Equal(1, len(qr.PodQueue.List()))
_, found := qr.PodQueue.Get("default/foo")
assert.True(found)
// exec & post conditions
@ -899,9 +900,9 @@ func TestDeleteOne_PendingPod(t *testing.T) {
}
err = d.deleteOne(pod)
assert.Nil(err)
_, found = qr.podQueue.Get("foo0")
_, found = qr.PodQueue.Get("foo0")
assert.False(found)
assert.Equal(0, len(qr.podQueue.List()))
assert.Equal(0, len(qr.PodQueue.List()))
obj.AssertExpectations(t)
}
@ -911,7 +912,7 @@ func TestDeleteOne_Running(t *testing.T) {
reg := podtask.NewInMemoryRegistry()
obj.On("tasks").Return(reg)
pod := &Pod{Pod: &api.Pod{
pod := &queuer.Pod{Pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
UID: "foo0",
@ -929,10 +930,10 @@ func TestDeleteOne_Running(t *testing.T) {
}
// preconditions
qr := newQueuer(nil)
qr.podQueue.Add(pod, queue.ReplaceExisting)
assert.Equal(1, len(qr.podQueue.List()))
_, found := qr.podQueue.Get("default/foo")
qr := queuer.NewQueuer(nil)
qr.PodQueue.Add(pod, queue.ReplaceExisting)
assert.Equal(1, len(qr.PodQueue.List()))
_, found := qr.PodQueue.Get("default/foo")
assert.True(found)
obj.On("killTask", task.ID).Return(nil)
@ -944,19 +945,19 @@ func TestDeleteOne_Running(t *testing.T) {
}
err = d.deleteOne(pod)
assert.Nil(err)
_, found = qr.podQueue.Get("foo0")
_, found = qr.PodQueue.Get("foo0")
assert.False(found)
assert.Equal(0, len(qr.podQueue.List()))
assert.Equal(0, len(qr.PodQueue.List()))
obj.AssertExpectations(t)
}
func TestDeleteOne_badPodNaming(t *testing.T) {
assert := assert.New(t)
obj := &MockScheduler{}
pod := &Pod{Pod: &api.Pod{}}
pod := &queuer.Pod{Pod: &api.Pod{}}
d := &deleter{
api: obj,
qr: newQueuer(nil),
qr: queuer.NewQueuer(nil),
}
err := d.deleteOne(pod)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
package queuer
import (
"fmt"
@ -29,8 +29,12 @@ import (
type Pod struct {
*api.Pod
deadline *time.Time
delay *time.Duration
notify queue.BreakChan
Delay *time.Duration
Notify queue.BreakChan
}
func NewPodWithDeadline(pod *api.Pod, deadline *time.Time) *Pod {
return &Pod{Pod: pod, deadline: deadline}
}
// implements Copyable
@ -54,21 +58,21 @@ func (p *Pod) GetUID() string {
// implements Deadlined
func (dp *Pod) Deadline() (time.Time, bool) {
if dp.deadline != nil {
if dp.Deadline != nil {
return *(dp.deadline), true
}
return time.Time{}, false
}
func (dp *Pod) GetDelay() time.Duration {
if dp.delay != nil {
return *(dp.delay)
if dp.Delay != nil {
return *(dp.Delay)
}
return 0
}
func (p *Pod) Breaker() queue.BreakChan {
return p.notify
return p.Notify
}
func (p *Pod) String() string {

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
package queuer
import (
"fmt"
@ -26,6 +26,7 @@ import (
log "github.com/golang/glog"
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
annotation "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
)
@ -37,17 +38,17 @@ const (
yieldWaitTimeout = 1 * time.Second
)
type queuer struct {
type Queuer struct {
lock sync.Mutex // shared by condition variables of this struct
podUpdates queue.FIFO // queue of pod updates to be processed
podQueue *queue.DelayFIFO // queue of pods to be scheduled
PodQueue *queue.DelayFIFO // queue of pods to be scheduled
deltaCond sync.Cond // pod changes are available for processing
unscheduledCond sync.Cond // there are unscheduled pods for processing
}
func newQueuer(store queue.FIFO) *queuer {
q := &queuer{
podQueue: queue.NewDelayFIFO(),
func NewQueuer(store queue.FIFO) *Queuer {
q := &Queuer{
PodQueue: queue.NewDelayFIFO(),
podUpdates: store,
}
q.deltaCond.L = &q.lock
@ -55,9 +56,9 @@ func newQueuer(store queue.FIFO) *queuer {
return q
}
func (q *queuer) installDebugHandlers(mux *http.ServeMux) {
func (q *Queuer) InstallDebugHandlers(mux *http.ServeMux) {
mux.HandleFunc("/debug/scheduler/podqueue", func(w http.ResponseWriter, r *http.Request) {
for _, x := range q.podQueue.List() {
for _, x := range q.PodQueue.List() {
if _, err := io.WriteString(w, fmt.Sprintf("%+v\n", x)); err != nil {
break
}
@ -73,36 +74,36 @@ func (q *queuer) installDebugHandlers(mux *http.ServeMux) {
}
// signal that there are probably pod updates waiting to be processed
func (q *queuer) updatesAvailable() {
func (q *Queuer) UpdatesAvailable() {
q.deltaCond.Broadcast()
}
// delete a pod from the to-be-scheduled queue
func (q *queuer) dequeue(id string) {
q.podQueue.Delete(id)
func (q *Queuer) Dequeue(id string) {
q.PodQueue.Delete(id)
}
// re-add a pod to the to-be-scheduled queue, will not overwrite existing pod data (that
// may have already changed).
func (q *queuer) requeue(pod *Pod) {
func (q *Queuer) Requeue(pod *Pod) {
// use KeepExisting in case the pod has already been updated (can happen if binding fails
// due to constraint voilations); we don't want to overwrite a newer entry with stale data.
q.podQueue.Add(pod, queue.KeepExisting)
q.PodQueue.Add(pod, queue.KeepExisting)
q.unscheduledCond.Broadcast()
}
// same as requeue but calls podQueue.Offer instead of podQueue.Add
func (q *queuer) reoffer(pod *Pod) {
// same as Requeue but calls podQueue.Offer instead of podQueue.Add
func (q *Queuer) Reoffer(pod *Pod) {
// use KeepExisting in case the pod has already been updated (can happen if binding fails
// due to constraint voilations); we don't want to overwrite a newer entry with stale data.
if q.podQueue.Offer(pod, queue.KeepExisting) {
if q.PodQueue.Offer(pod, queue.KeepExisting) {
q.unscheduledCond.Broadcast()
}
}
// spawns a go-routine to watch for unscheduled pods and queue them up
// for scheduling. returns immediately.
func (q *queuer) Run(done <-chan struct{}) {
func (q *Queuer) Run(done <-chan struct{}) {
go runtime.Until(func() {
log.Info("Watching for newly created pods")
q.lock.Lock()
@ -130,12 +131,12 @@ func (q *queuer) Run(done <-chan struct{}) {
pod := p.(*Pod)
if recoverAssignedSlave(pod.Pod) != "" {
log.V(3).Infof("dequeuing assigned pod for scheduling: %v", pod.Pod.Name)
q.dequeue(pod.GetUID())
q.Dequeue(pod.GetUID())
} else {
// use ReplaceExisting because we are always pushing the latest state
now := time.Now()
pod.deadline = &now
if q.podQueue.Offer(pod, queue.ReplaceExisting) {
if q.PodQueue.Offer(pod, queue.ReplaceExisting) {
q.unscheduledCond.Broadcast()
log.V(3).Infof("queued pod for scheduling: %v", pod.Pod.Name)
} else {
@ -147,7 +148,7 @@ func (q *queuer) Run(done <-chan struct{}) {
}
// implementation of scheduling plugin's NextPod func; see k8s plugin/pkg/scheduler
func (q *queuer) yield() *api.Pod {
func (q *Queuer) Yield() *api.Pod {
log.V(2).Info("attempting to yield a pod")
q.lock.Lock()
defer q.lock.Unlock()
@ -155,7 +156,7 @@ func (q *queuer) yield() *api.Pod {
for {
// limit blocking here to short intervals so that we don't block the
// enqueuer Run() routine for very long
kpod := q.podQueue.Await(yieldPopTimeout)
kpod := q.PodQueue.Await(yieldPopTimeout)
if kpod == nil {
signalled := runtime.After(q.unscheduledCond.Wait)
// lock is yielded at this point and we're going to wait for either
@ -185,3 +186,12 @@ func (q *queuer) yield() *api.Pod {
}
}
}
// recoverAssignedSlave recovers the assigned Mesos slave from a pod by searching
// the BindingHostKey. For tasks in the registry of the scheduler, the same
// value is stored in T.Spec.AssignedSlave. Before launching, the BindingHostKey
// annotation is added and the executor will eventually persist that to the
// apiserver on binding.
func recoverAssignedSlave(pod *api.Pod) string {
return pod.Annotations[annotation.BindingHostKey]
}