diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index 7d1a078a442..83340dc07ba 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -475,7 +475,7 @@ func (k *KubernetesMesosExecutor) launchTask(driver bindings.ExecutorDriver, tas // TODO(k8s): use Pods interface for binding once clusters are upgraded // return b.Pods(binding.Namespace).Bind(binding) if pod.Spec.NodeName == "" { - //HACK(jdef): cloned binding construction from k8s plugin/pkg/scheduler/scheduler.go + //HACK(jdef): cloned binding construction from k8s plugin/pkg/scheduler/framework.go binding := &api.Binding{ ObjectMeta: api.ObjectMeta{ Namespace: pod.Namespace, @@ -780,7 +780,7 @@ func (k *KubernetesMesosExecutor) FrameworkMessage(driver bindings.ExecutorDrive } log.Infof("Receives message from framework %v\n", message) - //TODO(jdef) master reported a lost task, reconcile this! @see scheduler.go:handleTaskLost + //TODO(jdef) master reported a lost task, reconcile this! @see framework.go:handleTaskLost if strings.HasPrefix(message, messages.TaskLost+":") { taskId := message[len(messages.TaskLost)+1:] if taskId != "" { diff --git a/contrib/mesos/pkg/scheduler/framework.go b/contrib/mesos/pkg/scheduler/framework.go index 2b21ab8ff89..c8ad65f8fd7 100644 --- a/contrib/mesos/pkg/scheduler/framework.go +++ b/contrib/mesos/pkg/scheduler/framework.go @@ -17,48 +17,741 @@ limitations under the License. package scheduler import ( + "fmt" + "io" + "math" + "net/http" + "reflect" "sync" + "time" + log "github.com/golang/glog" mesos "github.com/mesos/mesos-go/mesosproto" mutil "github.com/mesos/mesos-go/mesosutil" + bindings "github.com/mesos/mesos-go/scheduler" + execcfg "k8s.io/kubernetes/contrib/mesos/pkg/executor/config" + "k8s.io/kubernetes/contrib/mesos/pkg/executor/messages" + "k8s.io/kubernetes/contrib/mesos/pkg/node" "k8s.io/kubernetes/contrib/mesos/pkg/offers" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" + offermetrics "k8s.io/kubernetes/contrib/mesos/pkg/offers/metrics" + "k8s.io/kubernetes/contrib/mesos/pkg/proc" + "k8s.io/kubernetes/contrib/mesos/pkg/runtime" + schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" + merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/slave" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/uid" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/kubelet/container" + kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/util/sets" ) -type MesosFramework struct { - sync.Mutex - MesosScheduler *MesosScheduler +type Framework struct { + // We use a lock here to avoid races + // between invoking the mesos callback + *sync.RWMutex + + // Config related, write-once + sched types.Scheduler + schedulerConfig *schedcfg.Config + executor *mesos.ExecutorInfo + executorGroup uint64 + client *client.Client + failoverTimeout float64 // in seconds + reconcileInterval int64 + nodeRegistrator node.Registrator + storeFrameworkId func(id string) + + // Mesos context. + driver bindings.SchedulerDriver // late initialization + frameworkId *mesos.FrameworkID + masterInfo *mesos.MasterInfo + registered bool + registration chan struct{} // signal chan that closes upon first successful registration + onRegistration sync.Once + offers offers.Registry + slaveHostNames *slave.Registry + + // via deferred init + tasksReconciler *operations.TasksReconciler + reconcileCooldown time.Duration + asRegisteredMaster proc.Doer + terminate <-chan struct{} // signal chan, closes when we should kill background tasks } -func (fw *MesosFramework) PodScheduler() podschedulers.PodScheduler { - return fw.MesosScheduler.podScheduler +type Config struct { + SchedulerConfig schedcfg.Config + Executor *mesos.ExecutorInfo + Client *client.Client + StoreFrameworkId func(id string) + FailoverTimeout float64 + ReconcileInterval int64 + ReconcileCooldown time.Duration + LookupNode node.LookupFunc } -func (fw *MesosFramework) Offers() offers.Registry { - return fw.MesosScheduler.offers +// New creates a new Framework +func New(config Config) *Framework { + var k *Framework + k = &Framework{ + schedulerConfig: &config.SchedulerConfig, + RWMutex: new(sync.RWMutex), + executor: config.Executor, + executorGroup: uid.Parse(config.Executor.ExecutorId.GetValue()).Group(), + client: config.Client, + failoverTimeout: config.FailoverTimeout, + reconcileInterval: config.ReconcileInterval, + nodeRegistrator: node.NewRegistrator(config.Client, config.LookupNode), + offers: offers.CreateRegistry(offers.RegistryConfig{ + Compat: func(o *mesos.Offer) bool { + // the node must be registered and have up-to-date labels + n := config.LookupNode(o.GetHostname()) + if n == nil || !node.IsUpToDate(n, node.SlaveAttributesToLabels(o.GetAttributes())) { + return false + } + + // the executor IDs must not identify a kubelet-executor with a group that doesn't match ours + for _, eid := range o.GetExecutorIds() { + execuid := uid.Parse(eid.GetValue()) + if execuid.Name() == execcfg.DefaultInfoID && execuid.Group() != k.executorGroup { + return false + } + } + + return true + }, + DeclineOffer: func(id string) <-chan error { + errOnce := proc.NewErrorOnce(k.terminate) + errOuter := k.asRegisteredMaster.Do(func() { + var err error + defer errOnce.Report(err) + offerId := mutil.NewOfferID(id) + filters := &mesos.Filters{} + _, err = k.driver.DeclineOffer(offerId, filters) + }) + return errOnce.Send(errOuter).Err() + }, + // remember expired offers so that we can tell if a previously scheduler offer relies on one + LingerTTL: config.SchedulerConfig.OfferLingerTTL.Duration, + TTL: config.SchedulerConfig.OfferTTL.Duration, + ListenerDelay: config.SchedulerConfig.ListenerDelay.Duration, + }), + slaveHostNames: slave.NewRegistry(), + reconcileCooldown: config.ReconcileCooldown, + registration: make(chan struct{}), + asRegisteredMaster: proc.DoerFunc(func(proc.Action) <-chan error { + return proc.ErrorChanf("cannot execute action with unregistered scheduler") + }), + storeFrameworkId: config.StoreFrameworkId, + } + return k } -func (fw *MesosFramework) Tasks() podtask.Registry { - return fw.MesosScheduler.taskRegistry +func (k *Framework) Init(scheduler *Scheduler, electedMaster proc.Process, mux *http.ServeMux) error { + log.V(1).Infoln("initializing kubernetes mesos scheduler") + + k.sched = scheduler + k.asRegisteredMaster = proc.DoerFunc(func(a proc.Action) <-chan error { + if !k.registered { + return proc.ErrorChanf("failed to execute action, scheduler is disconnected") + } + return electedMaster.Do(a) + }) + k.terminate = electedMaster.Done() + k.offers.Init(k.terminate) + k.InstallDebugHandlers(mux) + k.nodeRegistrator.Run(k.terminate) + return k.recoverTasks() } -func (fw *MesosFramework) SlaveHostNameFor(id string) string { - return fw.MesosScheduler.slaveHostNames.HostName(id) +func (k *Framework) asMaster() proc.Doer { + k.RLock() + defer k.RUnlock() + return k.asRegisteredMaster } -func (fw *MesosFramework) KillTask(taskId string) error { - killTaskId := mutil.NewTaskID(taskId) - _, err := fw.MesosScheduler.driver.KillTask(killTaskId) +func (k *Framework) InstallDebugHandlers(mux *http.ServeMux) { + wrappedHandler := func(uri string, h http.Handler) { + mux.HandleFunc(uri, func(w http.ResponseWriter, r *http.Request) { + ch := make(chan struct{}) + closer := runtime.Closer(ch) + proc.OnError(k.asMaster().Do(func() { + defer closer() + h.ServeHTTP(w, r) + }), func(err error) { + defer closer() + log.Warningf("failed HTTP request for %s: %v", uri, err) + w.WriteHeader(http.StatusServiceUnavailable) + }, k.terminate) + select { + case <-time.After(k.schedulerConfig.HttpHandlerTimeout.Duration): + log.Warningf("timed out waiting for request to be processed") + w.WriteHeader(http.StatusServiceUnavailable) + return + case <-ch: // noop + } + }) + } + requestReconciliation := func(uri string, requestAction func()) { + wrappedHandler(uri, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requestAction() + w.WriteHeader(http.StatusNoContent) + })) + } + requestReconciliation("/debug/actions/requestExplicit", k.tasksReconciler.RequestExplicit) + requestReconciliation("/debug/actions/requestImplicit", k.tasksReconciler.RequestImplicit) + + wrappedHandler("/debug/actions/kamikaze", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + slaves := k.slaveHostNames.SlaveIDs() + for _, slaveId := range slaves { + _, err := k.driver.SendFrameworkMessage( + k.executor.ExecutorId, + mutil.NewSlaveID(slaveId), + messages.Kamikaze) + if err != nil { + log.Warningf("failed to send kamikaze message to slave %s: %v", slaveId, err) + } else { + io.WriteString(w, fmt.Sprintf("kamikaze slave %s\n", slaveId)) + } + } + io.WriteString(w, "OK") + })) +} + +func (k *Framework) Registration() <-chan struct{} { + return k.registration +} + +// Registered is called when the scheduler registered with the master successfully. +func (k *Framework) Registered(drv bindings.SchedulerDriver, fid *mesos.FrameworkID, mi *mesos.MasterInfo) { + log.Infof("Scheduler registered with the master: %v with frameworkId: %v\n", mi, fid) + + k.driver = drv + k.frameworkId = fid + k.masterInfo = mi + k.registered = true + + k.onRegistration.Do(func() { k.onInitialRegistration(drv) }) + k.tasksReconciler.RequestExplicit() +} + +// Reregistered is called when the scheduler re-registered with the master successfully. +// This happends when the master fails over. +func (k *Framework) Reregistered(drv bindings.SchedulerDriver, mi *mesos.MasterInfo) { + log.Infof("Scheduler reregistered with the master: %v\n", mi) + + k.driver = drv + k.masterInfo = mi + k.registered = true + + k.onRegistration.Do(func() { k.onInitialRegistration(drv) }) + k.tasksReconciler.RequestExplicit() +} + +// perform one-time initialization actions upon the first registration event received from Mesos. +func (k *Framework) onInitialRegistration(driver bindings.SchedulerDriver) { + defer close(k.registration) + + if k.failoverTimeout > 0 { + refreshInterval := k.schedulerConfig.FrameworkIdRefreshInterval.Duration + if k.failoverTimeout < k.schedulerConfig.FrameworkIdRefreshInterval.Duration.Seconds() { + refreshInterval = time.Duration(math.Max(1, k.failoverTimeout/2)) * time.Second + } + go runtime.Until(func() { + k.storeFrameworkId(k.frameworkId.GetValue()) + }, refreshInterval, k.terminate) + } + + r1 := k.makeTaskRegistryReconciler() + r2 := k.makePodRegistryReconciler() + + k.tasksReconciler = operations.NewTasksReconciler(k.asRegisteredMaster, k.makeCompositeReconciler(r1, r2), + k.reconcileCooldown, k.schedulerConfig.ExplicitReconciliationAbortTimeout.Duration, k.terminate) + go k.tasksReconciler.Run(driver) + + if k.reconcileInterval > 0 { + ri := time.Duration(k.reconcileInterval) * time.Second + time.AfterFunc(k.schedulerConfig.InitialImplicitReconciliationDelay.Duration, func() { runtime.Until(k.tasksReconciler.RequestImplicit, ri, k.terminate) }) + log.Infof("will perform implicit task reconciliation at interval: %v after %v", ri, k.schedulerConfig.InitialImplicitReconciliationDelay.Duration) + } +} + +// Disconnected is called when the scheduler loses connection to the master. +func (k *Framework) Disconnected(driver bindings.SchedulerDriver) { + log.Infof("Master disconnected!\n") + + k.registered = false + + // discard all cached offers to avoid unnecessary TASK_LOST updates + k.offers.Invalidate("") +} + +// ResourceOffers is called when the scheduler receives some offers from the master. +func (k *Framework) ResourceOffers(driver bindings.SchedulerDriver, offers []*mesos.Offer) { + log.V(2).Infof("Received offers %+v", offers) + + // Record the offers in the global offer map as well as each slave's offer map. + k.offers.Add(offers) + for _, offer := range offers { + slaveId := offer.GetSlaveId().GetValue() + k.slaveHostNames.Register(slaveId, offer.GetHostname()) + + // create api object if not existing already + if k.nodeRegistrator != nil { + labels := node.SlaveAttributesToLabels(offer.GetAttributes()) + _, err := k.nodeRegistrator.Register(offer.GetHostname(), labels) + if err != nil { + log.Error(err) + } + } + } +} + +// OfferRescinded is called when the resources are recinded from the scheduler. +func (k *Framework) OfferRescinded(driver bindings.SchedulerDriver, offerId *mesos.OfferID) { + log.Infof("Offer rescinded %v\n", offerId) + + oid := offerId.GetValue() + k.offers.Delete(oid, offermetrics.OfferRescinded) +} + +// StatusUpdate is called when a status update message is sent to the scheduler. +func (k *Framework) StatusUpdate(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { + + source, reason := "none", "none" + if taskStatus.Source != nil { + source = (*taskStatus.Source).String() + } + if taskStatus.Reason != nil { + reason = (*taskStatus.Reason).String() + } + taskState := taskStatus.GetState() + metrics.StatusUpdates.WithLabelValues(source, reason, taskState.String()).Inc() + + message := "none" + if taskStatus.Message != nil { + message = *taskStatus.Message + } + + log.Infof( + "task status update %q from %q for task %q on slave %q executor %q for reason %q with message %q", + taskState.String(), + source, + taskStatus.TaskId.GetValue(), + taskStatus.SlaveId.GetValue(), + taskStatus.ExecutorId.GetValue(), + reason, + message, + ) + + switch taskState { + case mesos.TaskState_TASK_RUNNING, mesos.TaskState_TASK_FINISHED, mesos.TaskState_TASK_STARTING, mesos.TaskState_TASK_STAGING: + if _, state := k.sched.Tasks().UpdateStatus(taskStatus); state == podtask.StateUnknown { + if taskState != mesos.TaskState_TASK_FINISHED { + //TODO(jdef) what if I receive this after a TASK_LOST or TASK_KILLED? + //I don't want to reincarnate then.. TASK_LOST is a special case because + //the master is stateless and there are scenarios where I may get TASK_LOST + //followed by TASK_RUNNING. + //TODO(jdef) consider running this asynchronously since there are API server + //calls that may be made + k.reconcileNonTerminalTask(driver, taskStatus) + } // else, we don't really care about FINISHED tasks that aren't registered + return + } + if hostName := k.slaveHostNames.HostName(taskStatus.GetSlaveId().GetValue()); hostName == "" { + // a registered task has an update reported by a slave that we don't recognize. + // this should never happen! So we don't reconcile it. + log.Errorf("Ignore status %+v because the slave does not exist", taskStatus) + return + } + case mesos.TaskState_TASK_FAILED, mesos.TaskState_TASK_ERROR: + if task, _ := k.sched.Tasks().UpdateStatus(taskStatus); task != nil { + if task.Has(podtask.Launched) && !task.Has(podtask.Bound) { + go k.sched.Reconcile(task) + return + } + } else { + // unknown task failed, not much we can do about it + return + } + // last-ditch effort to reconcile our records + fallthrough + case mesos.TaskState_TASK_LOST, mesos.TaskState_TASK_KILLED: + k.reconcileTerminalTask(driver, taskStatus) + default: + log.Errorf( + "unknown task status %q from %q for task %q on slave %q executor %q for reason %q with message %q", + taskState.String(), + source, + taskStatus.TaskId.GetValue(), + taskStatus.SlaveId.GetValue(), + taskStatus.ExecutorId.GetValue(), + reason, + message, + ) + } +} + +func (k *Framework) reconcileTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { + task, state := k.sched.Tasks().UpdateStatus(taskStatus) + + if (state == podtask.StateRunning || state == podtask.StatePending) && + ((taskStatus.GetSource() == mesos.TaskStatus_SOURCE_MASTER && taskStatus.GetReason() == mesos.TaskStatus_REASON_RECONCILIATION) || + (taskStatus.GetSource() == mesos.TaskStatus_SOURCE_SLAVE && taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_TERMINATED) || + (taskStatus.GetSource() == mesos.TaskStatus_SOURCE_SLAVE && taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_UNREGISTERED) || + (taskStatus.GetSource() == mesos.TaskStatus_SOURCE_EXECUTOR && taskStatus.GetMessage() == messages.ContainersDisappeared)) { + //-- + // pod-task has metadata that refers to: + // (1) a task that Mesos no longer knows about, or else + // (2) a pod that the Kubelet will never report as "failed" + // (3) a pod that the kubeletExecutor reported as lost (likely due to docker daemon crash/restart) + // For now, destroy the pod and hope that there's a replication controller backing it up. + // TODO(jdef) for case #2 don't delete the pod, just update it's status to Failed + pod := &task.Pod + log.Warningf("deleting rogue pod %v/%v for lost task %v", pod.Namespace, pod.Name, task.ID) + if err := k.client.Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err != nil && !errors.IsNotFound(err) { + log.Errorf("failed to delete pod %v/%v for terminal task %v: %v", pod.Namespace, pod.Name, task.ID, err) + } + } else if taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_TERMINATED || taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_UNREGISTERED { + // attempt to prevent dangling pods in the pod and task registries + log.V(1).Infof("request explicit reconciliation to clean up for task %v after executor reported (terminated/unregistered)", taskStatus.TaskId.GetValue()) + k.tasksReconciler.RequestExplicit() + } else if taskStatus.GetState() == mesos.TaskState_TASK_LOST && state == podtask.StateRunning && taskStatus.ExecutorId != nil && taskStatus.SlaveId != nil { + //TODO(jdef) this may not be meaningful once we have proper checkpointing and master detection + //If we're reconciling and receive this then the executor may be + //running a task that we need it to kill. It's possible that the framework + //is unrecognized by the master at this point, so KillTask is not guaranteed + //to do anything. The underlying driver transport may be able to send a + //FrameworkMessage directly to the slave to terminate the task. + log.V(2).Info("forwarding TASK_LOST message to executor %v on slave %v", taskStatus.ExecutorId, taskStatus.SlaveId) + data := fmt.Sprintf("%s:%s", messages.TaskLost, task.ID) //TODO(jdef) use a real message type + if _, err := driver.SendFrameworkMessage(taskStatus.ExecutorId, taskStatus.SlaveId, data); err != nil { + log.Error(err.Error()) + } + } +} + +// reconcile an unknown (from the perspective of our registry) non-terminal task +func (k *Framework) reconcileNonTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { + // attempt to recover task from pod info: + // - task data may contain an api.PodStatusResult; if status.reason == REASON_RECONCILIATION then status.data == nil + // - the Name can be parsed by container.ParseFullName() to yield a pod Name and Namespace + // - pull the pod metadata down from the api server + // - perform task recovery based on pod metadata + taskId := taskStatus.TaskId.GetValue() + if taskStatus.GetReason() == mesos.TaskStatus_REASON_RECONCILIATION && taskStatus.GetSource() == mesos.TaskStatus_SOURCE_MASTER { + // there will be no data in the task status that we can use to determine the associated pod + switch taskStatus.GetState() { + case mesos.TaskState_TASK_STAGING: + // there is still hope for this task, don't kill it just yet + //TODO(jdef) there should probably be a limit for how long we tolerate tasks stuck in this state + return + default: + // for TASK_{STARTING,RUNNING} we should have already attempted to recoverTasks() for. + // if the scheduler failed over before the executor fired TASK_STARTING, then we should *not* + // be processing this reconciliation update before we process the one from the executor. + // point: we don't know what this task is (perhaps there was unrecoverable metadata in the pod), + // so it gets killed. + log.Errorf("killing non-terminal, unrecoverable task %v", taskId) + } + } else if podStatus, err := podtask.ParsePodStatusResult(taskStatus); err != nil { + // possible rogue pod exists at this point because we can't identify it; should kill the task + log.Errorf("possible rogue pod; illegal task status data for task %v, expected an api.PodStatusResult: %v", taskId, err) + } else if name, namespace, err := container.ParsePodFullName(podStatus.Name); err != nil { + // possible rogue pod exists at this point because we can't identify it; should kill the task + log.Errorf("possible rogue pod; illegal api.PodStatusResult, unable to parse full pod name from: '%v' for task %v: %v", + podStatus.Name, taskId, err) + } else if pod, err := k.client.Pods(namespace).Get(name); err == nil { + if t, ok, err := podtask.RecoverFrom(*pod); ok { + log.Infof("recovered task %v from metadata in pod %v/%v", taskId, namespace, name) + _, err := k.sched.Tasks().Register(t, nil) + if err != nil { + // someone beat us to it?! + log.Warningf("failed to register recovered task: %v", err) + return + } else { + k.sched.Tasks().UpdateStatus(taskStatus) + } + return + } else if err != nil { + //should kill the pod and the task + log.Errorf("killing pod, failed to recover task from pod %v/%v: %v", namespace, name, err) + if err := k.client.Pods(namespace).Delete(name, nil); err != nil { + log.Errorf("failed to delete pod %v/%v: %v", namespace, name, err) + } + } else { + //this is pretty unexpected: we received a TASK_{STARTING,RUNNING} message, but the apiserver's pod + //metadata is not appropriate for task reconstruction -- which should almost certainly never + //be the case unless someone swapped out the pod on us (and kept the same namespace/name) while + //we were failed over. + + //kill this task, allow the newly launched scheduler to schedule the new pod + log.Warningf("unexpected pod metadata for task %v in apiserver, assuming new unscheduled pod spec: %+v", taskId, pod) + } + } else if errors.IsNotFound(err) { + // pod lookup failed, should delete the task since the pod is no longer valid; may be redundant, that's ok + log.Infof("killing task %v since pod %v/%v no longer exists", taskId, namespace, name) + } else if errors.IsServerTimeout(err) { + log.V(2).Infof("failed to reconcile task due to API server timeout: %v", err) + return + } else { + log.Errorf("unexpected API server error, aborting reconcile for task %v: %v", taskId, err) + return + } + if _, err := driver.KillTask(taskStatus.TaskId); err != nil { + log.Errorf("failed to kill task %v: %v", taskId, err) + } +} + +// FrameworkMessage is called when the scheduler receives a message from the executor. +func (k *Framework) FrameworkMessage(driver bindings.SchedulerDriver, + executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, message string) { + log.Infof("Received messages from executor %v of slave %v, %v\n", executorId, slaveId, message) +} + +// SlaveLost is called when some slave is lost. +func (k *Framework) SlaveLost(driver bindings.SchedulerDriver, slaveId *mesos.SlaveID) { + log.Infof("Slave %v is lost\n", slaveId) + + sid := slaveId.GetValue() + k.offers.InvalidateForSlave(sid) + + // TODO(jdef): delete slave from our internal list? probably not since we may need to reconcile + // tasks. it would be nice to somehow flag the slave as lost so that, perhaps, we can periodically + // flush lost slaves older than X, and for which no tasks or pods reference. + + // unfinished tasks/pods will be dropped. use a replication controller if you want pods to + // be restarted when slaves die. +} + +// ExecutorLost is called when some executor is lost. +func (k *Framework) ExecutorLost(driver bindings.SchedulerDriver, executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, status int) { + log.Infof("Executor %v of slave %v is lost, status: %v\n", executorId, slaveId, status) + // TODO(yifan): Restart any unfinished tasks of the executor. +} + +// Error is called when there is an unrecoverable error in the scheduler or scheduler driver. +// The driver should have been aborted before this is invoked. +func (k *Framework) Error(driver bindings.SchedulerDriver, message string) { + log.Fatalf("fatal scheduler error: %v\n", message) +} + +// filter func used for explicit task reconciliation, selects only non-terminal tasks which +// have been communicated to mesos (read: launched). +func explicitTaskFilter(t *podtask.T) bool { + switch t.State { + case podtask.StateRunning: + return true + case podtask.StatePending: + return t.Has(podtask.Launched) + default: + return false + } +} + +// invoke the given ReconcilerAction funcs in sequence, aborting the sequence if reconciliation +// is cancelled. if any other errors occur the composite reconciler will attempt to complete the +// sequence, reporting only the last generated error. +func (k *Framework) makeCompositeReconciler(actions ...operations.ReconcilerAction) operations.ReconcilerAction { + if x := len(actions); x == 0 { + // programming error + panic("no actions specified for composite reconciler") + } else if x == 1 { + return actions[0] + } + chained := func(d bindings.SchedulerDriver, c <-chan struct{}, a, b operations.ReconcilerAction) <-chan error { + ech := a(d, c) + ch := make(chan error, 1) + go func() { + select { + case <-k.terminate: + case <-c: + case e := <-ech: + if e != nil { + ch <- e + return + } + ech = b(d, c) + select { + case <-k.terminate: + case <-c: + case e := <-ech: + if e != nil { + ch <- e + return + } + close(ch) + return + } + } + ch <- fmt.Errorf("aborting composite reconciler action") + }() + return ch + } + result := func(d bindings.SchedulerDriver, c <-chan struct{}) <-chan error { + return chained(d, c, actions[0], actions[1]) + } + for i := 2; i < len(actions); i++ { + i := i + next := func(d bindings.SchedulerDriver, c <-chan struct{}) <-chan error { + return chained(d, c, operations.ReconcilerAction(result), actions[i]) + } + result = next + } + return operations.ReconcilerAction(result) +} + +// reconciler action factory, performs explicit task reconciliation for non-terminal +// tasks listed in the scheduler's internal taskRegistry. +func (k *Framework) makeTaskRegistryReconciler() operations.ReconcilerAction { + return operations.ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error { + taskToSlave := make(map[string]string) + for _, t := range k.sched.Tasks().List(explicitTaskFilter) { + if t.Spec.SlaveID != "" { + taskToSlave[t.ID] = t.Spec.SlaveID + } + } + return proc.ErrorChan(k.explicitlyReconcileTasks(drv, taskToSlave, cancel)) + }) +} + +// reconciler action factory, performs explicit task reconciliation for non-terminal +// tasks identified by annotations in the Kubernetes pod registry. +func (k *Framework) makePodRegistryReconciler() operations.ReconcilerAction { + return operations.ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error { + podList, err := k.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) + if err != nil { + return proc.ErrorChanf("failed to reconcile pod registry: %v", err) + } + taskToSlave := make(map[string]string) + for _, pod := range podList.Items { + if len(pod.Annotations) == 0 { + continue + } + taskId, found := pod.Annotations[meta.TaskIdKey] + if !found { + continue + } + slaveId, found := pod.Annotations[meta.SlaveIdKey] + if !found { + continue + } + taskToSlave[taskId] = slaveId + } + return proc.ErrorChan(k.explicitlyReconcileTasks(drv, taskToSlave, cancel)) + }) +} + +// execute an explicit task reconciliation, as per http://mesos.apache.org/documentation/latest/reconciliation/ +func (k *Framework) explicitlyReconcileTasks(driver bindings.SchedulerDriver, taskToSlave map[string]string, cancel <-chan struct{}) error { + log.Info("explicit reconcile tasks") + + // tell mesos to send us the latest status updates for all the non-terminal tasks that we know about + statusList := []*mesos.TaskStatus{} + remaining := sets.KeySet(reflect.ValueOf(taskToSlave)) + for taskId, slaveId := range taskToSlave { + if slaveId == "" { + delete(taskToSlave, taskId) + continue + } + statusList = append(statusList, &mesos.TaskStatus{ + TaskId: mutil.NewTaskID(taskId), + SlaveId: mutil.NewSlaveID(slaveId), + State: mesos.TaskState_TASK_RUNNING.Enum(), // req'd field, doesn't have to reflect reality + }) + } + + select { + case <-cancel: + return merrors.ReconciliationCancelledErr + default: + if _, err := driver.ReconcileTasks(statusList); err != nil { + return err + } + } + + start := time.Now() + first := true + for backoff := 1 * time.Second; first || remaining.Len() > 0; backoff = backoff * 2 { + first = false + // nothing to do here other than wait for status updates.. + if backoff > k.schedulerConfig.ExplicitReconciliationMaxBackoff.Duration { + backoff = k.schedulerConfig.ExplicitReconciliationMaxBackoff.Duration + } + select { + case <-cancel: + return merrors.ReconciliationCancelledErr + case <-time.After(backoff): + for taskId := range remaining { + if task, _ := k.sched.Tasks().Get(taskId); task != nil && explicitTaskFilter(task) && task.UpdatedTime.Before(start) { + // keep this task in remaining list + continue + } + remaining.Delete(taskId) + } + } + } + return nil +} + +func (ks *Framework) recoverTasks() error { + podList, err := ks.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) + if err != nil { + log.V(1).Infof("failed to recover pod registry, madness may ensue: %v", err) + return err + } + recoverSlave := func(t *podtask.T) { + + slaveId := t.Spec.SlaveID + ks.slaveHostNames.Register(slaveId, t.Offer.Host()) + } + for _, pod := range podList.Items { + if _, isMirrorPod := pod.Annotations[kubetypes.ConfigMirrorAnnotationKey]; isMirrorPod { + // mirrored pods are never reconciled because the scheduler isn't responsible for + // scheduling them; they're started by the executor/kubelet upon instantiation and + // reflected in the apiserver afterward. the scheduler has no knowledge of them. + continue + } + if t, ok, err := podtask.RecoverFrom(pod); err != nil { + log.Errorf("failed to recover task from pod, will attempt to delete '%v/%v': %v", pod.Namespace, pod.Name, err) + err := ks.client.Pods(pod.Namespace).Delete(pod.Name, nil) + //TODO(jdef) check for temporary or not-found errors + if err != nil { + log.Errorf("failed to delete pod '%v/%v': %v", pod.Namespace, pod.Name, err) + } + } else if ok { + ks.sched.Tasks().Register(t, nil) + recoverSlave(t) + log.Infof("recovered task %v from pod %v/%v", t.ID, pod.Namespace, pod.Name) + } + } + return nil +} + +func (ks *Framework) KillTask(id string) error { + killTaskId := mutil.NewTaskID(id) + _, err := ks.driver.KillTask(killTaskId) return err } -func (fw *MesosFramework) LaunchTask(task *podtask.T) error { +func (ks *Framework) LaunchTask(t *podtask.T) error { // assume caller is holding scheduler lock - ei := fw.MesosScheduler.executor - taskList := []*mesos.TaskInfo{task.BuildTaskInfo(ei)} - offerIds := []*mesos.OfferID{task.Offer.Details().Id} + taskList := []*mesos.TaskInfo{t.BuildTaskInfo(ks.executor)} + offerIds := []*mesos.OfferID{t.Offer.Details().Id} filters := &mesos.Filters{} - _, err := fw.MesosScheduler.driver.LaunchTasks(offerIds, taskList, filters) + _, err := ks.driver.LaunchTasks(offerIds, taskList, filters) return err } + +func (ks *Framework) Offers() offers.Registry { + return ks.offers +} diff --git a/contrib/mesos/pkg/scheduler/scheduler_test.go b/contrib/mesos/pkg/scheduler/framework_test.go similarity index 76% rename from contrib/mesos/pkg/scheduler/scheduler_test.go rename to contrib/mesos/pkg/scheduler/framework_test.go index 8c714f1106f..689d21d1291 100644 --- a/contrib/mesos/pkg/scheduler/scheduler_test.go +++ b/contrib/mesos/pkg/scheduler/framework_test.go @@ -26,11 +26,12 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/offers" "k8s.io/kubernetes/contrib/mesos/pkg/proc" schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/slave" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/mock" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" ) //get number of non-expired offers from offer registry @@ -82,12 +83,19 @@ func (r *mockRegistrator) Register(hostName string, labels map[string]string) (b } } +func mockScheduler() types.Scheduler { + mockScheduler := &types.MockScheduler{} + reg := podtask.NewInMemoryRegistry() + mockScheduler.On("Tasks").Return(reg) + return mockScheduler +} + //test adding of ressource offer, should be added to offer registry and slaves func TestResourceOffer_Add(t *testing.T) { assert := assert.New(t) registrator := &mockRegistrator{cache.NewStore(cache.MetaNamespaceKeyFunc)} - testScheduler := &MesosScheduler{ + testFramework := &Framework{ offers: offers.CreateRegistry(offers.RegistryConfig{ Compat: func(o *mesos.Offer) bool { return true @@ -102,37 +110,38 @@ func TestResourceOffer_Add(t *testing.T) { }), slaveHostNames: slave.NewRegistry(), nodeRegistrator: registrator, + sched: mockScheduler(), } hostname := "h1" offerID1 := util.NewOfferID("test1") offer1 := &mesos.Offer{Id: offerID1, Hostname: &hostname, SlaveId: util.NewSlaveID(hostname)} offers1 := []*mesos.Offer{offer1} - testScheduler.ResourceOffers(nil, offers1) + testFramework.ResourceOffers(nil, offers1) assert.Equal(1, len(registrator.store.List())) - assert.Equal(1, getNumberOffers(testScheduler.offers)) + assert.Equal(1, getNumberOffers(testFramework.offers)) //check slave hostname - assert.Equal(1, len(testScheduler.slaveHostNames.SlaveIDs())) + assert.Equal(1, len(testFramework.slaveHostNames.SlaveIDs())) //add another offer hostname2 := "h2" offer2 := &mesos.Offer{Id: util.NewOfferID("test2"), Hostname: &hostname2, SlaveId: util.NewSlaveID(hostname2)} offers2 := []*mesos.Offer{offer2} - testScheduler.ResourceOffers(nil, offers2) + testFramework.ResourceOffers(nil, offers2) //check it is stored in registry - assert.Equal(2, getNumberOffers(testScheduler.offers)) + assert.Equal(2, getNumberOffers(testFramework.offers)) //check slave hostnames - assert.Equal(2, len(testScheduler.slaveHostNames.SlaveIDs())) + assert.Equal(2, len(testFramework.slaveHostNames.SlaveIDs())) } //test adding of ressource offer, should be added to offer registry and slavesf func TestResourceOffer_Add_Rescind(t *testing.T) { assert := assert.New(t) - testScheduler := &MesosScheduler{ + testFramework := &Framework{ offers: offers.CreateRegistry(offers.RegistryConfig{ Compat: func(o *mesos.Offer) bool { return true @@ -146,41 +155,42 @@ func TestResourceOffer_Add_Rescind(t *testing.T) { ListenerDelay: schedcfg.DefaultListenerDelay, }), slaveHostNames: slave.NewRegistry(), + sched: mockScheduler(), } hostname := "h1" offerID1 := util.NewOfferID("test1") offer1 := &mesos.Offer{Id: offerID1, Hostname: &hostname, SlaveId: util.NewSlaveID(hostname)} offers1 := []*mesos.Offer{offer1} - testScheduler.ResourceOffers(nil, offers1) + testFramework.ResourceOffers(nil, offers1) - assert.Equal(1, getNumberOffers(testScheduler.offers)) + assert.Equal(1, getNumberOffers(testFramework.offers)) //check slave hostname - assert.Equal(1, len(testScheduler.slaveHostNames.SlaveIDs())) + assert.Equal(1, len(testFramework.slaveHostNames.SlaveIDs())) //add another offer hostname2 := "h2" offer2 := &mesos.Offer{Id: util.NewOfferID("test2"), Hostname: &hostname2, SlaveId: util.NewSlaveID(hostname2)} offers2 := []*mesos.Offer{offer2} - testScheduler.ResourceOffers(nil, offers2) + testFramework.ResourceOffers(nil, offers2) - assert.Equal(2, getNumberOffers(testScheduler.offers)) + assert.Equal(2, getNumberOffers(testFramework.offers)) //check slave hostnames - assert.Equal(2, len(testScheduler.slaveHostNames.SlaveIDs())) + assert.Equal(2, len(testFramework.slaveHostNames.SlaveIDs())) //next whether offers can be rescinded - testScheduler.OfferRescinded(nil, offerID1) - assert.Equal(1, getNumberOffers(testScheduler.offers)) + testFramework.OfferRescinded(nil, offerID1) + assert.Equal(1, getNumberOffers(testFramework.offers)) //next whether offers can be rescinded - testScheduler.OfferRescinded(nil, util.NewOfferID("test2")) + testFramework.OfferRescinded(nil, util.NewOfferID("test2")) //walk offers again and check it is removed from registry - assert.Equal(0, getNumberOffers(testScheduler.offers)) + assert.Equal(0, getNumberOffers(testFramework.offers)) //remove non existing ID - testScheduler.OfferRescinded(nil, util.NewOfferID("notExist")) + testFramework.OfferRescinded(nil, util.NewOfferID("notExist")) } //test that when a slave is lost we remove all offers @@ -188,7 +198,7 @@ func TestSlave_Lost(t *testing.T) { assert := assert.New(t) // - testScheduler := &MesosScheduler{ + testFramework := &Framework{ offers: offers.CreateRegistry(offers.RegistryConfig{ Compat: func(o *mesos.Offer) bool { return true @@ -199,44 +209,45 @@ func TestSlave_Lost(t *testing.T) { ListenerDelay: schedcfg.DefaultListenerDelay, }), slaveHostNames: slave.NewRegistry(), + sched: mockScheduler(), } hostname := "h1" offer1 := &mesos.Offer{Id: util.NewOfferID("test1"), Hostname: &hostname, SlaveId: util.NewSlaveID(hostname)} offers1 := []*mesos.Offer{offer1} - testScheduler.ResourceOffers(nil, offers1) + testFramework.ResourceOffers(nil, offers1) offer2 := &mesos.Offer{Id: util.NewOfferID("test2"), Hostname: &hostname, SlaveId: util.NewSlaveID(hostname)} offers2 := []*mesos.Offer{offer2} - testScheduler.ResourceOffers(nil, offers2) + testFramework.ResourceOffers(nil, offers2) //add another offer from different slaveID hostname2 := "h2" offer3 := &mesos.Offer{Id: util.NewOfferID("test3"), Hostname: &hostname2, SlaveId: util.NewSlaveID(hostname2)} offers3 := []*mesos.Offer{offer3} - testScheduler.ResourceOffers(nil, offers3) + testFramework.ResourceOffers(nil, offers3) //test precondition - assert.Equal(3, getNumberOffers(testScheduler.offers)) - assert.Equal(2, len(testScheduler.slaveHostNames.SlaveIDs())) + assert.Equal(3, getNumberOffers(testFramework.offers)) + assert.Equal(2, len(testFramework.slaveHostNames.SlaveIDs())) //remove first slave - testScheduler.SlaveLost(nil, util.NewSlaveID(hostname)) + testFramework.SlaveLost(nil, util.NewSlaveID(hostname)) //offers should be removed - assert.Equal(1, getNumberOffers(testScheduler.offers)) + assert.Equal(1, getNumberOffers(testFramework.offers)) //slave hostnames should still be all present - assert.Equal(2, len(testScheduler.slaveHostNames.SlaveIDs())) + assert.Equal(2, len(testFramework.slaveHostNames.SlaveIDs())) //remove second slave - testScheduler.SlaveLost(nil, util.NewSlaveID(hostname2)) + testFramework.SlaveLost(nil, util.NewSlaveID(hostname2)) //offers should be removed - assert.Equal(0, getNumberOffers(testScheduler.offers)) + assert.Equal(0, getNumberOffers(testFramework.offers)) //slave hostnames should still be all present - assert.Equal(2, len(testScheduler.slaveHostNames.SlaveIDs())) + assert.Equal(2, len(testFramework.slaveHostNames.SlaveIDs())) //try to remove non existing slave - testScheduler.SlaveLost(nil, util.NewSlaveID("notExist")) + testFramework.SlaveLost(nil, util.NewSlaveID("notExist")) } @@ -245,7 +256,7 @@ func TestDisconnect(t *testing.T) { assert := assert.New(t) // - testScheduler := &MesosScheduler{ + testFramework := &Framework{ offers: offers.CreateRegistry(offers.RegistryConfig{ Compat: func(o *mesos.Offer) bool { return true @@ -256,29 +267,30 @@ func TestDisconnect(t *testing.T) { ListenerDelay: schedcfg.DefaultListenerDelay, }), slaveHostNames: slave.NewRegistry(), + sched: mockScheduler(), } hostname := "h1" offer1 := &mesos.Offer{Id: util.NewOfferID("test1"), Hostname: &hostname, SlaveId: util.NewSlaveID(hostname)} offers1 := []*mesos.Offer{offer1} - testScheduler.ResourceOffers(nil, offers1) + testFramework.ResourceOffers(nil, offers1) offer2 := &mesos.Offer{Id: util.NewOfferID("test2"), Hostname: &hostname, SlaveId: util.NewSlaveID(hostname)} offers2 := []*mesos.Offer{offer2} - testScheduler.ResourceOffers(nil, offers2) + testFramework.ResourceOffers(nil, offers2) //add another offer from different slaveID hostname2 := "h2" offer3 := &mesos.Offer{Id: util.NewOfferID("test2"), Hostname: &hostname2, SlaveId: util.NewSlaveID(hostname2)} offers3 := []*mesos.Offer{offer3} - testScheduler.ResourceOffers(nil, offers3) + testFramework.ResourceOffers(nil, offers3) //disconnect - testScheduler.Disconnected(nil) + testFramework.Disconnected(nil) //all offers should be removed - assert.Equal(0, getNumberOffers(testScheduler.offers)) + assert.Equal(0, getNumberOffers(testFramework.offers)) //slave hostnames should still be all present - assert.Equal(2, len(testScheduler.slaveHostNames.SlaveIDs())) + assert.Equal(2, len(testFramework.slaveHostNames.SlaveIDs())) } //test we can handle different status updates, TODO check state transitions @@ -288,7 +300,7 @@ func TestStatus_Update(t *testing.T) { // setup expectations mockdriver.On("KillTask", util.NewTaskID("test-task-001")).Return(mesos.Status_DRIVER_RUNNING, nil) - testScheduler := &MesosScheduler{ + testFramework := &Framework{ offers: offers.CreateRegistry(offers.RegistryConfig{ Compat: func(o *mesos.Offer) bool { return true @@ -300,26 +312,26 @@ func TestStatus_Update(t *testing.T) { }), slaveHostNames: slave.NewRegistry(), driver: &mockdriver, - taskRegistry: podtask.NewInMemoryRegistry(), + sched: mockScheduler(), } taskStatus_task_starting := util.NewTaskStatus( util.NewTaskID("test-task-001"), mesos.TaskState_TASK_RUNNING, ) - testScheduler.StatusUpdate(testScheduler.driver, taskStatus_task_starting) + testFramework.StatusUpdate(testFramework.driver, taskStatus_task_starting) taskStatus_task_running := util.NewTaskStatus( util.NewTaskID("test-task-001"), mesos.TaskState_TASK_RUNNING, ) - testScheduler.StatusUpdate(testScheduler.driver, taskStatus_task_running) + testFramework.StatusUpdate(testFramework.driver, taskStatus_task_running) taskStatus_task_failed := util.NewTaskStatus( util.NewTaskID("test-task-001"), mesos.TaskState_TASK_FAILED, ) - testScheduler.StatusUpdate(testScheduler.driver, taskStatus_task_failed) + testFramework.StatusUpdate(testFramework.driver, taskStatus_task_failed) //assert that mock was invoked mockdriver.AssertExpectations(t) diff --git a/contrib/mesos/pkg/scheduler/ha/ha.go b/contrib/mesos/pkg/scheduler/ha/ha.go index 6c745067ffe..eb44c34a79b 100644 --- a/contrib/mesos/pkg/scheduler/ha/ha.go +++ b/contrib/mesos/pkg/scheduler/ha/ha.go @@ -112,10 +112,10 @@ type SchedulerProcess struct { fin chan struct{} } -func New(sched bindings.Scheduler) *SchedulerProcess { +func New(framework bindings.Scheduler) *SchedulerProcess { p := &SchedulerProcess{ Process: proc.New(), - Scheduler: sched, + Scheduler: framework, stage: initStage, elected: make(chan struct{}), failover: make(chan struct{}), diff --git a/contrib/mesos/pkg/scheduler/integration_test.go b/contrib/mesos/pkg/scheduler/integration_test.go index 3e5802c2a4a..18d551c9231 100644 --- a/contrib/mesos/pkg/scheduler/integration_test.go +++ b/contrib/mesos/pkg/scheduler/integration_test.go @@ -426,11 +426,10 @@ type lifecycleTest struct { apiServer *TestServer driver *mmock.JoinableDriver eventObs *EventObserver - loop operations.SchedulerLoopInterface - podReconciler *operations.PodReconciler podsListWatch *MockPodsListWatch - scheduler *MesosScheduler + framework *Framework schedulerProc *ha.SchedulerProcess + scheduler *Scheduler t *testing.T } @@ -450,7 +449,25 @@ func newLifecycleTest(t *testing.T) lifecycleTest { ) ei.Data = []byte{0, 1, 2} - // create scheduler + // create framework + client := client.NewOrDie(&client.Config{ + Host: apiServer.server.URL, + Version: testapi.Default.Version(), + }) + c := *schedcfg.CreateDefaultConfig() + framework := New(Config{ + Executor: ei, + Client: client, + SchedulerConfig: c, + LookupNode: apiServer.LookupNode, + }) + + // TODO(sttts): re-enable the following tests + // assert.NotNil(framework.client, "client is nil") + // assert.NotNil(framework.executor, "executor is nil") + // assert.NotNil(framework.offers, "offer registry is nil") + + // create pod scheduler strategy := podschedulers.NewAllocationStrategy( podtask.NewDefaultPredicate( mresource.DefaultDefaultContainerCPULimit, @@ -461,33 +478,15 @@ func newLifecycleTest(t *testing.T) lifecycleTest { mresource.DefaultDefaultContainerMemLimit, ), ) - - client := client.NewOrDie(&client.Config{ - Host: apiServer.server.URL, - Version: testapi.Default.Version(), - }) - c := *schedcfg.CreateDefaultConfig() - mesosScheduler := New(Config{ - Executor: ei, - Client: client, - PodScheduler: podschedulers.NewFCFSPodScheduler(strategy, apiServer.LookupNode), - SchedulerConfig: c, - LookupNode: apiServer.LookupNode, - }) - - // TODO(sttts): re-enable the following tests - // assert.NotNil(mesosScheduler.client, "client is nil") - // assert.NotNil(mesosScheduler.executor, "executor is nil") - // assert.NotNil(mesosScheduler.offers, "offer registry is nil") + fcfs := podschedulers.NewFCFSPodScheduler(strategy, apiServer.LookupNode) // create scheduler process - schedulerProc := ha.New(mesosScheduler) + schedulerProc := ha.New(framework) - // create scheduler loop - fw := &MesosFramework{MesosScheduler: mesosScheduler} + // create scheduler eventObs := NewEventObserver() - loop, podReconciler := operations.NewScheduler(&c, fw, client, eventObs, schedulerProc.Terminal(), http.DefaultServeMux, &podsListWatch.ListWatch) - assert.NotNil(loop) + scheduler := NewScheduler(&c, framework, fcfs, client, eventObs, schedulerProc.Terminal(), http.DefaultServeMux, &podsListWatch.ListWatch) + assert.NotNil(scheduler) // create mock mesos scheduler driver driver := &mmock.JoinableDriver{} @@ -496,23 +495,22 @@ func newLifecycleTest(t *testing.T) lifecycleTest { apiServer: apiServer, driver: driver, eventObs: eventObs, - loop: loop, podsListWatch: podsListWatch, - podReconciler: podReconciler, - scheduler: mesosScheduler, + framework: framework, schedulerProc: schedulerProc, + scheduler: scheduler, t: t, } } func (lt lifecycleTest) Start() <-chan LaunchedTask { assert := &EventAssertions{*assert.New(lt.t)} - lt.loop.Run(lt.schedulerProc.Terminal()) + lt.scheduler.Run(lt.schedulerProc.Terminal()) - // init scheduler - err := lt.scheduler.Init( + // init framework + err := lt.framework.Init( + lt.scheduler, lt.schedulerProc.Master(), - lt.podReconciler, http.DefaultServeMux, ) assert.NoError(err) @@ -565,7 +563,7 @@ func (lt lifecycleTest) Start() <-chan LaunchedTask { <-started // tell scheduler to be registered - lt.scheduler.Registered( + lt.framework.Registered( lt.driver, mesosutil.NewFrameworkID("kubernetes-id"), mesosutil.NewMasterInfo("master-id", (192<<24)+(168<<16)+(0<<8)+1, 5050), @@ -605,25 +603,25 @@ func TestScheduler_LifeCycle(t *testing.T) { // add some matching offer offers := []*mesos.Offer{NewTestOffer(fmt.Sprintf("offer%d", i))} - lt.scheduler.ResourceOffers(nil, offers) + lt.framework.ResourceOffers(nil, offers) // first offer is declined because node is not available yet lt.apiServer.WaitForNode("some_hostname") // add one more offer - lt.scheduler.ResourceOffers(nil, offers) + lt.framework.ResourceOffers(nil, offers) // and wait for scheduled pod assert.EventWithReason(lt.eventObs, operations.Scheduled) select { case launchedTask := <-launchedTasks: // report back that the task has been staged, and then started by mesos - lt.scheduler.StatusUpdate( + lt.framework.StatusUpdate( lt.driver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING), ) - lt.scheduler.StatusUpdate( + lt.framework.StatusUpdate( lt.driver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING), ) @@ -634,7 +632,7 @@ func TestScheduler_LifeCycle(t *testing.T) { // report back that the task has been lost lt.driver.AssertNumberOfCalls(t, "SendFrameworkMessage", 0) - lt.scheduler.StatusUpdate( + lt.framework.StatusUpdate( lt.driver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_LOST), ) @@ -654,14 +652,14 @@ func TestScheduler_LifeCycle(t *testing.T) { assert.EventWithReason(lt.eventObs, operations.FailedScheduling, "failedScheduling event not received") // supply a matching offer - lt.scheduler.ResourceOffers(lt.driver, offers) + lt.framework.ResourceOffers(lt.driver, offers) for _, offer := range offers { if _, ok := offeredNodes[offer.GetHostname()]; !ok { offeredNodes[offer.GetHostname()] = struct{}{} lt.apiServer.WaitForNode(offer.GetHostname()) // reoffer since it must have been declined above - lt.scheduler.ResourceOffers(lt.driver, []*mesos.Offer{offer}) + lt.framework.ResourceOffers(lt.driver, []*mesos.Offer{offer}) } } @@ -696,11 +694,11 @@ func TestScheduler_LifeCycle(t *testing.T) { pod, launchedTask, offer := launchPodWithOffers(pod, offers) if pod != nil { // report back status - lt.scheduler.StatusUpdate( + lt.framework.StatusUpdate( lt.driver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING), ) - lt.scheduler.StatusUpdate( + lt.framework.StatusUpdate( lt.driver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING), ) @@ -736,7 +734,7 @@ func TestScheduler_LifeCycle(t *testing.T) { select { case <-killTaskCalled: // report back that the task is finished - lt.scheduler.StatusUpdate( + lt.framework.StatusUpdate( lt.driver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_FINISHED), ) @@ -761,8 +759,8 @@ func TestScheduler_LifeCycle(t *testing.T) { assert.Equal(offers[1].Id.GetValue(), usedOffer.Id.GetValue()) assert.Equal(pod.Spec.NodeName, *usedOffer.Hostname) - lt.scheduler.OfferRescinded(lt.driver, offers[0].Id) - lt.scheduler.OfferRescinded(lt.driver, offers[2].Id) + lt.framework.OfferRescinded(lt.driver, offers[0].Id) + lt.framework.OfferRescinded(lt.driver, offers[2].Id) // start pods: // - which are failing while binding, @@ -774,7 +772,7 @@ func TestScheduler_LifeCycle(t *testing.T) { status := newTaskStatusForTask(task, mesos.TaskState_TASK_FAILED) message := messages.CreateBindingFailure status.Message = &message - lt.scheduler.StatusUpdate(lt.driver, status) + lt.framework.StatusUpdate(lt.driver, status) // wait until pod is looked up at the apiserver assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { @@ -796,7 +794,7 @@ func TestScheduler_LifeCycle(t *testing.T) { podKey, _ := podtask.MakePodKey(api.NewDefaultContext(), pod.Name) assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { - t, _ := lt.scheduler.taskRegistry.ForPod(podKey) + t, _ := lt.scheduler.Tasks().ForPod(podKey) return t == nil }) diff --git a/contrib/mesos/pkg/scheduler/operations/algorithm.go b/contrib/mesos/pkg/scheduler/operations/algorithm.go index 59c0bd0b1c6..b102e180359 100644 --- a/contrib/mesos/pkg/scheduler/operations/algorithm.go +++ b/contrib/mesos/pkg/scheduler/operations/algorithm.go @@ -23,6 +23,7 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/offers" "k8s.io/kubernetes/contrib/mesos/pkg/queue" merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types" "k8s.io/kubernetes/pkg/api" @@ -31,14 +32,16 @@ import ( // SchedulerAlgorithm implements the algorithm.ScheduleAlgorithm interface type SchedulerAlgorithm struct { - fw types.Framework - podUpdates queue.FIFO + sched types.Scheduler + podUpdates queue.FIFO + podScheduler podschedulers.PodScheduler } -func NewSchedulerAlgorithm(fw types.Framework, podUpdates queue.FIFO) *SchedulerAlgorithm { +func NewSchedulerAlgorithm(sched types.Scheduler, podUpdates queue.FIFO, podScheduler podschedulers.PodScheduler) *SchedulerAlgorithm { return &SchedulerAlgorithm{ - fw: fw, - podUpdates: podUpdates, + sched: sched, + podUpdates: podUpdates, + podScheduler: podScheduler, } } @@ -54,10 +57,10 @@ func (k *SchedulerAlgorithm) Schedule(pod *api.Pod) (string, error) { return "", err } - k.fw.Lock() - defer k.fw.Unlock() + k.sched.Lock() + defer k.sched.Unlock() - switch task, state := k.fw.Tasks().ForPod(podKey); state { + switch task, state := k.sched.Tasks().ForPod(podKey); state { case podtask.StateUnknown: // There's a bit of a potential race here, a pod could have been yielded() and // then before we get *here* it could be deleted. @@ -77,7 +80,7 @@ func (k *SchedulerAlgorithm) Schedule(pod *api.Pod) (string, error) { log.Warningf("aborting Schedule, unable to create podtask object %+v: %v", pod, err) return "", err } - return k.doSchedule(k.fw.Tasks().Register(podTask, nil)) + return k.doSchedule(k.sched.Tasks().Register(podTask, nil)) //TODO(jdef) it's possible that the pod state has diverged from what //we knew previously, we should probably update the task.Pod state here @@ -107,19 +110,19 @@ func (k *SchedulerAlgorithm) doSchedule(task *podtask.T, err error) (string, err if task.HasAcceptedOffer() { // verify that the offer is still on the table offerId := task.GetOfferId() - if offer, ok := k.fw.Offers().Get(offerId); ok && !offer.HasExpired() { + if offer, ok := k.sched.Offers().Get(offerId); ok && !offer.HasExpired() { // skip tasks that have already have assigned offers offer = task.Offer } else { task.Offer.Release() task.Reset() - if err = k.fw.Tasks().Update(task); err != nil { + if err = k.sched.Tasks().Update(task); err != nil { return "", err } } } if err == nil && offer == nil { - offer, err = k.fw.PodScheduler().SchedulePod(k.fw.Offers(), k.fw, task) + offer, err = k.podScheduler.SchedulePod(k.sched.Offers(), task) } if err != nil { return "", err @@ -128,24 +131,16 @@ func (k *SchedulerAlgorithm) doSchedule(task *podtask.T, err error) (string, err if details == nil { return "", fmt.Errorf("offer already invalid/expired for task %v", task.ID) } - slaveId := details.GetSlaveId().GetValue() - if slaveHostName := k.fw.SlaveHostNameFor(slaveId); slaveHostName == "" { - // not much sense in Release()ing the offer here since its owner died - offer.Release() - k.fw.Offers().Invalidate(details.Id.GetValue()) - return "", fmt.Errorf("Slave disappeared (%v) while scheduling task %v", slaveId, task.ID) - } else { - if task.Offer != nil && task.Offer != offer { - return "", fmt.Errorf("task.offer assignment must be idempotent, task %+v: offer %+v", task, offer) - } - - task.Offer = offer - k.fw.PodScheduler().Procurement()(task, details) // TODO(jdef) why is nothing checking the error returned here? - - if err := k.fw.Tasks().Update(task); err != nil { - offer.Release() - return "", err - } - return slaveHostName, nil + if task.Offer != nil && task.Offer != offer { + return "", fmt.Errorf("task.offer assignment must be idempotent, task %+v: offer %+v", task, offer) } + + task.Offer = offer + k.podScheduler.Procurement()(task, details) // TODO(jdef) why is nothing checking the error returned here? + + if err := k.sched.Tasks().Update(task); err != nil { + offer.Release() + return "", err + } + return details.GetHostname(), nil } diff --git a/contrib/mesos/pkg/scheduler/operations/binder.go b/contrib/mesos/pkg/scheduler/operations/binder.go index 6d344e600af..b0d2a9fbeed 100644 --- a/contrib/mesos/pkg/scheduler/operations/binder.go +++ b/contrib/mesos/pkg/scheduler/operations/binder.go @@ -29,12 +29,12 @@ import ( ) type Binder struct { - fw types.Framework + sched types.Scheduler } -func NewBinder(fw types.Framework) *Binder { +func NewBinder(sched types.Scheduler) *Binder { return &Binder{ - fw: fw, + sched: sched, } } @@ -49,10 +49,10 @@ func (b *Binder) Bind(binding *api.Binding) error { return err } - b.fw.Lock() - defer b.fw.Unlock() + b.sched.Lock() + defer b.sched.Unlock() - switch task, state := b.fw.Tasks().ForPod(podKey); state { + switch task, state := b.sched.Tasks().ForPod(podKey); state { case podtask.StatePending: return b.bind(ctx, binding, task) default: @@ -66,7 +66,7 @@ func (b *Binder) Bind(binding *api.Binding) error { func (b *Binder) rollback(task *podtask.T, err error) error { task.Offer.Release() task.Reset() - if err2 := b.fw.Tasks().Update(task); err2 != nil { + if err2 := b.sched.Tasks().Update(task); err2 != nil { log.Errorf("failed to update pod task: %v", err2) } return err @@ -88,7 +88,7 @@ func (b *Binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (e // By this time, there is a chance that the slave is disconnected. offerId := task.GetOfferId() - if offer, ok := b.fw.Offers().Get(offerId); !ok || offer.HasExpired() { + if offer, ok := b.sched.Offers().Get(offerId); !ok || offer.HasExpired() { // already rescinded or timed out or otherwise invalidated return b.rollback(task, fmt.Errorf("failed prior to launchTask due to expired offer for task %v", task.ID)) } @@ -96,10 +96,10 @@ func (b *Binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (e if err = b.prepareTaskForLaunch(ctx, binding.Target.Name, task, offerId); err == nil { log.V(2).Infof("launching task: %q on target %q slave %q for pod \"%v/%v\", cpu %.2f, mem %.2f MB", task.ID, binding.Target.Name, task.Spec.SlaveID, task.Pod.Namespace, task.Pod.Name, task.Spec.CPU, task.Spec.Memory) - if err = b.fw.LaunchTask(task); err == nil { - b.fw.Offers().Invalidate(offerId) + if err = b.sched.LaunchTask(task); err == nil { + b.sched.Offers().Invalidate(offerId) task.Set(podtask.Launched) - if err = b.fw.Tasks().Update(task); err != nil { + if err = b.sched.Tasks().Update(task); err != nil { // this should only happen if the task has been removed or has changed status, // which SHOULD NOT HAPPEN as long as we're synchronizing correctly log.Errorf("failed to update task w/ Launched status: %v", err) diff --git a/contrib/mesos/pkg/scheduler/operations/deleter.go b/contrib/mesos/pkg/scheduler/operations/deleter.go index d70d0db97a4..027813a1600 100644 --- a/contrib/mesos/pkg/scheduler/operations/deleter.go +++ b/contrib/mesos/pkg/scheduler/operations/deleter.go @@ -30,14 +30,14 @@ import ( ) type Deleter struct { - fw types.Framework - qr *queuer.Queuer + sched types.Scheduler + qr *queuer.Queuer } -func NewDeleter(fw types.Framework, qr *queuer.Queuer) *Deleter { +func NewDeleter(sched types.Scheduler, qr *queuer.Queuer) *Deleter { return &Deleter{ - fw: fw, - qr: qr, + sched: sched, + qr: qr, } } @@ -72,8 +72,8 @@ func (k *Deleter) DeleteOne(pod *queuer.Pod) error { // removing the pod from the scheduling queue. this makes the concurrent // execution of scheduler-error-handling and delete-handling easier to // reason about. - k.fw.Lock() - defer k.fw.Unlock() + k.sched.Lock() + defer k.sched.Unlock() // prevent the scheduler from attempting to pop this; it's also possible that // it's concurrently being scheduled (somewhere between pod scheduling and @@ -81,7 +81,7 @@ func (k *Deleter) DeleteOne(pod *queuer.Pod) error { // will abort Bind()ing k.qr.Dequeue(pod.GetUID()) - switch task, state := k.fw.Tasks().ForPod(podKey); state { + switch task, state := k.sched.Tasks().ForPod(podKey); state { case podtask.StateUnknown: log.V(2).Infof("Could not resolve pod '%s' to task id", podKey) return merrors.NoSuchPodErr @@ -96,11 +96,11 @@ func (k *Deleter) DeleteOne(pod *queuer.Pod) error { task.Reset() task.Set(podtask.Deleted) //TODO(jdef) probably want better handling here - if err := k.fw.Tasks().Update(task); err != nil { + if err := k.sched.Tasks().Update(task); err != nil { return err } } - k.fw.Tasks().Unregister(task) + k.sched.Tasks().Unregister(task) return nil } fallthrough @@ -108,10 +108,10 @@ func (k *Deleter) DeleteOne(pod *queuer.Pod) error { case podtask.StateRunning: // signal to watchers that the related pod is going down task.Set(podtask.Deleted) - if err := k.fw.Tasks().Update(task); err != nil { + if err := k.sched.Tasks().Update(task); err != nil { log.Errorf("failed to update task w/ Deleted status: %v", err) } - return k.fw.KillTask(task.ID) + return k.sched.KillTask(task.ID) default: log.Infof("cannot kill pod '%s': non-terminal task not found %v", podKey, task.ID) diff --git a/contrib/mesos/pkg/scheduler/operations/doc.go b/contrib/mesos/pkg/scheduler/operations/doc.go index bac6081044e..3136d479113 100644 --- a/contrib/mesos/pkg/scheduler/operations/doc.go +++ b/contrib/mesos/pkg/scheduler/operations/doc.go @@ -15,6 +15,6 @@ limitations under the License. */ // Package operations implements independent aspects of the scheduler which -// do not use MesosScheduler internals, but rely solely on the Framework +// do not use Framework internals, but rely solely on the Framework // interface. package operations diff --git a/contrib/mesos/pkg/scheduler/operations/errorhandler.go b/contrib/mesos/pkg/scheduler/operations/errorhandler.go index 14474042daf..d97d22a227f 100644 --- a/contrib/mesos/pkg/scheduler/operations/errorhandler.go +++ b/contrib/mesos/pkg/scheduler/operations/errorhandler.go @@ -31,16 +31,18 @@ import ( ) type ErrorHandler struct { - fw types.Framework - backoff *backoff.Backoff - qr *queuer.Queuer + sched types.Scheduler + backoff *backoff.Backoff + qr *queuer.Queuer + podScheduler podschedulers.PodScheduler } -func NewErrorHandler(fw types.Framework, backoff *backoff.Backoff, qr *queuer.Queuer) *ErrorHandler { +func NewErrorHandler(sched types.Scheduler, backoff *backoff.Backoff, qr *queuer.Queuer, podScheduler podschedulers.PodScheduler) *ErrorHandler { return &ErrorHandler{ - fw: fw, - backoff: backoff, - qr: qr, + sched: sched, + backoff: backoff, + qr: qr, + podScheduler: podScheduler, } } @@ -64,10 +66,10 @@ func (k *ErrorHandler) Error(pod *api.Pod, schedulingErr error) { } k.backoff.GC() - k.fw.Lock() - defer k.fw.Unlock() + k.sched.Lock() + defer k.sched.Unlock() - switch task, state := k.fw.Tasks().ForPod(podKey); state { + switch task, state := k.sched.Tasks().ForPod(podKey); state { case podtask.StateUnknown: // if we don't have a mapping here any more then someone deleted the pod log.V(2).Infof("Could not resolve pod to task, aborting pod reschdule: %s", podKey) @@ -81,16 +83,16 @@ func (k *ErrorHandler) Error(pod *api.Pod, schedulingErr error) { breakoutEarly := queue.BreakChan(nil) if schedulingErr == podschedulers.NoSuitableOffersErr { log.V(3).Infof("adding backoff breakout handler for pod %v", podKey) - breakoutEarly = queue.BreakChan(k.fw.Offers().Listen(podKey, func(offer *mesos.Offer) bool { - k.fw.Lock() - defer k.fw.Unlock() - switch task, state := k.fw.Tasks().Get(task.ID); state { + breakoutEarly = queue.BreakChan(k.sched.Offers().Listen(podKey, func(offer *mesos.Offer) bool { + k.sched.Lock() + defer k.sched.Unlock() + switch task, state := k.sched.Tasks().Get(task.ID); state { case podtask.StatePending: // Assess fitness of pod with the current offer. The scheduler normally // "backs off" when it can't find an offer that matches up with a pod. // The backoff period for a pod can terminate sooner if an offer becomes // available that matches up. - return !task.Has(podtask.Launched) && k.fw.PodScheduler().FitPredicate()(task, offer, nil) + return !task.Has(podtask.Launched) && k.podScheduler.FitPredicate()(task, offer, nil) default: // no point in continuing to check for matching offers return true diff --git a/contrib/mesos/pkg/scheduler/operations/podreconciler.go b/contrib/mesos/pkg/scheduler/operations/podreconciler.go index 79c31815270..9f0132577b2 100644 --- a/contrib/mesos/pkg/scheduler/operations/podreconciler.go +++ b/contrib/mesos/pkg/scheduler/operations/podreconciler.go @@ -31,15 +31,15 @@ import ( // PodReconciler reconciles a pod with the apiserver type PodReconciler struct { - fw types.Framework + sched types.Scheduler client *client.Client qr *queuer.Queuer deleter *Deleter } -func NewPodReconciler(fw types.Framework, client *client.Client, qr *queuer.Queuer, deleter *Deleter) *PodReconciler { +func NewPodReconciler(sched types.Scheduler, client *client.Client, qr *queuer.Queuer, deleter *Deleter) *PodReconciler { return &PodReconciler{ - fw: fw, + sched: sched, client: client, qr: qr, deleter: deleter, @@ -88,10 +88,10 @@ func (s *PodReconciler) Reconcile(t *podtask.T) { return } - s.fw.Lock() - defer s.fw.Unlock() + s.sched.Lock() + defer s.sched.Unlock() - if _, state := s.fw.Tasks().ForPod(podKey); state != podtask.StateUnknown { + if _, state := s.sched.Tasks().ForPod(podKey); state != podtask.StateUnknown { //TODO(jdef) reconcile the task log.Errorf("task already registered for pod %v", pod.Name) return diff --git a/contrib/mesos/pkg/scheduler/operations/schedulerloop.go b/contrib/mesos/pkg/scheduler/operations/schedulerloop.go index 406db4277e6..cef1ce69c84 100644 --- a/contrib/mesos/pkg/scheduler/operations/schedulerloop.go +++ b/contrib/mesos/pkg/scheduler/operations/schedulerloop.go @@ -17,19 +17,11 @@ limitations under the License. package operations import ( - "net/http" "time" log "github.com/golang/glog" - "k8s.io/kubernetes/contrib/mesos/pkg/backoff" - "k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/runtime" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" - types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" ) @@ -55,47 +47,6 @@ type SchedulerLoop struct { started chan<- struct{} // startup latch } -func NewScheduler(c *config.Config, fw types.Framework, client *client.Client, recorder record.EventRecorder, - terminate <-chan struct{}, mux *http.ServeMux, podsWatcher *cache.ListWatch) (SchedulerLoopInterface, *PodReconciler) { - - // Watch and queue pods that need scheduling. - updates := make(chan queue.Entry, c.UpdatesBacklog) - podUpdates := &podStoreAdapter{queue.NewHistorical(updates)} - reflector := cache.NewReflector(podsWatcher, &api.Pod{}, podUpdates, 0) - - // lock that guards critial sections that involve transferring pods from - // the store (cache) to the scheduling queue; its purpose is to maintain - // an ordering (vs interleaving) of operations that's easier to reason about. - - q := queuer.New(podUpdates) - - algorithm := NewSchedulerAlgorithm(fw, podUpdates) - - podDeleter := NewDeleter(fw, q) - - podReconciler := NewPodReconciler(fw, client, q, podDeleter) - - bo := backoff.New(c.InitialPodBackoff.Duration, c.MaxPodBackoff.Duration) - errorHandler := NewErrorHandler(fw, bo, q) - - binder := NewBinder(fw) - - startLatch := make(chan struct{}) - eventBroadcaster := record.NewBroadcaster() - - runtime.On(startLatch, func() { - eventBroadcaster.StartRecordingToSink(client.Events("")) - reflector.Run() // TODO(jdef) should listen for termination - podDeleter.Run(updates, terminate) - q.Run(terminate) - - q.InstallDebugHandlers(mux) - podtask.InstallDebugHandlers(fw.Tasks(), mux) - }) - - return NewSchedulerLoop(client, algorithm, recorder, q.Yield, errorHandler.Error, binder, startLatch), podReconciler -} - func NewSchedulerLoop(client *client.Client, algorithm *SchedulerAlgorithm, recorder record.EventRecorder, nextPod func() *api.Pod, error func(pod *api.Pod, schedulingErr error), binder *Binder, started chan<- struct{}) *SchedulerLoop { diff --git a/contrib/mesos/pkg/scheduler/podschedulers/fcfs.go b/contrib/mesos/pkg/scheduler/podschedulers/fcfs.go index c2bf3766fa6..12872595429 100644 --- a/contrib/mesos/pkg/scheduler/podschedulers/fcfs.go +++ b/contrib/mesos/pkg/scheduler/podschedulers/fcfs.go @@ -62,7 +62,7 @@ func NewFCFSPodScheduler(as AllocationStrategy, lookupNode node.LookupFunc) PodS } // A first-come-first-serve scheduler: acquires the first offer that can support the task -func (fps *fcfsPodScheduler) SchedulePod(r offers.Registry, unused SlaveIndex, task *podtask.T) (offers.Perishable, error) { +func (fps *fcfsPodScheduler) SchedulePod(r offers.Registry, task *podtask.T) (offers.Perishable, error) { podName := fmt.Sprintf("%s/%s", task.Pod.Namespace, task.Pod.Name) var acceptedOffer offers.Perishable err := r.Walk(func(p offers.Perishable) (bool, error) { diff --git a/contrib/mesos/pkg/scheduler/podschedulers/types.go b/contrib/mesos/pkg/scheduler/podschedulers/types.go index 94d63e19a03..14eb13c8ab9 100644 --- a/contrib/mesos/pkg/scheduler/podschedulers/types.go +++ b/contrib/mesos/pkg/scheduler/podschedulers/types.go @@ -37,14 +37,13 @@ type PodScheduler interface { // SchedulePod implements how to schedule pods among slaves. // We can have different implementation for different scheduling policy. // - // The function accepts a group of slaves (each contains offers from - // that slave) and a single pod, which aligns well with the k8s scheduling - // algorithm. It returns an offerId that is acceptable for the pod, otherwise - // nil. The caller is responsible for filling in task state w/ relevant offer - // details. + // The function accepts a set of offers and a single pod, which aligns well + // with the k8s scheduling algorithm. It returns an offerId that is acceptable + // for the pod, otherwise nil. The caller is responsible for filling in task + // state w/ relevant offer details. // // See the FCFSPodScheduler for example. - SchedulePod(r offers.Registry, slaves SlaveIndex, task *podtask.T) (offers.Perishable, error) + SchedulePod(r offers.Registry, task *podtask.T) (offers.Perishable, error) } // A minimal placeholder diff --git a/contrib/mesos/pkg/scheduler/operations/podstoreadapter.go b/contrib/mesos/pkg/scheduler/podstoreadapter.go similarity index 99% rename from contrib/mesos/pkg/scheduler/operations/podstoreadapter.go rename to contrib/mesos/pkg/scheduler/podstoreadapter.go index 3b7c698787c..889faf28576 100644 --- a/contrib/mesos/pkg/scheduler/operations/podstoreadapter.go +++ b/contrib/mesos/pkg/scheduler/podstoreadapter.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package operations +package scheduler import ( "k8s.io/kubernetes/contrib/mesos/pkg/queue" diff --git a/contrib/mesos/pkg/scheduler/scheduler.go b/contrib/mesos/pkg/scheduler/scheduler.go index 03dde8a4239..abfc4e339c3 100644 --- a/contrib/mesos/pkg/scheduler/scheduler.go +++ b/contrib/mesos/pkg/scheduler/scheduler.go @@ -17,736 +17,98 @@ limitations under the License. package scheduler import ( - "fmt" - "io" - "math" "net/http" "sync" - "time" - log "github.com/golang/glog" - mesos "github.com/mesos/mesos-go/mesosproto" - mutil "github.com/mesos/mesos-go/mesosutil" - bindings "github.com/mesos/mesos-go/scheduler" - execcfg "k8s.io/kubernetes/contrib/mesos/pkg/executor/config" - "k8s.io/kubernetes/contrib/mesos/pkg/executor/messages" - "k8s.io/kubernetes/contrib/mesos/pkg/node" + "k8s.io/kubernetes/contrib/mesos/pkg/backoff" "k8s.io/kubernetes/contrib/mesos/pkg/offers" - offermetrics "k8s.io/kubernetes/contrib/mesos/pkg/offers/metrics" - "k8s.io/kubernetes/contrib/mesos/pkg/proc" + "k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/runtime" - schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" - merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/slave" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/uid" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" - "k8s.io/kubernetes/pkg/fields" - "k8s.io/kubernetes/pkg/kubelet/container" - kubetypes "k8s.io/kubernetes/pkg/kubelet/types" - "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/util/sets" ) -// KubernetesScheduler implements: -// 1: A mesos scheduler. -// 2: A kubernetes pod.Registry. -type MesosScheduler struct { - // We use a lock here to avoid races - // between invoking the mesos callback - // and the invoking the pod registry interfaces. - // In particular, changes to podtask.T objects are currently guarded by this lock. - *sync.RWMutex - podScheduler podschedulers.PodScheduler - - // Config related, write-once - - schedulerConfig *schedcfg.Config - executor *mesos.ExecutorInfo - executorGroup uint64 - client *client.Client - failoverTimeout float64 // in seconds - reconcileInterval int64 - nodeRegistrator node.Registrator - storeFrameworkId func(id string) - - // Mesos context. - - driver bindings.SchedulerDriver // late initialization - frameworkId *mesos.FrameworkID - masterInfo *mesos.MasterInfo - registered bool - registration chan struct{} // signal chan that closes upon first successful registration - onRegistration sync.Once - offers offers.Registry - slaveHostNames *slave.Registry - - // unsafe state, needs to be guarded +// Scheduler implements types.Scheduler +type Scheduler struct { + podReconciler *operations.PodReconciler + framework *Framework + loop *operations.SchedulerLoop + // unsafe state, needs to be guarded, especially changes to podtask.T objects + sync.RWMutex taskRegistry podtask.Registry - - // via deferred init - podReconciler *operations.PodReconciler - tasksReconciler *operations.TasksReconciler - reconcileCooldown time.Duration - asRegisteredMaster proc.Doer - terminate <-chan struct{} // signal chan, closes when we should kill background tasks } -type Config struct { - SchedulerConfig schedcfg.Config - Executor *mesos.ExecutorInfo - PodScheduler podschedulers.PodScheduler - Client *client.Client - StoreFrameworkId func(id string) - FailoverTimeout float64 - ReconcileInterval int64 - ReconcileCooldown time.Duration - LookupNode node.LookupFunc -} +func NewScheduler(c *config.Config, framework *Framework, podScheduler podschedulers.PodScheduler, + client *client.Client, recorder record.EventRecorder, terminate <-chan struct{}, mux *http.ServeMux, podsWatcher *cache.ListWatch) *Scheduler { -// New creates a new MesosScheduler -func New(config Config) *MesosScheduler { - var k *MesosScheduler - k = &MesosScheduler{ - schedulerConfig: &config.SchedulerConfig, - RWMutex: new(sync.RWMutex), - executor: config.Executor, - executorGroup: uid.Parse(config.Executor.ExecutorId.GetValue()).Group(), - podScheduler: config.PodScheduler, - client: config.Client, - failoverTimeout: config.FailoverTimeout, - reconcileInterval: config.ReconcileInterval, - nodeRegistrator: node.NewRegistrator(config.Client, config.LookupNode), - offers: offers.CreateRegistry(offers.RegistryConfig{ - Compat: func(o *mesos.Offer) bool { - // the node must be registered and have up-to-date labels - n := config.LookupNode(o.GetHostname()) - if n == nil || !node.IsUpToDate(n, node.SlaveAttributesToLabels(o.GetAttributes())) { - return false - } - - // the executor IDs must not identify a kubelet-executor with a group that doesn't match ours - for _, eid := range o.GetExecutorIds() { - execuid := uid.Parse(eid.GetValue()) - if execuid.Name() == execcfg.DefaultInfoID && execuid.Group() != k.executorGroup { - return false - } - } - - return true - }, - DeclineOffer: func(id string) <-chan error { - errOnce := proc.NewErrorOnce(k.terminate) - errOuter := k.asRegisteredMaster.Do(func() { - var err error - defer errOnce.Report(err) - offerId := mutil.NewOfferID(id) - filters := &mesos.Filters{} - _, err = k.driver.DeclineOffer(offerId, filters) - }) - return errOnce.Send(errOuter).Err() - }, - // remember expired offers so that we can tell if a previously scheduler offer relies on one - LingerTTL: config.SchedulerConfig.OfferLingerTTL.Duration, - TTL: config.SchedulerConfig.OfferTTL.Duration, - ListenerDelay: config.SchedulerConfig.ListenerDelay.Duration, - }), - slaveHostNames: slave.NewRegistry(), - taskRegistry: podtask.NewInMemoryRegistry(), - reconcileCooldown: config.ReconcileCooldown, - registration: make(chan struct{}), - asRegisteredMaster: proc.DoerFunc(func(proc.Action) <-chan error { - return proc.ErrorChanf("cannot execute action with unregistered scheduler") - }), - storeFrameworkId: config.StoreFrameworkId, + core := &Scheduler{ + framework: framework, + taskRegistry: podtask.NewInMemoryRegistry(), } - return k -} -func (k *MesosScheduler) Init(electedMaster proc.Process, pr *operations.PodReconciler, mux *http.ServeMux) error { - log.V(1).Infoln("initializing kubernetes mesos scheduler") + // Watch and queue pods that need scheduling. + updates := make(chan queue.Entry, c.UpdatesBacklog) + podUpdates := &podStoreAdapter{queue.NewHistorical(updates)} + reflector := cache.NewReflector(podsWatcher, &api.Pod{}, podUpdates, 0) - k.asRegisteredMaster = proc.DoerFunc(func(a proc.Action) <-chan error { - if !k.registered { - return proc.ErrorChanf("failed to execute action, scheduler is disconnected") - } - return electedMaster.Do(a) + q := queuer.New(podUpdates) + + algorithm := operations.NewSchedulerAlgorithm(core, podUpdates, podScheduler) + + podDeleter := operations.NewDeleter(core, q) + + core.podReconciler = operations.NewPodReconciler(core, client, q, podDeleter) + + bo := backoff.New(c.InitialPodBackoff.Duration, c.MaxPodBackoff.Duration) + errorHandler := operations.NewErrorHandler(core, bo, q, podScheduler) + + binder := operations.NewBinder(core) + + startLatch := make(chan struct{}) + eventBroadcaster := record.NewBroadcaster() + + runtime.On(startLatch, func() { + eventBroadcaster.StartRecordingToSink(client.Events("")) + reflector.Run() // TODO(jdef) should listen for termination + podDeleter.Run(updates, terminate) + q.Run(terminate) + + q.InstallDebugHandlers(mux) + podtask.InstallDebugHandlers(core.Tasks(), mux) }) - k.terminate = electedMaster.Done() - k.podReconciler = pr - k.offers.Init(k.terminate) - k.InstallDebugHandlers(mux) - k.nodeRegistrator.Run(k.terminate) - return k.recoverTasks() + + core.loop = operations.NewSchedulerLoop(client, algorithm, recorder, q.Yield, errorHandler.Error, binder, startLatch) + return core } -func (k *MesosScheduler) asMaster() proc.Doer { - k.RLock() - defer k.RUnlock() - return k.asRegisteredMaster +func (c *Scheduler) Run(done <-chan struct{}) { + c.loop.Run(done) } -func (k *MesosScheduler) InstallDebugHandlers(mux *http.ServeMux) { - wrappedHandler := func(uri string, h http.Handler) { - mux.HandleFunc(uri, func(w http.ResponseWriter, r *http.Request) { - ch := make(chan struct{}) - closer := runtime.Closer(ch) - proc.OnError(k.asMaster().Do(func() { - defer closer() - h.ServeHTTP(w, r) - }), func(err error) { - defer closer() - log.Warningf("failed HTTP request for %s: %v", uri, err) - w.WriteHeader(http.StatusServiceUnavailable) - }, k.terminate) - select { - case <-time.After(k.schedulerConfig.HttpHandlerTimeout.Duration): - log.Warningf("timed out waiting for request to be processed") - w.WriteHeader(http.StatusServiceUnavailable) - return - case <-ch: // noop - } - }) - } - requestReconciliation := func(uri string, requestAction func()) { - wrappedHandler(uri, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - requestAction() - w.WriteHeader(http.StatusNoContent) - })) - } - requestReconciliation("/debug/actions/requestExplicit", k.tasksReconciler.RequestExplicit) - requestReconciliation("/debug/actions/requestImplicit", k.tasksReconciler.RequestImplicit) - - wrappedHandler("/debug/actions/kamikaze", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - slaves := k.slaveHostNames.SlaveIDs() - for _, slaveId := range slaves { - _, err := k.driver.SendFrameworkMessage( - k.executor.ExecutorId, - mutil.NewSlaveID(slaveId), - messages.Kamikaze) - if err != nil { - log.Warningf("failed to send kamikaze message to slave %s: %v", slaveId, err) - } else { - io.WriteString(w, fmt.Sprintf("kamikaze slave %s\n", slaveId)) - } - } - io.WriteString(w, "OK") - })) +func (c *Scheduler) Reconcile(t *podtask.T) { + c.podReconciler.Reconcile(t) } -func (k *MesosScheduler) Registration() <-chan struct{} { - return k.registration +func (c *Scheduler) Tasks() podtask.Registry { + return c.taskRegistry } -// Registered is called when the scheduler registered with the master successfully. -func (k *MesosScheduler) Registered(drv bindings.SchedulerDriver, fid *mesos.FrameworkID, mi *mesos.MasterInfo) { - log.Infof("Scheduler registered with the master: %v with frameworkId: %v\n", mi, fid) - - k.driver = drv - k.frameworkId = fid - k.masterInfo = mi - k.registered = true - - k.onRegistration.Do(func() { k.onInitialRegistration(drv) }) - k.tasksReconciler.RequestExplicit() +func (c *Scheduler) Offers() offers.Registry { + return c.framework.offers } -// Reregistered is called when the scheduler re-registered with the master successfully. -// This happends when the master fails over. -func (k *MesosScheduler) Reregistered(drv bindings.SchedulerDriver, mi *mesos.MasterInfo) { - log.Infof("Scheduler reregistered with the master: %v\n", mi) - - k.driver = drv - k.masterInfo = mi - k.registered = true - - k.onRegistration.Do(func() { k.onInitialRegistration(drv) }) - k.tasksReconciler.RequestExplicit() +func (c *Scheduler) KillTask(id string) error { + return c.framework.KillTask(id) } -// perform one-time initialization actions upon the first registration event received from Mesos. -func (k *MesosScheduler) onInitialRegistration(driver bindings.SchedulerDriver) { - defer close(k.registration) - - if k.failoverTimeout > 0 { - refreshInterval := k.schedulerConfig.FrameworkIdRefreshInterval.Duration - if k.failoverTimeout < k.schedulerConfig.FrameworkIdRefreshInterval.Duration.Seconds() { - refreshInterval = time.Duration(math.Max(1, k.failoverTimeout/2)) * time.Second - } - go runtime.Until(func() { - k.storeFrameworkId(k.frameworkId.GetValue()) - }, refreshInterval, k.terminate) - } - - r1 := k.makeTaskRegistryReconciler() - r2 := k.makePodRegistryReconciler() - - k.tasksReconciler = operations.NewTasksReconciler(k.asRegisteredMaster, k.makeCompositeReconciler(r1, r2), - k.reconcileCooldown, k.schedulerConfig.ExplicitReconciliationAbortTimeout.Duration, k.terminate) - go k.tasksReconciler.Run(driver) - - if k.reconcileInterval > 0 { - ri := time.Duration(k.reconcileInterval) * time.Second - time.AfterFunc(k.schedulerConfig.InitialImplicitReconciliationDelay.Duration, func() { runtime.Until(k.tasksReconciler.RequestImplicit, ri, k.terminate) }) - log.Infof("will perform implicit task reconciliation at interval: %v after %v", ri, k.schedulerConfig.InitialImplicitReconciliationDelay.Duration) - } -} - -// Disconnected is called when the scheduler loses connection to the master. -func (k *MesosScheduler) Disconnected(driver bindings.SchedulerDriver) { - log.Infof("Master disconnected!\n") - - k.registered = false - - // discard all cached offers to avoid unnecessary TASK_LOST updates - k.offers.Invalidate("") -} - -// ResourceOffers is called when the scheduler receives some offers from the master. -func (k *MesosScheduler) ResourceOffers(driver bindings.SchedulerDriver, offers []*mesos.Offer) { - log.V(2).Infof("Received offers %+v", offers) - - // Record the offers in the global offer map as well as each slave's offer map. - k.offers.Add(offers) - for _, offer := range offers { - slaveId := offer.GetSlaveId().GetValue() - k.slaveHostNames.Register(slaveId, offer.GetHostname()) - - // create api object if not existing already - if k.nodeRegistrator != nil { - labels := node.SlaveAttributesToLabels(offer.GetAttributes()) - _, err := k.nodeRegistrator.Register(offer.GetHostname(), labels) - if err != nil { - log.Error(err) - } - } - } -} - -// OfferRescinded is called when the resources are recinded from the scheduler. -func (k *MesosScheduler) OfferRescinded(driver bindings.SchedulerDriver, offerId *mesos.OfferID) { - log.Infof("Offer rescinded %v\n", offerId) - - oid := offerId.GetValue() - k.offers.Delete(oid, offermetrics.OfferRescinded) -} - -// StatusUpdate is called when a status update message is sent to the scheduler. -func (k *MesosScheduler) StatusUpdate(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { - - source, reason := "none", "none" - if taskStatus.Source != nil { - source = (*taskStatus.Source).String() - } - if taskStatus.Reason != nil { - reason = (*taskStatus.Reason).String() - } - taskState := taskStatus.GetState() - metrics.StatusUpdates.WithLabelValues(source, reason, taskState.String()).Inc() - - message := "none" - if taskStatus.Message != nil { - message = *taskStatus.Message - } - - log.Infof( - "task status update %q from %q for task %q on slave %q executor %q for reason %q with message %q", - taskState.String(), - source, - taskStatus.TaskId.GetValue(), - taskStatus.SlaveId.GetValue(), - taskStatus.ExecutorId.GetValue(), - reason, - message, - ) - - switch taskState { - case mesos.TaskState_TASK_RUNNING, mesos.TaskState_TASK_FINISHED, mesos.TaskState_TASK_STARTING, mesos.TaskState_TASK_STAGING: - if _, state := k.taskRegistry.UpdateStatus(taskStatus); state == podtask.StateUnknown { - if taskState != mesos.TaskState_TASK_FINISHED { - //TODO(jdef) what if I receive this after a TASK_LOST or TASK_KILLED? - //I don't want to reincarnate then.. TASK_LOST is a special case because - //the master is stateless and there are scenarios where I may get TASK_LOST - //followed by TASK_RUNNING. - //TODO(jdef) consider running this asynchronously since there are API server - //calls that may be made - k.reconcileNonTerminalTask(driver, taskStatus) - } // else, we don't really care about FINISHED tasks that aren't registered - return - } - if hostName := k.slaveHostNames.HostName(taskStatus.GetSlaveId().GetValue()); hostName == "" { - // a registered task has an update reported by a slave that we don't recognize. - // this should never happen! So we don't reconcile it. - log.Errorf("Ignore status %+v because the slave does not exist", taskStatus) - return - } - case mesos.TaskState_TASK_FAILED, mesos.TaskState_TASK_ERROR: - if task, _ := k.taskRegistry.UpdateStatus(taskStatus); task != nil { - if task.Has(podtask.Launched) && !task.Has(podtask.Bound) { - go k.podReconciler.Reconcile(task) - return - } - } else { - // unknown task failed, not much we can do about it - return - } - // last-ditch effort to reconcile our records - fallthrough - case mesos.TaskState_TASK_LOST, mesos.TaskState_TASK_KILLED: - k.reconcileTerminalTask(driver, taskStatus) - default: - log.Errorf( - "unknown task status %q from %q for task %q on slave %q executor %q for reason %q with message %q", - taskState.String(), - source, - taskStatus.TaskId.GetValue(), - taskStatus.SlaveId.GetValue(), - taskStatus.ExecutorId.GetValue(), - reason, - message, - ) - } -} - -func (k *MesosScheduler) reconcileTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { - task, state := k.taskRegistry.UpdateStatus(taskStatus) - - if (state == podtask.StateRunning || state == podtask.StatePending) && - ((taskStatus.GetSource() == mesos.TaskStatus_SOURCE_MASTER && taskStatus.GetReason() == mesos.TaskStatus_REASON_RECONCILIATION) || - (taskStatus.GetSource() == mesos.TaskStatus_SOURCE_SLAVE && taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_TERMINATED) || - (taskStatus.GetSource() == mesos.TaskStatus_SOURCE_SLAVE && taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_UNREGISTERED) || - (taskStatus.GetSource() == mesos.TaskStatus_SOURCE_EXECUTOR && taskStatus.GetMessage() == messages.ContainersDisappeared)) { - //-- - // pod-task has metadata that refers to: - // (1) a task that Mesos no longer knows about, or else - // (2) a pod that the Kubelet will never report as "failed" - // (3) a pod that the kubeletExecutor reported as lost (likely due to docker daemon crash/restart) - // For now, destroy the pod and hope that there's a replication controller backing it up. - // TODO(jdef) for case #2 don't delete the pod, just update it's status to Failed - pod := &task.Pod - log.Warningf("deleting rogue pod %v/%v for lost task %v", pod.Namespace, pod.Name, task.ID) - if err := k.client.Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err != nil && !errors.IsNotFound(err) { - log.Errorf("failed to delete pod %v/%v for terminal task %v: %v", pod.Namespace, pod.Name, task.ID, err) - } - } else if taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_TERMINATED || taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_UNREGISTERED { - // attempt to prevent dangling pods in the pod and task registries - log.V(1).Infof("request explicit reconciliation to clean up for task %v after executor reported (terminated/unregistered)", taskStatus.TaskId.GetValue()) - k.tasksReconciler.RequestExplicit() - } else if taskStatus.GetState() == mesos.TaskState_TASK_LOST && state == podtask.StateRunning && taskStatus.ExecutorId != nil && taskStatus.SlaveId != nil { - //TODO(jdef) this may not be meaningful once we have proper checkpointing and master detection - //If we're reconciling and receive this then the executor may be - //running a task that we need it to kill. It's possible that the framework - //is unrecognized by the master at this point, so KillTask is not guaranteed - //to do anything. The underlying driver transport may be able to send a - //FrameworkMessage directly to the slave to terminate the task. - log.V(2).Info("forwarding TASK_LOST message to executor %v on slave %v", taskStatus.ExecutorId, taskStatus.SlaveId) - data := fmt.Sprintf("%s:%s", messages.TaskLost, task.ID) //TODO(jdef) use a real message type - if _, err := driver.SendFrameworkMessage(taskStatus.ExecutorId, taskStatus.SlaveId, data); err != nil { - log.Error(err.Error()) - } - } -} - -// reconcile an unknown (from the perspective of our registry) non-terminal task -func (k *MesosScheduler) reconcileNonTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { - // attempt to recover task from pod info: - // - task data may contain an api.PodStatusResult; if status.reason == REASON_RECONCILIATION then status.data == nil - // - the Name can be parsed by container.ParseFullName() to yield a pod Name and Namespace - // - pull the pod metadata down from the api server - // - perform task recovery based on pod metadata - taskId := taskStatus.TaskId.GetValue() - if taskStatus.GetReason() == mesos.TaskStatus_REASON_RECONCILIATION && taskStatus.GetSource() == mesos.TaskStatus_SOURCE_MASTER { - // there will be no data in the task status that we can use to determine the associated pod - switch taskStatus.GetState() { - case mesos.TaskState_TASK_STAGING: - // there is still hope for this task, don't kill it just yet - //TODO(jdef) there should probably be a limit for how long we tolerate tasks stuck in this state - return - default: - // for TASK_{STARTING,RUNNING} we should have already attempted to recoverTasks() for. - // if the scheduler failed over before the executor fired TASK_STARTING, then we should *not* - // be processing this reconciliation update before we process the one from the executor. - // point: we don't know what this task is (perhaps there was unrecoverable metadata in the pod), - // so it gets killed. - log.Errorf("killing non-terminal, unrecoverable task %v", taskId) - } - } else if podStatus, err := podtask.ParsePodStatusResult(taskStatus); err != nil { - // possible rogue pod exists at this point because we can't identify it; should kill the task - log.Errorf("possible rogue pod; illegal task status data for task %v, expected an api.PodStatusResult: %v", taskId, err) - } else if name, namespace, err := container.ParsePodFullName(podStatus.Name); err != nil { - // possible rogue pod exists at this point because we can't identify it; should kill the task - log.Errorf("possible rogue pod; illegal api.PodStatusResult, unable to parse full pod name from: '%v' for task %v: %v", - podStatus.Name, taskId, err) - } else if pod, err := k.client.Pods(namespace).Get(name); err == nil { - if t, ok, err := podtask.RecoverFrom(*pod); ok { - log.Infof("recovered task %v from metadata in pod %v/%v", taskId, namespace, name) - _, err := k.taskRegistry.Register(t, nil) - if err != nil { - // someone beat us to it?! - log.Warningf("failed to register recovered task: %v", err) - return - } else { - k.taskRegistry.UpdateStatus(taskStatus) - } - return - } else if err != nil { - //should kill the pod and the task - log.Errorf("killing pod, failed to recover task from pod %v/%v: %v", namespace, name, err) - if err := k.client.Pods(namespace).Delete(name, nil); err != nil { - log.Errorf("failed to delete pod %v/%v: %v", namespace, name, err) - } - } else { - //this is pretty unexpected: we received a TASK_{STARTING,RUNNING} message, but the apiserver's pod - //metadata is not appropriate for task reconstruction -- which should almost certainly never - //be the case unless someone swapped out the pod on us (and kept the same namespace/name) while - //we were failed over. - - //kill this task, allow the newly launched scheduler to schedule the new pod - log.Warningf("unexpected pod metadata for task %v in apiserver, assuming new unscheduled pod spec: %+v", taskId, pod) - } - } else if errors.IsNotFound(err) { - // pod lookup failed, should delete the task since the pod is no longer valid; may be redundant, that's ok - log.Infof("killing task %v since pod %v/%v no longer exists", taskId, namespace, name) - } else if errors.IsServerTimeout(err) { - log.V(2).Infof("failed to reconcile task due to API server timeout: %v", err) - return - } else { - log.Errorf("unexpected API server error, aborting reconcile for task %v: %v", taskId, err) - return - } - if _, err := driver.KillTask(taskStatus.TaskId); err != nil { - log.Errorf("failed to kill task %v: %v", taskId, err) - } -} - -// FrameworkMessage is called when the scheduler receives a message from the executor. -func (k *MesosScheduler) FrameworkMessage(driver bindings.SchedulerDriver, - executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, message string) { - log.Infof("Received messages from executor %v of slave %v, %v\n", executorId, slaveId, message) -} - -// SlaveLost is called when some slave is lost. -func (k *MesosScheduler) SlaveLost(driver bindings.SchedulerDriver, slaveId *mesos.SlaveID) { - log.Infof("Slave %v is lost\n", slaveId) - - sid := slaveId.GetValue() - k.offers.InvalidateForSlave(sid) - - // TODO(jdef): delete slave from our internal list? probably not since we may need to reconcile - // tasks. it would be nice to somehow flag the slave as lost so that, perhaps, we can periodically - // flush lost slaves older than X, and for which no tasks or pods reference. - - // unfinished tasks/pods will be dropped. use a replication controller if you want pods to - // be restarted when slaves die. -} - -// ExecutorLost is called when some executor is lost. -func (k *MesosScheduler) ExecutorLost(driver bindings.SchedulerDriver, executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, status int) { - log.Infof("Executor %v of slave %v is lost, status: %v\n", executorId, slaveId, status) - // TODO(yifan): Restart any unfinished tasks of the executor. -} - -// Error is called when there is an unrecoverable error in the scheduler or scheduler driver. -// The driver should have been aborted before this is invoked. -func (k *MesosScheduler) Error(driver bindings.SchedulerDriver, message string) { - log.Fatalf("fatal scheduler error: %v\n", message) -} - -// filter func used for explicit task reconciliation, selects only non-terminal tasks which -// have been communicated to mesos (read: launched). -func explicitTaskFilter(t *podtask.T) bool { - switch t.State { - case podtask.StateRunning: - return true - case podtask.StatePending: - return t.Has(podtask.Launched) - default: - return false - } -} - -// invoke the given ReconcilerAction funcs in sequence, aborting the sequence if reconciliation -// is cancelled. if any other errors occur the composite reconciler will attempt to complete the -// sequence, reporting only the last generated error. -func (k *MesosScheduler) makeCompositeReconciler(actions ...operations.ReconcilerAction) operations.ReconcilerAction { - if x := len(actions); x == 0 { - // programming error - panic("no actions specified for composite reconciler") - } else if x == 1 { - return actions[0] - } - chained := func(d bindings.SchedulerDriver, c <-chan struct{}, a, b operations.ReconcilerAction) <-chan error { - ech := a(d, c) - ch := make(chan error, 1) - go func() { - select { - case <-k.terminate: - case <-c: - case e := <-ech: - if e != nil { - ch <- e - return - } - ech = b(d, c) - select { - case <-k.terminate: - case <-c: - case e := <-ech: - if e != nil { - ch <- e - return - } - close(ch) - return - } - } - ch <- fmt.Errorf("aborting composite reconciler action") - }() - return ch - } - result := func(d bindings.SchedulerDriver, c <-chan struct{}) <-chan error { - return chained(d, c, actions[0], actions[1]) - } - for i := 2; i < len(actions); i++ { - i := i - next := func(d bindings.SchedulerDriver, c <-chan struct{}) <-chan error { - return chained(d, c, operations.ReconcilerAction(result), actions[i]) - } - result = next - } - return operations.ReconcilerAction(result) -} - -// reconciler action factory, performs explicit task reconciliation for non-terminal -// tasks listed in the scheduler's internal taskRegistry. -func (k *MesosScheduler) makeTaskRegistryReconciler() operations.ReconcilerAction { - return operations.ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error { - taskToSlave := make(map[string]string) - for _, t := range k.taskRegistry.List(explicitTaskFilter) { - if t.Spec.SlaveID != "" { - taskToSlave[t.ID] = t.Spec.SlaveID - } - } - return proc.ErrorChan(k.explicitlyReconcileTasks(drv, taskToSlave, cancel)) - }) -} - -// reconciler action factory, performs explicit task reconciliation for non-terminal -// tasks identified by annotations in the Kubernetes pod registry. -func (k *MesosScheduler) makePodRegistryReconciler() operations.ReconcilerAction { - return operations.ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error { - podList, err := k.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) - if err != nil { - return proc.ErrorChanf("failed to reconcile pod registry: %v", err) - } - taskToSlave := make(map[string]string) - for _, pod := range podList.Items { - if len(pod.Annotations) == 0 { - continue - } - taskId, found := pod.Annotations[meta.TaskIdKey] - if !found { - continue - } - slaveId, found := pod.Annotations[meta.SlaveIdKey] - if !found { - continue - } - taskToSlave[taskId] = slaveId - } - return proc.ErrorChan(k.explicitlyReconcileTasks(drv, taskToSlave, cancel)) - }) -} - -// execute an explicit task reconciliation, as per http://mesos.apache.org/documentation/latest/reconciliation/ -func (k *MesosScheduler) explicitlyReconcileTasks(driver bindings.SchedulerDriver, taskToSlave map[string]string, cancel <-chan struct{}) error { - log.Info("explicit reconcile tasks") - - // tell mesos to send us the latest status updates for all the non-terminal tasks that we know about - statusList := []*mesos.TaskStatus{} - remaining := sets.StringKeySet(taskToSlave) - for taskId, slaveId := range taskToSlave { - if slaveId == "" { - delete(taskToSlave, taskId) - continue - } - statusList = append(statusList, &mesos.TaskStatus{ - TaskId: mutil.NewTaskID(taskId), - SlaveId: mutil.NewSlaveID(slaveId), - State: mesos.TaskState_TASK_RUNNING.Enum(), // req'd field, doesn't have to reflect reality - }) - } - - select { - case <-cancel: - return merrors.ReconciliationCancelledErr - default: - if _, err := driver.ReconcileTasks(statusList); err != nil { - return err - } - } - - start := time.Now() - first := true - for backoff := 1 * time.Second; first || remaining.Len() > 0; backoff = backoff * 2 { - first = false - // nothing to do here other than wait for status updates.. - if backoff > k.schedulerConfig.ExplicitReconciliationMaxBackoff.Duration { - backoff = k.schedulerConfig.ExplicitReconciliationMaxBackoff.Duration - } - select { - case <-cancel: - return merrors.ReconciliationCancelledErr - case <-time.After(backoff): - for taskId := range remaining { - if task, _ := k.taskRegistry.Get(taskId); task != nil && explicitTaskFilter(task) && task.UpdatedTime.Before(start) { - // keep this task in remaining list - continue - } - remaining.Delete(taskId) - } - } - } - return nil -} - -func (ks *MesosScheduler) recoverTasks() error { - podList, err := ks.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) - if err != nil { - log.V(1).Infof("failed to recover pod registry, madness may ensue: %v", err) - return err - } - recoverSlave := func(t *podtask.T) { - - slaveId := t.Spec.SlaveID - ks.slaveHostNames.Register(slaveId, t.Offer.Host()) - } - for _, pod := range podList.Items { - if _, isMirrorPod := pod.Annotations[kubetypes.ConfigMirrorAnnotationKey]; isMirrorPod { - // mirrored pods are never reconciled because the scheduler isn't responsible for - // scheduling them; they're started by the executor/kubelet upon instantiation and - // reflected in the apiserver afterward. the scheduler has no knowledge of them. - continue - } - if t, ok, err := podtask.RecoverFrom(pod); err != nil { - log.Errorf("failed to recover task from pod, will attempt to delete '%v/%v': %v", pod.Namespace, pod.Name, err) - err := ks.client.Pods(pod.Namespace).Delete(pod.Name, nil) - //TODO(jdef) check for temporary or not-found errors - if err != nil { - log.Errorf("failed to delete pod '%v/%v': %v", pod.Namespace, pod.Name, err) - } - } else if ok { - ks.taskRegistry.Register(t, nil) - recoverSlave(t) - log.Infof("recovered task %v from pod %v/%v", t.ID, pod.Namespace, pod.Name) - } - } - return nil +func (c *Scheduler) LaunchTask(t *podtask.T) error { + return c.framework.LaunchTask(t) } diff --git a/contrib/mesos/pkg/scheduler/service/service.go b/contrib/mesos/pkg/scheduler/service/service.go index 43a3d30e90a..6da5c2d83e2 100644 --- a/contrib/mesos/pkg/scheduler/service/service.go +++ b/contrib/mesos/pkg/scheduler/service/service.go @@ -59,7 +59,6 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" @@ -720,10 +719,9 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config } fcfs := podschedulers.NewFCFSPodScheduler(as, lookupNode) - mesosScheduler := scheduler.New(scheduler.Config{ + framework := scheduler.New(scheduler.Config{ SchedulerConfig: *sc, Executor: executor, - PodScheduler: fcfs, Client: client, FailoverTimeout: s.failoverTimeout, ReconcileInterval: s.reconcileInterval, @@ -744,7 +742,7 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config log.Fatalf("Misconfigured mesos framework: %v", err) } - schedulerProcess := ha.New(mesosScheduler) + schedulerProcess := ha.New(framework) dconfig := &bindings.DriverConfig{ Scheduler: schedulerProcess, Framework: info, @@ -761,18 +759,17 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config } // create scheduler loop - fw := &scheduler.MesosFramework{MesosScheduler: mesosScheduler} eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}) lw := cache.NewListWatchFromClient(client, "pods", api.NamespaceAll, fields.Everything()) - loop, pr := operations.NewScheduler(sc, fw, client, recorder, schedulerProcess.Terminal(), s.mux, lw) + scheduler := scheduler.NewScheduler(sc, framework, fcfs, client, recorder, schedulerProcess.Terminal(), s.mux, lw) - runtime.On(mesosScheduler.Registration(), func() { loop.Run(schedulerProcess.Terminal()) }) - runtime.On(mesosScheduler.Registration(), s.newServiceWriter(schedulerProcess.Terminal())) + runtime.On(framework.Registration(), func() { scheduler.Run(schedulerProcess.Terminal()) }) + runtime.On(framework.Registration(), s.newServiceWriter(schedulerProcess.Terminal())) driverFactory := ha.DriverFactory(func() (drv bindings.SchedulerDriver, err error) { log.V(1).Infoln("performing deferred initialization") - if err = mesosScheduler.Init(schedulerProcess.Master(), pr, s.mux); err != nil { + if err = framework.Init(scheduler, schedulerProcess.Master(), s.mux); err != nil { return nil, fmt.Errorf("failed to initialize pod scheduler: %v", err) } log.V(1).Infoln("deferred init complete") diff --git a/contrib/mesos/pkg/scheduler/types/types.go b/contrib/mesos/pkg/scheduler/types/scheduler.go similarity index 65% rename from contrib/mesos/pkg/scheduler/types/types.go rename to contrib/mesos/pkg/scheduler/types/scheduler.go index 032101f14b7..db46aaa2309 100644 --- a/contrib/mesos/pkg/scheduler/types/types.go +++ b/contrib/mesos/pkg/scheduler/types/scheduler.go @@ -20,21 +20,19 @@ import ( "sync" "k8s.io/kubernetes/contrib/mesos/pkg/offers" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" ) -// Framework abstracts everything other components of the scheduler need from -// the actual MesosScheduler implementation. -type Framework interface { - sync.Locker // synchronize scheduler plugin operations - - podschedulers.SlaveIndex - PodScheduler() podschedulers.PodScheduler - Offers() offers.Registry +// Scheduler abstracts everything other components of the scheduler need +// to access from eachother +type Scheduler interface { Tasks() podtask.Registry + sync.Locker // synchronize changes to tasks, i.e. lock, get task, change task, store task, unlock - // driver calls - KillTask(taskId string) error - LaunchTask(*podtask.T) error + Offers() offers.Registry + Reconcile(t *podtask.T) + KillTask(id string) error + LaunchTask(t *podtask.T) error + + Run(done <-chan struct{}) } diff --git a/contrib/mesos/pkg/scheduler/types/mock.go b/contrib/mesos/pkg/scheduler/types/scheduler_mock.go similarity index 61% rename from contrib/mesos/pkg/scheduler/types/mock.go rename to contrib/mesos/pkg/scheduler/types/scheduler_mock.go index a5d8180a0c4..4d1f5b6171c 100644 --- a/contrib/mesos/pkg/scheduler/types/mock.go +++ b/contrib/mesos/pkg/scheduler/types/scheduler_mock.go @@ -18,54 +18,25 @@ package types import ( "sync" - "testing" "github.com/stretchr/testify/mock" "k8s.io/kubernetes/contrib/mesos/pkg/offers" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" + "k8s.io/kubernetes/contrib/mesos/pkg/runtime" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" - "k8s.io/kubernetes/pkg/api" + "time" ) -// @deprecated this is a placeholder for me to test the mock package -func TestNoSlavesYet(t *testing.T) { - obj := &MockScheduler{} - obj.On("SlaveHostNameFor", "foo").Return(nil) - obj.SlaveHostNameFor("foo") - obj.AssertExpectations(t) -} - // MockScheduler implements SchedulerApi type MockScheduler struct { sync.RWMutex mock.Mock } -func (m *MockScheduler) SlaveHostNameFor(id string) (hostName string) { - args := m.Called(id) - x := args.Get(0) - if x != nil { - hostName = x.(string) - } - return -} - -func (m *MockScheduler) PodScheduler() (f podschedulers.PodScheduler) { - args := m.Called() - x := args.Get(0) - if x != nil { - f = x.(podschedulers.PodScheduler) - } - return -} - -func (m *MockScheduler) CreatePodTask(ctx api.Context, pod *api.Pod) (task *podtask.T, err error) { - args := m.Called(ctx, pod) - x := args.Get(0) - if x != nil { - task = x.(*podtask.T) - } - err = args.Error(1) +func (m *MockScheduler) Run(done <-chan struct{}) { + _ = m.Called() + runtime.Until(func() { + time.Sleep(time.Second) + }, time.Second, done) return } @@ -96,3 +67,8 @@ func (m *MockScheduler) LaunchTask(task *podtask.T) error { args := m.Called(task) return args.Error(0) } + +func (m *MockScheduler) Reconcile(task *podtask.T) { + _ = m.Called() + return +}