Make slave assignment before binding persistent

- move assigned slave to T.Spec.AssignedSlave
- only create the BindingHost annoation in prepareTaskForLaunch
- recover the assigned slave from annotation and write it back to the T.Spec field

Before this patch the annotation were used to store the assign slave. But due
to the cloning of tasks in the registry, this value was never persisted in the
registry.

This patch adds it to the Spec of a task and only creates the annotation
last-minute before launching.

Without this patch pods which fail before binding will stay in the registry,
but they are never rescheduled again. The reason: the BindingHost annotation does
not exist in the registry and not on the apiserver (compare reconcilePod function).
This commit is contained in:
Dr. Stefan Schimanski 2015-08-11 17:53:45 +02:00
parent 5a9b36b703
commit f1a560718c
4 changed files with 87 additions and 72 deletions

View File

@ -198,8 +198,12 @@ func (b *binder) prepareTaskForLaunch(ctx api.Context, machine string, task *pod
oemCt := pod.Spec.Containers oemCt := pod.Spec.Containers
pod.Spec.Containers = append([]api.Container{}, oemCt...) // (shallow) clone before mod pod.Spec.Containers = append([]api.Container{}, oemCt...) // (shallow) clone before mod
annotateForExecutorOnSlave(&pod, machine) if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
task.SaveRecoveryInfo(pod.Annotations) task.SaveRecoveryInfo(pod.Annotations)
pod.Annotations[annotation.BindingHostKey] = task.Spec.AssignedSlave
for _, entry := range task.Spec.PortMap { for _, entry := range task.Spec.PortMap {
oemPorts := pod.Spec.Containers[entry.ContainerIdx].Ports oemPorts := pod.Spec.Containers[entry.ContainerIdx].Ports
@ -233,29 +237,13 @@ type kubeScheduler struct {
defaultContainerMemLimit mresource.MegaBytes defaultContainerMemLimit mresource.MegaBytes
} }
// annotatedForExecutor checks whether a pod is assigned to a Mesos slave, and // recoverAssignedSlave recovers the assigned Mesos slave from a pod by searching
// possibly already launched. It can, but doesn't have to be scheduled already // the BindingHostKey. For tasks in the registry of the scheduler, the same
// in the sense of kubernetes, i.e. the NodeName field might still be empty. // value is stored in T.Spec.AssignedSlave. Before launching, the BindingHostKey
func annotatedForExecutor(pod *api.Pod) bool { // annotation is added and the executor will eventually persist that to the
_, ok := pod.ObjectMeta.Annotations[annotation.BindingHostKey] // apiserver on binding.
return ok func recoverAssignedSlave(pod *api.Pod) string {
} return pod.Annotations[annotation.BindingHostKey]
// annotateForExecutorOnSlave sets the BindingHostKey annotation which
// marks the pod to be processed by the scheduler and launched as a Mesos
// task. The executor on the slave will to the final binding to finish the
// scheduling in the kubernetes sense.
func annotateForExecutorOnSlave(pod *api.Pod, slave string) {
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
} else {
oemAnn := pod.Annotations
pod.Annotations = make(map[string]string)
for k, v := range oemAnn {
pod.Annotations[k] = v
}
}
pod.Annotations[annotation.BindingHostKey] = slave
} }
// Schedule implements the Scheduler interface of Kubernetes. // Schedule implements the Scheduler interface of Kubernetes.
@ -462,7 +450,7 @@ func (q *queuer) Run(done <-chan struct{}) {
} }
pod := p.(*Pod) pod := p.(*Pod)
if annotatedForExecutor(pod.Pod) { if recoverAssignedSlave(pod.Pod) != "" {
log.V(3).Infof("dequeuing pod for scheduling: %v", pod.Pod.Name) log.V(3).Infof("dequeuing pod for scheduling: %v", pod.Pod.Name)
q.dequeue(pod.GetUID()) q.dequeue(pod.GetUID())
} else { } else {
@ -511,7 +499,7 @@ func (q *queuer) yield() *api.Pod {
log.Warningf("yield unable to understand pod object %+v, will skip: %v", pod, err) log.Warningf("yield unable to understand pod object %+v, will skip: %v", pod, err)
} else if !q.podUpdates.Poll(podName, queue.POP_EVENT) { } else if !q.podUpdates.Poll(podName, queue.POP_EVENT) {
log.V(1).Infof("yield popped a transitioning pod, skipping: %+v", pod) log.V(1).Infof("yield popped a transitioning pod, skipping: %+v", pod)
} else if annotatedForExecutor(pod) { } else if recoverAssignedSlave(pod) != "" {
// should never happen if enqueuePods is filtering properly // should never happen if enqueuePods is filtering properly
log.Warningf("yield popped an already-scheduled pod, skipping: %+v", pod) log.Warningf("yield popped an already-scheduled pod, skipping: %+v", pod)
} else { } else {
@ -801,25 +789,27 @@ func (s *schedulingPlugin) scheduleOne() {
// host="..." | host="..." ; perhaps no updates to process? // host="..." | host="..." ; perhaps no updates to process?
// //
// TODO(jdef) this needs an integration test // TODO(jdef) this needs an integration test
func (s *schedulingPlugin) reconcilePod(oldPod api.Pod) { func (s *schedulingPlugin) reconcileTask(t *podtask.T) {
log.V(1).Infof("reconcile pod %v", oldPod.Name) log.V(1).Infof("reconcile pod %v, assigned to slave %q", t.Pod.Name, t.Spec.AssignedSlave)
ctx := api.WithNamespace(api.NewDefaultContext(), oldPod.Namespace) ctx := api.WithNamespace(api.NewDefaultContext(), t.Pod.Namespace)
pod, err := s.client.Pods(api.NamespaceValue(ctx)).Get(oldPod.Name) pod, err := s.client.Pods(api.NamespaceValue(ctx)).Get(t.Pod.Name)
if err != nil { if err != nil {
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
// attempt to delete // attempt to delete
if err = s.deleter.deleteOne(&Pod{Pod: &oldPod}); err != nil && err != noSuchPodErr && err != noSuchTaskErr { if err = s.deleter.deleteOne(&Pod{Pod: &t.Pod}); err != nil && err != noSuchPodErr && err != noSuchTaskErr {
log.Errorf("failed to delete pod: %v: %v", oldPod.Name, err) log.Errorf("failed to delete pod: %v: %v", t.Pod.Name, err)
} }
} else { } else {
//TODO(jdef) other errors should probably trigger a retry (w/ backoff). //TODO(jdef) other errors should probably trigger a retry (w/ backoff).
//For now, drop the pod on the floor //For now, drop the pod on the floor
log.Warning("aborting reconciliation for pod %v: %v", oldPod.Name, err) log.Warning("aborting reconciliation for pod %v: %v", t.Pod.Name, err)
} }
return return
} }
if oldPod.Spec.NodeName != pod.Spec.NodeName {
if annotatedForExecutor(pod) { log.Infof("pod %v scheduled on %q according to apiserver", pod.Name, pod.Spec.NodeName)
if t.Spec.AssignedSlave != pod.Spec.NodeName {
if pod.Spec.NodeName == "" {
// pod is unscheduled. // pod is unscheduled.
// it's possible that we dropped the pod in the scheduler error handler // it's possible that we dropped the pod in the scheduler error handler
// because of task misalignment with the pod (task.Has(podtask.Launched) == true) // because of task misalignment with the pod (task.Has(podtask.Launched) == true)

View File

@ -199,7 +199,7 @@ func NewTestPod() (*api.Pod, int) {
TypeMeta: api.TypeMeta{APIVersion: testapi.Version()}, TypeMeta: api.TypeMeta{APIVersion: testapi.Version()},
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: name, Name: name,
Namespace: "default", Namespace: api.NamespaceDefault,
SelfLink: fmt.Sprintf("http://1.2.3.4/api/v1beta1/pods/%s", name), SelfLink: fmt.Sprintf("http://1.2.3.4/api/v1beta1/pods/%s", name),
}, },
Spec: api.PodSpec{ Spec: api.PodSpec{
@ -418,7 +418,7 @@ func TestPlugin_LifeCycle(t *testing.T) {
c.Recorder = eventObserver c.Recorder = eventObserver
// create plugin // create plugin
p := NewPlugin(c) p := NewPlugin(c).(*schedulingPlugin)
assert.NotNil(p) assert.NotNil(p)
// run plugin // run plugin
@ -514,11 +514,8 @@ func TestPlugin_LifeCycle(t *testing.T) {
t.Fatalf("timed out waiting for launchTasks call") t.Fatalf("timed out waiting for launchTasks call")
} }
// define generic pod startup // Launch a pod and wait until the scheduler driver is called
startPodWithOffers := func(pod *api.Pod, offers []*mesos.Offer) (*api.Pod, *LaunchedTask, *mesos.Offer) { schedulePodWithOffers := func(pod *api.Pod, offers []*mesos.Offer) (*api.Pod, *LaunchedTask, *mesos.Offer) {
// notify watchers of new pod
podListWatch.Add(pod, true)
// wait for failedScheduling event because there is no offer // wait for failedScheduling event because there is no offer
assert.EventWithReason(eventObserver, "failedScheduling", "failedScheduling event not received") assert.EventWithReason(eventObserver, "failedScheduling", "failedScheduling event not received")
@ -531,8 +528,6 @@ func TestPlugin_LifeCycle(t *testing.T) {
// wait for driver.launchTasks call // wait for driver.launchTasks call
select { select {
case launchedTask := <-launchedTasks: case launchedTask := <-launchedTasks:
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING))
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING))
for _, offer := range offers { for _, offer := range offers {
if offer.Id.GetValue() == launchedTask.offerId.GetValue() { if offer.Id.GetValue() == launchedTask.offerId.GetValue() {
return pod, &launchedTask, offer return pod, &launchedTask, offer
@ -540,12 +535,30 @@ func TestPlugin_LifeCycle(t *testing.T) {
} }
t.Fatalf("unknown offer used to start a pod") t.Fatalf("unknown offer used to start a pod")
return nil, nil, nil return nil, nil, nil
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for launchTasks") t.Fatal("timed out waiting for launchTasks")
return nil, nil, nil return nil, nil, nil
} }
} }
// Launch a pod and wait until the scheduler driver is called
launchPodWithOffers := func(pod *api.Pod, offers []*mesos.Offer) (*api.Pod, *LaunchedTask, *mesos.Offer) {
podListWatch.Add(pod, true)
return schedulePodWithOffers(pod, offers)
}
// Launch a pod, wait until the scheduler driver is called and report back that it is running
startPodWithOffers := func(pod *api.Pod, offers []*mesos.Offer) (*api.Pod, *LaunchedTask, *mesos.Offer) {
// notify about pod, offer resources and wait for scheduling
pod, launchedTask, offer := launchPodWithOffers(pod, offers)
if pod != nil {
// report back status
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING))
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING))
return pod, launchedTask, offer
}
return nil, nil, nil
}
startTestPod := func() (*api.Pod, *LaunchedTask, *mesos.Offer) { startTestPod := func() (*api.Pod, *LaunchedTask, *mesos.Offer) {
pod, i := NewTestPod() pod, i := NewTestPod()
@ -610,31 +623,42 @@ func TestPlugin_LifeCycle(t *testing.T) {
// wait until pod is looked up at the apiserver // wait until pod is looked up at the apiserver
assertext.EventuallyTrue(t, time.Second, func() bool { assertext.EventuallyTrue(t, time.Second, func() bool {
return testApiServer.Stats(pod.Name) == beforePodLookups+1 return testApiServer.Stats(pod.Name) == beforePodLookups+1
}, "expect that reconcilePod will access apiserver for pod %v", pod.Name) }, "expect that reconcileTask will access apiserver for pod %v", pod.Name)
}
launchTestPod := func() (*api.Pod, *LaunchedTask, *mesos.Offer) {
pod, i := NewTestPod()
offers := []*mesos.Offer{NewTestOffer(fmt.Sprintf("offer%d", i))}
return launchPodWithOffers(pod, offers)
} }
// 1. with pod deleted from the apiserver // 1. with pod deleted from the apiserver
pod, launchedTask, _ = startTestPod() // expected: pod is removed from internal task registry
pod, launchedTask, _ = launchTestPod()
podListWatch.Delete(pod, false) // not notifying the watchers podListWatch.Delete(pod, false) // not notifying the watchers
failPodFromExecutor(launchedTask.taskInfo) failPodFromExecutor(launchedTask.taskInfo)
podKey, _ := podtask.MakePodKey(api.NewDefaultContext(), pod.Name)
assertext.EventuallyTrue(t, time.Second, func() bool {
t, _ := p.api.tasks().ForPod(podKey)
return t == nil
})
// 2. with pod still on the apiserver, not bound // 2. with pod still on the apiserver, not bound
pod, launchedTask, _ = startTestPod() // expected: pod is rescheduled
pod, launchedTask, _ = launchTestPod()
failPodFromExecutor(launchedTask.taskInfo) failPodFromExecutor(launchedTask.taskInfo)
// 3. with pod still on the apiserver, bound i.e. host!="" retryOffers := []*mesos.Offer{NewTestOffer("retry-offer")}
pod, launchedTask, usedOffer = startTestPod() schedulePodWithOffers(pod, retryOffers)
pod.Annotations = map[string]string{
meta.BindingHostKey: *usedOffer.Hostname, // 3. with pod still on the apiserver, bound, notified via ListWatch
} // expected: nothing, pod updates not supported, compare ReconcileTask function
podListWatch.Modify(pod, false) // not notifying the watchers
failPodFromExecutor(launchedTask.taskInfo)
// 4. with pod still on the apiserver, bound i.e. host!="", notified via ListWatch
pod, launchedTask, usedOffer = startTestPod() pod, launchedTask, usedOffer = startTestPod()
pod.Annotations = map[string]string{ pod.Annotations = map[string]string{
meta.BindingHostKey: *usedOffer.Hostname, meta.BindingHostKey: *usedOffer.Hostname,
} }
pod.Spec.NodeName = *usedOffer.Hostname
podListWatch.Modify(pod, true) // notifying the watchers podListWatch.Modify(pod, true) // notifying the watchers
time.Sleep(time.Second / 2) time.Sleep(time.Second / 2)
failPodFromExecutor(launchedTask.taskInfo) failPodFromExecutor(launchedTask.taskInfo)

View File

@ -71,12 +71,13 @@ type T struct {
} }
type Spec struct { type Spec struct {
SlaveID string SlaveID string
CPU mresource.CPUShares AssignedSlave string
Memory mresource.MegaBytes CPU mresource.CPUShares
PortMap []HostPortMapping Memory mresource.MegaBytes
Ports []uint64 PortMap []HostPortMapping
Data []byte Ports []uint64
Data []byte
} }
// mostly-clone this pod task. the clone will actually share the some fields: // mostly-clone this pod task. the clone will actually share the some fields:
@ -161,9 +162,10 @@ func (t *T) FillFromDetails(details *mesos.Offer) error {
log.V(3).Infof("Recording offer(s) %s/%s against pod %v: cpu: %.2f, mem: %.2f MB", details.Id, t.Pod.Namespace, t.Pod.Name, cpu, mem) log.V(3).Infof("Recording offer(s) %s/%s against pod %v: cpu: %.2f, mem: %.2f MB", details.Id, t.Pod.Namespace, t.Pod.Name, cpu, mem)
t.Spec = Spec{ t.Spec = Spec{
SlaveID: details.GetSlaveId().GetValue(), SlaveID: details.GetSlaveId().GetValue(),
CPU: cpu, AssignedSlave: *details.Hostname,
Memory: mem, CPU: cpu,
Memory: mem,
} }
// fill in port mapping // fill in port mapping
@ -346,8 +348,7 @@ func RecoverFrom(pod api.Pod) (*T, bool, error) {
bindTime: now, bindTime: now,
} }
var ( var (
offerId string offerId string
hostname string
) )
for _, k := range []string{ for _, k := range []string{
annotation.BindingHostKey, annotation.BindingHostKey,
@ -362,7 +363,7 @@ func RecoverFrom(pod api.Pod) (*T, bool, error) {
} }
switch k { switch k {
case annotation.BindingHostKey: case annotation.BindingHostKey:
hostname = v t.Spec.AssignedSlave = v
case annotation.SlaveIdKey: case annotation.SlaveIdKey:
t.Spec.SlaveID = v t.Spec.SlaveID = v
case annotation.OfferIdKey: case annotation.OfferIdKey:
@ -375,7 +376,7 @@ func RecoverFrom(pod api.Pod) (*T, bool, error) {
t.executor = &mesos.ExecutorInfo{ExecutorId: mutil.NewExecutorID(v)} t.executor = &mesos.ExecutorInfo{ExecutorId: mutil.NewExecutorID(v)}
} }
} }
t.Offer = offers.Expired(offerId, hostname, 0) t.Offer = offers.Expired(offerId, t.Spec.AssignedSlave, 0)
t.Flags[Launched] = struct{}{} t.Flags[Launched] = struct{}{}
t.Flags[Bound] = struct{}{} t.Flags[Bound] = struct{}{}
return t, true, nil return t, true, nil

View File

@ -102,7 +102,7 @@ func (self *slaveStorage) getSlave(slaveId string) (*Slave, bool) {
type PluginInterface interface { type PluginInterface interface {
// the apiserver may have a different state for the pod than we do // the apiserver may have a different state for the pod than we do
// so reconcile our records, but only for this one pod // so reconcile our records, but only for this one pod
reconcilePod(api.Pod) reconcileTask(*podtask.T)
// execute the Scheduling plugin, should start a go routine and return immediately // execute the Scheduling plugin, should start a go routine and return immediately
Run(<-chan struct{}) Run(<-chan struct{})
@ -432,7 +432,7 @@ func (k *KubernetesScheduler) StatusUpdate(driver bindings.SchedulerDriver, task
case mesos.TaskState_TASK_FAILED: case mesos.TaskState_TASK_FAILED:
if task, _ := k.taskRegistry.UpdateStatus(taskStatus); task != nil { if task, _ := k.taskRegistry.UpdateStatus(taskStatus); task != nil {
if task.Has(podtask.Launched) && !task.Has(podtask.Bound) { if task.Has(podtask.Launched) && !task.Has(podtask.Bound) {
go k.plugin.reconcilePod(task.Pod) go k.plugin.reconcileTask(task)
return return
} }
} else { } else {