Remove ExecutorInfo from podtask.T and create it with the TaskInfo

This commit is contained in:
Dr. Stefan Schimanski 2015-11-02 12:26:22 +01:00
parent f793ddffbb
commit 60cc93fff8
11 changed files with 60 additions and 71 deletions

View File

@ -170,11 +170,10 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
} }
pod := NewTestPod(1) pod := NewTestPod(1)
podTask, err := podtask.New(api.NewDefaultContext(), "", podTask, err := podtask.New(api.NewDefaultContext(), "", pod)
*pod, &mesosproto.ExecutorInfo{})
assert.Equal(t, nil, err, "must be able to create a task from a pod") assert.Equal(t, nil, err, "must be able to create a task from a pod")
taskInfo := podTask.BuildTaskInfo() taskInfo := podTask.BuildTaskInfo(&mesosproto.ExecutorInfo{})
data, err := testapi.Default.Codec().Encode(pod) data, err := testapi.Default.Codec().Encode(pod)
assert.Equal(t, nil, err, "must be able to encode a pod's spec data") assert.Equal(t, nil, err, "must be able to encode a pod's spec data")
taskInfo.Data = data taskInfo.Data = data
@ -417,10 +416,8 @@ func TestExecutorFrameworkMessage(t *testing.T) {
// set up a pod to then lose // set up a pod to then lose
pod := NewTestPod(1) pod := NewTestPod(1)
podTask, _ := podtask.New(api.NewDefaultContext(), "foo", podTask, _ := podtask.New(api.NewDefaultContext(), "foo", pod)
*pod, &mesosproto.ExecutorInfo{}) taskInfo := podTask.BuildTaskInfo(&mesosproto.ExecutorInfo{})
taskInfo := podTask.BuildTaskInfo()
data, _ := testapi.Default.Codec().Encode(pod) data, _ := testapi.Default.Codec().Encode(pod)
taskInfo.Data = data taskInfo.Data = data

View File

@ -24,7 +24,6 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/offers" "k8s.io/kubernetes/contrib/mesos/pkg/offers"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/pkg/api"
) )
type MesosFramework struct { type MesosFramework struct {
@ -44,10 +43,6 @@ func (fw *MesosFramework) Tasks() podtask.Registry {
return fw.MesosScheduler.taskRegistry return fw.MesosScheduler.taskRegistry
} }
func (fw *MesosFramework) CreatePodTask(ctx api.Context, pod *api.Pod) (*podtask.T, error) {
return podtask.New(ctx, "", *pod, fw.MesosScheduler.executor)
}
func (fw *MesosFramework) SlaveHostNameFor(id string) string { func (fw *MesosFramework) SlaveHostNameFor(id string) string {
return fw.MesosScheduler.slaveHostNames.HostName(id) return fw.MesosScheduler.slaveHostNames.HostName(id)
} }
@ -60,7 +55,8 @@ func (fw *MesosFramework) KillTask(taskId string) error {
func (fw *MesosFramework) LaunchTask(task *podtask.T) error { func (fw *MesosFramework) LaunchTask(task *podtask.T) error {
// assume caller is holding scheduler lock // assume caller is holding scheduler lock
taskList := []*mesos.TaskInfo{task.BuildTaskInfo()} ei := fw.MesosScheduler.executor
taskList := []*mesos.TaskInfo{task.BuildTaskInfo(ei)}
offerIds := []*mesos.OfferID{task.Offer.Details().Id} offerIds := []*mesos.OfferID{task.Offer.Details().Id}
filters := &mesos.Filters{} filters := &mesos.Filters{}
_, err := fw.MesosScheduler.driver.LaunchTasks(offerIds, taskList, filters) _, err := fw.MesosScheduler.driver.LaunchTasks(offerIds, taskList, filters)

View File

@ -486,7 +486,7 @@ func newLifecycleTest(t *testing.T) lifecycleTest {
// create scheduler loop // create scheduler loop
fw := &MesosFramework{MesosScheduler: mesosScheduler} fw := &MesosFramework{MesosScheduler: mesosScheduler}
eventObs := NewEventObserver() eventObs := NewEventObserver()
loop, _ := operations.NewScheduler(&c, fw, client, eventObs, schedulerProc.Terminal(), http.DefaultServeMux, &podsListWatch.ListWatch) loop, podReconciler := operations.NewScheduler(&c, fw, client, eventObs, schedulerProc.Terminal(), http.DefaultServeMux, &podsListWatch.ListWatch)
assert.NotNil(loop) assert.NotNil(loop)
// create mock mesos scheduler driver // create mock mesos scheduler driver
@ -498,6 +498,7 @@ func newLifecycleTest(t *testing.T) lifecycleTest {
eventObs: eventObs, eventObs: eventObs,
loop: loop, loop: loop,
podsListWatch: podsListWatch, podsListWatch: podsListWatch,
podReconciler: podReconciler,
scheduler: mesosScheduler, scheduler: mesosScheduler,
schedulerProc: schedulerProc, schedulerProc: schedulerProc,
t: t, t: t,

View File

@ -25,7 +25,6 @@ const (
TaskIdKey = "k8s.mesosphere.io/taskId" TaskIdKey = "k8s.mesosphere.io/taskId"
SlaveIdKey = "k8s.mesosphere.io/slaveId" SlaveIdKey = "k8s.mesosphere.io/slaveId"
OfferIdKey = "k8s.mesosphere.io/offerId" OfferIdKey = "k8s.mesosphere.io/offerId"
ExecutorIdKey = "k8s.mesosphere.io/executorId"
PortMappingKeyPrefix = "k8s.mesosphere.io/port_" PortMappingKeyPrefix = "k8s.mesosphere.io/port_"
PortMappingKeyFormat = PortMappingKeyPrefix + "%s_%d" PortMappingKeyFormat = PortMappingKeyPrefix + "%s_%d"
PortNameMappingKeyPrefix = "k8s.mesosphere.io/portName_" PortNameMappingKeyPrefix = "k8s.mesosphere.io/portName_"

View File

@ -72,7 +72,12 @@ func (k *SchedulerAlgorithm) Schedule(pod *api.Pod) (string, error) {
log.Infof("aborting Schedule, pod has been deleted %+v", pod) log.Infof("aborting Schedule, pod has been deleted %+v", pod)
return "", merrors.NoSuchPodErr return "", merrors.NoSuchPodErr
} }
return k.doSchedule(k.fw.Tasks().Register(k.fw.CreatePodTask(ctx, pod))) podTask, err := podtask.New(ctx, "", pod)
if err != nil {
log.Warningf("aborting Schedule, unable to create podtask object %+v: %v", pod, err)
return "", err
}
return k.doSchedule(k.fw.Tasks().Register(podTask, nil))
//TODO(jdef) it's possible that the pod state has diverged from what //TODO(jdef) it's possible that the pod state has diverged from what
//we knew previously, we should probably update the task.Pod state here //we knew previously, we should probably update the task.Pod state here

View File

@ -19,7 +19,6 @@ package operations
import ( import (
"testing" "testing"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/queue"
merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
@ -60,7 +59,7 @@ func TestDeleteOne_PendingPod(t *testing.T) {
UID: "foo0", UID: "foo0",
Namespace: api.NamespaceDefault, Namespace: api.NamespaceDefault,
}}} }}}
_, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{})) _, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", pod.Pod))
if err != nil { if err != nil {
t.Fatalf("failed to create task: %v", err) t.Fatalf("failed to create task: %v", err)
} }
@ -94,7 +93,7 @@ func TestDeleteOne_Running(t *testing.T) {
UID: "foo0", UID: "foo0",
Namespace: api.NamespaceDefault, Namespace: api.NamespaceDefault,
}}} }}}
task, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{})) task, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", pod.Pod))
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }

