diff --git a/contrib/mesos/pkg/scheduler/components/framework/framework.go b/contrib/mesos/pkg/scheduler/components/framework/framework.go index 89c5a01a1fa..f5e1a356e0c 100644 --- a/contrib/mesos/pkg/scheduler/components/framework/framework.go +++ b/contrib/mesos/pkg/scheduler/components/framework/framework.go @@ -283,7 +283,7 @@ func (k *framework) onInitialRegistration(driver bindings.SchedulerDriver) { r1 := k.makeTaskRegistryReconciler() r2 := k.makePodRegistryReconciler() - k.tasksReconciler = taskreconciler.New(k.asRegisteredMaster, k.makeCompositeReconciler(r1, r2), + k.tasksReconciler = taskreconciler.New(k.asRegisteredMaster, taskreconciler.MakeComposite(k.terminate, r1, r2), k.reconcileCooldown, k.schedulerConfig.ExplicitReconciliationAbortTimeout.Duration, k.terminate) go k.tasksReconciler.Run(driver, k.terminate) @@ -569,58 +569,6 @@ func explicitTaskFilter(t *podtask.T) bool { } } -// invoke the given ReconcilerAction funcs in sequence, aborting the sequence if reconciliation -// is cancelled. if any other errors occur the composite reconciler will attempt to complete the -// sequence, reporting only the last generated error. -func (k *framework) makeCompositeReconciler(actions ...taskreconciler.Action) taskreconciler.Action { - if x := len(actions); x == 0 { - // programming error - panic("no actions specified for composite reconciler") - } else if x == 1 { - return actions[0] - } - chained := func(d bindings.SchedulerDriver, c <-chan struct{}, a, b taskreconciler.Action) <-chan error { - ech := a(d, c) - ch := make(chan error, 1) - go func() { - select { - case <-k.terminate: - case <-c: - case e := <-ech: - if e != nil { - ch <- e - return - } - ech = b(d, c) - select { - case <-k.terminate: - case <-c: - case e := <-ech: - if e != nil { - ch <- e - return - } - close(ch) - return - } - } - ch <- fmt.Errorf("aborting composite reconciler action") - }() - return ch - } - result := func(d bindings.SchedulerDriver, c <-chan struct{}) <-chan error { - return chained(d, c, actions[0], actions[1]) - } - for i := 2; i < len(actions); i++ { - i := i - next := func(d bindings.SchedulerDriver, c <-chan struct{}) <-chan error { - return chained(d, c, taskreconciler.Action(result), actions[i]) - } - result = next - } - return taskreconciler.Action(result) -} - // reconciler action factory, performs explicit task reconciliation for non-terminal // tasks listed in the scheduler's internal taskRegistry. func (k *framework) makeTaskRegistryReconciler() taskreconciler.Action { diff --git a/contrib/mesos/pkg/scheduler/components/tasksreconciler/tasksreconciler.go b/contrib/mesos/pkg/scheduler/components/tasksreconciler/tasksreconciler.go index 299dfffa584..3ba4fe6fc67 100644 --- a/contrib/mesos/pkg/scheduler/components/tasksreconciler/tasksreconciler.go +++ b/contrib/mesos/pkg/scheduler/components/tasksreconciler/tasksreconciler.go @@ -17,6 +17,7 @@ limitations under the License. package taskreconciler import ( + "fmt" "time" log "github.com/golang/glog" @@ -180,3 +181,55 @@ requestLoop: } } // for } + +// MakeComposite invokes the given ReconcilerAction funcs in sequence, aborting the sequence if reconciliation +// is cancelled. if any other errors occur the composite reconciler will attempt to complete the +// sequence, reporting only the last generated error. +func MakeComposite(done <-chan struct{}, actions ...Action) Action { + if x := len(actions); x == 0 { + // programming error + panic("no actions specified for composite reconciler") + } else if x == 1 { + return actions[0] + } + chained := func(d bindings.SchedulerDriver, c <-chan struct{}, a, b Action) <-chan error { + ech := a(d, c) + ch := make(chan error, 1) + go func() { + select { + case <-done: + case <-c: + case e := <-ech: + if e != nil { + ch <- e + return + } + ech = b(d, c) + select { + case <-done: + case <-c: + case e := <-ech: + if e != nil { + ch <- e + return + } + close(ch) + return + } + } + ch <- fmt.Errorf("aborting composite reconciler action") + }() + return ch + } + result := func(d bindings.SchedulerDriver, c <-chan struct{}) <-chan error { + return chained(d, c, actions[0], actions[1]) + } + for i := 2; i < len(actions); i++ { + i := i + next := func(d bindings.SchedulerDriver, c <-chan struct{}) <-chan error { + return chained(d, c, Action(result), actions[i]) + } + result = next + } + return Action(result) +}