Split MesosScheduler from actual Scheduler code holding the task registry

- rename types.Framework -> types.Scheduler
- rename MesosScheduler -> Framework
- transform MesosFramework into independent Scheduler glueing all scheduler
  components, implementing types.Scheduler and owning the task registry
This commit is contained in:
Dr. Stefan Schimanski
2015-11-02 18:01:29 +01:00
parent 60cc93fff8
commit 3601292cff
19 changed files with 990 additions and 1007 deletions

View File

@@ -475,7 +475,7 @@ func (k *KubernetesMesosExecutor) launchTask(driver bindings.ExecutorDriver, tas
// TODO(k8s): use Pods interface for binding once clusters are upgraded
// return b.Pods(binding.Namespace).Bind(binding)
if pod.Spec.NodeName == "" {
//HACK(jdef): cloned binding construction from k8s plugin/pkg/scheduler/scheduler.go
//HACK(jdef): cloned binding construction from k8s plugin/pkg/scheduler/framework.go
binding := &api.Binding{
ObjectMeta: api.ObjectMeta{
Namespace: pod.Namespace,
@@ -780,7 +780,7 @@ func (k *KubernetesMesosExecutor) FrameworkMessage(driver bindings.ExecutorDrive
}
log.Infof("Receives message from framework %v\n", message)
//TODO(jdef) master reported a lost task, reconcile this! @see scheduler.go:handleTaskLost
//TODO(jdef) master reported a lost task, reconcile this! @see framework.go:handleTaskLost
if strings.HasPrefix(message, messages.TaskLost+":") {
taskId := message[len(messages.TaskLost)+1:]
if taskId != "" {

View File

@@ -17,48 +17,741 @@ limitations under the License.
package scheduler
import (
"fmt"
"io"
"math"
"net/http"
"reflect"
"sync"
"time"
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
mutil "github.com/mesos/mesos-go/mesosutil"
bindings "github.com/mesos/mesos-go/scheduler"
execcfg "k8s.io/kubernetes/contrib/mesos/pkg/executor/config"
"k8s.io/kubernetes/contrib/mesos/pkg/executor/messages"
"k8s.io/kubernetes/contrib/mesos/pkg/node"
"k8s.io/kubernetes/contrib/mesos/pkg/offers"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers"
offermetrics "k8s.io/kubernetes/contrib/mesos/pkg/offers/metrics"
"k8s.io/kubernetes/contrib/mesos/pkg/proc"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/slave"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/uid"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubelet/container"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/sets"
)
type MesosFramework struct {
sync.Mutex
MesosScheduler *MesosScheduler
type Framework struct {
// We use a lock here to avoid races
// between invoking the mesos callback
*sync.RWMutex
// Config related, write-once
sched types.Scheduler
schedulerConfig *schedcfg.Config
executor *mesos.ExecutorInfo
executorGroup uint64
client *client.Client
failoverTimeout float64 // in seconds
reconcileInterval int64
nodeRegistrator node.Registrator
storeFrameworkId func(id string)
// Mesos context.
driver bindings.SchedulerDriver // late initialization
frameworkId *mesos.FrameworkID
masterInfo *mesos.MasterInfo
registered bool
registration chan struct{} // signal chan that closes upon first successful registration
onRegistration sync.Once
offers offers.Registry
slaveHostNames *slave.Registry
// via deferred init
tasksReconciler *operations.TasksReconciler
reconcileCooldown time.Duration
asRegisteredMaster proc.Doer
terminate <-chan struct{} // signal chan, closes when we should kill background tasks
}
func (fw *MesosFramework) PodScheduler() podschedulers.PodScheduler {
return fw.MesosScheduler.podScheduler
type Config struct {
SchedulerConfig schedcfg.Config
Executor *mesos.ExecutorInfo
Client *client.Client
StoreFrameworkId func(id string)
FailoverTimeout float64
ReconcileInterval int64
ReconcileCooldown time.Duration
LookupNode node.LookupFunc
}
func (fw *MesosFramework) Offers() offers.Registry {
return fw.MesosScheduler.offers
// New creates a new Framework
func New(config Config) *Framework {
var k *Framework
k = &Framework{
schedulerConfig: &config.SchedulerConfig,
RWMutex: new(sync.RWMutex),
executor: config.Executor,
executorGroup: uid.Parse(config.Executor.ExecutorId.GetValue()).Group(),
client: config.Client,
failoverTimeout: config.FailoverTimeout,
reconcileInterval: config.ReconcileInterval,
nodeRegistrator: node.NewRegistrator(config.Client, config.LookupNode),
offers: offers.CreateRegistry(offers.RegistryConfig{
Compat: func(o *mesos.Offer) bool {
// the node must be registered and have up-to-date labels
n := config.LookupNode(o.GetHostname())
if n == nil || !node.IsUpToDate(n, node.SlaveAttributesToLabels(o.GetAttributes())) {
return false
}
// the executor IDs must not identify a kubelet-executor with a group that doesn't match ours
for _, eid := range o.GetExecutorIds() {
execuid := uid.Parse(eid.GetValue())
if execuid.Name() == execcfg.DefaultInfoID && execuid.Group() != k.executorGroup {
return false
}
}
return true
},
DeclineOffer: func(id string) <-chan error {
errOnce := proc.NewErrorOnce(k.terminate)
errOuter := k.asRegisteredMaster.Do(func() {
var err error
defer errOnce.Report(err)
offerId := mutil.NewOfferID(id)
filters := &mesos.Filters{}
_, err = k.driver.DeclineOffer(offerId, filters)
})
return errOnce.Send(errOuter).Err()
},
// remember expired offers so that we can tell if a previously scheduler offer relies on one
LingerTTL: config.SchedulerConfig.OfferLingerTTL.Duration,
TTL: config.SchedulerConfig.OfferTTL.Duration,
ListenerDelay: config.SchedulerConfig.ListenerDelay.Duration,
}),
slaveHostNames: slave.NewRegistry(),
reconcileCooldown: config.ReconcileCooldown,
registration: make(chan struct{}),
asRegisteredMaster: proc.DoerFunc(func(proc.Action) <-chan error {
return proc.ErrorChanf("cannot execute action with unregistered scheduler")
}),
storeFrameworkId: config.StoreFrameworkId,
}
return k
}
func (fw *MesosFramework) Tasks() podtask.Registry {
return fw.MesosScheduler.taskRegistry
func (k *Framework) Init(scheduler *Scheduler, electedMaster proc.Process, mux *http.ServeMux) error {
log.V(1).Infoln("initializing kubernetes mesos scheduler")
k.sched = scheduler
k.asRegisteredMaster = proc.DoerFunc(func(a proc.Action) <-chan error {
if !k.registered {
return proc.ErrorChanf("failed to execute action, scheduler is disconnected")
}
return electedMaster.Do(a)
})
k.terminate = electedMaster.Done()
k.offers.Init(k.terminate)
k.InstallDebugHandlers(mux)
k.nodeRegistrator.Run(k.terminate)
return k.recoverTasks()
}
func (fw *MesosFramework) SlaveHostNameFor(id string) string {
return fw.MesosScheduler.slaveHostNames.HostName(id)
func (k *Framework) asMaster() proc.Doer {
k.RLock()
defer k.RUnlock()
return k.asRegisteredMaster
}
func (fw *MesosFramework) KillTask(taskId string) error {
killTaskId := mutil.NewTaskID(taskId)
_, err := fw.MesosScheduler.driver.KillTask(killTaskId)
func (k *Framework) InstallDebugHandlers(mux *http.ServeMux) {
wrappedHandler := func(uri string, h http.Handler) {
mux.HandleFunc(uri, func(w http.ResponseWriter, r *http.Request) {
ch := make(chan struct{})
closer := runtime.Closer(ch)
proc.OnError(k.asMaster().Do(func() {
defer closer()
h.ServeHTTP(w, r)
}), func(err error) {
defer closer()
log.Warningf("failed HTTP request for %s: %v", uri, err)
w.WriteHeader(http.StatusServiceUnavailable)
}, k.terminate)
select {
case <-time.After(k.schedulerConfig.HttpHandlerTimeout.Duration):
log.Warningf("timed out waiting for request to be processed")
w.WriteHeader(http.StatusServiceUnavailable)
return
case <-ch: // noop
}
})
}
requestReconciliation := func(uri string, requestAction func()) {
wrappedHandler(uri, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestAction()
w.WriteHeader(http.StatusNoContent)
}))
}
requestReconciliation("/debug/actions/requestExplicit", k.tasksReconciler.RequestExplicit)
requestReconciliation("/debug/actions/requestImplicit", k.tasksReconciler.RequestImplicit)
wrappedHandler("/debug/actions/kamikaze", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
slaves := k.slaveHostNames.SlaveIDs()
for _, slaveId := range slaves {
_, err := k.driver.SendFrameworkMessage(
k.executor.ExecutorId,
mutil.NewSlaveID(slaveId),
messages.Kamikaze)
if err != nil {
log.Warningf("failed to send kamikaze message to slave %s: %v", slaveId, err)
} else {
io.WriteString(w, fmt.Sprintf("kamikaze slave %s\n", slaveId))
}
}
io.WriteString(w, "OK")
}))
}
func (k *Framework) Registration() <-chan struct{} {
return k.registration
}
// Registered is called when the scheduler registered with the master successfully.
func (k *Framework) Registered(drv bindings.SchedulerDriver, fid *mesos.FrameworkID, mi *mesos.MasterInfo) {
log.Infof("Scheduler registered with the master: %v with frameworkId: %v\n", mi, fid)
k.driver = drv
k.frameworkId = fid
k.masterInfo = mi
k.registered = true
k.onRegistration.Do(func() { k.onInitialRegistration(drv) })
k.tasksReconciler.RequestExplicit()
}
// Reregistered is called when the scheduler re-registered with the master successfully.
// This happends when the master fails over.
func (k *Framework) Reregistered(drv bindings.SchedulerDriver, mi *mesos.MasterInfo) {
log.Infof("Scheduler reregistered with the master: %v\n", mi)
k.driver = drv
k.masterInfo = mi
k.registered = true
k.onRegistration.Do(func() { k.onInitialRegistration(drv) })
k.tasksReconciler.RequestExplicit()
}
// perform one-time initialization actions upon the first registration event received from Mesos.
func (k *Framework) onInitialRegistration(driver bindings.SchedulerDriver) {
defer close(k.registration)
if k.failoverTimeout > 0 {
refreshInterval := k.schedulerConfig.FrameworkIdRefreshInterval.Duration
if k.failoverTimeout < k.schedulerConfig.FrameworkIdRefreshInterval.Duration.Seconds() {
refreshInterval = time.Duration(math.Max(1, k.failoverTimeout/2)) * time.Second
}
go runtime.Until(func() {
k.storeFrameworkId(k.frameworkId.GetValue())
}, refreshInterval, k.terminate)
}
r1 := k.makeTaskRegistryReconciler()
r2 := k.makePodRegistryReconciler()
k.tasksReconciler = operations.NewTasksReconciler(k.asRegisteredMaster, k.makeCompositeReconciler(r1, r2),
k.reconcileCooldown, k.schedulerConfig.ExplicitReconciliationAbortTimeout.Duration, k.terminate)
go k.tasksReconciler.Run(driver)
if k.reconcileInterval > 0 {
ri := time.Duration(k.reconcileInterval) * time.Second
time.AfterFunc(k.schedulerConfig.InitialImplicitReconciliationDelay.Duration, func() { runtime.Until(k.tasksReconciler.RequestImplicit, ri, k.terminate) })
log.Infof("will perform implicit task reconciliation at interval: %v after %v", ri, k.schedulerConfig.InitialImplicitReconciliationDelay.Duration)
}
}
// Disconnected is called when the scheduler loses connection to the master.
func (k *Framework) Disconnected(driver bindings.SchedulerDriver) {
log.Infof("Master disconnected!\n")
k.registered = false
// discard all cached offers to avoid unnecessary TASK_LOST updates
k.offers.Invalidate("")
}
// ResourceOffers is called when the scheduler receives some offers from the master.
func (k *Framework) ResourceOffers(driver bindings.SchedulerDriver, offers []*mesos.Offer) {
log.V(2).Infof("Received offers %+v", offers)
// Record the offers in the global offer map as well as each slave's offer map.
k.offers.Add(offers)
for _, offer := range offers {
slaveId := offer.GetSlaveId().GetValue()
k.slaveHostNames.Register(slaveId, offer.GetHostname())
// create api object if not existing already
if k.nodeRegistrator != nil {
labels := node.SlaveAttributesToLabels(offer.GetAttributes())
_, err := k.nodeRegistrator.Register(offer.GetHostname(), labels)
if err != nil {
log.Error(err)
}
}
}
}
// OfferRescinded is called when the resources are recinded from the scheduler.
func (k *Framework) OfferRescinded(driver bindings.SchedulerDriver, offerId *mesos.OfferID) {
log.Infof("Offer rescinded %v\n", offerId)
oid := offerId.GetValue()
k.offers.Delete(oid, offermetrics.OfferRescinded)
}
// StatusUpdate is called when a status update message is sent to the scheduler.
func (k *Framework) StatusUpdate(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) {
source, reason := "none", "none"
if taskStatus.Source != nil {
source = (*taskStatus.Source).String()
}
if taskStatus.Reason != nil {
reason = (*taskStatus.Reason).String()
}
taskState := taskStatus.GetState()
metrics.StatusUpdates.WithLabelValues(source, reason, taskState.String()).Inc()
message := "none"
if taskStatus.Message != nil {
message = *taskStatus.Message
}
log.Infof(
"task status update %q from %q for task %q on slave %q executor %q for reason %q with message %q",
taskState.String(),
source,
taskStatus.TaskId.GetValue(),
taskStatus.SlaveId.GetValue(),
taskStatus.ExecutorId.GetValue(),
reason,
message,
)
switch taskState {
case mesos.TaskState_TASK_RUNNING, mesos.TaskState_TASK_FINISHED, mesos.TaskState_TASK_STARTING, mesos.TaskState_TASK_STAGING:
if _, state := k.sched.Tasks().UpdateStatus(taskStatus); state == podtask.StateUnknown {
if taskState != mesos.TaskState_TASK_FINISHED {
//TODO(jdef) what if I receive this after a TASK_LOST or TASK_KILLED?
//I don't want to reincarnate then.. TASK_LOST is a special case because
//the master is stateless and there are scenarios where I may get TASK_LOST
//followed by TASK_RUNNING.
//TODO(jdef) consider running this asynchronously since there are API server
//calls that may be made
k.reconcileNonTerminalTask(driver, taskStatus)
} // else, we don't really care about FINISHED tasks that aren't registered
return
}
if hostName := k.slaveHostNames.HostName(taskStatus.GetSlaveId().GetValue()); hostName == "" {
// a registered task has an update reported by a slave that we don't recognize.
// this should never happen! So we don't reconcile it.
log.Errorf("Ignore status %+v because the slave does not exist", taskStatus)
return
}
case mesos.TaskState_TASK_FAILED, mesos.TaskState_TASK_ERROR:
if task, _ := k.sched.Tasks().UpdateStatus(taskStatus); task != nil {
if task.Has(podtask.Launched) && !task.Has(podtask.Bound) {
go k.sched.Reconcile(task)
return
}
} else {
// unknown task failed, not much we can do about it
return
}
// last-ditch effort to reconcile our records
fallthrough
case mesos.TaskState_TASK_LOST, mesos.TaskState_TASK_KILLED:
k.reconcileTerminalTask(driver, taskStatus)
default:
log.Errorf(
"unknown task status %q from %q for task %q on slave %q executor %q for reason %q with message %q",
taskState.String(),
source,
taskStatus.TaskId.GetValue(),
taskStatus.SlaveId.GetValue(),
taskStatus.ExecutorId.GetValue(),
reason,
message,
)
}
}
func (k *Framework) reconcileTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) {
task, state := k.sched.Tasks().UpdateStatus(taskStatus)
if (state == podtask.StateRunning || state == podtask.StatePending) &&
((taskStatus.GetSource() == mesos.TaskStatus_SOURCE_MASTER && taskStatus.GetReason() == mesos.TaskStatus_REASON_RECONCILIATION) ||
(taskStatus.GetSource() == mesos.TaskStatus_SOURCE_SLAVE && taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_TERMINATED) ||
(taskStatus.GetSource() == mesos.TaskStatus_SOURCE_SLAVE && taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_UNREGISTERED) ||
(taskStatus.GetSource() == mesos.TaskStatus_SOURCE_EXECUTOR && taskStatus.GetMessage() == messages.ContainersDisappeared)) {
//--
// pod-task has metadata that refers to:
// (1) a task that Mesos no longer knows about, or else
// (2) a pod that the Kubelet will never report as "failed"
// (3) a pod that the kubeletExecutor reported as lost (likely due to docker daemon crash/restart)
// For now, destroy the pod and hope that there's a replication controller backing it up.
// TODO(jdef) for case #2 don't delete the pod, just update it's status to Failed
pod := &task.Pod
log.Warningf("deleting rogue pod %v/%v for lost task %v", pod.Namespace, pod.Name, task.ID)
if err := k.client.Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err != nil && !errors.IsNotFound(err) {
log.Errorf("failed to delete pod %v/%v for terminal task %v: %v", pod.Namespace, pod.Name, task.ID, err)
}
} else if taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_TERMINATED || taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_UNREGISTERED {
// attempt to prevent dangling pods in the pod and task registries
log.V(1).Infof("request explicit reconciliation to clean up for task %v after executor reported (terminated/unregistered)", taskStatus.TaskId.GetValue())
k.tasksReconciler.RequestExplicit()
} else if taskStatus.GetState() == mesos.TaskState_TASK_LOST && state == podtask.StateRunning && taskStatus.ExecutorId != nil && taskStatus.SlaveId != nil {
//TODO(jdef) this may not be meaningful once we have proper checkpointing and master detection
//If we're reconciling and receive this then the executor may be
//running a task that we need it to kill. It's possible that the framework
//is unrecognized by the master at this point, so KillTask is not guaranteed
//to do anything. The underlying driver transport may be able to send a
//FrameworkMessage directly to the slave to terminate the task.
log.V(2).Info("forwarding TASK_LOST message to executor %v on slave %v", taskStatus.ExecutorId, taskStatus.SlaveId)
data := fmt.Sprintf("%s:%s", messages.TaskLost, task.ID) //TODO(jdef) use a real message type
if _, err := driver.SendFrameworkMessage(taskStatus.ExecutorId, taskStatus.SlaveId, data); err != nil {
log.Error(err.Error())
}
}
}
// reconcile an unknown (from the perspective of our registry) non-terminal task
func (k *Framework) reconcileNonTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) {
// attempt to recover task from pod info:
// - task data may contain an api.PodStatusResult; if status.reason == REASON_RECONCILIATION then status.data == nil
// - the Name can be parsed by container.ParseFullName() to yield a pod Name and Namespace
// - pull the pod metadata down from the api server
// - perform task recovery based on pod metadata
taskId := taskStatus.TaskId.GetValue()
if taskStatus.GetReason() == mesos.TaskStatus_REASON_RECONCILIATION && taskStatus.GetSource() == mesos.TaskStatus_SOURCE_MASTER {
// there will be no data in the task status that we can use to determine the associated pod
switch taskStatus.GetState() {
case mesos.TaskState_TASK_STAGING:
// there is still hope for this task, don't kill it just yet
//TODO(jdef) there should probably be a limit for how long we tolerate tasks stuck in this state
return
default:
// for TASK_{STARTING,RUNNING} we should have already attempted to recoverTasks() for.
// if the scheduler failed over before the executor fired TASK_STARTING, then we should *not*
// be processing this reconciliation update before we process the one from the executor.
// point: we don't know what this task is (perhaps there was unrecoverable metadata in the pod),
// so it gets killed.
log.Errorf("killing non-terminal, unrecoverable task %v", taskId)
}
} else if podStatus, err := podtask.ParsePodStatusResult(taskStatus); err != nil {
// possible rogue pod exists at this point because we can't identify it; should kill the task
log.Errorf("possible rogue pod; illegal task status data for task %v, expected an api.PodStatusResult: %v", taskId, err)
} else if name, namespace, err := container.ParsePodFullName(podStatus.Name); err != nil {
// possible rogue pod exists at this point because we can't identify it; should kill the task
log.Errorf("possible rogue pod; illegal api.PodStatusResult, unable to parse full pod name from: '%v' for task %v: %v",
podStatus.Name, taskId, err)
} else if pod, err := k.client.Pods(namespace).Get(name); err == nil {
if t, ok, err := podtask.RecoverFrom(*pod); ok {
log.Infof("recovered task %v from metadata in pod %v/%v", taskId, namespace, name)
_, err := k.sched.Tasks().Register(t, nil)
if err != nil {
// someone beat us to it?!
log.Warningf("failed to register recovered task: %v", err)
return
} else {
k.sched.Tasks().UpdateStatus(taskStatus)
}
return
} else if err != nil {
//should kill the pod and the task
log.Errorf("killing pod, failed to recover task from pod %v/%v: %v", namespace, name, err)
if err := k.client.Pods(namespace).Delete(name, nil); err != nil {
log.Errorf("failed to delete pod %v/%v: %v", namespace, name, err)
}
} else {
//this is pretty unexpected: we received a TASK_{STARTING,RUNNING} message, but the apiserver's pod
//metadata is not appropriate for task reconstruction -- which should almost certainly never
//be the case unless someone swapped out the pod on us (and kept the same namespace/name) while
//we were failed over.
//kill this task, allow the newly launched scheduler to schedule the new pod
log.Warningf("unexpected pod metadata for task %v in apiserver, assuming new unscheduled pod spec: %+v", taskId, pod)
}
} else if errors.IsNotFound(err) {
// pod lookup failed, should delete the task since the pod is no longer valid; may be redundant, that's ok
log.Infof("killing task %v since pod %v/%v no longer exists", taskId, namespace, name)
} else if errors.IsServerTimeout(err) {
log.V(2).Infof("failed to reconcile task due to API server timeout: %v", err)
return
} else {
log.Errorf("unexpected API server error, aborting reconcile for task %v: %v", taskId, err)
return
}
if _, err := driver.KillTask(taskStatus.TaskId); err != nil {
log.Errorf("failed to kill task %v: %v", taskId, err)
}
}
// FrameworkMessage is called when the scheduler receives a message from the executor.
func (k *Framework) FrameworkMessage(driver bindings.SchedulerDriver,
executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, message string) {
log.Infof("Received messages from executor %v of slave %v, %v\n", executorId, slaveId, message)
}
// SlaveLost is called when some slave is lost.
func (k *Framework) SlaveLost(driver bindings.SchedulerDriver, slaveId *mesos.SlaveID) {
log.Infof("Slave %v is lost\n", slaveId)
sid := slaveId.GetValue()
k.offers.InvalidateForSlave(sid)
// TODO(jdef): delete slave from our internal list? probably not since we may need to reconcile
// tasks. it would be nice to somehow flag the slave as lost so that, perhaps, we can periodically
// flush lost slaves older than X, and for which no tasks or pods reference.
// unfinished tasks/pods will be dropped. use a replication controller if you want pods to
// be restarted when slaves die.
}
// ExecutorLost is called when some executor is lost.
func (k *Framework) ExecutorLost(driver bindings.SchedulerDriver, executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, status int) {
log.Infof("Executor %v of slave %v is lost, status: %v\n", executorId, slaveId, status)
// TODO(yifan): Restart any unfinished tasks of the executor.
}
// Error is called when there is an unrecoverable error in the scheduler or scheduler driver.
// The driver should have been aborted before this is invoked.
func (k *Framework) Error(driver bindings.SchedulerDriver, message string) {
log.Fatalf("fatal scheduler error: %v\n", message)
}
// filter func used for explicit task reconciliation, selects only non-terminal tasks which
// have been communicated to mesos (read: launched).
func explicitTaskFilter(t *podtask.T) bool {
switch t.State {
case podtask.StateRunning:
return true
case podtask.StatePending:
return t.Has(podtask.Launched)
default:
return false
}
}
// invoke the given ReconcilerAction funcs in sequence, aborting the sequence if reconciliation
// is cancelled. if any other errors occur the composite reconciler will attempt to complete the
// sequence, reporting only the last generated error.
func (k *Framework) makeCompositeReconciler(actions ...operations.ReconcilerAction) operations.ReconcilerAction {
if x := len(actions); x == 0 {
// programming error
panic("no actions specified for composite reconciler")
} else if x == 1 {
return actions[0]
}
chained := func(d bindings.SchedulerDriver, c <-chan struct{}, a, b operations.ReconcilerAction) <-chan error {
ech := a(d, c)
ch := make(chan error, 1)
go func() {
select {
case <-k.terminate:
case <-c:
case e := <-ech:
if e != nil {
ch <- e
return
}
ech = b(d, c)
select {
case <-k.terminate:
case <-c:
case e := <-ech:
if e != nil {
ch <- e
return
}
close(ch)
return
}
}
ch <- fmt.Errorf("aborting composite reconciler action")
}()
return ch
}
result := func(d bindings.SchedulerDriver, c <-chan struct{}) <-chan error {
return chained(d, c, actions[0], actions[1])
}
for i := 2; i < len(actions); i++ {
i := i
next := func(d bindings.SchedulerDriver, c <-chan struct{}) <-chan error {
return chained(d, c, operations.ReconcilerAction(result), actions[i])
}
result = next
}
return operations.ReconcilerAction(result)
}
// reconciler action factory, performs explicit task reconciliation for non-terminal
// tasks listed in the scheduler's internal taskRegistry.
func (k *Framework) makeTaskRegistryReconciler() operations.ReconcilerAction {
return operations.ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error {
taskToSlave := make(map[string]string)
for _, t := range k.sched.Tasks().List(explicitTaskFilter) {
if t.Spec.SlaveID != "" {
taskToSlave[t.ID] = t.Spec.SlaveID
}
}
return proc.ErrorChan(k.explicitlyReconcileTasks(drv, taskToSlave, cancel))
})
}
// reconciler action factory, performs explicit task reconciliation for non-terminal
// tasks identified by annotations in the Kubernetes pod registry.
func (k *Framework) makePodRegistryReconciler() operations.ReconcilerAction {
return operations.ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error {
podList, err := k.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything())
if err != nil {
return proc.ErrorChanf("failed to reconcile pod registry: %v", err)
}
taskToSlave := make(map[string]string)
for _, pod := range podList.Items {
if len(pod.Annotations) == 0 {
continue
}
taskId, found := pod.Annotations[meta.TaskIdKey]
if !found {
continue
}
slaveId, found := pod.Annotations[meta.SlaveIdKey]
if !found {
continue
}
taskToSlave[taskId] = slaveId
}
return proc.ErrorChan(k.explicitlyReconcileTasks(drv, taskToSlave, cancel))
})
}
// execute an explicit task reconciliation, as per http://mesos.apache.org/documentation/latest/reconciliation/
func (k *Framework) explicitlyReconcileTasks(driver bindings.SchedulerDriver, taskToSlave map[string]string, cancel <-chan struct{}) error {
log.Info("explicit reconcile tasks")
// tell mesos to send us the latest status updates for all the non-terminal tasks that we know about
statusList := []*mesos.TaskStatus{}
remaining := sets.KeySet(reflect.ValueOf(taskToSlave))
for taskId, slaveId := range taskToSlave {
if slaveId == "" {
delete(taskToSlave, taskId)
continue
}
statusList = append(statusList, &mesos.TaskStatus{
TaskId: mutil.NewTaskID(taskId),
SlaveId: mutil.NewSlaveID(slaveId),
State: mesos.TaskState_TASK_RUNNING.Enum(), // req'd field, doesn't have to reflect reality
})
}
select {
case <-cancel:
return merrors.ReconciliationCancelledErr
default:
if _, err := driver.ReconcileTasks(statusList); err != nil {
return err
}
}
start := time.Now()
first := true
for backoff := 1 * time.Second; first || remaining.Len() > 0; backoff = backoff * 2 {
first = false
// nothing to do here other than wait for status updates..
if backoff > k.schedulerConfig.ExplicitReconciliationMaxBackoff.Duration {
backoff = k.schedulerConfig.ExplicitReconciliationMaxBackoff.Duration
}
select {
case <-cancel:
return merrors.ReconciliationCancelledErr
case <-time.After(backoff):
for taskId := range remaining {
if task, _ := k.sched.Tasks().Get(taskId); task != nil && explicitTaskFilter(task) && task.UpdatedTime.Before(start) {
// keep this task in remaining list
continue
}
remaining.Delete(taskId)
}
}
}
return nil
}
func (ks *Framework) recoverTasks() error {
podList, err := ks.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything())
if err != nil {
log.V(1).Infof("failed to recover pod registry, madness may ensue: %v", err)
return err
}
recoverSlave := func(t *podtask.T) {
slaveId := t.Spec.SlaveID
ks.slaveHostNames.Register(slaveId, t.Offer.Host())
}
for _, pod := range podList.Items {
if _, isMirrorPod := pod.Annotations[kubetypes.ConfigMirrorAnnotationKey]; isMirrorPod {
// mirrored pods are never reconciled because the scheduler isn't responsible for
// scheduling them; they're started by the executor/kubelet upon instantiation and
// reflected in the apiserver afterward. the scheduler has no knowledge of them.
continue
}
if t, ok, err := podtask.RecoverFrom(pod); err != nil {
log.Errorf("failed to recover task from pod, will attempt to delete '%v/%v': %v", pod.Namespace, pod.Name, err)
err := ks.client.Pods(pod.Namespace).Delete(pod.Name, nil)
//TODO(jdef) check for temporary or not-found errors
if err != nil {
log.Errorf("failed to delete pod '%v/%v': %v", pod.Namespace, pod.Name, err)
}
} else if ok {
ks.sched.Tasks().Register(t, nil)
recoverSlave(t)
log.Infof("recovered task %v from pod %v/%v", t.ID, pod.Namespace, pod.Name)
}
}
return nil
}
func (ks *Framework) KillTask(id string) error {
killTaskId := mutil.NewTaskID(id)
_, err := ks.driver.KillTask(killTaskId)
return err
}
func (fw *MesosFramework) LaunchTask(task *podtask.T) error {
func (ks *Framework) LaunchTask(t *podtask.T) error {
// assume caller is holding scheduler lock
ei := fw.MesosScheduler.executor
taskList := []*mesos.TaskInfo{task.BuildTaskInfo(ei)}
offerIds := []*mesos.OfferID{task.Offer.Details().Id}
taskList := []*mesos.TaskInfo{t.BuildTaskInfo(ks.executor)}
offerIds := []*mesos.OfferID{t.Offer.Details().Id}
filters := &mesos.Filters{}
_, err := fw.MesosScheduler.driver.LaunchTasks(offerIds, taskList, filters)
_, err := ks.driver.LaunchTasks(offerIds, taskList, filters)
return err
}
func (ks *Framework) Offers() offers.Registry {
return ks.offers
}

View File

@@ -26,11 +26,12 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/offers"
"k8s.io/kubernetes/contrib/mesos/pkg/proc"
schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/slave"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/mock"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
)
//get number of non-expired offers from offer registry
@@ -82,12 +83,19 @@ func (r *mockRegistrator) Register(hostName string, labels map[string]string) (b
}
}
func mockScheduler() types.Scheduler {
mockScheduler := &types.MockScheduler{}
reg := podtask.NewInMemoryRegistry()
mockScheduler.On("Tasks").Return(reg)
return mockScheduler
}
//test adding of ressource offer, should be added to offer registry and slaves
func TestResourceOffer_Add(t *testing.T) {
assert := assert.New(t)
registrator := &mockRegistrator{cache.NewStore(cache.MetaNamespaceKeyFunc)}
testScheduler := &MesosScheduler{
testFramework := &Framework{
offers: offers.CreateRegistry(offers.RegistryConfig{
Compat: func(o *mesos.Offer) bool {
return true
@@ -102,37 +110,38 @@ func TestResourceOffer_Add(t *testing.T) {
}),
slaveHostNames: slave.NewRegistry(),
nodeRegistrator: registrator,
sched: mockScheduler(),
}
hostname := "h1"
offerID1 := util.NewOfferID("test1")
offer1 := &mesos.Offer{Id: offerID1, Hostname: &hostname, SlaveId: util.NewSlaveID(hostname)}
offers1 := []*mesos.Offer{offer1}
testScheduler.ResourceOffers(nil, offers1)
testFramework.ResourceOffers(nil, offers1)
assert.Equal(1, len(registrator.store.List()))
assert.Equal(1, getNumberOffers(testScheduler.offers))
assert.Equal(1, getNumberOffers(testFramework.offers))
//check slave hostname
assert.Equal(1, len(testScheduler.slaveHostNames.SlaveIDs()))
assert.Equal(1, len(testFramework.slaveHostNames.SlaveIDs()))
//add another offer
hostname2 := "h2"
offer2 := &mesos.Offer{Id: util.NewOfferID("test2"), Hostname: &hostname2, SlaveId: util.NewSlaveID(hostname2)}
offers2 := []*mesos.Offer{offer2}
testScheduler.ResourceOffers(nil, offers2)
testFramework.ResourceOffers(nil, offers2)
//check it is stored in registry
assert.Equal(2, getNumberOffers(testScheduler.offers))
assert.Equal(2, getNumberOffers(testFramework.offers))
//check slave hostnames
assert.Equal(2, len(testScheduler.slaveHostNames.SlaveIDs()))
assert.Equal(2, len(testFramework.slaveHostNames.SlaveIDs()))
}
//test adding of ressource offer, should be added to offer registry and slavesf
func TestResourceOffer_Add_Rescind(t *testing.T) {
assert := assert.New(t)
testScheduler := &MesosScheduler{
testFramework := &Framework{
offers: offers.CreateRegistry(offers.RegistryConfig{
Compat: func(o *mesos.Offer) bool {
return true
@@ -146,41 +155,42 @@ func TestResourceOffer_Add_Rescind(t *testing.T) {
ListenerDelay: schedcfg.DefaultListenerDelay,
}),
slaveHostNames: slave.NewRegistry(),
sched: mockScheduler(),
}
hostname := "h1"
offerID1 := util.NewOfferID("test1")
offer1 := &mesos.Offer{Id: offerID1, Hostname: &hostname, SlaveId: util.NewSlaveID(hostname)}
offers1 := []*mesos.Offer{offer1}
testScheduler.ResourceOffers(nil, offers1)
testFramework.ResourceOffers(nil, offers1)
assert.Equal(1, getNumberOffers(testScheduler.offers))
assert.Equal(1, getNumberOffers(testFramework.offers))
//check slave hostname
assert.Equal(1, len(testScheduler.slaveHostNames.SlaveIDs()))
assert.Equal(1, len(testFramework.slaveHostNames.SlaveIDs()))
//add another offer
hostname2 := "h2"
offer2 := &mesos.Offer{Id: util.NewOfferID("test2"), Hostname: &hostname2, SlaveId: util.NewSlaveID(hostname2)}
offers2 := []*mesos.Offer{offer2}
testScheduler.ResourceOffers(nil, offers2)
testFramework.ResourceOffers(nil, offers2)
assert.Equal(2, getNumberOffers(testScheduler.offers))
assert.Equal(2, getNumberOffers(testFramework.offers))
//check slave hostnames
assert.Equal(2, len(testScheduler.slaveHostNames.SlaveIDs()))
assert.Equal(2, len(testFramework.slaveHostNames.SlaveIDs()))
//next whether offers can be rescinded
testScheduler.OfferRescinded(nil, offerID1)
assert.Equal(1, getNumberOffers(testScheduler.offers))
testFramework.OfferRescinded(nil, offerID1)
assert.Equal(1, getNumberOffers(testFramework.offers))
//next whether offers can be rescinded
testScheduler.OfferRescinded(nil, util.NewOfferID("test2"))
testFramework.OfferRescinded(nil, util.NewOfferID("test2"))
//walk offers again and check it is removed from registry
assert.Equal(0, getNumberOffers(testScheduler.offers))
assert.Equal(0, getNumberOffers(testFramework.offers))
//remove non existing ID
testScheduler.OfferRescinded(nil, util.NewOfferID("notExist"))
testFramework.OfferRescinded(nil, util.NewOfferID("notExist"))
}
//test that when a slave is lost we remove all offers
@@ -188,7 +198,7 @@ func TestSlave_Lost(t *testing.T) {
assert := assert.New(t)
//
testScheduler := &MesosScheduler{
testFramework := &Framework{
offers: offers.CreateRegistry(offers.RegistryConfig{
Compat: func(o *mesos.Offer) bool {
return true
@@ -199,44 +209,45 @@ func TestSlave_Lost(t *testing.T) {
ListenerDelay: schedcfg.DefaultListenerDelay,
}),
slaveHostNames: slave.NewRegistry(),
sched: mockScheduler(),
}
hostname := "h1"
offer1 := &mesos.Offer{Id: util.NewOfferID("test1"), Hostname: &hostname, SlaveId: util.NewSlaveID(hostname)}
offers1 := []*mesos.Offer{offer1}
testScheduler.ResourceOffers(nil, offers1)
testFramework.ResourceOffers(nil, offers1)
offer2 := &mesos.Offer{Id: util.NewOfferID("test2"), Hostname: &hostname, SlaveId: util.NewSlaveID(hostname)}
offers2 := []*mesos.Offer{offer2}
testScheduler.ResourceOffers(nil, offers2)
testFramework.ResourceOffers(nil, offers2)
//add another offer from different slaveID
hostname2 := "h2"
offer3 := &mesos.Offer{Id: util.NewOfferID("test3"), Hostname: &hostname2, SlaveId: util.NewSlaveID(hostname2)}
offers3 := []*mesos.Offer{offer3}
testScheduler.ResourceOffers(nil, offers3)
testFramework.ResourceOffers(nil, offers3)
//test precondition
assert.Equal(3, getNumberOffers(testScheduler.offers))
assert.Equal(2, len(testScheduler.slaveHostNames.SlaveIDs()))
assert.Equal(3, getNumberOffers(testFramework.offers))
assert.Equal(2, len(testFramework.slaveHostNames.SlaveIDs()))
//remove first slave
testScheduler.SlaveLost(nil, util.NewSlaveID(hostname))
testFramework.SlaveLost(nil, util.NewSlaveID(hostname))
//offers should be removed
assert.Equal(1, getNumberOffers(testScheduler.offers))
assert.Equal(1, getNumberOffers(testFramework.offers))
//slave hostnames should still be all present
assert.Equal(2, len(testScheduler.slaveHostNames.SlaveIDs()))
assert.Equal(2, len(testFramework.slaveHostNames.SlaveIDs()))
//remove second slave
testScheduler.SlaveLost(nil, util.NewSlaveID(hostname2))
testFramework.SlaveLost(nil, util.NewSlaveID(hostname2))
//offers should be removed
assert.Equal(0, getNumberOffers(testScheduler.offers))
assert.Equal(0, getNumberOffers(testFramework.offers))
//slave hostnames should still be all present
assert.Equal(2, len(testScheduler.slaveHostNames.SlaveIDs()))
assert.Equal(2, len(testFramework.slaveHostNames.SlaveIDs()))
//try to remove non existing slave
testScheduler.SlaveLost(nil, util.NewSlaveID("notExist"))
testFramework.SlaveLost(nil, util.NewSlaveID("notExist"))
}
@@ -245,7 +256,7 @@ func TestDisconnect(t *testing.T) {
assert := assert.New(t)
//
testScheduler := &MesosScheduler{
testFramework := &Framework{
offers: offers.CreateRegistry(offers.RegistryConfig{
Compat: func(o *mesos.Offer) bool {
return true
@@ -256,29 +267,30 @@ func TestDisconnect(t *testing.T) {
ListenerDelay: schedcfg.DefaultListenerDelay,
}),
slaveHostNames: slave.NewRegistry(),
sched: mockScheduler(),
}
hostname := "h1"
offer1 := &mesos.Offer{Id: util.NewOfferID("test1"), Hostname: &hostname, SlaveId: util.NewSlaveID(hostname)}
offers1 := []*mesos.Offer{offer1}
testScheduler.ResourceOffers(nil, offers1)
testFramework.ResourceOffers(nil, offers1)
offer2 := &mesos.Offer{Id: util.NewOfferID("test2"), Hostname: &hostname, SlaveId: util.NewSlaveID(hostname)}
offers2 := []*mesos.Offer{offer2}
testScheduler.ResourceOffers(nil, offers2)
testFramework.ResourceOffers(nil, offers2)
//add another offer from different slaveID
hostname2 := "h2"
offer3 := &mesos.Offer{Id: util.NewOfferID("test2"), Hostname: &hostname2, SlaveId: util.NewSlaveID(hostname2)}
offers3 := []*mesos.Offer{offer3}
testScheduler.ResourceOffers(nil, offers3)
testFramework.ResourceOffers(nil, offers3)
//disconnect
testScheduler.Disconnected(nil)
testFramework.Disconnected(nil)
//all offers should be removed
assert.Equal(0, getNumberOffers(testScheduler.offers))
assert.Equal(0, getNumberOffers(testFramework.offers))
//slave hostnames should still be all present
assert.Equal(2, len(testScheduler.slaveHostNames.SlaveIDs()))
assert.Equal(2, len(testFramework.slaveHostNames.SlaveIDs()))
}
//test we can handle different status updates, TODO check state transitions
@@ -288,7 +300,7 @@ func TestStatus_Update(t *testing.T) {
// setup expectations
mockdriver.On("KillTask", util.NewTaskID("test-task-001")).Return(mesos.Status_DRIVER_RUNNING, nil)
testScheduler := &MesosScheduler{
testFramework := &Framework{
offers: offers.CreateRegistry(offers.RegistryConfig{
Compat: func(o *mesos.Offer) bool {
return true
@@ -300,26 +312,26 @@ func TestStatus_Update(t *testing.T) {
}),
slaveHostNames: slave.NewRegistry(),
driver: &mockdriver,
taskRegistry: podtask.NewInMemoryRegistry(),
sched: mockScheduler(),
}
taskStatus_task_starting := util.NewTaskStatus(
util.NewTaskID("test-task-001"),
mesos.TaskState_TASK_RUNNING,
)
testScheduler.StatusUpdate(testScheduler.driver, taskStatus_task_starting)
testFramework.StatusUpdate(testFramework.driver, taskStatus_task_starting)
taskStatus_task_running := util.NewTaskStatus(
util.NewTaskID("test-task-001"),
mesos.TaskState_TASK_RUNNING,
)
testScheduler.StatusUpdate(testScheduler.driver, taskStatus_task_running)
testFramework.StatusUpdate(testFramework.driver, taskStatus_task_running)
taskStatus_task_failed := util.NewTaskStatus(
util.NewTaskID("test-task-001"),
mesos.TaskState_TASK_FAILED,
)
testScheduler.StatusUpdate(testScheduler.driver, taskStatus_task_failed)
testFramework.StatusUpdate(testFramework.driver, taskStatus_task_failed)
//assert that mock was invoked
mockdriver.AssertExpectations(t)

View File

@@ -112,10 +112,10 @@ type SchedulerProcess struct {
fin chan struct{}
}
func New(sched bindings.Scheduler) *SchedulerProcess {
func New(framework bindings.Scheduler) *SchedulerProcess {
p := &SchedulerProcess{
Process: proc.New(),
Scheduler: sched,
Scheduler: framework,
stage: initStage,
elected: make(chan struct{}),
failover: make(chan struct{}),

View File

@@ -426,11 +426,10 @@ type lifecycleTest struct {
apiServer *TestServer
driver *mmock.JoinableDriver
eventObs *EventObserver
loop operations.SchedulerLoopInterface
podReconciler *operations.PodReconciler
podsListWatch *MockPodsListWatch
scheduler *MesosScheduler
framework *Framework
schedulerProc *ha.SchedulerProcess
scheduler *Scheduler
t *testing.T
}
@@ -450,7 +449,25 @@ func newLifecycleTest(t *testing.T) lifecycleTest {
)
ei.Data = []byte{0, 1, 2}
// create scheduler
// create framework
client := client.NewOrDie(&client.Config{
Host: apiServer.server.URL,
Version: testapi.Default.Version(),
})
c := *schedcfg.CreateDefaultConfig()
framework := New(Config{
Executor: ei,
Client: client,
SchedulerConfig: c,
LookupNode: apiServer.LookupNode,
})
// TODO(sttts): re-enable the following tests
// assert.NotNil(framework.client, "client is nil")
// assert.NotNil(framework.executor, "executor is nil")
// assert.NotNil(framework.offers, "offer registry is nil")
// create pod scheduler
strategy := podschedulers.NewAllocationStrategy(
podtask.NewDefaultPredicate(
mresource.DefaultDefaultContainerCPULimit,
@@ -461,33 +478,15 @@ func newLifecycleTest(t *testing.T) lifecycleTest {
mresource.DefaultDefaultContainerMemLimit,
),
)
client := client.NewOrDie(&client.Config{
Host: apiServer.server.URL,
Version: testapi.Default.Version(),
})
c := *schedcfg.CreateDefaultConfig()
mesosScheduler := New(Config{
Executor: ei,
Client: client,
PodScheduler: podschedulers.NewFCFSPodScheduler(strategy, apiServer.LookupNode),
SchedulerConfig: c,
LookupNode: apiServer.LookupNode,
})
// TODO(sttts): re-enable the following tests
// assert.NotNil(mesosScheduler.client, "client is nil")
// assert.NotNil(mesosScheduler.executor, "executor is nil")
// assert.NotNil(mesosScheduler.offers, "offer registry is nil")
fcfs := podschedulers.NewFCFSPodScheduler(strategy, apiServer.LookupNode)
// create scheduler process
schedulerProc := ha.New(mesosScheduler)
schedulerProc := ha.New(framework)
// create scheduler loop
fw := &MesosFramework{MesosScheduler: mesosScheduler}
// create scheduler
eventObs := NewEventObserver()
loop, podReconciler := operations.NewScheduler(&c, fw, client, eventObs, schedulerProc.Terminal(), http.DefaultServeMux, &podsListWatch.ListWatch)
assert.NotNil(loop)
scheduler := NewScheduler(&c, framework, fcfs, client, eventObs, schedulerProc.Terminal(), http.DefaultServeMux, &podsListWatch.ListWatch)
assert.NotNil(scheduler)
// create mock mesos scheduler driver
driver := &mmock.JoinableDriver{}
@@ -496,23 +495,22 @@ func newLifecycleTest(t *testing.T) lifecycleTest {
apiServer: apiServer,
driver: driver,
eventObs: eventObs,
loop: loop,
podsListWatch: podsListWatch,
podReconciler: podReconciler,
scheduler: mesosScheduler,
framework: framework,
schedulerProc: schedulerProc,
scheduler: scheduler,
t: t,
}
}
func (lt lifecycleTest) Start() <-chan LaunchedTask {
assert := &EventAssertions{*assert.New(lt.t)}
lt.loop.Run(lt.schedulerProc.Terminal())
lt.scheduler.Run(lt.schedulerProc.Terminal())
// init scheduler
err := lt.scheduler.Init(
// init framework
err := lt.framework.Init(
lt.scheduler,
lt.schedulerProc.Master(),
lt.podReconciler,
http.DefaultServeMux,
)
assert.NoError(err)
@@ -565,7 +563,7 @@ func (lt lifecycleTest) Start() <-chan LaunchedTask {
<-started
// tell scheduler to be registered
lt.scheduler.Registered(
lt.framework.Registered(
lt.driver,
mesosutil.NewFrameworkID("kubernetes-id"),
mesosutil.NewMasterInfo("master-id", (192<<24)+(168<<16)+(0<<8)+1, 5050),
@@ -605,25 +603,25 @@ func TestScheduler_LifeCycle(t *testing.T) {
// add some matching offer
offers := []*mesos.Offer{NewTestOffer(fmt.Sprintf("offer%d", i))}
lt.scheduler.ResourceOffers(nil, offers)
lt.framework.ResourceOffers(nil, offers)
// first offer is declined because node is not available yet
lt.apiServer.WaitForNode("some_hostname")
// add one more offer
lt.scheduler.ResourceOffers(nil, offers)
lt.framework.ResourceOffers(nil, offers)
// and wait for scheduled pod
assert.EventWithReason(lt.eventObs, operations.Scheduled)
select {
case launchedTask := <-launchedTasks:
// report back that the task has been staged, and then started by mesos
lt.scheduler.StatusUpdate(
lt.framework.StatusUpdate(
lt.driver,
newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING),
)
lt.scheduler.StatusUpdate(
lt.framework.StatusUpdate(
lt.driver,
newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING),
)
@@ -634,7 +632,7 @@ func TestScheduler_LifeCycle(t *testing.T) {
// report back that the task has been lost
lt.driver.AssertNumberOfCalls(t, "SendFrameworkMessage", 0)
lt.scheduler.StatusUpdate(
lt.framework.StatusUpdate(
lt.driver,
newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_LOST),
)
@@ -654,14 +652,14 @@ func TestScheduler_LifeCycle(t *testing.T) {
assert.EventWithReason(lt.eventObs, operations.FailedScheduling, "failedScheduling event not received")
// supply a matching offer
lt.scheduler.ResourceOffers(lt.driver, offers)
lt.framework.ResourceOffers(lt.driver, offers)
for _, offer := range offers {
if _, ok := offeredNodes[offer.GetHostname()]; !ok {
offeredNodes[offer.GetHostname()] = struct{}{}
lt.apiServer.WaitForNode(offer.GetHostname())
// reoffer since it must have been declined above
lt.scheduler.ResourceOffers(lt.driver, []*mesos.Offer{offer})
lt.framework.ResourceOffers(lt.driver, []*mesos.Offer{offer})
}
}
@@ -696,11 +694,11 @@ func TestScheduler_LifeCycle(t *testing.T) {
pod, launchedTask, offer := launchPodWithOffers(pod, offers)
if pod != nil {
// report back status
lt.scheduler.StatusUpdate(
lt.framework.StatusUpdate(
lt.driver,
newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING),
)
lt.scheduler.StatusUpdate(
lt.framework.StatusUpdate(
lt.driver,
newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING),
)
@@ -736,7 +734,7 @@ func TestScheduler_LifeCycle(t *testing.T) {
select {
case <-killTaskCalled:
// report back that the task is finished
lt.scheduler.StatusUpdate(
lt.framework.StatusUpdate(
lt.driver,
newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_FINISHED),
)
@@ -761,8 +759,8 @@ func TestScheduler_LifeCycle(t *testing.T) {
assert.Equal(offers[1].Id.GetValue(), usedOffer.Id.GetValue())
assert.Equal(pod.Spec.NodeName, *usedOffer.Hostname)
lt.scheduler.OfferRescinded(lt.driver, offers[0].Id)
lt.scheduler.OfferRescinded(lt.driver, offers[2].Id)
lt.framework.OfferRescinded(lt.driver, offers[0].Id)
lt.framework.OfferRescinded(lt.driver, offers[2].Id)
// start pods:
// - which are failing while binding,
@@ -774,7 +772,7 @@ func TestScheduler_LifeCycle(t *testing.T) {
status := newTaskStatusForTask(task, mesos.TaskState_TASK_FAILED)
message := messages.CreateBindingFailure
status.Message = &message
lt.scheduler.StatusUpdate(lt.driver, status)
lt.framework.StatusUpdate(lt.driver, status)
// wait until pod is looked up at the apiserver
assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
@@ -796,7 +794,7 @@ func TestScheduler_LifeCycle(t *testing.T) {
podKey, _ := podtask.MakePodKey(api.NewDefaultContext(), pod.Name)
assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
t, _ := lt.scheduler.taskRegistry.ForPod(podKey)
t, _ := lt.scheduler.Tasks().ForPod(podKey)
return t == nil
})

View File

@@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/offers"
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types"
"k8s.io/kubernetes/pkg/api"
@@ -31,14 +32,16 @@ import (
// SchedulerAlgorithm implements the algorithm.ScheduleAlgorithm interface
type SchedulerAlgorithm struct {
fw types.Framework
podUpdates queue.FIFO
sched types.Scheduler
podUpdates queue.FIFO
podScheduler podschedulers.PodScheduler
}
func NewSchedulerAlgorithm(fw types.Framework, podUpdates queue.FIFO) *SchedulerAlgorithm {
func NewSchedulerAlgorithm(sched types.Scheduler, podUpdates queue.FIFO, podScheduler podschedulers.PodScheduler) *SchedulerAlgorithm {
return &SchedulerAlgorithm{
fw: fw,
podUpdates: podUpdates,
sched: sched,
podUpdates: podUpdates,
podScheduler: podScheduler,
}
}
@@ -54,10 +57,10 @@ func (k *SchedulerAlgorithm) Schedule(pod *api.Pod) (string, error) {
return "", err
}
k.fw.Lock()
defer k.fw.Unlock()
k.sched.Lock()
defer k.sched.Unlock()
switch task, state := k.fw.Tasks().ForPod(podKey); state {
switch task, state := k.sched.Tasks().ForPod(podKey); state {
case podtask.StateUnknown:
// There's a bit of a potential race here, a pod could have been yielded() and
// then before we get *here* it could be deleted.
@@ -77,7 +80,7 @@ func (k *SchedulerAlgorithm) Schedule(pod *api.Pod) (string, error) {
log.Warningf("aborting Schedule, unable to create podtask object %+v: %v", pod, err)
return "", err
}
return k.doSchedule(k.fw.Tasks().Register(podTask, nil))
return k.doSchedule(k.sched.Tasks().Register(podTask, nil))
//TODO(jdef) it's possible that the pod state has diverged from what
//we knew previously, we should probably update the task.Pod state here
@@ -107,19 +110,19 @@ func (k *SchedulerAlgorithm) doSchedule(task *podtask.T, err error) (string, err
if task.HasAcceptedOffer() {
// verify that the offer is still on the table
offerId := task.GetOfferId()
if offer, ok := k.fw.Offers().Get(offerId); ok && !offer.HasExpired() {
if offer, ok := k.sched.Offers().Get(offerId); ok && !offer.HasExpired() {
// skip tasks that have already have assigned offers
offer = task.Offer
} else {
task.Offer.Release()
task.Reset()
if err = k.fw.Tasks().Update(task); err != nil {
if err = k.sched.Tasks().Update(task); err != nil {
return "", err
}
}
}
if err == nil && offer == nil {
offer, err = k.fw.PodScheduler().SchedulePod(k.fw.Offers(), k.fw, task)
offer, err = k.podScheduler.SchedulePod(k.sched.Offers(), task)
}
if err != nil {
return "", err
@@ -128,24 +131,16 @@ func (k *SchedulerAlgorithm) doSchedule(task *podtask.T, err error) (string, err
if details == nil {
return "", fmt.Errorf("offer already invalid/expired for task %v", task.ID)
}
slaveId := details.GetSlaveId().GetValue()
if slaveHostName := k.fw.SlaveHostNameFor(slaveId); slaveHostName == "" {
// not much sense in Release()ing the offer here since its owner died
offer.Release()
k.fw.Offers().Invalidate(details.Id.GetValue())
return "", fmt.Errorf("Slave disappeared (%v) while scheduling task %v", slaveId, task.ID)
} else {
if task.Offer != nil && task.Offer != offer {
return "", fmt.Errorf("task.offer assignment must be idempotent, task %+v: offer %+v", task, offer)
}
task.Offer = offer
k.fw.PodScheduler().Procurement()(task, details) // TODO(jdef) why is nothing checking the error returned here?
if err := k.fw.Tasks().Update(task); err != nil {
offer.Release()
return "", err
}
return slaveHostName, nil
if task.Offer != nil && task.Offer != offer {
return "", fmt.Errorf("task.offer assignment must be idempotent, task %+v: offer %+v", task, offer)
}
task.Offer = offer
k.podScheduler.Procurement()(task, details) // TODO(jdef) why is nothing checking the error returned here?
if err := k.sched.Tasks().Update(task); err != nil {
offer.Release()
return "", err
}
return details.GetHostname(), nil
}

View File

@@ -29,12 +29,12 @@ import (
)
type Binder struct {
fw types.Framework
sched types.Scheduler
}
func NewBinder(fw types.Framework) *Binder {
func NewBinder(sched types.Scheduler) *Binder {
return &Binder{
fw: fw,
sched: sched,
}
}
@@ -49,10 +49,10 @@ func (b *Binder) Bind(binding *api.Binding) error {
return err
}
b.fw.Lock()
defer b.fw.Unlock()
b.sched.Lock()
defer b.sched.Unlock()
switch task, state := b.fw.Tasks().ForPod(podKey); state {
switch task, state := b.sched.Tasks().ForPod(podKey); state {
case podtask.StatePending:
return b.bind(ctx, binding, task)
default:
@@ -66,7 +66,7 @@ func (b *Binder) Bind(binding *api.Binding) error {
func (b *Binder) rollback(task *podtask.T, err error) error {
task.Offer.Release()
task.Reset()
if err2 := b.fw.Tasks().Update(task); err2 != nil {
if err2 := b.sched.Tasks().Update(task); err2 != nil {
log.Errorf("failed to update pod task: %v", err2)
}
return err
@@ -88,7 +88,7 @@ func (b *Binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (e
// By this time, there is a chance that the slave is disconnected.
offerId := task.GetOfferId()
if offer, ok := b.fw.Offers().Get(offerId); !ok || offer.HasExpired() {
if offer, ok := b.sched.Offers().Get(offerId); !ok || offer.HasExpired() {
// already rescinded or timed out or otherwise invalidated
return b.rollback(task, fmt.Errorf("failed prior to launchTask due to expired offer for task %v", task.ID))
}
@@ -96,10 +96,10 @@ func (b *Binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (e
if err = b.prepareTaskForLaunch(ctx, binding.Target.Name, task, offerId); err == nil {
log.V(2).Infof("launching task: %q on target %q slave %q for pod \"%v/%v\", cpu %.2f, mem %.2f MB",
task.ID, binding.Target.Name, task.Spec.SlaveID, task.Pod.Namespace, task.Pod.Name, task.Spec.CPU, task.Spec.Memory)
if err = b.fw.LaunchTask(task); err == nil {
b.fw.Offers().Invalidate(offerId)
if err = b.sched.LaunchTask(task); err == nil {
b.sched.Offers().Invalidate(offerId)
task.Set(podtask.Launched)
if err = b.fw.Tasks().Update(task); err != nil {
if err = b.sched.Tasks().Update(task); err != nil {
// this should only happen if the task has been removed or has changed status,
// which SHOULD NOT HAPPEN as long as we're synchronizing correctly
log.Errorf("failed to update task w/ Launched status: %v", err)

View File

@@ -30,14 +30,14 @@ import (
)
type Deleter struct {
fw types.Framework
qr *queuer.Queuer
sched types.Scheduler
qr *queuer.Queuer
}
func NewDeleter(fw types.Framework, qr *queuer.Queuer) *Deleter {
func NewDeleter(sched types.Scheduler, qr *queuer.Queuer) *Deleter {
return &Deleter{
fw: fw,
qr: qr,
sched: sched,
qr: qr,
}
}
@@ -72,8 +72,8 @@ func (k *Deleter) DeleteOne(pod *queuer.Pod) error {
// removing the pod from the scheduling queue. this makes the concurrent
// execution of scheduler-error-handling and delete-handling easier to
// reason about.
k.fw.Lock()
defer k.fw.Unlock()
k.sched.Lock()
defer k.sched.Unlock()
// prevent the scheduler from attempting to pop this; it's also possible that
// it's concurrently being scheduled (somewhere between pod scheduling and
@@ -81,7 +81,7 @@ func (k *Deleter) DeleteOne(pod *queuer.Pod) error {
// will abort Bind()ing
k.qr.Dequeue(pod.GetUID())
switch task, state := k.fw.Tasks().ForPod(podKey); state {
switch task, state := k.sched.Tasks().ForPod(podKey); state {
case podtask.StateUnknown:
log.V(2).Infof("Could not resolve pod '%s' to task id", podKey)
return merrors.NoSuchPodErr
@@ -96,11 +96,11 @@ func (k *Deleter) DeleteOne(pod *queuer.Pod) error {
task.Reset()
task.Set(podtask.Deleted)
//TODO(jdef) probably want better handling here
if err := k.fw.Tasks().Update(task); err != nil {
if err := k.sched.Tasks().Update(task); err != nil {
return err
}
}
k.fw.Tasks().Unregister(task)
k.sched.Tasks().Unregister(task)
return nil
}
fallthrough
@@ -108,10 +108,10 @@ func (k *Deleter) DeleteOne(pod *queuer.Pod) error {
case podtask.StateRunning:
// signal to watchers that the related pod is going down
task.Set(podtask.Deleted)
if err := k.fw.Tasks().Update(task); err != nil {
if err := k.sched.Tasks().Update(task); err != nil {
log.Errorf("failed to update task w/ Deleted status: %v", err)
}
return k.fw.KillTask(task.ID)
return k.sched.KillTask(task.ID)
default:
log.Infof("cannot kill pod '%s': non-terminal task not found %v", podKey, task.ID)

View File

@@ -15,6 +15,6 @@ limitations under the License.
*/
// Package operations implements independent aspects of the scheduler which
// do not use MesosScheduler internals, but rely solely on the Framework
// do not use Framework internals, but rely solely on the Framework
// interface.
package operations

View File

@@ -31,16 +31,18 @@ import (
)
type ErrorHandler struct {
fw types.Framework
backoff *backoff.Backoff
qr *queuer.Queuer
sched types.Scheduler
backoff *backoff.Backoff
qr *queuer.Queuer
podScheduler podschedulers.PodScheduler
}
func NewErrorHandler(fw types.Framework, backoff *backoff.Backoff, qr *queuer.Queuer) *ErrorHandler {
func NewErrorHandler(sched types.Scheduler, backoff *backoff.Backoff, qr *queuer.Queuer, podScheduler podschedulers.PodScheduler) *ErrorHandler {
return &ErrorHandler{
fw: fw,
backoff: backoff,
qr: qr,
sched: sched,
backoff: backoff,
qr: qr,
podScheduler: podScheduler,
}
}
@@ -64,10 +66,10 @@ func (k *ErrorHandler) Error(pod *api.Pod, schedulingErr error) {
}
k.backoff.GC()
k.fw.Lock()
defer k.fw.Unlock()
k.sched.Lock()
defer k.sched.Unlock()
switch task, state := k.fw.Tasks().ForPod(podKey); state {
switch task, state := k.sched.Tasks().ForPod(podKey); state {
case podtask.StateUnknown:
// if we don't have a mapping here any more then someone deleted the pod
log.V(2).Infof("Could not resolve pod to task, aborting pod reschdule: %s", podKey)
@@ -81,16 +83,16 @@ func (k *ErrorHandler) Error(pod *api.Pod, schedulingErr error) {
breakoutEarly := queue.BreakChan(nil)
if schedulingErr == podschedulers.NoSuitableOffersErr {
log.V(3).Infof("adding backoff breakout handler for pod %v", podKey)
breakoutEarly = queue.BreakChan(k.fw.Offers().Listen(podKey, func(offer *mesos.Offer) bool {
k.fw.Lock()
defer k.fw.Unlock()
switch task, state := k.fw.Tasks().Get(task.ID); state {
breakoutEarly = queue.BreakChan(k.sched.Offers().Listen(podKey, func(offer *mesos.Offer) bool {
k.sched.Lock()
defer k.sched.Unlock()
switch task, state := k.sched.Tasks().Get(task.ID); state {
case podtask.StatePending:
// Assess fitness of pod with the current offer. The scheduler normally
// "backs off" when it can't find an offer that matches up with a pod.
// The backoff period for a pod can terminate sooner if an offer becomes
// available that matches up.
return !task.Has(podtask.Launched) && k.fw.PodScheduler().FitPredicate()(task, offer, nil)
return !task.Has(podtask.Launched) && k.podScheduler.FitPredicate()(task, offer, nil)
default:
// no point in continuing to check for matching offers
return true

View File

@@ -31,15 +31,15 @@ import (
// PodReconciler reconciles a pod with the apiserver
type PodReconciler struct {
fw types.Framework
sched types.Scheduler
client *client.Client
qr *queuer.Queuer
deleter *Deleter
}
func NewPodReconciler(fw types.Framework, client *client.Client, qr *queuer.Queuer, deleter *Deleter) *PodReconciler {
func NewPodReconciler(sched types.Scheduler, client *client.Client, qr *queuer.Queuer, deleter *Deleter) *PodReconciler {
return &PodReconciler{
fw: fw,
sched: sched,
client: client,
qr: qr,
deleter: deleter,
@@ -88,10 +88,10 @@ func (s *PodReconciler) Reconcile(t *podtask.T) {
return
}
s.fw.Lock()
defer s.fw.Unlock()
s.sched.Lock()
defer s.sched.Unlock()
if _, state := s.fw.Tasks().ForPod(podKey); state != podtask.StateUnknown {
if _, state := s.sched.Tasks().ForPod(podKey); state != podtask.StateUnknown {
//TODO(jdef) reconcile the task
log.Errorf("task already registered for pod %v", pod.Name)
return

View File

@@ -17,19 +17,11 @@ limitations under the License.
package operations
import (
"net/http"
"time"
log "github.com/golang/glog"
"k8s.io/kubernetes/contrib/mesos/pkg/backoff"
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
)
@@ -55,47 +47,6 @@ type SchedulerLoop struct {
started chan<- struct{} // startup latch
}
func NewScheduler(c *config.Config, fw types.Framework, client *client.Client, recorder record.EventRecorder,
terminate <-chan struct{}, mux *http.ServeMux, podsWatcher *cache.ListWatch) (SchedulerLoopInterface, *PodReconciler) {
// Watch and queue pods that need scheduling.
updates := make(chan queue.Entry, c.UpdatesBacklog)
podUpdates := &podStoreAdapter{queue.NewHistorical(updates)}
reflector := cache.NewReflector(podsWatcher, &api.Pod{}, podUpdates, 0)
// lock that guards critial sections that involve transferring pods from
// the store (cache) to the scheduling queue; its purpose is to maintain
// an ordering (vs interleaving) of operations that's easier to reason about.
q := queuer.New(podUpdates)
algorithm := NewSchedulerAlgorithm(fw, podUpdates)
podDeleter := NewDeleter(fw, q)
podReconciler := NewPodReconciler(fw, client, q, podDeleter)
bo := backoff.New(c.InitialPodBackoff.Duration, c.MaxPodBackoff.Duration)
errorHandler := NewErrorHandler(fw, bo, q)
binder := NewBinder(fw)
startLatch := make(chan struct{})
eventBroadcaster := record.NewBroadcaster()
runtime.On(startLatch, func() {
eventBroadcaster.StartRecordingToSink(client.Events(""))
reflector.Run() // TODO(jdef) should listen for termination
podDeleter.Run(updates, terminate)
q.Run(terminate)
q.InstallDebugHandlers(mux)
podtask.InstallDebugHandlers(fw.Tasks(), mux)
})
return NewSchedulerLoop(client, algorithm, recorder, q.Yield, errorHandler.Error, binder, startLatch), podReconciler
}
func NewSchedulerLoop(client *client.Client, algorithm *SchedulerAlgorithm,
recorder record.EventRecorder, nextPod func() *api.Pod, error func(pod *api.Pod, schedulingErr error),
binder *Binder, started chan<- struct{}) *SchedulerLoop {

View File

@@ -62,7 +62,7 @@ func NewFCFSPodScheduler(as AllocationStrategy, lookupNode node.LookupFunc) PodS
}
// A first-come-first-serve scheduler: acquires the first offer that can support the task
func (fps *fcfsPodScheduler) SchedulePod(r offers.Registry, unused SlaveIndex, task *podtask.T) (offers.Perishable, error) {
func (fps *fcfsPodScheduler) SchedulePod(r offers.Registry, task *podtask.T) (offers.Perishable, error) {
podName := fmt.Sprintf("%s/%s", task.Pod.Namespace, task.Pod.Name)
var acceptedOffer offers.Perishable
err := r.Walk(func(p offers.Perishable) (bool, error) {

View File

@@ -37,14 +37,13 @@ type PodScheduler interface {
// SchedulePod implements how to schedule pods among slaves.
// We can have different implementation for different scheduling policy.
//
// The function accepts a group of slaves (each contains offers from
// that slave) and a single pod, which aligns well with the k8s scheduling
// algorithm. It returns an offerId that is acceptable for the pod, otherwise
// nil. The caller is responsible for filling in task state w/ relevant offer
// details.
// The function accepts a set of offers and a single pod, which aligns well
// with the k8s scheduling algorithm. It returns an offerId that is acceptable
// for the pod, otherwise nil. The caller is responsible for filling in task
// state w/ relevant offer details.
//
// See the FCFSPodScheduler for example.
SchedulePod(r offers.Registry, slaves SlaveIndex, task *podtask.T) (offers.Perishable, error)
SchedulePod(r offers.Registry, task *podtask.T) (offers.Perishable, error)
}
// A minimal placeholder

View File

@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package operations
package scheduler
import (
"k8s.io/kubernetes/contrib/mesos/pkg/queue"

View File

@@ -17,736 +17,98 @@ limitations under the License.
package scheduler
import (
"fmt"
"io"
"math"
"net/http"
"sync"
"time"
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
mutil "github.com/mesos/mesos-go/mesosutil"
bindings "github.com/mesos/mesos-go/scheduler"
execcfg "k8s.io/kubernetes/contrib/mesos/pkg/executor/config"
"k8s.io/kubernetes/contrib/mesos/pkg/executor/messages"
"k8s.io/kubernetes/contrib/mesos/pkg/node"
"k8s.io/kubernetes/contrib/mesos/pkg/backoff"
"k8s.io/kubernetes/contrib/mesos/pkg/offers"
offermetrics "k8s.io/kubernetes/contrib/mesos/pkg/offers/metrics"
"k8s.io/kubernetes/contrib/mesos/pkg/proc"
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/slave"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/uid"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubelet/container"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/sets"
)
// KubernetesScheduler implements:
// 1: A mesos scheduler.
// 2: A kubernetes pod.Registry.
type MesosScheduler struct {
// We use a lock here to avoid races
// between invoking the mesos callback
// and the invoking the pod registry interfaces.
// In particular, changes to podtask.T objects are currently guarded by this lock.
*sync.RWMutex
podScheduler podschedulers.PodScheduler
// Config related, write-once
schedulerConfig *schedcfg.Config
executor *mesos.ExecutorInfo
executorGroup uint64
client *client.Client
failoverTimeout float64 // in seconds
reconcileInterval int64
nodeRegistrator node.Registrator
storeFrameworkId func(id string)
// Mesos context.
driver bindings.SchedulerDriver // late initialization
frameworkId *mesos.FrameworkID
masterInfo *mesos.MasterInfo
registered bool
registration chan struct{} // signal chan that closes upon first successful registration
onRegistration sync.Once
offers offers.Registry
slaveHostNames *slave.Registry
// unsafe state, needs to be guarded
// Scheduler implements types.Scheduler
type Scheduler struct {
podReconciler *operations.PodReconciler
framework *Framework
loop *operations.SchedulerLoop
// unsafe state, needs to be guarded, especially changes to podtask.T objects
sync.RWMutex
taskRegistry podtask.Registry
// via deferred init
podReconciler *operations.PodReconciler
tasksReconciler *operations.TasksReconciler
reconcileCooldown time.Duration
asRegisteredMaster proc.Doer
terminate <-chan struct{} // signal chan, closes when we should kill background tasks
}
type Config struct {
SchedulerConfig schedcfg.Config
Executor *mesos.ExecutorInfo
PodScheduler podschedulers.PodScheduler
Client *client.Client
StoreFrameworkId func(id string)
FailoverTimeout float64
ReconcileInterval int64
ReconcileCooldown time.Duration
LookupNode node.LookupFunc
}
func NewScheduler(c *config.Config, framework *Framework, podScheduler podschedulers.PodScheduler,
client *client.Client, recorder record.EventRecorder, terminate <-chan struct{}, mux *http.ServeMux, podsWatcher *cache.ListWatch) *Scheduler {
// New creates a new MesosScheduler
func New(config Config) *MesosScheduler {
var k *MesosScheduler
k = &MesosScheduler{
schedulerConfig: &config.SchedulerConfig,
RWMutex: new(sync.RWMutex),
executor: config.Executor,
executorGroup: uid.Parse(config.Executor.ExecutorId.GetValue()).Group(),
podScheduler: config.PodScheduler,
client: config.Client,
failoverTimeout: config.FailoverTimeout,
reconcileInterval: config.ReconcileInterval,
nodeRegistrator: node.NewRegistrator(config.Client, config.LookupNode),
offers: offers.CreateRegistry(offers.RegistryConfig{
Compat: func(o *mesos.Offer) bool {
// the node must be registered and have up-to-date labels
n := config.LookupNode(o.GetHostname())
if n == nil || !node.IsUpToDate(n, node.SlaveAttributesToLabels(o.GetAttributes())) {
return false
}
// the executor IDs must not identify a kubelet-executor with a group that doesn't match ours
for _, eid := range o.GetExecutorIds() {
execuid := uid.Parse(eid.GetValue())
if execuid.Name() == execcfg.DefaultInfoID && execuid.Group() != k.executorGroup {
return false
}
}
return true
},
DeclineOffer: func(id string) <-chan error {
errOnce := proc.NewErrorOnce(k.terminate)
errOuter := k.asRegisteredMaster.Do(func() {
var err error
defer errOnce.Report(err)
offerId := mutil.NewOfferID(id)
filters := &mesos.Filters{}
_, err = k.driver.DeclineOffer(offerId, filters)
})
return errOnce.Send(errOuter).Err()
},
// remember expired offers so that we can tell if a previously scheduler offer relies on one
LingerTTL: config.SchedulerConfig.OfferLingerTTL.Duration,
TTL: config.SchedulerConfig.OfferTTL.Duration,
ListenerDelay: config.SchedulerConfig.ListenerDelay.Duration,
}),
slaveHostNames: slave.NewRegistry(),
taskRegistry: podtask.NewInMemoryRegistry(),
reconcileCooldown: config.ReconcileCooldown,
registration: make(chan struct{}),
asRegisteredMaster: proc.DoerFunc(func(proc.Action) <-chan error {
return proc.ErrorChanf("cannot execute action with unregistered scheduler")
}),
storeFrameworkId: config.StoreFrameworkId,
core := &Scheduler{
framework: framework,
taskRegistry: podtask.NewInMemoryRegistry(),
}
return k
}
func (k *MesosScheduler) Init(electedMaster proc.Process, pr *operations.PodReconciler, mux *http.ServeMux) error {
log.V(1).Infoln("initializing kubernetes mesos scheduler")
// Watch and queue pods that need scheduling.
updates := make(chan queue.Entry, c.UpdatesBacklog)
podUpdates := &podStoreAdapter{queue.NewHistorical(updates)}
reflector := cache.NewReflector(podsWatcher, &api.Pod{}, podUpdates, 0)
k.asRegisteredMaster = proc.DoerFunc(func(a proc.Action) <-chan error {
if !k.registered {
return proc.ErrorChanf("failed to execute action, scheduler is disconnected")
}
return electedMaster.Do(a)
q := queuer.New(podUpdates)
algorithm := operations.NewSchedulerAlgorithm(core, podUpdates, podScheduler)
podDeleter := operations.NewDeleter(core, q)
core.podReconciler = operations.NewPodReconciler(core, client, q, podDeleter)
bo := backoff.New(c.InitialPodBackoff.Duration, c.MaxPodBackoff.Duration)
errorHandler := operations.NewErrorHandler(core, bo, q, podScheduler)
binder := operations.NewBinder(core)
startLatch := make(chan struct{})
eventBroadcaster := record.NewBroadcaster()
runtime.On(startLatch, func() {
eventBroadcaster.StartRecordingToSink(client.Events(""))
reflector.Run() // TODO(jdef) should listen for termination
podDeleter.Run(updates, terminate)
q.Run(terminate)
q.InstallDebugHandlers(mux)
podtask.InstallDebugHandlers(core.Tasks(), mux)
})
k.terminate = electedMaster.Done()
k.podReconciler = pr
k.offers.Init(k.terminate)
k.InstallDebugHandlers(mux)
k.nodeRegistrator.Run(k.terminate)
return k.recoverTasks()
core.loop = operations.NewSchedulerLoop(client, algorithm, recorder, q.Yield, errorHandler.Error, binder, startLatch)
return core
}
func (k *MesosScheduler) asMaster() proc.Doer {
k.RLock()
defer k.RUnlock()
return k.asRegisteredMaster
func (c *Scheduler) Run(done <-chan struct{}) {
c.loop.Run(done)
}
func (k *MesosScheduler) InstallDebugHandlers(mux *http.ServeMux) {
wrappedHandler := func(uri string, h http.Handler) {
mux.HandleFunc(uri, func(w http.ResponseWriter, r *http.Request) {
ch := make(chan struct{})
closer := runtime.Closer(ch)
proc.OnError(k.asMaster().Do(func() {
defer closer()
h.ServeHTTP(w, r)
}), func(err error) {
defer closer()
log.Warningf("failed HTTP request for %s: %v", uri, err)
w.WriteHeader(http.StatusServiceUnavailable)
}, k.terminate)
select {
case <-time.After(k.schedulerConfig.HttpHandlerTimeout.Duration):
log.Warningf("timed out waiting for request to be processed")
w.WriteHeader(http.StatusServiceUnavailable)
return
case <-ch: // noop
}
})
}
requestReconciliation := func(uri string, requestAction func()) {
wrappedHandler(uri, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestAction()
w.WriteHeader(http.StatusNoContent)
}))
}
requestReconciliation("/debug/actions/requestExplicit", k.tasksReconciler.RequestExplicit)
requestReconciliation("/debug/actions/requestImplicit", k.tasksReconciler.RequestImplicit)
wrappedHandler("/debug/actions/kamikaze", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
slaves := k.slaveHostNames.SlaveIDs()
for _, slaveId := range slaves {
_, err := k.driver.SendFrameworkMessage(
k.executor.ExecutorId,
mutil.NewSlaveID(slaveId),
messages.Kamikaze)
if err != nil {
log.Warningf("failed to send kamikaze message to slave %s: %v", slaveId, err)
} else {
io.WriteString(w, fmt.Sprintf("kamikaze slave %s\n", slaveId))
}
}
io.WriteString(w, "OK")
}))
func (c *Scheduler) Reconcile(t *podtask.T) {
c.podReconciler.Reconcile(t)
}
func (k *MesosScheduler) Registration() <-chan struct{} {
return k.registration
func (c *Scheduler) Tasks() podtask.Registry {
return c.taskRegistry
}
// Registered is called when the scheduler registered with the master successfully.
func (k *MesosScheduler) Registered(drv bindings.SchedulerDriver, fid *mesos.FrameworkID, mi *mesos.MasterInfo) {
log.Infof("Scheduler registered with the master: %v with frameworkId: %v\n", mi, fid)
k.driver = drv
k.frameworkId = fid
k.masterInfo = mi
k.registered = true
k.onRegistration.Do(func() { k.onInitialRegistration(drv) })
k.tasksReconciler.RequestExplicit()
func (c *Scheduler) Offers() offers.Registry {
return c.framework.offers
}
// Reregistered is called when the scheduler re-registered with the master successfully.
// This happends when the master fails over.
func (k *MesosScheduler) Reregistered(drv bindings.SchedulerDriver, mi *mesos.MasterInfo) {
log.Infof("Scheduler reregistered with the master: %v\n", mi)
k.driver = drv
k.masterInfo = mi
k.registered = true
k.onRegistration.Do(func() { k.onInitialRegistration(drv) })
k.tasksReconciler.RequestExplicit()
func (c *Scheduler) KillTask(id string) error {
return c.framework.KillTask(id)
}
// perform one-time initialization actions upon the first registration event received from Mesos.
func (k *MesosScheduler) onInitialRegistration(driver bindings.SchedulerDriver) {
defer close(k.registration)
if k.failoverTimeout > 0 {
refreshInterval := k.schedulerConfig.FrameworkIdRefreshInterval.Duration
if k.failoverTimeout < k.schedulerConfig.FrameworkIdRefreshInterval.Duration.Seconds() {
refreshInterval = time.Duration(math.Max(1, k.failoverTimeout/2)) * time.Second
}
go runtime.Until(func() {
k.storeFrameworkId(k.frameworkId.GetValue())
}, refreshInterval, k.terminate)
}
r1 := k.makeTaskRegistryReconciler()
r2 := k.makePodRegistryReconciler()
k.tasksReconciler = operations.NewTasksReconciler(k.asRegisteredMaster, k.makeCompositeReconciler(r1, r2),
k.reconcileCooldown, k.schedulerConfig.ExplicitReconciliationAbortTimeout.Duration, k.terminate)
go k.tasksReconciler.Run(driver)
if k.reconcileInterval > 0 {
ri := time.Duration(k.reconcileInterval) * time.Second
time.AfterFunc(k.schedulerConfig.InitialImplicitReconciliationDelay.Duration, func() { runtime.Until(k.tasksReconciler.RequestImplicit, ri, k.terminate) })
log.Infof("will perform implicit task reconciliation at interval: %v after %v", ri, k.schedulerConfig.InitialImplicitReconciliationDelay.Duration)
}
}
// Disconnected is called when the scheduler loses connection to the master.
func (k *MesosScheduler) Disconnected(driver bindings.SchedulerDriver) {
log.Infof("Master disconnected!\n")
k.registered = false
// discard all cached offers to avoid unnecessary TASK_LOST updates
k.offers.Invalidate("")
}
// ResourceOffers is called when the scheduler receives some offers from the master.
func (k *MesosScheduler) ResourceOffers(driver bindings.SchedulerDriver, offers []*mesos.Offer) {
log.V(2).Infof("Received offers %+v", offers)
// Record the offers in the global offer map as well as each slave's offer map.
k.offers.Add(offers)
for _, offer := range offers {
slaveId := offer.GetSlaveId().GetValue()
k.slaveHostNames.Register(slaveId, offer.GetHostname())
// create api object if not existing already
if k.nodeRegistrator != nil {
labels := node.SlaveAttributesToLabels(offer.GetAttributes())
_, err := k.nodeRegistrator.Register(offer.GetHostname(), labels)
if err != nil {
log.Error(err)
}
}
}
}
// OfferRescinded is called when the resources are recinded from the scheduler.
func (k *MesosScheduler) OfferRescinded(driver bindings.SchedulerDriver, offerId *mesos.OfferID) {
log.Infof("Offer rescinded %v\n", offerId)
oid := offerId.GetValue()
k.offers.Delete(oid, offermetrics.OfferRescinded)
}
// StatusUpdate is called when a status update message is sent to the scheduler.
func (k *MesosScheduler) StatusUpdate(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) {
source, reason := "none", "none"
if taskStatus.Source != nil {
source = (*taskStatus.Source).String()
}
if taskStatus.Reason != nil {
reason = (*taskStatus.Reason).String()
}
taskState := taskStatus.GetState()
metrics.StatusUpdates.WithLabelValues(source, reason, taskState.String()).Inc()
message := "none"
if taskStatus.Message != nil {
message = *taskStatus.Message
}
log.Infof(
"task status update %q from %q for task %q on slave %q executor %q for reason %q with message %q",
taskState.String(),
source,
taskStatus.TaskId.GetValue(),
taskStatus.SlaveId.GetValue(),
taskStatus.ExecutorId.GetValue(),
reason,
message,
)
switch taskState {
case mesos.TaskState_TASK_RUNNING, mesos.TaskState_TASK_FINISHED, mesos.TaskState_TASK_STARTING, mesos.TaskState_TASK_STAGING:
if _, state := k.taskRegistry.UpdateStatus(taskStatus); state == podtask.StateUnknown {
if taskState != mesos.TaskState_TASK_FINISHED {
//TODO(jdef) what if I receive this after a TASK_LOST or TASK_KILLED?
//I don't want to reincarnate then.. TASK_LOST is a special case because
//the master is stateless and there are scenarios where I may get TASK_LOST
//followed by TASK_RUNNING.
//TODO(jdef) consider running this asynchronously since there are API server
//calls that may be made
k.reconcileNonTerminalTask(driver, taskStatus)
} // else, we don't really care about FINISHED tasks that aren't registered
return
}
if hostName := k.slaveHostNames.HostName(taskStatus.GetSlaveId().GetValue()); hostName == "" {
// a registered task has an update reported by a slave that we don't recognize.
// this should never happen! So we don't reconcile it.
log.Errorf("Ignore status %+v because the slave does not exist", taskStatus)
return
}
case mesos.TaskState_TASK_FAILED, mesos.TaskState_TASK_ERROR:
if task, _ := k.taskRegistry.UpdateStatus(taskStatus); task != nil {
if task.Has(podtask.Launched) && !task.Has(podtask.Bound) {
go k.podReconciler.Reconcile(task)
return
}
} else {
// unknown task failed, not much we can do about it
return
}
// last-ditch effort to reconcile our records
fallthrough
case mesos.TaskState_TASK_LOST, mesos.TaskState_TASK_KILLED:
k.reconcileTerminalTask(driver, taskStatus)
default:
log.Errorf(
"unknown task status %q from %q for task %q on slave %q executor %q for reason %q with message %q",
taskState.String(),
source,
taskStatus.TaskId.GetValue(),
taskStatus.SlaveId.GetValue(),
taskStatus.ExecutorId.GetValue(),
reason,
message,
)
}
}
func (k *MesosScheduler) reconcileTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) {
task, state := k.taskRegistry.UpdateStatus(taskStatus)
if (state == podtask.StateRunning || state == podtask.StatePending) &&
((taskStatus.GetSource() == mesos.TaskStatus_SOURCE_MASTER && taskStatus.GetReason() == mesos.TaskStatus_REASON_RECONCILIATION) ||
(taskStatus.GetSource() == mesos.TaskStatus_SOURCE_SLAVE && taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_TERMINATED) ||
(taskStatus.GetSource() == mesos.TaskStatus_SOURCE_SLAVE && taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_UNREGISTERED) ||
(taskStatus.GetSource() == mesos.TaskStatus_SOURCE_EXECUTOR && taskStatus.GetMessage() == messages.ContainersDisappeared)) {
//--
// pod-task has metadata that refers to:
// (1) a task that Mesos no longer knows about, or else
// (2) a pod that the Kubelet will never report as "failed"
// (3) a pod that the kubeletExecutor reported as lost (likely due to docker daemon crash/restart)
// For now, destroy the pod and hope that there's a replication controller backing it up.
// TODO(jdef) for case #2 don't delete the pod, just update it's status to Failed
pod := &task.Pod
log.Warningf("deleting rogue pod %v/%v for lost task %v", pod.Namespace, pod.Name, task.ID)
if err := k.client.Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err != nil && !errors.IsNotFound(err) {
log.Errorf("failed to delete pod %v/%v for terminal task %v: %v", pod.Namespace, pod.Name, task.ID, err)
}
} else if taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_TERMINATED || taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_UNREGISTERED {
// attempt to prevent dangling pods in the pod and task registries
log.V(1).Infof("request explicit reconciliation to clean up for task %v after executor reported (terminated/unregistered)", taskStatus.TaskId.GetValue())
k.tasksReconciler.RequestExplicit()
} else if taskStatus.GetState() == mesos.TaskState_TASK_LOST && state == podtask.StateRunning && taskStatus.ExecutorId != nil && taskStatus.SlaveId != nil {
//TODO(jdef) this may not be meaningful once we have proper checkpointing and master detection
//If we're reconciling and receive this then the executor may be
//running a task that we need it to kill. It's possible that the framework
//is unrecognized by the master at this point, so KillTask is not guaranteed
//to do anything. The underlying driver transport may be able to send a
//FrameworkMessage directly to the slave to terminate the task.
log.V(2).Info("forwarding TASK_LOST message to executor %v on slave %v", taskStatus.ExecutorId, taskStatus.SlaveId)
data := fmt.Sprintf("%s:%s", messages.TaskLost, task.ID) //TODO(jdef) use a real message type
if _, err := driver.SendFrameworkMessage(taskStatus.ExecutorId, taskStatus.SlaveId, data); err != nil {
log.Error(err.Error())
}
}
}
// reconcile an unknown (from the perspective of our registry) non-terminal task
func (k *MesosScheduler) reconcileNonTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) {
// attempt to recover task from pod info:
// - task data may contain an api.PodStatusResult; if status.reason == REASON_RECONCILIATION then status.data == nil
// - the Name can be parsed by container.ParseFullName() to yield a pod Name and Namespace
// - pull the pod metadata down from the api server
// - perform task recovery based on pod metadata
taskId := taskStatus.TaskId.GetValue()
if taskStatus.GetReason() == mesos.TaskStatus_REASON_RECONCILIATION && taskStatus.GetSource() == mesos.TaskStatus_SOURCE_MASTER {
// there will be no data in the task status that we can use to determine the associated pod
switch taskStatus.GetState() {
case mesos.TaskState_TASK_STAGING:
// there is still hope for this task, don't kill it just yet
//TODO(jdef) there should probably be a limit for how long we tolerate tasks stuck in this state
return
default:
// for TASK_{STARTING,RUNNING} we should have already attempted to recoverTasks() for.
// if the scheduler failed over before the executor fired TASK_STARTING, then we should *not*
// be processing this reconciliation update before we process the one from the executor.
// point: we don't know what this task is (perhaps there was unrecoverable metadata in the pod),
// so it gets killed.
log.Errorf("killing non-terminal, unrecoverable task %v", taskId)
}
} else if podStatus, err := podtask.ParsePodStatusResult(taskStatus); err != nil {
// possible rogue pod exists at this point because we can't identify it; should kill the task
log.Errorf("possible rogue pod; illegal task status data for task %v, expected an api.PodStatusResult: %v", taskId, err)
} else if name, namespace, err := container.ParsePodFullName(podStatus.Name); err != nil {
// possible rogue pod exists at this point because we can't identify it; should kill the task
log.Errorf("possible rogue pod; illegal api.PodStatusResult, unable to parse full pod name from: '%v' for task %v: %v",
podStatus.Name, taskId, err)
} else if pod, err := k.client.Pods(namespace).Get(name); err == nil {
if t, ok, err := podtask.RecoverFrom(*pod); ok {
log.Infof("recovered task %v from metadata in pod %v/%v", taskId, namespace, name)
_, err := k.taskRegistry.Register(t, nil)
if err != nil {
// someone beat us to it?!
log.Warningf("failed to register recovered task: %v", err)
return
} else {
k.taskRegistry.UpdateStatus(taskStatus)
}
return
} else if err != nil {
//should kill the pod and the task
log.Errorf("killing pod, failed to recover task from pod %v/%v: %v", namespace, name, err)
if err := k.client.Pods(namespace).Delete(name, nil); err != nil {
log.Errorf("failed to delete pod %v/%v: %v", namespace, name, err)
}
} else {
//this is pretty unexpected: we received a TASK_{STARTING,RUNNING} message, but the apiserver's pod
//metadata is not appropriate for task reconstruction -- which should almost certainly never
//be the case unless someone swapped out the pod on us (and kept the same namespace/name) while
//we were failed over.
//kill this task, allow the newly launched scheduler to schedule the new pod
log.Warningf("unexpected pod metadata for task %v in apiserver, assuming new unscheduled pod spec: %+v", taskId, pod)
}
} else if errors.IsNotFound(err) {
// pod lookup failed, should delete the task since the pod is no longer valid; may be redundant, that's ok
log.Infof("killing task %v since pod %v/%v no longer exists", taskId, namespace, name)
} else if errors.IsServerTimeout(err) {
log.V(2).Infof("failed to reconcile task due to API server timeout: %v", err)
return
} else {
log.Errorf("unexpected API server error, aborting reconcile for task %v: %v", taskId, err)
return
}
if _, err := driver.KillTask(taskStatus.TaskId); err != nil {
log.Errorf("failed to kill task %v: %v", taskId, err)
}
}
// FrameworkMessage is called when the scheduler receives a message from the executor.
func (k *MesosScheduler) FrameworkMessage(driver bindings.SchedulerDriver,
executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, message string) {
log.Infof("Received messages from executor %v of slave %v, %v\n", executorId, slaveId, message)
}
// SlaveLost is called when some slave is lost.
func (k *MesosScheduler) SlaveLost(driver bindings.SchedulerDriver, slaveId *mesos.SlaveID) {
log.Infof("Slave %v is lost\n", slaveId)
sid := slaveId.GetValue()
k.offers.InvalidateForSlave(sid)
// TODO(jdef): delete slave from our internal list? probably not since we may need to reconcile
// tasks. it would be nice to somehow flag the slave as lost so that, perhaps, we can periodically
// flush lost slaves older than X, and for which no tasks or pods reference.
// unfinished tasks/pods will be dropped. use a replication controller if you want pods to
// be restarted when slaves die.
}
// ExecutorLost is called when some executor is lost.
func (k *MesosScheduler) ExecutorLost(driver bindings.SchedulerDriver, executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, status int) {
log.Infof("Executor %v of slave %v is lost, status: %v\n", executorId, slaveId, status)
// TODO(yifan): Restart any unfinished tasks of the executor.
}
// Error is called when there is an unrecoverable error in the scheduler or scheduler driver.
// The driver should have been aborted before this is invoked.
func (k *MesosScheduler) Error(driver bindings.SchedulerDriver, message string) {
log.Fatalf("fatal scheduler error: %v\n", message)
}
// filter func used for explicit task reconciliation, selects only non-terminal tasks which
// have been communicated to mesos (read: launched).
func explicitTaskFilter(t *podtask.T) bool {
switch t.State {
case podtask.StateRunning:
return true
case podtask.StatePending:
return t.Has(podtask.Launched)
default:
return false
}
}
// invoke the given ReconcilerAction funcs in sequence, aborting the sequence if reconciliation
// is cancelled. if any other errors occur the composite reconciler will attempt to complete the
// sequence, reporting only the last generated error.
func (k *MesosScheduler) makeCompositeReconciler(actions ...operations.ReconcilerAction) operations.ReconcilerAction {
if x := len(actions); x == 0 {
// programming error
panic("no actions specified for composite reconciler")
} else if x == 1 {
return actions[0]
}
chained := func(d bindings.SchedulerDriver, c <-chan struct{}, a, b operations.ReconcilerAction) <-chan error {
ech := a(d, c)
ch := make(chan error, 1)
go func() {
select {
case <-k.terminate:
case <-c:
case e := <-ech:
if e != nil {
ch <- e
return
}
ech = b(d, c)
select {
case <-k.terminate:
case <-c:
case e := <-ech:
if e != nil {
ch <- e
return
}
close(ch)
return
}
}
ch <- fmt.Errorf("aborting composite reconciler action")
}()
return ch
}
result := func(d bindings.SchedulerDriver, c <-chan struct{}) <-chan error {
return chained(d, c, actions[0], actions[1])
}
for i := 2; i < len(actions); i++ {
i := i
next := func(d bindings.SchedulerDriver, c <-chan struct{}) <-chan error {
return chained(d, c, operations.ReconcilerAction(result), actions[i])
}
result = next
}
return operations.ReconcilerAction(result)
}
// reconciler action factory, performs explicit task reconciliation for non-terminal
// tasks listed in the scheduler's internal taskRegistry.
func (k *MesosScheduler) makeTaskRegistryReconciler() operations.ReconcilerAction {
return operations.ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error {
taskToSlave := make(map[string]string)
for _, t := range k.taskRegistry.List(explicitTaskFilter) {
if t.Spec.SlaveID != "" {
taskToSlave[t.ID] = t.Spec.SlaveID
}
}
return proc.ErrorChan(k.explicitlyReconcileTasks(drv, taskToSlave, cancel))
})
}
// reconciler action factory, performs explicit task reconciliation for non-terminal
// tasks identified by annotations in the Kubernetes pod registry.
func (k *MesosScheduler) makePodRegistryReconciler() operations.ReconcilerAction {
return operations.ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error {
podList, err := k.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything())
if err != nil {
return proc.ErrorChanf("failed to reconcile pod registry: %v", err)
}
taskToSlave := make(map[string]string)
for _, pod := range podList.Items {
if len(pod.Annotations) == 0 {
continue
}
taskId, found := pod.Annotations[meta.TaskIdKey]
if !found {
continue
}
slaveId, found := pod.Annotations[meta.SlaveIdKey]
if !found {
continue
}
taskToSlave[taskId] = slaveId
}
return proc.ErrorChan(k.explicitlyReconcileTasks(drv, taskToSlave, cancel))
})
}
// execute an explicit task reconciliation, as per http://mesos.apache.org/documentation/latest/reconciliation/
func (k *MesosScheduler) explicitlyReconcileTasks(driver bindings.SchedulerDriver, taskToSlave map[string]string, cancel <-chan struct{}) error {
log.Info("explicit reconcile tasks")
// tell mesos to send us the latest status updates for all the non-terminal tasks that we know about
statusList := []*mesos.TaskStatus{}
remaining := sets.StringKeySet(taskToSlave)
for taskId, slaveId := range taskToSlave {
if slaveId == "" {
delete(taskToSlave, taskId)
continue
}
statusList = append(statusList, &mesos.TaskStatus{
TaskId: mutil.NewTaskID(taskId),
SlaveId: mutil.NewSlaveID(slaveId),
State: mesos.TaskState_TASK_RUNNING.Enum(), // req'd field, doesn't have to reflect reality
})
}
select {
case <-cancel:
return merrors.ReconciliationCancelledErr
default:
if _, err := driver.ReconcileTasks(statusList); err != nil {
return err
}
}
start := time.Now()
first := true
for backoff := 1 * time.Second; first || remaining.Len() > 0; backoff = backoff * 2 {
first = false
// nothing to do here other than wait for status updates..
if backoff > k.schedulerConfig.ExplicitReconciliationMaxBackoff.Duration {
backoff = k.schedulerConfig.ExplicitReconciliationMaxBackoff.Duration
}
select {
case <-cancel:
return merrors.ReconciliationCancelledErr
case <-time.After(backoff):
for taskId := range remaining {
if task, _ := k.taskRegistry.Get(taskId); task != nil && explicitTaskFilter(task) && task.UpdatedTime.Before(start) {
// keep this task in remaining list
continue
}
remaining.Delete(taskId)
}
}
}
return nil
}
func (ks *MesosScheduler) recoverTasks() error {
podList, err := ks.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything())
if err != nil {
log.V(1).Infof("failed to recover pod registry, madness may ensue: %v", err)
return err
}
recoverSlave := func(t *podtask.T) {
slaveId := t.Spec.SlaveID
ks.slaveHostNames.Register(slaveId, t.Offer.Host())
}
for _, pod := range podList.Items {
if _, isMirrorPod := pod.Annotations[kubetypes.ConfigMirrorAnnotationKey]; isMirrorPod {
// mirrored pods are never reconciled because the scheduler isn't responsible for
// scheduling them; they're started by the executor/kubelet upon instantiation and
// reflected in the apiserver afterward. the scheduler has no knowledge of them.
continue
}
if t, ok, err := podtask.RecoverFrom(pod); err != nil {
log.Errorf("failed to recover task from pod, will attempt to delete '%v/%v': %v", pod.Namespace, pod.Name, err)
err := ks.client.Pods(pod.Namespace).Delete(pod.Name, nil)
//TODO(jdef) check for temporary or not-found errors
if err != nil {
log.Errorf("failed to delete pod '%v/%v': %v", pod.Namespace, pod.Name, err)
}
} else if ok {
ks.taskRegistry.Register(t, nil)
recoverSlave(t)
log.Infof("recovered task %v from pod %v/%v", t.ID, pod.Namespace, pod.Name)
}
}
return nil
func (c *Scheduler) LaunchTask(t *podtask.T) error {
return c.framework.LaunchTask(t)
}

View File

@@ -59,7 +59,6 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
@@ -720,10 +719,9 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
}
fcfs := podschedulers.NewFCFSPodScheduler(as, lookupNode)
mesosScheduler := scheduler.New(scheduler.Config{
framework := scheduler.New(scheduler.Config{
SchedulerConfig: *sc,
Executor: executor,
PodScheduler: fcfs,
Client: client,
FailoverTimeout: s.failoverTimeout,
ReconcileInterval: s.reconcileInterval,
@@ -744,7 +742,7 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
log.Fatalf("Misconfigured mesos framework: %v", err)
}
schedulerProcess := ha.New(mesosScheduler)
schedulerProcess := ha.New(framework)
dconfig := &bindings.DriverConfig{
Scheduler: schedulerProcess,
Framework: info,
@@ -761,18 +759,17 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
}
// create scheduler loop
fw := &scheduler.MesosFramework{MesosScheduler: mesosScheduler}
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"})
lw := cache.NewListWatchFromClient(client, "pods", api.NamespaceAll, fields.Everything())
loop, pr := operations.NewScheduler(sc, fw, client, recorder, schedulerProcess.Terminal(), s.mux, lw)
scheduler := scheduler.NewScheduler(sc, framework, fcfs, client, recorder, schedulerProcess.Terminal(), s.mux, lw)
runtime.On(mesosScheduler.Registration(), func() { loop.Run(schedulerProcess.Terminal()) })
runtime.On(mesosScheduler.Registration(), s.newServiceWriter(schedulerProcess.Terminal()))
runtime.On(framework.Registration(), func() { scheduler.Run(schedulerProcess.Terminal()) })
runtime.On(framework.Registration(), s.newServiceWriter(schedulerProcess.Terminal()))
driverFactory := ha.DriverFactory(func() (drv bindings.SchedulerDriver, err error) {
log.V(1).Infoln("performing deferred initialization")
if err = mesosScheduler.Init(schedulerProcess.Master(), pr, s.mux); err != nil {
if err = framework.Init(scheduler, schedulerProcess.Master(), s.mux); err != nil {
return nil, fmt.Errorf("failed to initialize pod scheduler: %v", err)
}
log.V(1).Infoln("deferred init complete")

View File

@@ -20,21 +20,19 @@ import (
"sync"
"k8s.io/kubernetes/contrib/mesos/pkg/offers"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
)
// Framework abstracts everything other components of the scheduler need from
// the actual MesosScheduler implementation.
type Framework interface {
sync.Locker // synchronize scheduler plugin operations
podschedulers.SlaveIndex
PodScheduler() podschedulers.PodScheduler
Offers() offers.Registry
// Scheduler abstracts everything other components of the scheduler need
// to access from eachother
type Scheduler interface {
Tasks() podtask.Registry
sync.Locker // synchronize changes to tasks, i.e. lock, get task, change task, store task, unlock
// driver calls
KillTask(taskId string) error
LaunchTask(*podtask.T) error
Offers() offers.Registry
Reconcile(t *podtask.T)
KillTask(id string) error
LaunchTask(t *podtask.T) error
Run(done <-chan struct{})
}

View File

@@ -18,54 +18,25 @@ package types
import (
"sync"
"testing"
"github.com/stretchr/testify/mock"
"k8s.io/kubernetes/contrib/mesos/pkg/offers"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/pkg/api"
"time"
)
// @deprecated this is a placeholder for me to test the mock package
func TestNoSlavesYet(t *testing.T) {
obj := &MockScheduler{}
obj.On("SlaveHostNameFor", "foo").Return(nil)
obj.SlaveHostNameFor("foo")
obj.AssertExpectations(t)
}
// MockScheduler implements SchedulerApi
type MockScheduler struct {
sync.RWMutex
mock.Mock
}
func (m *MockScheduler) SlaveHostNameFor(id string) (hostName string) {
args := m.Called(id)
x := args.Get(0)
if x != nil {
hostName = x.(string)
}
return
}
func (m *MockScheduler) PodScheduler() (f podschedulers.PodScheduler) {
args := m.Called()
x := args.Get(0)
if x != nil {
f = x.(podschedulers.PodScheduler)
}
return
}
func (m *MockScheduler) CreatePodTask(ctx api.Context, pod *api.Pod) (task *podtask.T, err error) {
args := m.Called(ctx, pod)
x := args.Get(0)
if x != nil {
task = x.(*podtask.T)
}
err = args.Error(1)
func (m *MockScheduler) Run(done <-chan struct{}) {
_ = m.Called()
runtime.Until(func() {
time.Sleep(time.Second)
}, time.Second, done)
return
}
@@ -96,3 +67,8 @@ func (m *MockScheduler) LaunchTask(task *podtask.T) error {
args := m.Called(task)
return args.Error(0)
}
func (m *MockScheduler) Reconcile(task *podtask.T) {
_ = m.Called()
return
}