View File

@ -18,6 +18,7 @@ package podtask
import ( import (
"fmt" "fmt"
"strings"
"time" "time"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
@ -62,7 +63,6 @@ type T struct {
UpdatedTime time.Time // time of the most recent StatusUpdate we've seen from the mesos master UpdatedTime time.Time // time of the most recent StatusUpdate we've seen from the mesos master
podStatus api.PodStatus podStatus api.PodStatus
executor *mesos.ExecutorInfo // readonly
podKey string podKey string
launchTime time.Time launchTime time.Time
bindTime time.Time bindTime time.Time
@ -130,21 +130,49 @@ func generateTaskName(pod *api.Pod) string {
return fmt.Sprintf("%s.%s.pods", pod.Name, ns) return fmt.Sprintf("%s.%s.pods", pod.Name, ns)
} }
func (t *T) BuildTaskInfo() *mesos.TaskInfo { func setCommandArgument(ei *mesos.ExecutorInfo, flag, value string, create bool) {
argv := []string{}
overwrite := false
if ei.Command != nil && ei.Command.Arguments != nil {
argv = ei.Command.Arguments
for i, arg := range argv {
if strings.HasPrefix(arg, flag+"=") {
overwrite = true
argv[i] = flag + "=" + value
break
}
}
}
if !overwrite && create {
argv = append(argv, flag+"="+value)
if ei.Command == nil {
ei.Command = &mesos.CommandInfo{}
}
ei.Command.Arguments = argv
}
}
func (t *T) BuildTaskInfo(ei *mesos.ExecutorInfo) *mesos.TaskInfo {
info := &mesos.TaskInfo{ info := &mesos.TaskInfo{
Name: proto.String(generateTaskName(&t.Pod)), Name: proto.String(generateTaskName(&t.Pod)),
TaskId: mutil.NewTaskID(t.ID), TaskId: mutil.NewTaskID(t.ID),
SlaveId: mutil.NewSlaveID(t.Spec.SlaveID), SlaveId: mutil.NewSlaveID(t.Spec.SlaveID),
Executor: t.executor, Executor: proto.Clone(ei).(*mesos.ExecutorInfo),
Data: t.Spec.Data, Data: t.Spec.Data,
Resources: []*mesos.Resource{ Resources: []*mesos.Resource{
mutil.NewScalarResource("cpus", float64(t.Spec.CPU)), mutil.NewScalarResource("cpus", float64(t.Spec.CPU)),
mutil.NewScalarResource("mem", float64(t.Spec.Memory)), mutil.NewScalarResource("mem", float64(t.Spec.Memory)),
}, },
} }
if portsResource := rangeResource("ports", t.Spec.Ports); portsResource != nil { if portsResource := rangeResource("ports", t.Spec.Ports); portsResource != nil {
info.Resources = append(info.Resources, portsResource) info.Resources = append(info.Resources, portsResource)
} }
// hostname needs of the executor needs to match that of the offer, otherwise
// the kubelet node status checker/updater is very unhappy
setCommandArgument(info.Executor, "--hostname-override", t.Spec.AssignedSlave, true)
return info return info
} }
@ -170,10 +198,7 @@ func (t *T) Has(f FlagType) (exists bool) {
return return
} }
func New(ctx api.Context, id string, pod api.Pod, executor *mesos.ExecutorInfo) (*T, error) { func New(ctx api.Context, id string, pod *api.Pod) (*T, error) {
if executor == nil {
return nil, fmt.Errorf("illegal argument: executor was nil")
}
key, err := MakePodKey(ctx, pod.Name) key, err := MakePodKey(ctx, pod.Name)
if err != nil { if err != nil {
return nil, err return nil, err
@ -182,13 +207,12 @@ func New(ctx api.Context, id string, pod api.Pod, executor *mesos.ExecutorInfo)
id = "pod." + uuid.NewUUID().String() id = "pod." + uuid.NewUUID().String()
} }
task := &T{ task := &T{
ID: id, ID: id,
Pod: pod, Pod: *pod,
State: StatePending, State: StatePending,
podKey: key, podKey: key,
mapper: MappingTypeForPod(&pod), mapper: MappingTypeForPod(pod),
Flags: make(map[FlagType]struct{}), Flags: make(map[FlagType]struct{}),
executor: proto.Clone(executor).(*mesos.ExecutorInfo),
} }
task.CreateTime = time.Now() task.CreateTime = time.Now()
return task, nil return task, nil
@ -198,7 +222,6 @@ func (t *T) SaveRecoveryInfo(dict map[string]string) {
dict[annotation.TaskIdKey] = t.ID dict[annotation.TaskIdKey] = t.ID
dict[annotation.SlaveIdKey] = t.Spec.SlaveID dict[annotation.SlaveIdKey] = t.Spec.SlaveID
dict[annotation.OfferIdKey] = t.Offer.Details().Id.GetValue() dict[annotation.OfferIdKey] = t.Offer.Details().Id.GetValue()
dict[annotation.ExecutorIdKey] = t.executor.ExecutorId.GetValue()
} }
// reconstruct a task from metadata stashed in a pod entry. there are limited pod states that // reconstruct a task from metadata stashed in a pod entry. there are limited pod states that
@ -256,7 +279,6 @@ func RecoverFrom(pod api.Pod) (*T, bool, error) {
annotation.TaskIdKey, annotation.TaskIdKey,
annotation.SlaveIdKey, annotation.SlaveIdKey,
annotation.OfferIdKey, annotation.OfferIdKey,
annotation.ExecutorIdKey,
} { } {
v, found := pod.Annotations[k] v, found := pod.Annotations[k]
if !found { if !found {
@ -271,10 +293,6 @@ func RecoverFrom(pod api.Pod) (*T, bool, error) {
offerId = v offerId = v
case annotation.TaskIdKey: case annotation.TaskIdKey:
t.ID = v t.ID = v
case annotation.ExecutorIdKey:
// this is nowhere near sufficient to re-launch a task, but we really just
// want this for tracking
t.executor = &mesos.ExecutorInfo{ExecutorId: mutil.NewExecutorID(v)}
} }
} }
t.Offer = offers.Expired(offerId, t.Spec.AssignedSlave, 0) t.Offer = offers.Expired(offerId, t.Spec.AssignedSlave, 0)

View File

@ -35,12 +35,12 @@ const (
) )
func fakePodTask(id string) (*T, error) { func fakePodTask(id string) (*T, error) {
return New(api.NewDefaultContext(), "", api.Pod{ return New(api.NewDefaultContext(), "", &api.Pod{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: id, Name: id,
Namespace: api.NamespaceDefault, Namespace: api.NamespaceDefault,
}, },
}, &mesos.ExecutorInfo{}) })
} }
func TestUnlimitedResources(t *testing.T) { func TestUnlimitedResources(t *testing.T) {

View File

@ -51,7 +51,7 @@ func TestDefaultHostPortMatching(t *testing.T) {
}}, }},
}}, }},
} }
task, err = New(api.NewDefaultContext(), "", *pod, &mesos.ExecutorInfo{}) task, err = New(api.NewDefaultContext(), "", pod)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -99,7 +99,7 @@ func TestWildcardHostPortMatching(t *testing.T) {
}}, }},
}}, }},
} }
task, err = New(api.NewDefaultContext(), "", *pod, &mesos.ExecutorInfo{}) task, err = New(api.NewDefaultContext(), "", pod)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -122,7 +122,7 @@ func TestWildcardHostPortMatching(t *testing.T) {
}}, }},
}}, }},
} }
task, err = New(api.NewDefaultContext(), "", *pod, &mesos.ExecutorInfo{}) task, err = New(api.NewDefaultContext(), "", pod)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -143,7 +143,7 @@ func TestWildcardHostPortMatching(t *testing.T) {
}}, }},
}}, }},
} }
task, err = New(api.NewDefaultContext(), "", *pod, &mesos.ExecutorInfo{}) task, err = New(api.NewDefaultContext(), "", pod)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -17,8 +17,6 @@ limitations under the License.
package podtask package podtask
import ( import (
"strings"
log "github.com/golang/glog" log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto" mesos "github.com/mesos/mesos-go/mesosproto"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
@ -74,31 +72,11 @@ func ValidateProcurement(t *T, offer *mesos.Offer) error {
return nil return nil
} }
func setCommandArgument(ei *mesos.ExecutorInfo, flag, value string, create bool) {
argv := ei.Command.Arguments
overwrite := false
for i, arg := range argv {
if strings.HasPrefix(arg, flag+"=") {
overwrite = true
argv[i] = flag + "=" + value
break
}
}
if !overwrite && create {
ei.Command.Arguments = append(argv, flag+"="+value)
}
}
// NodeProcurement updates t.Spec in preparation for the task to be launched on the // NodeProcurement updates t.Spec in preparation for the task to be launched on the
// slave associated with the offer. // slave associated with the offer.
func NodeProcurement(t *T, offer *mesos.Offer) error { func NodeProcurement(t *T, offer *mesos.Offer) error {
t.Spec.SlaveID = offer.GetSlaveId().GetValue() t.Spec.SlaveID = offer.GetSlaveId().GetValue()
t.Spec.AssignedSlave = offer.GetHostname() t.Spec.AssignedSlave = offer.GetHostname()
// hostname needs of the executor needs to match that of the offer, otherwise
// the kubelet node status checker/updater is very unhappy
setCommandArgument(t.executor, "--hostname-override", offer.GetHostname(), true)
return nil return nil
} }

View File

@ -22,7 +22,6 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/offers" "k8s.io/kubernetes/contrib/mesos/pkg/offers"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/pkg/api"
) )
// Framework abstracts everything other components of the scheduler need from // Framework abstracts everything other components of the scheduler need from
@ -38,7 +37,4 @@ type Framework interface {
// driver calls // driver calls
KillTask(taskId string) error KillTask(taskId string) error
LaunchTask(*podtask.T) error LaunchTask(*podtask.T) error
// convenience
CreatePodTask(api.Context, *api.Pod) (*podtask.T, error)
} }