Move reconciler.go into operations

This commit is contained in:
Dr. Stefan Schimanski
2015-10-26 21:21:54 -05:00
parent 23fa56adb1
commit 0bd1666d9b
3 changed files with 19 additions and 21 deletions

View File

@@ -18,9 +18,11 @@ package errors
import ( import (
"errors" "errors"
"fmt"
) )
var ( var (
NoSuchPodErr = errors.New("No such pod exists") NoSuchPodErr = errors.New("No such pod exists")
NoSuchTaskErr = errors.New("No such task exists") NoSuchTaskErr = errors.New("No such task exists")
ReconciliationCancelledErr = fmt.Errorf("explicit task reconciliation cancelled")
) )

View File

@@ -14,23 +14,19 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package scheduler package operations
import ( import (
"fmt"
"time" "time"
log "github.com/golang/glog" log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto" mesos "github.com/mesos/mesos-go/mesosproto"
bindings "github.com/mesos/mesos-go/scheduler" bindings "github.com/mesos/mesos-go/scheduler"
"k8s.io/kubernetes/contrib/mesos/pkg/proc" "k8s.io/kubernetes/contrib/mesos/pkg/proc"
merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics"
) )
var (
reconciliationCancelledErr = fmt.Errorf("explicit task reconciliation cancelled")
)
type ReconcilerAction func(driver bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error type ReconcilerAction func(driver bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error
type Reconciler struct { type Reconciler struct {
@@ -43,7 +39,7 @@ type Reconciler struct {
explicitReconciliationAbortTimeout time.Duration explicitReconciliationAbortTimeout time.Duration
} }
func newReconciler(doer proc.Doer, action ReconcilerAction, func NewReconciler(doer proc.Doer, action ReconcilerAction,
cooldown, explicitReconciliationAbortTimeout time.Duration, done <-chan struct{}) *Reconciler { cooldown, explicitReconciliationAbortTimeout time.Duration, done <-chan struct{}) *Reconciler {
return &Reconciler{ return &Reconciler{
Doer: doer, Doer: doer,
@@ -164,7 +160,7 @@ requestLoop:
metrics.ReconciliationExecuted.WithLabelValues("explicit").Inc() metrics.ReconciliationExecuted.WithLabelValues("explicit").Inc()
defer close(fin) defer close(fin)
err := <-r.Action(driver, cancel) err := <-r.Action(driver, cancel)
if err == reconciliationCancelledErr { if err == merrors.ReconciliationCancelledErr {
metrics.ReconciliationCancelled.WithLabelValues("explicit").Inc() metrics.ReconciliationCancelled.WithLabelValues("explicit").Inc()
log.Infoln(err.Error()) log.Infoln(err.Error())
} else if err != nil { } else if err != nil {

View File

@@ -38,6 +38,7 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/queue"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime" "k8s.io/kubernetes/contrib/mesos/pkg/runtime"
schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations"
@@ -55,7 +56,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/container"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/tools"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
plugin "k8s.io/kubernetes/plugin/pkg/scheduler" plugin "k8s.io/kubernetes/plugin/pkg/scheduler"
) )
@@ -101,7 +101,7 @@ type MesosScheduler struct {
// via deferred init // via deferred init
plugin PluginInterface plugin PluginInterface
reconciler *Reconciler reconciler *operations.Reconciler
reconcileCooldown time.Duration reconcileCooldown time.Duration
asRegisteredMaster proc.Doer asRegisteredMaster proc.Doer
terminate <-chan struct{} // signal chan, closes when we should kill background tasks terminate <-chan struct{} // signal chan, closes when we should kill background tasks
@@ -296,7 +296,7 @@ func (k *MesosScheduler) onInitialRegistration(driver bindings.SchedulerDriver)
r1 := k.makeTaskRegistryReconciler() r1 := k.makeTaskRegistryReconciler()
r2 := k.makePodRegistryReconciler() r2 := k.makePodRegistryReconciler()
k.reconciler = newReconciler(k.asRegisteredMaster, k.makeCompositeReconciler(r1, r2), k.reconciler = operations.NewReconciler(k.asRegisteredMaster, k.makeCompositeReconciler(r1, r2),
k.reconcileCooldown, k.schedulerConfig.ExplicitReconciliationAbortTimeout.Duration, k.terminate) k.reconcileCooldown, k.schedulerConfig.ExplicitReconciliationAbortTimeout.Duration, k.terminate)
go k.reconciler.Run(driver) go k.reconciler.Run(driver)
@@ -583,14 +583,14 @@ func explicitTaskFilter(t *podtask.T) bool {
// invoke the given ReconcilerAction funcs in sequence, aborting the sequence if reconciliation // invoke the given ReconcilerAction funcs in sequence, aborting the sequence if reconciliation
// is cancelled. if any other errors occur the composite reconciler will attempt to complete the // is cancelled. if any other errors occur the composite reconciler will attempt to complete the
// sequence, reporting only the last generated error. // sequence, reporting only the last generated error.
func (k *MesosScheduler) makeCompositeReconciler(actions ...ReconcilerAction) ReconcilerAction { func (k *MesosScheduler) makeCompositeReconciler(actions ...operations.ReconcilerAction) operations.ReconcilerAction {
if x := len(actions); x == 0 { if x := len(actions); x == 0 {
// programming error // programming error
panic("no actions specified for composite reconciler") panic("no actions specified for composite reconciler")
} else if x == 1 { } else if x == 1 {
return actions[0] return actions[0]
} }
chained := func(d bindings.SchedulerDriver, c <-chan struct{}, a, b ReconcilerAction) <-chan error { chained := func(d bindings.SchedulerDriver, c <-chan struct{}, a, b operations.ReconcilerAction) <-chan error {
ech := a(d, c) ech := a(d, c)
ch := make(chan error, 1) ch := make(chan error, 1)
go func() { go func() {
@@ -625,17 +625,17 @@ func (k *MesosScheduler) makeCompositeReconciler(actions ...ReconcilerAction) Re
for i := 2; i < len(actions); i++ { for i := 2; i < len(actions); i++ {
i := i i := i
next := func(d bindings.SchedulerDriver, c <-chan struct{}) <-chan error { next := func(d bindings.SchedulerDriver, c <-chan struct{}) <-chan error {
return chained(d, c, ReconcilerAction(result), actions[i]) return chained(d, c, operations.ReconcilerAction(result), actions[i])
} }
result = next result = next
} }
return ReconcilerAction(result) return operations.ReconcilerAction(result)
} }
// reconciler action factory, performs explicit task reconciliation for non-terminal // reconciler action factory, performs explicit task reconciliation for non-terminal
// tasks listed in the scheduler's internal taskRegistry. // tasks listed in the scheduler's internal taskRegistry.
func (k *MesosScheduler) makeTaskRegistryReconciler() ReconcilerAction { func (k *MesosScheduler) makeTaskRegistryReconciler() operations.ReconcilerAction {
return ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error { return operations.ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error {
taskToSlave := make(map[string]string) taskToSlave := make(map[string]string)
for _, t := range k.taskRegistry.List(explicitTaskFilter) { for _, t := range k.taskRegistry.List(explicitTaskFilter) {
if t.Spec.SlaveID != "" { if t.Spec.SlaveID != "" {
@@ -648,8 +648,8 @@ func (k *MesosScheduler) makeTaskRegistryReconciler() ReconcilerAction {
// reconciler action factory, performs explicit task reconciliation for non-terminal // reconciler action factory, performs explicit task reconciliation for non-terminal
// tasks identified by annotations in the Kubernetes pod registry. // tasks identified by annotations in the Kubernetes pod registry.
func (k *MesosScheduler) makePodRegistryReconciler() ReconcilerAction { func (k *MesosScheduler) makePodRegistryReconciler() operations.ReconcilerAction {
return ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error { return operations.ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error {
podList, err := k.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) podList, err := k.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything())
if err != nil { if err != nil {
return proc.ErrorChanf("failed to reconcile pod registry: %v", err) return proc.ErrorChanf("failed to reconcile pod registry: %v", err)
@@ -694,7 +694,7 @@ func (k *MesosScheduler) explicitlyReconcileTasks(driver bindings.SchedulerDrive
select { select {
case <-cancel: case <-cancel:
return reconciliationCancelledErr return merrors.ReconciliationCancelledErr
default: default:
if _, err := driver.ReconcileTasks(statusList); err != nil { if _, err := driver.ReconcileTasks(statusList); err != nil {
return err return err
@@ -711,7 +711,7 @@ func (k *MesosScheduler) explicitlyReconcileTasks(driver bindings.SchedulerDrive
} }
select { select {
case <-cancel: case <-cancel:
return reconciliationCancelledErr return merrors.ReconciliationCancelledErr
case <-time.After(backoff): case <-time.After(backoff):
for taskId := range remaining { for taskId := range remaining {
if task, _ := k.taskRegistry.Get(taskId); task != nil && explicitTaskFilter(task) && task.UpdatedTime.Before(start) { if task, _ := k.taskRegistry.Get(taskId); task != nil && explicitTaskFilter(task) && task.UpdatedTime.Before(start) {