diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index ac03cc1336f..5506026dd02 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -198,8 +198,12 @@ func (b *binder) prepareTaskForLaunch(ctx api.Context, machine string, task *pod oemCt := pod.Spec.Containers pod.Spec.Containers = append([]api.Container{}, oemCt...) // (shallow) clone before mod - annotateForExecutorOnSlave(&pod, machine) + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + task.SaveRecoveryInfo(pod.Annotations) + pod.Annotations[annotation.BindingHostKey] = task.Spec.AssignedSlave for _, entry := range task.Spec.PortMap { oemPorts := pod.Spec.Containers[entry.ContainerIdx].Ports @@ -233,29 +237,13 @@ type kubeScheduler struct { defaultContainerMemLimit mresource.MegaBytes } -// annotatedForExecutor checks whether a pod is assigned to a Mesos slave, and -// possibly already launched. It can, but doesn't have to be scheduled already -// in the sense of kubernetes, i.e. the NodeName field might still be empty. -func annotatedForExecutor(pod *api.Pod) bool { - _, ok := pod.ObjectMeta.Annotations[annotation.BindingHostKey] - return ok -} - -// annotateForExecutorOnSlave sets the BindingHostKey annotation which -// marks the pod to be processed by the scheduler and launched as a Mesos -// task. The executor on the slave will to the final binding to finish the -// scheduling in the kubernetes sense. -func annotateForExecutorOnSlave(pod *api.Pod, slave string) { - if pod.Annotations == nil { - pod.Annotations = make(map[string]string) - } else { - oemAnn := pod.Annotations - pod.Annotations = make(map[string]string) - for k, v := range oemAnn { - pod.Annotations[k] = v - } - } - pod.Annotations[annotation.BindingHostKey] = slave +// recoverAssignedSlave recovers the assigned Mesos slave from a pod by searching +// the BindingHostKey. For tasks in the registry of the scheduler, the same +// value is stored in T.Spec.AssignedSlave. Before launching, the BindingHostKey +// annotation is added and the executor will eventually persist that to the +// apiserver on binding. +func recoverAssignedSlave(pod *api.Pod) string { + return pod.Annotations[annotation.BindingHostKey] } // Schedule implements the Scheduler interface of Kubernetes. @@ -462,7 +450,7 @@ func (q *queuer) Run(done <-chan struct{}) { } pod := p.(*Pod) - if annotatedForExecutor(pod.Pod) { + if recoverAssignedSlave(pod.Pod) != "" { log.V(3).Infof("dequeuing pod for scheduling: %v", pod.Pod.Name) q.dequeue(pod.GetUID()) } else { @@ -511,7 +499,7 @@ func (q *queuer) yield() *api.Pod { log.Warningf("yield unable to understand pod object %+v, will skip: %v", pod, err) } else if !q.podUpdates.Poll(podName, queue.POP_EVENT) { log.V(1).Infof("yield popped a transitioning pod, skipping: %+v", pod) - } else if annotatedForExecutor(pod) { + } else if recoverAssignedSlave(pod) != "" { // should never happen if enqueuePods is filtering properly log.Warningf("yield popped an already-scheduled pod, skipping: %+v", pod) } else { @@ -801,25 +789,27 @@ func (s *schedulingPlugin) scheduleOne() { // host="..." | host="..." ; perhaps no updates to process? // // TODO(jdef) this needs an integration test -func (s *schedulingPlugin) reconcilePod(oldPod api.Pod) { - log.V(1).Infof("reconcile pod %v", oldPod.Name) - ctx := api.WithNamespace(api.NewDefaultContext(), oldPod.Namespace) - pod, err := s.client.Pods(api.NamespaceValue(ctx)).Get(oldPod.Name) +func (s *schedulingPlugin) reconcileTask(t *podtask.T) { + log.V(1).Infof("reconcile pod %v, assigned to slave %q", t.Pod.Name, t.Spec.AssignedSlave) + ctx := api.WithNamespace(api.NewDefaultContext(), t.Pod.Namespace) + pod, err := s.client.Pods(api.NamespaceValue(ctx)).Get(t.Pod.Name) if err != nil { if errors.IsNotFound(err) { // attempt to delete - if err = s.deleter.deleteOne(&Pod{Pod: &oldPod}); err != nil && err != noSuchPodErr && err != noSuchTaskErr { - log.Errorf("failed to delete pod: %v: %v", oldPod.Name, err) + if err = s.deleter.deleteOne(&Pod{Pod: &t.Pod}); err != nil && err != noSuchPodErr && err != noSuchTaskErr { + log.Errorf("failed to delete pod: %v: %v", t.Pod.Name, err) } } else { //TODO(jdef) other errors should probably trigger a retry (w/ backoff). //For now, drop the pod on the floor - log.Warning("aborting reconciliation for pod %v: %v", oldPod.Name, err) + log.Warning("aborting reconciliation for pod %v: %v", t.Pod.Name, err) } return } - if oldPod.Spec.NodeName != pod.Spec.NodeName { - if annotatedForExecutor(pod) { + + log.Infof("pod %v scheduled on %q according to apiserver", pod.Name, pod.Spec.NodeName) + if t.Spec.AssignedSlave != pod.Spec.NodeName { + if pod.Spec.NodeName == "" { // pod is unscheduled. // it's possible that we dropped the pod in the scheduler error handler // because of task misalignment with the pod (task.Has(podtask.Launched) == true) diff --git a/contrib/mesos/pkg/scheduler/plugin_test.go b/contrib/mesos/pkg/scheduler/plugin_test.go index 69df27807ef..6f74dcd3c00 100644 --- a/contrib/mesos/pkg/scheduler/plugin_test.go +++ b/contrib/mesos/pkg/scheduler/plugin_test.go @@ -199,7 +199,7 @@ func NewTestPod() (*api.Pod, int) { TypeMeta: api.TypeMeta{APIVersion: testapi.Version()}, ObjectMeta: api.ObjectMeta{ Name: name, - Namespace: "default", + Namespace: api.NamespaceDefault, SelfLink: fmt.Sprintf("http://1.2.3.4/api/v1beta1/pods/%s", name), }, Spec: api.PodSpec{ @@ -418,7 +418,7 @@ func TestPlugin_LifeCycle(t *testing.T) { c.Recorder = eventObserver // create plugin - p := NewPlugin(c) + p := NewPlugin(c).(*schedulingPlugin) assert.NotNil(p) // run plugin @@ -514,11 +514,8 @@ func TestPlugin_LifeCycle(t *testing.T) { t.Fatalf("timed out waiting for launchTasks call") } - // define generic pod startup - startPodWithOffers := func(pod *api.Pod, offers []*mesos.Offer) (*api.Pod, *LaunchedTask, *mesos.Offer) { - // notify watchers of new pod - podListWatch.Add(pod, true) - + // Launch a pod and wait until the scheduler driver is called + schedulePodWithOffers := func(pod *api.Pod, offers []*mesos.Offer) (*api.Pod, *LaunchedTask, *mesos.Offer) { // wait for failedScheduling event because there is no offer assert.EventWithReason(eventObserver, "failedScheduling", "failedScheduling event not received") @@ -531,8 +528,6 @@ func TestPlugin_LifeCycle(t *testing.T) { // wait for driver.launchTasks call select { case launchedTask := <-launchedTasks: - testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING)) - testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING)) for _, offer := range offers { if offer.Id.GetValue() == launchedTask.offerId.GetValue() { return pod, &launchedTask, offer @@ -540,12 +535,30 @@ func TestPlugin_LifeCycle(t *testing.T) { } t.Fatalf("unknown offer used to start a pod") return nil, nil, nil - case <-time.After(5 * time.Second): t.Fatal("timed out waiting for launchTasks") return nil, nil, nil } } + // Launch a pod and wait until the scheduler driver is called + launchPodWithOffers := func(pod *api.Pod, offers []*mesos.Offer) (*api.Pod, *LaunchedTask, *mesos.Offer) { + podListWatch.Add(pod, true) + return schedulePodWithOffers(pod, offers) + } + + // Launch a pod, wait until the scheduler driver is called and report back that it is running + startPodWithOffers := func(pod *api.Pod, offers []*mesos.Offer) (*api.Pod, *LaunchedTask, *mesos.Offer) { + // notify about pod, offer resources and wait for scheduling + pod, launchedTask, offer := launchPodWithOffers(pod, offers) + if pod != nil { + // report back status + testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING)) + testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING)) + return pod, launchedTask, offer + } + + return nil, nil, nil + } startTestPod := func() (*api.Pod, *LaunchedTask, *mesos.Offer) { pod, i := NewTestPod() @@ -610,31 +623,42 @@ func TestPlugin_LifeCycle(t *testing.T) { // wait until pod is looked up at the apiserver assertext.EventuallyTrue(t, time.Second, func() bool { return testApiServer.Stats(pod.Name) == beforePodLookups+1 - }, "expect that reconcilePod will access apiserver for pod %v", pod.Name) + }, "expect that reconcileTask will access apiserver for pod %v", pod.Name) + } + + launchTestPod := func() (*api.Pod, *LaunchedTask, *mesos.Offer) { + pod, i := NewTestPod() + offers := []*mesos.Offer{NewTestOffer(fmt.Sprintf("offer%d", i))} + return launchPodWithOffers(pod, offers) } // 1. with pod deleted from the apiserver - pod, launchedTask, _ = startTestPod() + // expected: pod is removed from internal task registry + pod, launchedTask, _ = launchTestPod() podListWatch.Delete(pod, false) // not notifying the watchers failPodFromExecutor(launchedTask.taskInfo) + podKey, _ := podtask.MakePodKey(api.NewDefaultContext(), pod.Name) + assertext.EventuallyTrue(t, time.Second, func() bool { + t, _ := p.api.tasks().ForPod(podKey) + return t == nil + }) + // 2. with pod still on the apiserver, not bound - pod, launchedTask, _ = startTestPod() + // expected: pod is rescheduled + pod, launchedTask, _ = launchTestPod() failPodFromExecutor(launchedTask.taskInfo) - // 3. with pod still on the apiserver, bound i.e. host!="" - pod, launchedTask, usedOffer = startTestPod() - pod.Annotations = map[string]string{ - meta.BindingHostKey: *usedOffer.Hostname, - } - podListWatch.Modify(pod, false) // not notifying the watchers - failPodFromExecutor(launchedTask.taskInfo) - - // 4. with pod still on the apiserver, bound i.e. host!="", notified via ListWatch + retryOffers := []*mesos.Offer{NewTestOffer("retry-offer")} + schedulePodWithOffers(pod, retryOffers) + + // 3. with pod still on the apiserver, bound, notified via ListWatch + // expected: nothing, pod updates not supported, compare ReconcileTask function pod, launchedTask, usedOffer = startTestPod() pod.Annotations = map[string]string{ meta.BindingHostKey: *usedOffer.Hostname, } + pod.Spec.NodeName = *usedOffer.Hostname podListWatch.Modify(pod, true) // notifying the watchers time.Sleep(time.Second / 2) failPodFromExecutor(launchedTask.taskInfo) diff --git a/contrib/mesos/pkg/scheduler/podtask/pod_task.go b/contrib/mesos/pkg/scheduler/podtask/pod_task.go index 6fbdf082efb..df5faef06cc 100644 --- a/contrib/mesos/pkg/scheduler/podtask/pod_task.go +++ b/contrib/mesos/pkg/scheduler/podtask/pod_task.go @@ -71,12 +71,13 @@ type T struct { } type Spec struct { - SlaveID string - CPU mresource.CPUShares - Memory mresource.MegaBytes - PortMap []HostPortMapping - Ports []uint64 - Data []byte + SlaveID string + AssignedSlave string + CPU mresource.CPUShares + Memory mresource.MegaBytes + PortMap []HostPortMapping + Ports []uint64 + Data []byte } // mostly-clone this pod task. the clone will actually share the some fields: @@ -161,9 +162,10 @@ func (t *T) FillFromDetails(details *mesos.Offer) error { log.V(3).Infof("Recording offer(s) %s/%s against pod %v: cpu: %.2f, mem: %.2f MB", details.Id, t.Pod.Namespace, t.Pod.Name, cpu, mem) t.Spec = Spec{ - SlaveID: details.GetSlaveId().GetValue(), - CPU: cpu, - Memory: mem, + SlaveID: details.GetSlaveId().GetValue(), + AssignedSlave: *details.Hostname, + CPU: cpu, + Memory: mem, } // fill in port mapping @@ -346,8 +348,7 @@ func RecoverFrom(pod api.Pod) (*T, bool, error) { bindTime: now, } var ( - offerId string - hostname string + offerId string ) for _, k := range []string{ annotation.BindingHostKey, @@ -362,7 +363,7 @@ func RecoverFrom(pod api.Pod) (*T, bool, error) { } switch k { case annotation.BindingHostKey: - hostname = v + t.Spec.AssignedSlave = v case annotation.SlaveIdKey: t.Spec.SlaveID = v case annotation.OfferIdKey: @@ -375,7 +376,7 @@ func RecoverFrom(pod api.Pod) (*T, bool, error) { t.executor = &mesos.ExecutorInfo{ExecutorId: mutil.NewExecutorID(v)} } } - t.Offer = offers.Expired(offerId, hostname, 0) + t.Offer = offers.Expired(offerId, t.Spec.AssignedSlave, 0) t.Flags[Launched] = struct{}{} t.Flags[Bound] = struct{}{} return t, true, nil diff --git a/contrib/mesos/pkg/scheduler/scheduler.go b/contrib/mesos/pkg/scheduler/scheduler.go index a7977060f67..8155f59c748 100644 --- a/contrib/mesos/pkg/scheduler/scheduler.go +++ b/contrib/mesos/pkg/scheduler/scheduler.go @@ -102,7 +102,7 @@ func (self *slaveStorage) getSlave(slaveId string) (*Slave, bool) { type PluginInterface interface { // the apiserver may have a different state for the pod than we do // so reconcile our records, but only for this one pod - reconcilePod(api.Pod) + reconcileTask(*podtask.T) // execute the Scheduling plugin, should start a go routine and return immediately Run(<-chan struct{}) @@ -432,7 +432,7 @@ func (k *KubernetesScheduler) StatusUpdate(driver bindings.SchedulerDriver, task case mesos.TaskState_TASK_FAILED: if task, _ := k.taskRegistry.UpdateStatus(taskStatus); task != nil { if task.Has(podtask.Launched) && !task.Has(podtask.Bound) { - go k.plugin.reconcilePod(task.Pod) + go k.plugin.reconcileTask(task) return } } else {