diff --git a/contrib/mesos/pkg/scheduler/errors/errors.go b/contrib/mesos/pkg/scheduler/errors/errors.go index 8823ad84d63..ddeab132aec 100644 --- a/contrib/mesos/pkg/scheduler/errors/errors.go +++ b/contrib/mesos/pkg/scheduler/errors/errors.go @@ -18,9 +18,11 @@ package errors import ( "errors" + "fmt" ) var ( NoSuchPodErr = errors.New("No such pod exists") NoSuchTaskErr = errors.New("No such task exists") + ReconciliationCancelledErr = fmt.Errorf("explicit task reconciliation cancelled") ) diff --git a/contrib/mesos/pkg/scheduler/reconciler.go b/contrib/mesos/pkg/scheduler/operations/reconciler.go similarity index 96% rename from contrib/mesos/pkg/scheduler/reconciler.go rename to contrib/mesos/pkg/scheduler/operations/reconciler.go index 85622298a64..2616852101e 100644 --- a/contrib/mesos/pkg/scheduler/reconciler.go +++ b/contrib/mesos/pkg/scheduler/operations/reconciler.go @@ -14,23 +14,19 @@ See the License for the specific language governing permissions and limitations under the License. */ -package scheduler +package operations import ( - "fmt" "time" log "github.com/golang/glog" mesos "github.com/mesos/mesos-go/mesosproto" bindings "github.com/mesos/mesos-go/scheduler" "k8s.io/kubernetes/contrib/mesos/pkg/proc" + merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics" ) -var ( - reconciliationCancelledErr = fmt.Errorf("explicit task reconciliation cancelled") -) - type ReconcilerAction func(driver bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error type Reconciler struct { @@ -43,7 +39,7 @@ type Reconciler struct { explicitReconciliationAbortTimeout time.Duration } -func newReconciler(doer proc.Doer, action ReconcilerAction, +func NewReconciler(doer proc.Doer, action ReconcilerAction, cooldown, explicitReconciliationAbortTimeout time.Duration, done <-chan struct{}) *Reconciler { return &Reconciler{ Doer: doer, @@ -164,7 +160,7 @@ requestLoop: metrics.ReconciliationExecuted.WithLabelValues("explicit").Inc() defer close(fin) err := <-r.Action(driver, cancel) - if err == reconciliationCancelledErr { + if err == merrors.ReconciliationCancelledErr { metrics.ReconciliationCancelled.WithLabelValues("explicit").Inc() log.Infoln(err.Error()) } else if err != nil { diff --git a/contrib/mesos/pkg/scheduler/scheduler.go b/contrib/mesos/pkg/scheduler/scheduler.go index 9290903df44..130f1d06204 100644 --- a/contrib/mesos/pkg/scheduler/scheduler.go +++ b/contrib/mesos/pkg/scheduler/scheduler.go @@ -38,6 +38,7 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/runtime" schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" + merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations" @@ -55,7 +56,6 @@ import ( "k8s.io/kubernetes/pkg/kubelet/container" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/tools" "k8s.io/kubernetes/pkg/util/sets" plugin "k8s.io/kubernetes/plugin/pkg/scheduler" ) @@ -101,7 +101,7 @@ type MesosScheduler struct { // via deferred init plugin PluginInterface - reconciler *Reconciler + reconciler *operations.Reconciler reconcileCooldown time.Duration asRegisteredMaster proc.Doer terminate <-chan struct{} // signal chan, closes when we should kill background tasks @@ -296,7 +296,7 @@ func (k *MesosScheduler) onInitialRegistration(driver bindings.SchedulerDriver) r1 := k.makeTaskRegistryReconciler() r2 := k.makePodRegistryReconciler() - k.reconciler = newReconciler(k.asRegisteredMaster, k.makeCompositeReconciler(r1, r2), + k.reconciler = operations.NewReconciler(k.asRegisteredMaster, k.makeCompositeReconciler(r1, r2), k.reconcileCooldown, k.schedulerConfig.ExplicitReconciliationAbortTimeout.Duration, k.terminate) go k.reconciler.Run(driver) @@ -583,14 +583,14 @@ func explicitTaskFilter(t *podtask.T) bool { // invoke the given ReconcilerAction funcs in sequence, aborting the sequence if reconciliation // is cancelled. if any other errors occur the composite reconciler will attempt to complete the // sequence, reporting only the last generated error. -func (k *MesosScheduler) makeCompositeReconciler(actions ...ReconcilerAction) ReconcilerAction { +func (k *MesosScheduler) makeCompositeReconciler(actions ...operations.ReconcilerAction) operations.ReconcilerAction { if x := len(actions); x == 0 { // programming error panic("no actions specified for composite reconciler") } else if x == 1 { return actions[0] } - chained := func(d bindings.SchedulerDriver, c <-chan struct{}, a, b ReconcilerAction) <-chan error { + chained := func(d bindings.SchedulerDriver, c <-chan struct{}, a, b operations.ReconcilerAction) <-chan error { ech := a(d, c) ch := make(chan error, 1) go func() { @@ -625,17 +625,17 @@ func (k *MesosScheduler) makeCompositeReconciler(actions ...ReconcilerAction) Re for i := 2; i < len(actions); i++ { i := i next := func(d bindings.SchedulerDriver, c <-chan struct{}) <-chan error { - return chained(d, c, ReconcilerAction(result), actions[i]) + return chained(d, c, operations.ReconcilerAction(result), actions[i]) } result = next } - return ReconcilerAction(result) + return operations.ReconcilerAction(result) } // reconciler action factory, performs explicit task reconciliation for non-terminal // tasks listed in the scheduler's internal taskRegistry. -func (k *MesosScheduler) makeTaskRegistryReconciler() ReconcilerAction { - return ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error { +func (k *MesosScheduler) makeTaskRegistryReconciler() operations.ReconcilerAction { + return operations.ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error { taskToSlave := make(map[string]string) for _, t := range k.taskRegistry.List(explicitTaskFilter) { if t.Spec.SlaveID != "" { @@ -648,8 +648,8 @@ func (k *MesosScheduler) makeTaskRegistryReconciler() ReconcilerAction { // reconciler action factory, performs explicit task reconciliation for non-terminal // tasks identified by annotations in the Kubernetes pod registry. -func (k *MesosScheduler) makePodRegistryReconciler() ReconcilerAction { - return ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error { +func (k *MesosScheduler) makePodRegistryReconciler() operations.ReconcilerAction { + return operations.ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error { podList, err := k.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) if err != nil { return proc.ErrorChanf("failed to reconcile pod registry: %v", err) @@ -694,7 +694,7 @@ func (k *MesosScheduler) explicitlyReconcileTasks(driver bindings.SchedulerDrive select { case <-cancel: - return reconciliationCancelledErr + return merrors.ReconciliationCancelledErr default: if _, err := driver.ReconcileTasks(statusList); err != nil { return err @@ -711,7 +711,7 @@ func (k *MesosScheduler) explicitlyReconcileTasks(driver bindings.SchedulerDrive } select { case <-cancel: - return reconciliationCancelledErr + return merrors.ReconciliationCancelledErr case <-time.After(backoff): for taskId := range remaining { if task, _ := k.taskRegistry.Get(taskId); task != nil && explicitTaskFilter(task) && task.UpdatedTime.Before(start) {