From af95e3fe0e14f9ec23fc3483ca573d457d06fa8c Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Sun, 29 Nov 2015 19:34:27 +0000 Subject: [PATCH 1/2] - forward updated labels/annotations for downward API compat - refactor queue.Pod construction to take functional options, privatize Pod fields - refactor DelayFIFO and HistoricalFIFO to offer consistent, more useful Pop() funcs - refactor pod update processing changes; long term we should somehow combine with the special pod config source that we are using for mirror pods - task launch timer cleanup --- contrib/mesos/pkg/executor/executor.go | 194 ++++++------------ contrib/mesos/pkg/executor/executor_test.go | 74 +++++++ contrib/mesos/pkg/executor/node.go | 64 ++++++ contrib/mesos/pkg/executor/observer.go | 171 +++++++++++++++ contrib/mesos/pkg/node/registrator.go | 2 +- contrib/mesos/pkg/offers/offers.go | 2 +- contrib/mesos/pkg/queue/delay.go | 26 ++- contrib/mesos/pkg/queue/delay_test.go | 4 +- contrib/mesos/pkg/queue/historical.go | 22 +- contrib/mesos/pkg/queue/historical_test.go | 14 +- contrib/mesos/pkg/queue/interface.go | 4 +- .../components/errorhandler/errorhandler.go | 2 +- .../components/podreconciler/podreconciler.go | 2 +- contrib/mesos/pkg/scheduler/queuer/pod.go | 42 +++- 14 files changed, 454 insertions(+), 169 deletions(-) create mode 100644 contrib/mesos/pkg/executor/node.go create mode 100644 contrib/mesos/pkg/executor/observer.go diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index b92e3e12a43..8100b6239f5 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -20,6 +20,7 @@ import ( "bytes" "encoding/json" "fmt" + "math" "strings" "sync" "sync/atomic" @@ -34,13 +35,12 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/executor/messages" "k8s.io/kubernetes/contrib/mesos/pkg/node" "k8s.io/kubernetes/contrib/mesos/pkg/podutil" + "k8s.io/kubernetes/contrib/mesos/pkg/runtime" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/executorinfo" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/pkg/api" - unversionedapi "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" - "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" @@ -89,16 +89,12 @@ func (s *stateType) transitionTo(to stateType, unless ...stateType) bool { type kuberTask struct { mesosTaskInfo *mesos.TaskInfo - podName string // empty until pod is sent to kubelet and registed in KubernetesExecutor.pods + launchTimer *time.Timer // launchTimer expires when the launch-task process duration exceeds launchGracePeriod + podName string // empty until pod is sent to kubelet and registed in KubernetesExecutor.pods } type podStatusFunc func() (*api.PodStatus, error) -type NodeInfo struct { - Cores int - Mem int64 // in bytes -} - // KubernetesExecutor is an mesos executor that runs pods // in a minion machine. type Executor struct { @@ -118,9 +114,9 @@ type Executor struct { exitFunc func(int) podStatusFunc func(*api.Pod) (*api.PodStatus, error) staticPodsConfigPath string - podController *framework.Controller launchGracePeriod time.Duration nodeInfos chan<- NodeInfo + initCompleted chan struct{} // closes upon completion of Init() } type Config struct { @@ -144,6 +140,13 @@ func (k *Executor) isConnected() bool { // New creates a new kubernetes executor. func New(config Config) *Executor { + launchGracePeriod := config.LaunchGracePeriod + if launchGracePeriod == 0 { + // this is the equivalent of saying "the timer never expires" and simplies nil + // timer checks elsewhere in the code. it's a little hacky but less code to + // maintain that alternative approaches. + launchGracePeriod = time.Duration(math.MaxInt64) + } k := &Executor{ updateChan: config.Updates, state: disconnectedState, @@ -160,41 +163,22 @@ func New(config Config) *Executor { exitFunc: config.ExitFunc, podStatusFunc: config.PodStatusFunc, staticPodsConfigPath: config.StaticPodsConfigPath, - launchGracePeriod: config.LaunchGracePeriod, + launchGracePeriod: launchGracePeriod, nodeInfos: config.NodeInfos, + initCompleted: make(chan struct{}), } + runtime.On(k.initCompleted, k.runSendLoop) - // watch pods from the given pod ListWatch - if config.PodLW == nil { - // fail early to make debugging easier - panic("cannot create executor with nil PodLW") - } - _, k.podController = framework.NewInformer(config.PodLW, &api.Pod{}, podRelistPeriod, &framework.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - pod := obj.(*api.Pod) - log.V(4).Infof("pod %s/%s created on apiserver", pod.Namespace, pod.Name) - k.handleChangedApiserverPod(pod) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - pod := newObj.(*api.Pod) - log.V(4).Infof("pod %s/%s updated on apiserver", pod.Namespace, pod.Name) - k.handleChangedApiserverPod(pod) - }, - DeleteFunc: func(obj interface{}) { - pod := obj.(*api.Pod) - log.V(4).Infof("pod %s/%s deleted on apiserver", pod.Namespace, pod.Name) - }, - }) + po := newPodObserver(config.PodLW, k.updateTask, k.terminate) + runtime.On(k.initCompleted, po.run) return k } func (k *Executor) Init(driver bindings.ExecutorDriver) { + defer close(k.initCompleted) k.killKubeletContainers() k.resetSuicideWatch(driver) - - go k.podController.Run(k.terminate) - go k.sendLoop() //TODO(jdef) monitor kubeletFinished and shutdown if it happens } @@ -251,7 +235,7 @@ func (k *Executor) Registered( } } - annotations, err := executorInfoToAnnotations(executorInfo) + annotations, err := annotationsFor(executorInfo) if err != nil { log.Errorf( "cannot get node annotations from executor info %v error %v", @@ -374,10 +358,10 @@ func (k *Executor) LaunchTask(driver bindings.ExecutorDriver, taskInfo *mesos.Ta return } + taskId := taskInfo.GetTaskId().GetValue() k.lock.Lock() defer k.lock.Unlock() - taskId := taskInfo.GetTaskId().GetValue() if _, found := k.tasks[taskId]; found { log.Errorf("task already launched\n") // Not to send back TASK_RUNNING here, because @@ -390,51 +374,13 @@ func (k *Executor) LaunchTask(driver bindings.ExecutorDriver, taskInfo *mesos.Ta // (c) we're leaving podName == "" for now, indicates we don't need to delete containers k.tasks[taskId] = &kuberTask{ mesosTaskInfo: taskInfo, + launchTimer: time.NewTimer(k.launchGracePeriod), } k.resetSuicideWatch(driver) go k.launchTask(driver, taskId, pod) } -func (k *Executor) handleChangedApiserverPod(pod *api.Pod) { - // exclude "pre-scheduled" pods which have a NodeName set to this node without being scheduled already - taskId := pod.Annotations[meta.TaskIdKey] - if taskId == "" { - log.V(5).Infof("ignoring pod update for %s/%s because %s annotation is missing", pod.Namespace, pod.Name, meta.TaskIdKey) - return - } - - k.lock.Lock() - defer k.lock.Unlock() - - // exclude tasks which are already deleted from our task registry - task := k.tasks[taskId] - if task == nil { - log.Warningf("task %s for pod %s/%s not found", taskId, pod.Namespace, pod.Name) - return - } - - oldPod := k.pods[task.podName] - - // terminating pod? - if oldPod != nil && pod.Status.Phase == api.PodRunning { - timeModified := differentTime(oldPod.DeletionTimestamp, pod.DeletionTimestamp) - graceModified := differentPeriod(oldPod.DeletionGracePeriodSeconds, pod.DeletionGracePeriodSeconds) - if timeModified || graceModified { - log.Infof("pod %s/%s is terminating at %v with %vs grace period, telling kubelet", pod.Namespace, pod.Name, *pod.DeletionTimestamp, *pod.DeletionGracePeriodSeconds) - - // modify the pod in our registry instead of sending the new pod. The later - // would allow that other changes bleed into the kubelet. For now we are - // very conservative changing this behaviour. - // TODO(sttts): check whether we can and should send all changes down to the kubelet - oldPod.DeletionTimestamp = pod.DeletionTimestamp - oldPod.DeletionGracePeriodSeconds = pod.DeletionGracePeriodSeconds - - k.sendPodsSnapshot() - } - } -} - // determine whether we need to start a suicide countdown. if so, then start // a timer that, upon expiration, causes this executor to commit suicide. // this implementation runs asynchronously. callers that wish to wait for the @@ -619,17 +565,10 @@ func (k *Executor) launchTask(driver bindings.ExecutorDriver, taskId string, pod psf := podStatusFunc(func() (*api.PodStatus, error) { return k.podStatusFunc(pod) }) - go k._launchTask(driver, taskId, podFullName, psf) + go k._launchTask(driver, taskId, podFullName, psf, task.launchTimer.C) } -func (k *Executor) _launchTask(driver bindings.ExecutorDriver, taskId, podFullName string, psf podStatusFunc) { - - expired := make(chan struct{}) - - if k.launchGracePeriod > 0 { - time.AfterFunc(k.launchGracePeriod, func() { close(expired) }) - } - +func (k *Executor) _launchTask(driver bindings.ExecutorDriver, taskId, podFullName string, psf podStatusFunc, expired <-chan time.Time) { getMarshalledInfo := func() (data []byte, cancel bool) { // potentially long call.. if podStatus, err := psf(); err == nil && podStatus != nil { @@ -677,7 +616,8 @@ waitForRunningPod: } else { k.lock.Lock() defer k.lock.Unlock() - if _, found := k.tasks[taskId]; !found { + task, found := k.tasks[taskId] + if !found { goto reportLost } @@ -689,6 +629,7 @@ waitForRunningPod: } k.sendStatus(driver, statusUpdate) + task.launchTimer.Stop() // continue to monitor the health of the pod go k.__launchTask(driver, taskId, podFullName, psf) @@ -946,7 +887,7 @@ func (k *Executor) sendFrameworkMessage(driver bindings.ExecutorDriver, msg stri } } -func (k *Executor) sendLoop() { +func (k *Executor) runSendLoop() { defer log.V(1).Info("sender loop exiting") for { select { @@ -982,53 +923,7 @@ func (k *Executor) sendLoop() { } } -func differentTime(a, b *unversionedapi.Time) bool { - return (a == nil) != (b == nil) || (a != nil && b != nil && *a != *b) -} - -func differentPeriod(a, b *int64) bool { - return (a == nil) != (b == nil) || (a != nil && b != nil && *a != *b) -} - -func nodeInfo(si *mesos.SlaveInfo, ei *mesos.ExecutorInfo) NodeInfo { - var executorCPU, executorMem float64 - - // get executor resources - if ei != nil { - for _, r := range ei.GetResources() { - if r == nil || r.GetType() != mesos.Value_SCALAR { - continue - } - switch r.GetName() { - case "cpus": - executorCPU = r.GetScalar().GetValue() - case "mem": - executorMem = r.GetScalar().GetValue() - } - } - } - - // get resource capacity of the node - ni := NodeInfo{} - for _, r := range si.GetResources() { - if r == nil || r.GetType() != mesos.Value_SCALAR { - continue - } - - switch r.GetName() { - case "cpus": - // We intentionally take the floor of executorCPU because cores are integers - // and we would loose a complete cpu here if the value is <1. - // TODO(sttts): switch to float64 when "Machine Allocables" are implemented - ni.Cores = int(r.GetScalar().GetValue() - float64(int(executorCPU))) - case "mem": - ni.Mem = int64(r.GetScalar().GetValue()-executorMem) * 1024 * 1024 - } - } - return ni -} - -func executorInfoToAnnotations(ei *mesos.ExecutorInfo) (annotations map[string]string, err error) { +func annotationsFor(ei *mesos.ExecutorInfo) (annotations map[string]string, err error) { annotations = map[string]string{} if ei == nil { return @@ -1044,3 +939,36 @@ func executorInfoToAnnotations(ei *mesos.ExecutorInfo) (annotations map[string]s return } + +// updateTask executes some mutating operation for the given task/pod, blocking until the update is either +// attempted or discarded. uses the executor state lock to synchronize concurrent invocation. returns true +// only if the specified update operation was attempted and also returns true. a result of true also indicates +// changes have been sent upstream to the kubelet. +func (k *Executor) updateTask(taskId string, f func(*kuberTask, *api.Pod) bool) (changed bool, err error) { + k.lock.Lock() + defer k.lock.Unlock() + + // exclude tasks which are already deleted from our task registry + task := k.tasks[taskId] + if task == nil { + // the pod has completed the launch-task-binding phase because it's been annotated with + // the task-id, but we don't have a record of it; it's best to let the scheduler reconcile. + // it's also possible that our update queue is backed up and hasn't caught up with the state + // of the world yet. + + // TODO(jdef) should we hint to the scheduler (via TASK_FAILED, reason=PodRefersToUnknownTask)? + + err = fmt.Errorf("task %s not found", taskId) + return + } + + oldPod := k.pods[task.podName] + changed = f(task, oldPod) + + // TODO(jdef) this abstraction isn't perfect since changes that only impact the task struct, + // and not the pod, don't require a new pod snapshot sent back to the kubelet. + if changed { + k.sendPodsSnapshot() + } + return +} diff --git a/contrib/mesos/pkg/executor/executor_test.go b/contrib/mesos/pkg/executor/executor_test.go index 928b65ab785..494ce56052e 100644 --- a/contrib/mesos/pkg/executor/executor_test.go +++ b/contrib/mesos/pkg/executor/executor_test.go @@ -608,3 +608,77 @@ func TestExecutorsendFrameworkMessage(t *testing.T) { } mockDriver.AssertExpectations(t) } + +func TestExecutor_updateMetaMap(t *testing.T) { + for i, tc := range []struct { + oldmap map[string]string + newmap map[string]string + wants bool + }{ + { + oldmap: nil, + newmap: nil, + wants: false, + }, + { + oldmap: nil, + newmap: map[string]string{}, + wants: false, + }, + { + oldmap: map[string]string{}, + newmap: nil, + wants: false, + }, + { + oldmap: nil, + newmap: map[string]string{ + "foo": "bar", + }, + wants: true, + }, + { + oldmap: map[string]string{}, + newmap: map[string]string{ + "foo": "bar", + }, + wants: true, + }, + { + oldmap: map[string]string{ + "baz": "qax", + }, + newmap: map[string]string{ + "foo": "bar", + }, + wants: true, + }, + { + oldmap: map[string]string{ + "baz": "qax", + }, + newmap: nil, + wants: true, + }, + { + oldmap: map[string]string{ + "baz": "qax", + "qwe": "iop", + }, + newmap: map[string]string{ + "foo": "bar", + "qwe": "iop", + }, + wants: true, + }, + } { + // do work here + actual := updateMetaMap(&tc.oldmap, tc.newmap) + if actual != tc.wants { + t.Fatalf("test case %d failed, expected %v but got %v instead", i, tc.wants, actual) + } + if len(tc.oldmap) != len(tc.newmap) || (len(tc.oldmap) > 0 && !reflect.DeepEqual(tc.oldmap, tc.newmap)) { + t.Fatalf("test case %d failed, expected %v but got %v instead", i, tc.newmap, tc.oldmap) + } + } +} diff --git a/contrib/mesos/pkg/executor/node.go b/contrib/mesos/pkg/executor/node.go new file mode 100644 index 00000000000..2169c016e4a --- /dev/null +++ b/contrib/mesos/pkg/executor/node.go @@ -0,0 +1,64 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package executor + +import ( + mesos "github.com/mesos/mesos-go/mesosproto" +) + +type NodeInfo struct { + Cores int + Mem int64 // in bytes +} + +func nodeInfo(si *mesos.SlaveInfo, ei *mesos.ExecutorInfo) NodeInfo { + var executorCPU, executorMem float64 + + // get executor resources + if ei != nil { + for _, r := range ei.GetResources() { + if r == nil || r.GetType() != mesos.Value_SCALAR { + continue + } + switch r.GetName() { + case "cpus": + executorCPU = r.GetScalar().GetValue() + case "mem": + executorMem = r.GetScalar().GetValue() + } + } + } + + // get resource capacity of the node + ni := NodeInfo{} + for _, r := range si.GetResources() { + if r == nil || r.GetType() != mesos.Value_SCALAR { + continue + } + + switch r.GetName() { + case "cpus": + // We intentionally take the floor of executorCPU because cores are integers + // and we would loose a complete cpu here if the value is <1. + // TODO(sttts): switch to float64 when "Machine Allocables" are implemented + ni.Cores = int(r.GetScalar().GetValue() - float64(int(executorCPU))) + case "mem": + ni.Mem = int64(r.GetScalar().GetValue()-executorMem) * 1024 * 1024 + } + } + return ni +} diff --git a/contrib/mesos/pkg/executor/observer.go b/contrib/mesos/pkg/executor/observer.go new file mode 100644 index 00000000000..8ef92dcb240 --- /dev/null +++ b/contrib/mesos/pkg/executor/observer.go @@ -0,0 +1,171 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package executor + +import ( + log "github.com/golang/glog" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/controller/framework" +) + +// taskUpdateTx execute a task update transaction f for the task identified by +// taskId. if no such task exists then f is not invoked and an error is +// returned. if f is invoked then taskUpdateTx returns the bool result of f. +type taskUpdateTx func(taskId string, f func(*kuberTask, *api.Pod) bool) (changed bool, err error) + +// podObserver receives callbacks for every pod state change on the apiserver and +// for each decides whether to execute a task update transaction. +type podObserver struct { + podController *framework.Controller + terminate <-chan struct{} + taskUpdateTx taskUpdateTx +} + +func newPodObserver(podLW cache.ListerWatcher, taskUpdateTx taskUpdateTx, terminate <-chan struct{}) *podObserver { + // watch pods from the given pod ListWatch + if podLW == nil { + // fail early to make debugging easier + panic("cannot create executor with nil PodLW") + } + + p := &podObserver{ + terminate: terminate, + taskUpdateTx: taskUpdateTx, + } + _, p.podController = framework.NewInformer(podLW, &api.Pod{}, podRelistPeriod, &framework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pod := obj.(*api.Pod) + log.V(4).Infof("pod %s/%s created on apiserver", pod.Namespace, pod.Name) + p.handleChangedApiserverPod(pod) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + pod := newObj.(*api.Pod) + log.V(4).Infof("pod %s/%s updated on apiserver", pod.Namespace, pod.Name) + p.handleChangedApiserverPod(pod) + }, + DeleteFunc: func(obj interface{}) { + pod := obj.(*api.Pod) + log.V(4).Infof("pod %s/%s deleted on apiserver", pod.Namespace, pod.Name) + }, + }) + return p +} + +// run begins observing pod state changes; blocks until the terminate chan closes. +func (p *podObserver) run() { + p.podController.Run(p.terminate) +} + +// handleChangedApiserverPod is invoked for pod add/update state changes and decides whether +// task updates are necessary. if so, a task update is executed via taskUpdateTx. +func (p *podObserver) handleChangedApiserverPod(pod *api.Pod) { + // Don't do anything for pods without task anotation which means: + // - "pre-scheduled" pods which have a NodeName set to this node without being scheduled already. + // - static/mirror pods: they'll never have a TaskID annotation, and we don't expect them to ever change. + // - all other pods that haven't passed through the launch-task-binding phase, which would set annotations. + taskId := pod.Annotations[meta.TaskIdKey] + if taskId == "" { + // There also could be a race between the overall launch-task process and this update, but here we + // will never be able to process such a stale update because the "update pod" that we're receiving + // in this func won't yet have a task ID annotation. It follows that we can safely drop such a stale + // update on the floor because we'll get another update later that, in addition to the changes that + // we're dropping now, will also include the changes from the binding process. + log.V(5).Infof("ignoring pod update for %s/%s because %s annotation is missing", pod.Namespace, pod.Name, meta.TaskIdKey) + return + } + + _, err := p.taskUpdateTx(taskId, func(_ *kuberTask, relatedPod *api.Pod) (sendSnapshot bool) { + if relatedPod == nil { + // should never happen because: + // (a) the update pod record has already passed through the binding phase in launchTasks() + // (b) all remaining updates to executor.{pods,tasks} are sync'd in unison + log.Errorf("internal state error: pod not found for task %s", taskId) + return + } + + // TODO(sttts): check whether we can and should send all "semantic" changes down to the kubelet + // see kubelet/config/config.go for semantic change detection + + // check for updated labels/annotations: need to forward these for the downward API + sendSnapshot = sendSnapshot || updateMetaMap(&relatedPod.Labels, pod.Labels) + sendSnapshot = sendSnapshot || updateMetaMap(&relatedPod.Annotations, pod.Annotations) + + // terminating pod? + if pod.Status.Phase == api.PodRunning { + timeModified := differentTime(relatedPod.DeletionTimestamp, pod.DeletionTimestamp) + graceModified := differentPeriod(relatedPod.DeletionGracePeriodSeconds, pod.DeletionGracePeriodSeconds) + if timeModified || graceModified { + log.Infof("pod %s/%s is terminating at %v with %vs grace period, telling kubelet", + pod.Namespace, pod.Name, *pod.DeletionTimestamp, *pod.DeletionGracePeriodSeconds) + + // modify the pod in our registry instead of sending the new pod. The later + // would allow that other changes bleed into the kubelet. For now we are + // very conservative changing this behaviour. + relatedPod.DeletionTimestamp = pod.DeletionTimestamp + relatedPod.DeletionGracePeriodSeconds = pod.DeletionGracePeriodSeconds + sendSnapshot = true + } + } + return + }) + if err != nil { + log.Errorf("failed to update pod %s/%s: %+v", pod.Namespace, pod.Name, err) + } +} + +// updateMetaMap looks for differences between src and dest; if there are differences +// then dest is changed (possibly to point to src) and this func returns true. +func updateMetaMap(dest *map[string]string, src map[string]string) (changed bool) { + // check for things in dest that are missing in src + for k := range *dest { + if _, ok := src[k]; !ok { + changed = true + break + } + } + if !changed { + if len(*dest) == 0 { + if len(src) > 0 { + changed = true + goto finished + } + // no update needed + return + } + // check for things in src that are missing/different in dest + for k, v := range src { + if vv, ok := (*dest)[k]; !ok || vv != v { + changed = true + break + } + } + } +finished: + *dest = src + return +} + +func differentTime(a, b *unversioned.Time) bool { + return (a == nil) != (b == nil) || (a != nil && b != nil && *a != *b) +} + +func differentPeriod(a, b *int64) bool { + return (a == nil) != (b == nil) || (a != nil && b != nil && *a != *b) +} diff --git a/contrib/mesos/pkg/node/registrator.go b/contrib/mesos/pkg/node/registrator.go index 5c8bc94d46d..1bdb1daf9f9 100644 --- a/contrib/mesos/pkg/node/registrator.go +++ b/contrib/mesos/pkg/node/registrator.go @@ -78,7 +78,7 @@ func (r *clientRegistrator) Run(terminate <-chan struct{}) error { loop := func() { RegistrationLoop: for { - obj := r.queue.CancelablePop(terminate) + obj := r.queue.Pop(terminate) if obj == nil { break RegistrationLoop } diff --git a/contrib/mesos/pkg/offers/offers.go b/contrib/mesos/pkg/offers/offers.go index e758e037604..3a0229d554a 100644 --- a/contrib/mesos/pkg/offers/offers.go +++ b/contrib/mesos/pkg/offers/offers.go @@ -441,7 +441,7 @@ func (s *offerStorage) ageOffers() { } func (s *offerStorage) nextListener() *offerListener { - obj := s.listeners.Pop() + obj := s.listeners.Pop(nil) if listen, ok := obj.(*offerListener); !ok { //programming error panic(fmt.Sprintf("unexpected listener object %v", obj)) diff --git a/contrib/mesos/pkg/queue/delay.go b/contrib/mesos/pkg/queue/delay.go index be240223ade..327ba10edb4 100644 --- a/contrib/mesos/pkg/queue/delay.go +++ b/contrib/mesos/pkg/queue/delay.go @@ -302,12 +302,18 @@ func (f *DelayFIFO) Get(id string) (UniqueID, bool) { // Variant of DelayQueue.Pop() for UniqueDelayed items func (q *DelayFIFO) Await(timeout time.Duration) UniqueID { - cancel := make(chan struct{}) - ch := make(chan interface{}, 1) + var ( + cancel = make(chan struct{}) + ch = make(chan interface{}, 1) + t = time.NewTimer(timeout) + ) + defer t.Stop() + go func() { ch <- q.pop(cancel) }() + var x interface{} select { - case <-time.After(timeout): + case <-t.C: close(cancel) x = <-ch case x = <-ch: @@ -319,13 +325,19 @@ func (q *DelayFIFO) Await(timeout time.Duration) UniqueID { return nil } -// Variant of DelayQueue.Pop() for UniqueDelayed items -func (q *DelayFIFO) Pop() UniqueID { - return q.pop(nil).(UniqueID) +// Pop blocks until either there is an item available to dequeue or else the specified +// cancel chan is closed. Callers that have no interest in providing a cancel chan +// should specify nil, or else WithoutCancel() (for readability). +func (q *DelayFIFO) Pop(cancel <-chan struct{}) UniqueID { + x := q.pop(cancel) + if x == nil { + return nil + } + return x.(UniqueID) } // variant of DelayQueue.Pop that implements optional cancellation -func (q *DelayFIFO) pop(cancel chan struct{}) interface{} { +func (q *DelayFIFO) pop(cancel <-chan struct{}) interface{} { next := func() *qitem { q.lock() defer q.unlock() diff --git a/contrib/mesos/pkg/queue/delay_test.go b/contrib/mesos/pkg/queue/delay_test.go index b0b98b970a7..82512d0ed81 100644 --- a/contrib/mesos/pkg/queue/delay_test.go +++ b/contrib/mesos/pkg/queue/delay_test.go @@ -358,7 +358,7 @@ func TestDFIFO_sanity_check(t *testing.T) { // pop last before := time.Now() - x := df.Pop() + x := df.Pop(nil) assert.Equal(a.(*testjob).instance, 2) now := time.Now() @@ -395,7 +395,7 @@ func TestDFIFO_Offer(t *testing.T) { } before := time.Now() - x := dq.Pop() + x := dq.Pop(nil) now := time.Now() waitPeriod := now.Sub(before) diff --git a/contrib/mesos/pkg/queue/historical.go b/contrib/mesos/pkg/queue/historical.go index 9ef42eaf80b..79304c511d6 100644 --- a/contrib/mesos/pkg/queue/historical.go +++ b/contrib/mesos/pkg/queue/historical.go @@ -220,22 +220,28 @@ func (f *HistoricalFIFO) Poll(id string, t EventType) bool { // Variant of DelayQueue.Pop() for UniqueDelayed items func (q *HistoricalFIFO) Await(timeout time.Duration) interface{} { - cancel := make(chan struct{}) - ch := make(chan interface{}, 1) - go func() { ch <- q.CancelablePop(cancel) }() + var ( + cancel = make(chan struct{}) + ch = make(chan interface{}, 1) + t = time.NewTimer(timeout) + ) + defer t.Stop() + + go func() { ch <- q.Pop(cancel) }() + select { - case <-time.After(timeout): + case <-t.C: close(cancel) return <-ch case x := <-ch: return x } } -func (f *HistoricalFIFO) Pop() interface{} { - return f.CancelablePop(nil) -} -func (f *HistoricalFIFO) CancelablePop(cancel <-chan struct{}) interface{} { +// Pop blocks until either there is an item available to dequeue or else the specified +// cancel chan is closed. Callers that have no interest in providing a cancel chan +// should specify nil, or else WithoutCancel() (for readability). +func (f *HistoricalFIFO) Pop(cancel <-chan struct{}) interface{} { popEvent := (Entry)(nil) defer func() { f.carrier(popEvent) diff --git a/contrib/mesos/pkg/queue/historical_test.go b/contrib/mesos/pkg/queue/historical_test.go index 587629be6df..1f1de907837 100644 --- a/contrib/mesos/pkg/queue/historical_test.go +++ b/contrib/mesos/pkg/queue/historical_test.go @@ -75,7 +75,7 @@ func TestFIFO_basic(t *testing.T) { lastInt := _int(0) lastUint := _uint(0) for i := 0; i < amount*2; i++ { - switch obj := f.Pop().(type) { + switch obj := f.Pop(nil).(type) { case _int: if obj <= lastInt { t.Errorf("got %v (int) out of order, last was %v", obj, lastInt) @@ -100,7 +100,7 @@ func TestFIFO_addUpdate(t *testing.T) { got := make(chan *testObj, 2) go func() { for { - got <- f.Pop().(*testObj) + got <- f.Pop(nil).(*testObj) } }() @@ -126,7 +126,7 @@ func TestFIFO_addReplace(t *testing.T) { got := make(chan *testObj, 2) go func() { for { - got <- f.Pop().(*testObj) + got <- f.Pop(nil).(*testObj) } }() @@ -158,24 +158,24 @@ func TestFIFO_detectLineJumpers(t *testing.T) { done := make(chan struct{}) go func() { defer close(done) - if e, a := 13, f.Pop().(*testObj).value; a != e { + if e, a := 13, f.Pop(nil).(*testObj).value; a != e { err = fmt.Errorf("expected %d, got %d", e, a) return } f.Add(&testObj{"foo", 14}) // ensure foo doesn't jump back in line - if e, a := 1, f.Pop().(*testObj).value; a != e { + if e, a := 1, f.Pop(nil).(*testObj).value; a != e { err = fmt.Errorf("expected %d, got %d", e, a) return } - if e, a := 30, f.Pop().(*testObj).value; a != e { + if e, a := 30, f.Pop(nil).(*testObj).value; a != e { err = fmt.Errorf("expected %d, got %d", e, a) return } - if e, a := 14, f.Pop().(*testObj).value; a != e { + if e, a := 14, f.Pop(nil).(*testObj).value; a != e { err = fmt.Errorf("expected %d, got %d", e, a) return } diff --git a/contrib/mesos/pkg/queue/interface.go b/contrib/mesos/pkg/queue/interface.go index de10a47904a..33e43c0c9f7 100644 --- a/contrib/mesos/pkg/queue/interface.go +++ b/contrib/mesos/pkg/queue/interface.go @@ -59,7 +59,7 @@ type FIFO interface { // ready, they are returned in the order in which they were added/updated. // The item is removed from the queue (and the store) before it is returned, // so if you don't successfully process it, you need to add it back with Add(). - Pop() interface{} + Pop(cancel <-chan struct{}) interface{} // Await attempts to Pop within the given interval; upon success the non-nil // item is returned, otherwise nil @@ -101,3 +101,5 @@ type UniqueDeadlined interface { UniqueID Deadlined } + +func WithoutCancel() <-chan struct{} { return nil } diff --git a/contrib/mesos/pkg/scheduler/components/errorhandler/errorhandler.go b/contrib/mesos/pkg/scheduler/components/errorhandler/errorhandler.go index 92153bc6758..ebe5a0ea138 100644 --- a/contrib/mesos/pkg/scheduler/components/errorhandler/errorhandler.go +++ b/contrib/mesos/pkg/scheduler/components/errorhandler/errorhandler.go @@ -89,7 +89,7 @@ func (k *errorHandler) Error(pod *api.Pod, schedulingErr error) { } delay := k.backoff.Get(podKey) log.V(3).Infof("requeuing pod %v with delay %v", podKey, delay) - k.qr.Requeue(&queuer.Pod{Pod: pod, Delay: &delay, Notify: breakoutEarly}) + k.qr.Requeue(queuer.NewPod(pod, queuer.Delay(delay), queuer.Notify(breakoutEarly))) default: log.V(2).Infof("Task is no longer pending, aborting reschedule for pod %v", podKey) diff --git a/contrib/mesos/pkg/scheduler/components/podreconciler/podreconciler.go b/contrib/mesos/pkg/scheduler/components/podreconciler/podreconciler.go index a2c8652d510..f9bc2269428 100644 --- a/contrib/mesos/pkg/scheduler/components/podreconciler/podreconciler.go +++ b/contrib/mesos/pkg/scheduler/components/podreconciler/podreconciler.go @@ -104,7 +104,7 @@ func (s *podReconciler) Reconcile(t *podtask.T) { now := time.Now() log.V(3).Infof("reoffering pod %v", podKey) - s.qr.Reoffer(queuer.NewPodWithDeadline(pod, &now)) + s.qr.Reoffer(queuer.NewPod(pod, queuer.Deadline(now))) } else { // pod is scheduled. // not sure how this happened behind our backs. attempt to reconstruct diff --git a/contrib/mesos/pkg/scheduler/queuer/pod.go b/contrib/mesos/pkg/scheduler/queuer/pod.go index 85a73df18e5..07132a8f6b6 100644 --- a/contrib/mesos/pkg/scheduler/queuer/pod.go +++ b/contrib/mesos/pkg/scheduler/queuer/pod.go @@ -25,16 +25,44 @@ import ( "k8s.io/kubernetes/pkg/client/cache" ) +// functional Pod option +type PodOpt func(*Pod) + // wrapper for the k8s pod type so that we can define additional methods on a "pod" type Pod struct { *api.Pod deadline *time.Time - Delay *time.Duration - Notify queue.BreakChan + delay *time.Duration + notify queue.BreakChan } -func NewPodWithDeadline(pod *api.Pod, deadline *time.Time) *Pod { - return &Pod{Pod: pod, deadline: deadline} +func NewPod(pod *api.Pod, opt ...PodOpt) *Pod { + p := &Pod{Pod: pod} + for _, f := range opt { + f(p) + } + return p +} + +// Deadline sets the deadline for a Pod +func Deadline(deadline time.Time) PodOpt { + return func(pod *Pod) { + pod.deadline = &deadline + } +} + +// Delay sets the delay for a Pod +func Delay(delay time.Duration) PodOpt { + return func(pod *Pod) { + pod.delay = &delay + } +} + +// Notify sets the breakout notification channel for a Pod +func Notify(notify queue.BreakChan) PodOpt { + return func(pod *Pod) { + pod.notify = notify + } } // implements Copyable @@ -65,14 +93,14 @@ func (dp *Pod) Deadline() (time.Time, bool) { } func (dp *Pod) GetDelay() time.Duration { - if dp.Delay != nil { - return *(dp.Delay) + if dp.delay != nil { + return *(dp.delay) } return 0 } func (p *Pod) Breaker() queue.BreakChan { - return p.Notify + return p.notify } func (p *Pod) String() string { From 26593f1a6a16c15d0309200b12edf61f0968eda0 Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Sat, 12 Dec 2015 15:33:48 +0000 Subject: [PATCH 2/2] use WithoutCancel interface w/ Pop(nil) --- contrib/mesos/pkg/offers/offers.go | 2 +- contrib/mesos/pkg/queue/delay_test.go | 4 ++-- contrib/mesos/pkg/queue/historical_test.go | 14 +++++++------- contrib/mesos/pkg/queue/interface.go | 1 + 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/contrib/mesos/pkg/offers/offers.go b/contrib/mesos/pkg/offers/offers.go index 3a0229d554a..09e35ce1f0c 100644 --- a/contrib/mesos/pkg/offers/offers.go +++ b/contrib/mesos/pkg/offers/offers.go @@ -441,7 +441,7 @@ func (s *offerStorage) ageOffers() { } func (s *offerStorage) nextListener() *offerListener { - obj := s.listeners.Pop(nil) + obj := s.listeners.Pop(queue.WithoutCancel()) if listen, ok := obj.(*offerListener); !ok { //programming error panic(fmt.Sprintf("unexpected listener object %v", obj)) diff --git a/contrib/mesos/pkg/queue/delay_test.go b/contrib/mesos/pkg/queue/delay_test.go index 82512d0ed81..4d192bdc122 100644 --- a/contrib/mesos/pkg/queue/delay_test.go +++ b/contrib/mesos/pkg/queue/delay_test.go @@ -358,7 +358,7 @@ func TestDFIFO_sanity_check(t *testing.T) { // pop last before := time.Now() - x := df.Pop(nil) + x := df.Pop(WithoutCancel()) assert.Equal(a.(*testjob).instance, 2) now := time.Now() @@ -395,7 +395,7 @@ func TestDFIFO_Offer(t *testing.T) { } before := time.Now() - x := dq.Pop(nil) + x := dq.Pop(WithoutCancel()) now := time.Now() waitPeriod := now.Sub(before) diff --git a/contrib/mesos/pkg/queue/historical_test.go b/contrib/mesos/pkg/queue/historical_test.go index 1f1de907837..8597526c684 100644 --- a/contrib/mesos/pkg/queue/historical_test.go +++ b/contrib/mesos/pkg/queue/historical_test.go @@ -75,7 +75,7 @@ func TestFIFO_basic(t *testing.T) { lastInt := _int(0) lastUint := _uint(0) for i := 0; i < amount*2; i++ { - switch obj := f.Pop(nil).(type) { + switch obj := f.Pop(WithoutCancel()).(type) { case _int: if obj <= lastInt { t.Errorf("got %v (int) out of order, last was %v", obj, lastInt) @@ -100,7 +100,7 @@ func TestFIFO_addUpdate(t *testing.T) { got := make(chan *testObj, 2) go func() { for { - got <- f.Pop(nil).(*testObj) + got <- f.Pop(WithoutCancel()).(*testObj) } }() @@ -126,7 +126,7 @@ func TestFIFO_addReplace(t *testing.T) { got := make(chan *testObj, 2) go func() { for { - got <- f.Pop(nil).(*testObj) + got <- f.Pop(WithoutCancel()).(*testObj) } }() @@ -158,24 +158,24 @@ func TestFIFO_detectLineJumpers(t *testing.T) { done := make(chan struct{}) go func() { defer close(done) - if e, a := 13, f.Pop(nil).(*testObj).value; a != e { + if e, a := 13, f.Pop(WithoutCancel()).(*testObj).value; a != e { err = fmt.Errorf("expected %d, got %d", e, a) return } f.Add(&testObj{"foo", 14}) // ensure foo doesn't jump back in line - if e, a := 1, f.Pop(nil).(*testObj).value; a != e { + if e, a := 1, f.Pop(WithoutCancel()).(*testObj).value; a != e { err = fmt.Errorf("expected %d, got %d", e, a) return } - if e, a := 30, f.Pop(nil).(*testObj).value; a != e { + if e, a := 30, f.Pop(WithoutCancel()).(*testObj).value; a != e { err = fmt.Errorf("expected %d, got %d", e, a) return } - if e, a := 14, f.Pop(nil).(*testObj).value; a != e { + if e, a := 14, f.Pop(WithoutCancel()).(*testObj).value; a != e { err = fmt.Errorf("expected %d, got %d", e, a) return } diff --git a/contrib/mesos/pkg/queue/interface.go b/contrib/mesos/pkg/queue/interface.go index 33e43c0c9f7..0cba9522a30 100644 --- a/contrib/mesos/pkg/queue/interface.go +++ b/contrib/mesos/pkg/queue/interface.go @@ -102,4 +102,5 @@ type UniqueDeadlined interface { Deadlined } +// WithoutCancel returns a chan that may never be closed and always blocks func WithoutCancel() <-chan struct{} { return nil }