diff --git a/contrib/mesos/pkg/executor/executor_test.go b/contrib/mesos/pkg/executor/executor_test.go index fc147a336ef..959a95398fb 100644 --- a/contrib/mesos/pkg/executor/executor_test.go +++ b/contrib/mesos/pkg/executor/executor_test.go @@ -170,11 +170,10 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { } pod := NewTestPod(1) - podTask, err := podtask.New(api.NewDefaultContext(), "", - *pod, &mesosproto.ExecutorInfo{}) + podTask, err := podtask.New(api.NewDefaultContext(), "", pod) assert.Equal(t, nil, err, "must be able to create a task from a pod") - taskInfo := podTask.BuildTaskInfo() + taskInfo := podTask.BuildTaskInfo(&mesosproto.ExecutorInfo{}) data, err := testapi.Default.Codec().Encode(pod) assert.Equal(t, nil, err, "must be able to encode a pod's spec data") taskInfo.Data = data @@ -417,10 +416,8 @@ func TestExecutorFrameworkMessage(t *testing.T) { // set up a pod to then lose pod := NewTestPod(1) - podTask, _ := podtask.New(api.NewDefaultContext(), "foo", - *pod, &mesosproto.ExecutorInfo{}) - - taskInfo := podTask.BuildTaskInfo() + podTask, _ := podtask.New(api.NewDefaultContext(), "foo", pod) + taskInfo := podTask.BuildTaskInfo(&mesosproto.ExecutorInfo{}) data, _ := testapi.Default.Codec().Encode(pod) taskInfo.Data = data diff --git a/contrib/mesos/pkg/scheduler/framework.go b/contrib/mesos/pkg/scheduler/framework.go index 951523f842c..2b21ab8ff89 100644 --- a/contrib/mesos/pkg/scheduler/framework.go +++ b/contrib/mesos/pkg/scheduler/framework.go @@ -24,7 +24,6 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/offers" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" - "k8s.io/kubernetes/pkg/api" ) type MesosFramework struct { @@ -44,10 +43,6 @@ func (fw *MesosFramework) Tasks() podtask.Registry { return fw.MesosScheduler.taskRegistry } -func (fw *MesosFramework) CreatePodTask(ctx api.Context, pod *api.Pod) (*podtask.T, error) { - return podtask.New(ctx, "", *pod, fw.MesosScheduler.executor) -} - func (fw *MesosFramework) SlaveHostNameFor(id string) string { return fw.MesosScheduler.slaveHostNames.HostName(id) } @@ -60,7 +55,8 @@ func (fw *MesosFramework) KillTask(taskId string) error { func (fw *MesosFramework) LaunchTask(task *podtask.T) error { // assume caller is holding scheduler lock - taskList := []*mesos.TaskInfo{task.BuildTaskInfo()} + ei := fw.MesosScheduler.executor + taskList := []*mesos.TaskInfo{task.BuildTaskInfo(ei)} offerIds := []*mesos.OfferID{task.Offer.Details().Id} filters := &mesos.Filters{} _, err := fw.MesosScheduler.driver.LaunchTasks(offerIds, taskList, filters) diff --git a/contrib/mesos/pkg/scheduler/integration_test.go b/contrib/mesos/pkg/scheduler/integration_test.go index 4a6c6fe072a..3e5802c2a4a 100644 --- a/contrib/mesos/pkg/scheduler/integration_test.go +++ b/contrib/mesos/pkg/scheduler/integration_test.go @@ -486,7 +486,7 @@ func newLifecycleTest(t *testing.T) lifecycleTest { // create scheduler loop fw := &MesosFramework{MesosScheduler: mesosScheduler} eventObs := NewEventObserver() - loop, _ := operations.NewScheduler(&c, fw, client, eventObs, schedulerProc.Terminal(), http.DefaultServeMux, &podsListWatch.ListWatch) + loop, podReconciler := operations.NewScheduler(&c, fw, client, eventObs, schedulerProc.Terminal(), http.DefaultServeMux, &podsListWatch.ListWatch) assert.NotNil(loop) // create mock mesos scheduler driver @@ -498,6 +498,7 @@ func newLifecycleTest(t *testing.T) lifecycleTest { eventObs: eventObs, loop: loop, podsListWatch: podsListWatch, + podReconciler: podReconciler, scheduler: mesosScheduler, schedulerProc: schedulerProc, t: t, diff --git a/contrib/mesos/pkg/scheduler/meta/annotations.go b/contrib/mesos/pkg/scheduler/meta/annotations.go index 7c9ee089380..361b8eca87b 100644 --- a/contrib/mesos/pkg/scheduler/meta/annotations.go +++ b/contrib/mesos/pkg/scheduler/meta/annotations.go @@ -25,7 +25,6 @@ const ( TaskIdKey = "k8s.mesosphere.io/taskId" SlaveIdKey = "k8s.mesosphere.io/slaveId" OfferIdKey = "k8s.mesosphere.io/offerId" - ExecutorIdKey = "k8s.mesosphere.io/executorId" PortMappingKeyPrefix = "k8s.mesosphere.io/port_" PortMappingKeyFormat = PortMappingKeyPrefix + "%s_%d" PortNameMappingKeyPrefix = "k8s.mesosphere.io/portName_" diff --git a/contrib/mesos/pkg/scheduler/operations/algorithm.go b/contrib/mesos/pkg/scheduler/operations/algorithm.go index 93a21254cd6..59c0bd0b1c6 100644 --- a/contrib/mesos/pkg/scheduler/operations/algorithm.go +++ b/contrib/mesos/pkg/scheduler/operations/algorithm.go @@ -72,7 +72,12 @@ func (k *SchedulerAlgorithm) Schedule(pod *api.Pod) (string, error) { log.Infof("aborting Schedule, pod has been deleted %+v", pod) return "", merrors.NoSuchPodErr } - return k.doSchedule(k.fw.Tasks().Register(k.fw.CreatePodTask(ctx, pod))) + podTask, err := podtask.New(ctx, "", pod) + if err != nil { + log.Warningf("aborting Schedule, unable to create podtask object %+v: %v", pod, err) + return "", err + } + return k.doSchedule(k.fw.Tasks().Register(podTask, nil)) //TODO(jdef) it's possible that the pod state has diverged from what //we knew previously, we should probably update the task.Pod state here diff --git a/contrib/mesos/pkg/scheduler/operations/deleter_test.go b/contrib/mesos/pkg/scheduler/operations/deleter_test.go index 639f31e5854..1fec942fca5 100644 --- a/contrib/mesos/pkg/scheduler/operations/deleter_test.go +++ b/contrib/mesos/pkg/scheduler/operations/deleter_test.go @@ -19,7 +19,6 @@ package operations import ( "testing" - mesos "github.com/mesos/mesos-go/mesosproto" "github.com/stretchr/testify/assert" "k8s.io/kubernetes/contrib/mesos/pkg/queue" merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" @@ -60,7 +59,7 @@ func TestDeleteOne_PendingPod(t *testing.T) { UID: "foo0", Namespace: api.NamespaceDefault, }}} - _, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{})) + _, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", pod.Pod)) if err != nil { t.Fatalf("failed to create task: %v", err) } @@ -94,7 +93,7 @@ func TestDeleteOne_Running(t *testing.T) { UID: "foo0", Namespace: api.NamespaceDefault, }}} - task, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{})) + task, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", pod.Pod)) if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/contrib/mesos/pkg/scheduler/podtask/pod_task.go b/contrib/mesos/pkg/scheduler/podtask/pod_task.go index ad8a77d42fe..3a76e56a373 100644 --- a/contrib/mesos/pkg/scheduler/podtask/pod_task.go +++ b/contrib/mesos/pkg/scheduler/podtask/pod_task.go @@ -18,6 +18,7 @@ package podtask import ( "fmt" + "strings" "time" "github.com/gogo/protobuf/proto" @@ -62,7 +63,6 @@ type T struct { UpdatedTime time.Time // time of the most recent StatusUpdate we've seen from the mesos master podStatus api.PodStatus - executor *mesos.ExecutorInfo // readonly podKey string launchTime time.Time bindTime time.Time @@ -130,21 +130,49 @@ func generateTaskName(pod *api.Pod) string { return fmt.Sprintf("%s.%s.pods", pod.Name, ns) } -func (t *T) BuildTaskInfo() *mesos.TaskInfo { +func setCommandArgument(ei *mesos.ExecutorInfo, flag, value string, create bool) { + argv := []string{} + overwrite := false + if ei.Command != nil && ei.Command.Arguments != nil { + argv = ei.Command.Arguments + for i, arg := range argv { + if strings.HasPrefix(arg, flag+"=") { + overwrite = true + argv[i] = flag + "=" + value + break + } + } + } + if !overwrite && create { + argv = append(argv, flag+"="+value) + if ei.Command == nil { + ei.Command = &mesos.CommandInfo{} + } + ei.Command.Arguments = argv + } +} + +func (t *T) BuildTaskInfo(ei *mesos.ExecutorInfo) *mesos.TaskInfo { info := &mesos.TaskInfo{ Name: proto.String(generateTaskName(&t.Pod)), TaskId: mutil.NewTaskID(t.ID), SlaveId: mutil.NewSlaveID(t.Spec.SlaveID), - Executor: t.executor, + Executor: proto.Clone(ei).(*mesos.ExecutorInfo), Data: t.Spec.Data, Resources: []*mesos.Resource{ mutil.NewScalarResource("cpus", float64(t.Spec.CPU)), mutil.NewScalarResource("mem", float64(t.Spec.Memory)), }, } + if portsResource := rangeResource("ports", t.Spec.Ports); portsResource != nil { info.Resources = append(info.Resources, portsResource) } + + // hostname needs of the executor needs to match that of the offer, otherwise + // the kubelet node status checker/updater is very unhappy + setCommandArgument(info.Executor, "--hostname-override", t.Spec.AssignedSlave, true) + return info } @@ -170,10 +198,7 @@ func (t *T) Has(f FlagType) (exists bool) { return } -func New(ctx api.Context, id string, pod api.Pod, executor *mesos.ExecutorInfo) (*T, error) { - if executor == nil { - return nil, fmt.Errorf("illegal argument: executor was nil") - } +func New(ctx api.Context, id string, pod *api.Pod) (*T, error) { key, err := MakePodKey(ctx, pod.Name) if err != nil { return nil, err @@ -182,13 +207,12 @@ func New(ctx api.Context, id string, pod api.Pod, executor *mesos.ExecutorInfo) id = "pod." + uuid.NewUUID().String() } task := &T{ - ID: id, - Pod: pod, - State: StatePending, - podKey: key, - mapper: MappingTypeForPod(&pod), - Flags: make(map[FlagType]struct{}), - executor: proto.Clone(executor).(*mesos.ExecutorInfo), + ID: id, + Pod: *pod, + State: StatePending, + podKey: key, + mapper: MappingTypeForPod(pod), + Flags: make(map[FlagType]struct{}), } task.CreateTime = time.Now() return task, nil @@ -198,7 +222,6 @@ func (t *T) SaveRecoveryInfo(dict map[string]string) { dict[annotation.TaskIdKey] = t.ID dict[annotation.SlaveIdKey] = t.Spec.SlaveID dict[annotation.OfferIdKey] = t.Offer.Details().Id.GetValue() - dict[annotation.ExecutorIdKey] = t.executor.ExecutorId.GetValue() } // reconstruct a task from metadata stashed in a pod entry. there are limited pod states that @@ -256,7 +279,6 @@ func RecoverFrom(pod api.Pod) (*T, bool, error) { annotation.TaskIdKey, annotation.SlaveIdKey, annotation.OfferIdKey, - annotation.ExecutorIdKey, } { v, found := pod.Annotations[k] if !found { @@ -271,10 +293,6 @@ func RecoverFrom(pod api.Pod) (*T, bool, error) { offerId = v case annotation.TaskIdKey: t.ID = v - case annotation.ExecutorIdKey: - // this is nowhere near sufficient to re-launch a task, but we really just - // want this for tracking - t.executor = &mesos.ExecutorInfo{ExecutorId: mutil.NewExecutorID(v)} } } t.Offer = offers.Expired(offerId, t.Spec.AssignedSlave, 0) diff --git a/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go b/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go index 710485b1eb1..77371942aeb 100644 --- a/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go +++ b/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go @@ -35,12 +35,12 @@ const ( ) func fakePodTask(id string) (*T, error) { - return New(api.NewDefaultContext(), "", api.Pod{ + return New(api.NewDefaultContext(), "", &api.Pod{ ObjectMeta: api.ObjectMeta{ Name: id, Namespace: api.NamespaceDefault, }, - }, &mesos.ExecutorInfo{}) + }) } func TestUnlimitedResources(t *testing.T) { diff --git a/contrib/mesos/pkg/scheduler/podtask/port_mapping_test.go b/contrib/mesos/pkg/scheduler/podtask/port_mapping_test.go index 9ff9b8df388..2d223f93b64 100644 --- a/contrib/mesos/pkg/scheduler/podtask/port_mapping_test.go +++ b/contrib/mesos/pkg/scheduler/podtask/port_mapping_test.go @@ -51,7 +51,7 @@ func TestDefaultHostPortMatching(t *testing.T) { }}, }}, } - task, err = New(api.NewDefaultContext(), "", *pod, &mesos.ExecutorInfo{}) + task, err = New(api.NewDefaultContext(), "", pod) if err != nil { t.Fatal(err) } @@ -99,7 +99,7 @@ func TestWildcardHostPortMatching(t *testing.T) { }}, }}, } - task, err = New(api.NewDefaultContext(), "", *pod, &mesos.ExecutorInfo{}) + task, err = New(api.NewDefaultContext(), "", pod) if err != nil { t.Fatal(err) } @@ -122,7 +122,7 @@ func TestWildcardHostPortMatching(t *testing.T) { }}, }}, } - task, err = New(api.NewDefaultContext(), "", *pod, &mesos.ExecutorInfo{}) + task, err = New(api.NewDefaultContext(), "", pod) if err != nil { t.Fatal(err) } @@ -143,7 +143,7 @@ func TestWildcardHostPortMatching(t *testing.T) { }}, }}, } - task, err = New(api.NewDefaultContext(), "", *pod, &mesos.ExecutorInfo{}) + task, err = New(api.NewDefaultContext(), "", pod) if err != nil { t.Fatal(err) } diff --git a/contrib/mesos/pkg/scheduler/podtask/procurement.go b/contrib/mesos/pkg/scheduler/podtask/procurement.go index 9a0921047ca..b27193a9647 100644 --- a/contrib/mesos/pkg/scheduler/podtask/procurement.go +++ b/contrib/mesos/pkg/scheduler/podtask/procurement.go @@ -17,8 +17,6 @@ limitations under the License. package podtask import ( - "strings" - log "github.com/golang/glog" mesos "github.com/mesos/mesos-go/mesosproto" mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" @@ -74,31 +72,11 @@ func ValidateProcurement(t *T, offer *mesos.Offer) error { return nil } -func setCommandArgument(ei *mesos.ExecutorInfo, flag, value string, create bool) { - argv := ei.Command.Arguments - overwrite := false - for i, arg := range argv { - if strings.HasPrefix(arg, flag+"=") { - overwrite = true - argv[i] = flag + "=" + value - break - } - } - if !overwrite && create { - ei.Command.Arguments = append(argv, flag+"="+value) - } -} - // NodeProcurement updates t.Spec in preparation for the task to be launched on the // slave associated with the offer. func NodeProcurement(t *T, offer *mesos.Offer) error { t.Spec.SlaveID = offer.GetSlaveId().GetValue() t.Spec.AssignedSlave = offer.GetHostname() - - // hostname needs of the executor needs to match that of the offer, otherwise - // the kubelet node status checker/updater is very unhappy - setCommandArgument(t.executor, "--hostname-override", offer.GetHostname(), true) - return nil } diff --git a/contrib/mesos/pkg/scheduler/types/types.go b/contrib/mesos/pkg/scheduler/types/types.go index ee4ebd0445e..032101f14b7 100644 --- a/contrib/mesos/pkg/scheduler/types/types.go +++ b/contrib/mesos/pkg/scheduler/types/types.go @@ -22,7 +22,6 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/offers" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" - "k8s.io/kubernetes/pkg/api" ) // Framework abstracts everything other components of the scheduler need from @@ -38,7 +37,4 @@ type Framework interface { // driver calls KillTask(taskId string) error LaunchTask(*podtask.T) error - - // convenience - CreatePodTask(api.Context, *api.Pod) (*podtask.T, error) }