Merge pull request #18348 from mesosphere/sur-652-conformance-tests

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2015-12-15 02:02:50 -08:00
commit b02417bb9b
16 changed files with 130 additions and 93 deletions

View File

@ -23,7 +23,7 @@ mesosmaster1:
- MESOS_QUORUM=1 - MESOS_QUORUM=1
- MESOS_REGISTRY=in_memory - MESOS_REGISTRY=in_memory
- MESOS_WORK_DIR=/var/lib/mesos - MESOS_WORK_DIR=/var/lib/mesos
- MESOS_ROLES=role1 - MESOS_ROLES=public
links: links:
- etcd - etcd
- "ambassador:apiserver" - "ambassador:apiserver"
@ -38,12 +38,13 @@ mesosslave:
- > - >
NAME=$$(cut -f2 -d/ <<<$${MESOSMASTER1_NAME}) && NAME=$$(cut -f2 -d/ <<<$${MESOSMASTER1_NAME}) &&
N=$${NAME##*_} && N=$${NAME##*_} &&
PUBLIC_RESOURCES="$$(if [ $${N} = 2 ]; then echo ";cpus(public):2;mem(public):640;ports(public):[7000-7999]"; fi)" &&
DOCKER_NETWORK_OFFSET=0.0.$${N}.0 DOCKER_NETWORK_OFFSET=0.0.$${N}.0
exec wrapdocker mesos-slave exec wrapdocker mesos-slave
--work_dir="/var/tmp/mesos/$${N}" --work_dir="/var/tmp/mesos/$${N}"
--attributes="rack:$${N};gen:201$${N};role:role$${N}" --attributes="rack:$${N};gen:201$${N}"
--hostname=$$(getent hosts mesosslave | cut -d' ' -f1 | sort -u | tail -1) --hostname=$$(getent hosts mesosslave | cut -d' ' -f1 | sort -u | tail -1)
--resources="cpus:4;mem:1280;disk:25600;ports:[8000-21099];cpus(role$${N}):1;mem(role$${N}):640;disk(role$${N}):25600;ports(role$${N}):[7000-7999]" --resources="cpus:4;mem:1280;disk:25600;ports:[8000-21099]$${PUBLIC_RESOURCES}"
command: [] command: []
environment: environment:
- MESOS_MASTER=mesosmaster1:5050 - MESOS_MASTER=mesosmaster1:5050
@ -144,7 +145,8 @@ scheduler:
--mesos-executor-cpus=1.0 --mesos-executor-cpus=1.0
--mesos-sandbox-overlay=/opt/sandbox-overlay.tar.gz --mesos-sandbox-overlay=/opt/sandbox-overlay.tar.gz
--static-pods-config=/opt/static-pods --static-pods-config=/opt/static-pods
--mesos-roles=*,role1 --mesos-framework-roles=*,public
--mesos-default-pod-roles=*,public
--v=4 --v=4
--executor-logv=4 --executor-logv=4
--profiling=true --profiling=true

View File

@ -175,6 +175,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
pod, pod,
executorinfo, executorinfo,
nil, nil,
nil,
) )
assert.Equal(t, nil, err, "must be able to create a task from a pod") assert.Equal(t, nil, err, "must be able to create a task from a pod")
@ -390,6 +391,7 @@ func TestExecutorFrameworkMessage(t *testing.T) {
pod, pod,
executorinfo, executorinfo,
nil, nil,
nil,
) )
podTask.Spec = &podtask.Spec{ podTask.Spec = &podtask.Spec{

View File

@ -36,9 +36,9 @@ func nodeInfo(si *mesos.SlaveInfo, ei *mesos.ExecutorInfo) NodeInfo {
} }
switch r.GetName() { switch r.GetName() {
case "cpus": case "cpus":
executorCPU = r.GetScalar().GetValue() executorCPU += r.GetScalar().GetValue()
case "mem": case "mem":
executorMem = r.GetScalar().GetValue() executorMem += r.GetScalar().GetValue()
} }
} }
} }
@ -55,10 +55,15 @@ func nodeInfo(si *mesos.SlaveInfo, ei *mesos.ExecutorInfo) NodeInfo {
// We intentionally take the floor of executorCPU because cores are integers // We intentionally take the floor of executorCPU because cores are integers
// and we would loose a complete cpu here if the value is <1. // and we would loose a complete cpu here if the value is <1.
// TODO(sttts): switch to float64 when "Machine Allocables" are implemented // TODO(sttts): switch to float64 when "Machine Allocables" are implemented
ni.Cores = int(r.GetScalar().GetValue() - float64(int(executorCPU))) ni.Cores += int(r.GetScalar().GetValue())
case "mem": case "mem":
ni.Mem = int64(r.GetScalar().GetValue()-executorMem) * 1024 * 1024 ni.Mem += int64(r.GetScalar().GetValue()) * 1024 * 1024
} }
} }
// TODO(sttts): subtract executorCPU/Mem from static pod resources before subtracting them from the capacity
ni.Cores -= int(executorCPU)
ni.Mem -= int64(executorMem) * 1024 * 1024
return ni return ni
} }

View File

@ -46,7 +46,8 @@ type schedulerAlgorithm struct {
podUpdates queue.FIFO podUpdates queue.FIFO
podScheduler podschedulers.PodScheduler podScheduler podschedulers.PodScheduler
prototype *mesosproto.ExecutorInfo prototype *mesosproto.ExecutorInfo
roles []string frameworkRoles []string
defaultPodRoles []string
defaultCpus mresource.CPUShares defaultCpus mresource.CPUShares
defaultMem mresource.MegaBytes defaultMem mresource.MegaBytes
} }
@ -58,7 +59,7 @@ func New(
podUpdates queue.FIFO, podUpdates queue.FIFO,
podScheduler podschedulers.PodScheduler, podScheduler podschedulers.PodScheduler,
prototype *mesosproto.ExecutorInfo, prototype *mesosproto.ExecutorInfo,
roles []string, frameworkRoles, defaultPodRoles []string,
defaultCpus mresource.CPUShares, defaultCpus mresource.CPUShares,
defaultMem mresource.MegaBytes, defaultMem mresource.MegaBytes,
) SchedulerAlgorithm { ) SchedulerAlgorithm {
@ -66,7 +67,8 @@ func New(
sched: sched, sched: sched,
podUpdates: podUpdates, podUpdates: podUpdates,
podScheduler: podScheduler, podScheduler: podScheduler,
roles: roles, frameworkRoles: frameworkRoles,
defaultPodRoles: defaultPodRoles,
prototype: prototype, prototype: prototype,
defaultCpus: defaultCpus, defaultCpus: defaultCpus,
defaultMem: defaultMem, defaultMem: defaultMem,
@ -107,7 +109,7 @@ func (k *schedulerAlgorithm) Schedule(pod *api.Pod) (string, error) {
// From here on we can expect that the pod spec of a task has proper limits for CPU and memory. // From here on we can expect that the pod spec of a task has proper limits for CPU and memory.
k.limitPod(pod) k.limitPod(pod)
podTask, err := podtask.New(ctx, "", pod, k.prototype, k.roles) podTask, err := podtask.New(ctx, "", pod, k.prototype, k.frameworkRoles, k.defaultPodRoles)
if err != nil { if err != nil {
log.Warningf("aborting Schedule, unable to create podtask object %+v: %v", pod, err) log.Warningf("aborting Schedule, unable to create podtask object %+v: %v", pod, err)
return "", err return "", err

View File

@ -67,6 +67,7 @@ func TestDeleteOne_PendingPod(t *testing.T) {
pod.Pod, pod.Pod,
&mesosproto.ExecutorInfo{}, &mesosproto.ExecutorInfo{},
nil, nil,
nil,
) )
if err != nil { if err != nil {
t.Fatalf("failed to create task: %v", err) t.Fatalf("failed to create task: %v", err)
@ -113,6 +114,7 @@ func TestDeleteOne_Running(t *testing.T) {
pod.Pod, pod.Pod,
&mesosproto.ExecutorInfo{}, &mesosproto.ExecutorInfo{},
nil, nil,
nil,
) )
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)

View File

@ -65,7 +65,7 @@ func New(
mux *http.ServeMux, mux *http.ServeMux,
lw *cache.ListWatch, lw *cache.ListWatch,
prototype *mesos.ExecutorInfo, prototype *mesos.ExecutorInfo,
roles []string, frameworkRoles, defaultPodRoles []string,
defaultCpus mresource.CPUShares, defaultCpus mresource.CPUShares,
defaultMem mresource.MegaBytes, defaultMem mresource.MegaBytes,
) scheduler.Scheduler { ) scheduler.Scheduler {
@ -81,7 +81,7 @@ func New(
q := queuer.New(queue.NewDelayFIFO(), podUpdates) q := queuer.New(queue.NewDelayFIFO(), podUpdates)
algorithm := algorithm.New(core, podUpdates, ps, prototype, roles, defaultCpus, defaultMem) algorithm := algorithm.New(core, podUpdates, ps, prototype, frameworkRoles, defaultPodRoles, defaultCpus, defaultMem)
podDeleter := deleter.New(core, q) podDeleter := deleter.New(core, q)

View File

@ -509,6 +509,7 @@ func newLifecycleTest(t *testing.T) lifecycleTest {
&podsListWatch.ListWatch, &podsListWatch.ListWatch,
ei, ei,
[]string{"*"}, []string{"*"},
[]string{"*"},
mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerCPULimit,
mresource.DefaultDefaultContainerMemLimit, mresource.DefaultDefaultContainerMemLimit,
) )

View File

@ -51,7 +51,7 @@ const (
Deleted = FlagType("deleted") Deleted = FlagType("deleted")
) )
var defaultRoles = []string{"*"} var starRole = []string{"*"}
// A struct that describes a pod task. // A struct that describes a pod task.
type T struct { type T struct {
@ -70,7 +70,8 @@ type T struct {
podStatus api.PodStatus podStatus api.PodStatus
prototype *mesos.ExecutorInfo // readonly prototype *mesos.ExecutorInfo // readonly
allowedRoles []string // roles under which pods are allowed to be launched frameworkRoles []string // Mesos framework roles, pods are allowed to be launched with those
defaultPodRoles []string // roles under which pods are scheduled if none are specified in labels
podKey string podKey string
launchTime time.Time launchTime time.Time
bindTime time.Time bindTime time.Time
@ -168,34 +169,38 @@ func (t *T) Has(f FlagType) (exists bool) {
return return
} }
// Roles returns the valid roles under which this pod task can be scheduled.
// If the pod has roles labels defined they are being used
// else default pod roles are being returned.
func (t *T) Roles() []string { func (t *T) Roles() []string {
var roles []string
if r, ok := t.Pod.ObjectMeta.Labels[annotation.RolesKey]; ok { if r, ok := t.Pod.ObjectMeta.Labels[annotation.RolesKey]; ok {
roles = strings.Split(r, ",") roles := strings.Split(r, ",")
for i, r := range roles { for i, r := range roles {
roles[i] = strings.TrimSpace(r) roles[i] = strings.TrimSpace(r)
} }
roles = filterRoles(roles, not(emptyRole), not(seenRole())) return filterRoles(
} else { roles,
// no roles label defined, not(emptyRole), not(seenRole()), inRoles(t.frameworkRoles...),
// by convention return the first allowed role )
// to be used for launching the pod task
return []string{t.allowedRoles[0]}
} }
return filterRoles(roles, inRoles(t.allowedRoles...)) // no roles label defined, return defaults
return t.defaultPodRoles
} }
func New(ctx api.Context, id string, pod *api.Pod, prototype *mesos.ExecutorInfo, allowedRoles []string) (*T, error) { func New(ctx api.Context, id string, pod *api.Pod, prototype *mesos.ExecutorInfo, frameworkRoles, defaultPodRoles []string) (*T, error) {
if prototype == nil { if prototype == nil {
return nil, fmt.Errorf("illegal argument: executor is nil") return nil, fmt.Errorf("illegal argument: executor is nil")
} }
if len(allowedRoles) == 0 { if len(frameworkRoles) == 0 {
allowedRoles = defaultRoles frameworkRoles = starRole
}
if len(defaultPodRoles) == 0 {
defaultPodRoles = starRole
} }
key, err := MakePodKey(ctx, pod.Name) key, err := MakePodKey(ctx, pod.Name)
@ -215,7 +220,8 @@ func New(ctx api.Context, id string, pod *api.Pod, prototype *mesos.ExecutorInfo
mapper: NewHostPortMapper(pod), mapper: NewHostPortMapper(pod),
Flags: make(map[FlagType]struct{}), Flags: make(map[FlagType]struct{}),
prototype: prototype, prototype: prototype,
allowedRoles: allowedRoles, frameworkRoles: frameworkRoles,
defaultPodRoles: defaultPodRoles,
} }
task.CreateTime = time.Now() task.CreateTime = time.Now()

View File

@ -34,7 +34,7 @@ const (
t_min_mem = 128 t_min_mem = 128
) )
func fakePodTask(id string, roles ...string) *T { func fakePodTask(id string, allowedRoles, defaultRoles []string) *T {
t, _ := New( t, _ := New(
api.NewDefaultContext(), api.NewDefaultContext(),
"", "",
@ -45,7 +45,8 @@ func fakePodTask(id string, roles ...string) *T {
}, },
}, },
&mesos.ExecutorInfo{}, &mesos.ExecutorInfo{},
roles, allowedRoles,
defaultRoles,
) )
return t return t
@ -62,12 +63,12 @@ func TestRoles(t *testing.T) {
{ {
map[string]string{}, map[string]string{},
nil, nil,
defaultRoles, starRole,
}, },
{ {
map[string]string{"other": "label"}, map[string]string{"other": "label"},
nil, nil,
defaultRoles, starRole,
}, },
{ {
map[string]string{meta.RolesKey: ""}, map[string]string{meta.RolesKey: ""},
@ -100,10 +101,10 @@ func TestRoles(t *testing.T) {
{ {
map[string]string{}, map[string]string{},
[]string{"role1"}, []string{"role1"},
[]string{"role1"}, []string{"*"},
}, },
} { } {
task := fakePodTask("test", tt.frameworkRoles...) task := fakePodTask("test", tt.frameworkRoles, starRole)
task.Pod.ObjectMeta.Labels = tt.labels task.Pod.ObjectMeta.Labels = tt.labels
assert.True(reflect.DeepEqual(task.Roles(), tt.want), "test #%d got %#v want %#v", i, task.Roles(), tt.want) assert.True(reflect.DeepEqual(task.Roles(), tt.want), "test #%d got %#v want %#v", i, task.Roles(), tt.want)
} }
@ -127,7 +128,7 @@ func (mr mockRegistry) Invalidate(hostname string) {
func TestEmptyOffer(t *testing.T) { func TestEmptyOffer(t *testing.T) {
t.Parallel() t.Parallel()
task := fakePodTask("foo") task := fakePodTask("foo", nil, nil)
task.Pod.Spec = api.PodSpec{ task.Pod.Spec = api.PodSpec{
Containers: []api.Container{{ Containers: []api.Container{{
@ -156,7 +157,7 @@ func TestEmptyOffer(t *testing.T) {
func TestNoPortsInPodOrOffer(t *testing.T) { func TestNoPortsInPodOrOffer(t *testing.T) {
t.Parallel() t.Parallel()
task := fakePodTask("foo") task := fakePodTask("foo", nil, nil)
task.Pod.Spec = api.PodSpec{ task.Pod.Spec = api.PodSpec{
Containers: []api.Container{{ Containers: []api.Container{{
@ -206,7 +207,7 @@ func TestNoPortsInPodOrOffer(t *testing.T) {
func TestAcceptOfferPorts(t *testing.T) { func TestAcceptOfferPorts(t *testing.T) {
t.Parallel() t.Parallel()
task := fakePodTask("foo") task := fakePodTask("foo", nil, nil)
pod := &task.Pod pod := &task.Pod
defaultProc := NewDefaultProcurement( defaultProc := NewDefaultProcurement(
@ -376,7 +377,7 @@ func TestNodeSelector(t *testing.T) {
) )
for _, ts := range tests { for _, ts := range tests {
task := fakePodTask("foo") task := fakePodTask("foo", nil, nil)
task.Pod.Spec.NodeSelector = ts.selector task.Pod.Spec.NodeSelector = ts.selector
offer := &mesos.Offer{ offer := &mesos.Offer{
Resources: []*mesos.Resource{ Resources: []*mesos.Resource{

View File

@ -26,7 +26,7 @@ import (
func TestDefaultHostPortMatching(t *testing.T) { func TestDefaultHostPortMatching(t *testing.T) {
t.Parallel() t.Parallel()
task := fakePodTask("foo") task := fakePodTask("foo", nil, nil)
pod := &task.Pod pod := &task.Pod
offer := &mesos.Offer{ offer := &mesos.Offer{
@ -52,7 +52,7 @@ func TestDefaultHostPortMatching(t *testing.T) {
}}, }},
}}, }},
} }
task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil) task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -66,7 +66,7 @@ func TestDefaultHostPortMatching(t *testing.T) {
func TestWildcardHostPortMatching(t *testing.T) { func TestWildcardHostPortMatching(t *testing.T) {
t.Parallel() t.Parallel()
task := fakePodTask("foo") task := fakePodTask("foo", nil, nil)
pod := &task.Pod pod := &task.Pod
offer := &mesos.Offer{} offer := &mesos.Offer{}
@ -100,7 +100,7 @@ func TestWildcardHostPortMatching(t *testing.T) {
}}, }},
}}, }},
} }
task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil) task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -123,7 +123,7 @@ func TestWildcardHostPortMatching(t *testing.T) {
}}, }},
}}, }},
} }
task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil) task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -144,7 +144,7 @@ func TestWildcardHostPortMatching(t *testing.T) {
}}, }},
}}, }},
} }
task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil) task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -190,7 +190,7 @@ func TestWildcardHostPortMatching(t *testing.T) {
}}, }},
}}, }},
} }
task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil) task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -225,12 +225,12 @@ func NewExecutorResourceProcurer(resources []*mesos.Resource, registry executori
wantedCpus := sumResources(filterResources(resources, isScalar, hasName("cpus"))) wantedCpus := sumResources(filterResources(resources, isScalar, hasName("cpus")))
wantedMem := sumResources(filterResources(resources, isScalar, hasName("mem"))) wantedMem := sumResources(filterResources(resources, isScalar, hasName("mem")))
procuredCpu, remaining := procureScalarResources("cpus", wantedCpus, t.allowedRoles, ps.offer.GetResources()) procuredCpu, remaining := procureScalarResources("cpus", wantedCpus, t.frameworkRoles, ps.offer.GetResources())
if procuredCpu == nil { if procuredCpu == nil {
return fmt.Errorf("not enough cpu resources for executor: want=%v", wantedCpus) return fmt.Errorf("not enough cpu resources for executor: want=%v", wantedCpus)
} }
procuredMem, remaining := procureScalarResources("mem", wantedMem, t.allowedRoles, remaining) procuredMem, remaining := procureScalarResources("mem", wantedMem, t.frameworkRoles, remaining)
if procuredMem == nil { if procuredMem == nil {
return fmt.Errorf("not enough mem resources for executor: want=%v", wantedMem) return fmt.Errorf("not enough mem resources for executor: want=%v", wantedMem)
} }

View File

@ -78,6 +78,7 @@ func TestNewPodResourcesProcurement(t *testing.T) {
}, },
executor, executor,
[]string{"*"}, []string{"*"},
[]string{"*"},
) )
procurement := NewPodResourcesProcurement() procurement := NewPodResourcesProcurement()

View File

@ -38,14 +38,14 @@ func TestInMemoryRegistry_RegisterGetUnregister(t *testing.T) {
assert.Empty(tasks) assert.Empty(tasks)
// add a task // add a task
a := fakePodTask("a") a := fakePodTask("a", nil, nil)
a_clone, err := registry.Register(a) a_clone, err := registry.Register(a)
assert.NoError(err) assert.NoError(err)
assert.Equal(a_clone.ID, a.ID) assert.Equal(a_clone.ID, a.ID)
assert.Equal(a_clone.podKey, a.podKey) assert.Equal(a_clone.podKey, a.podKey)
// add another task // add another task
b := fakePodTask("b") b := fakePodTask("b", nil, nil)
b_clone, err := registry.Register(b) b_clone, err := registry.Register(b)
assert.NoError(err) assert.NoError(err)
assert.Equal(b_clone.ID, b.ID) assert.Equal(b_clone.ID, b.ID)
@ -106,7 +106,7 @@ func TestInMemoryRegistry_RegisterGetUnregister(t *testing.T) {
assertContains(t, a, tasks...) assertContains(t, a, tasks...)
// unregister a task not registered // unregister a task not registered
unregistered_task := fakePodTask("unregistered-task") unregistered_task := fakePodTask("unregistered-task", nil, nil)
registry.Unregister(unregistered_task) registry.Unregister(unregistered_task)
} }
@ -124,7 +124,7 @@ func TestInMemoryRegistry_State(t *testing.T) {
registry := NewInMemoryRegistry() registry := NewInMemoryRegistry()
// add a task // add a task
a := fakePodTask("a") a := fakePodTask("a", nil, nil)
a_clone, err := registry.Register(a) a_clone, err := registry.Register(a)
assert.NoError(err) assert.NoError(err)
assert.Equal(a.State, a_clone.State) assert.Equal(a.State, a_clone.State)
@ -167,7 +167,7 @@ func TestInMemoryRegistry_Update(t *testing.T) {
// create registry // create registry
registry := NewInMemoryRegistry() registry := NewInMemoryRegistry()
a := fakePodTask("a") a := fakePodTask("a", nil, nil)
registry.Register(a.Clone()) // here clone a because we change it below registry.Register(a.Clone()) // here clone a because we change it below
// state changes are ignored // state changes are ignored
@ -225,7 +225,7 @@ func TestInMemoryRegistry_Update(t *testing.T) {
assert.Error(err) assert.Error(err)
// update unknown task // update unknown task
unknown_task := fakePodTask("unknown-task") unknown_task := fakePodTask("unknown-task", nil, nil)
err = registry.Update(unknown_task) err = registry.Update(unknown_task)
assert.Error(err) assert.Error(err)
@ -256,7 +256,7 @@ func testStateTrace(t *testing.T, transitions []transition) *Registry {
assert := assert.New(t) assert := assert.New(t)
registry := NewInMemoryRegistry() registry := NewInMemoryRegistry()
a := fakePodTask("a") a := fakePodTask("a", nil, nil)
a, _ = registry.Register(a) a, _ = registry.Register(a)
// initial pending state // initial pending state

View File

@ -75,6 +75,7 @@ import (
"k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/master/ports" "k8s.io/kubernetes/pkg/master/ports"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/util/sets"
// lock to this API version, compilation will fail when this becomes unsupported // lock to this API version, compilation will fail when this becomes unsupported
_ "k8s.io/kubernetes/pkg/api/v1" _ "k8s.io/kubernetes/pkg/api/v1"
@ -83,7 +84,8 @@ import (
const ( const (
defaultMesosMaster = "localhost:5050" defaultMesosMaster = "localhost:5050"
defaultMesosUser = "root" // should have privs to execute docker and iptables commands defaultMesosUser = "root" // should have privs to execute docker and iptables commands
defaultMesosRoles = "*" defaultFrameworkRoles = "*"
defaultPodRoles = "*"
defaultReconcileInterval = 300 // 5m default task reconciliation interval defaultReconcileInterval = 300 // 5m default task reconciliation interval
defaultReconcileCooldown = 15 * time.Second defaultReconcileCooldown = 15 * time.Second
defaultNodeRelistPeriod = 5 * time.Minute defaultNodeRelistPeriod = 5 * time.Minute
@ -106,7 +108,8 @@ type SchedulerServer struct {
proxyPath string proxyPath string
mesosMaster string mesosMaster string
mesosUser string mesosUser string
mesosRoles []string frameworkRoles []string
defaultPodRoles []string
mesosAuthPrincipal string mesosAuthPrincipal string
mesosAuthSecretFile string mesosAuthSecretFile string
mesosCgroupPrefix string mesosCgroupPrefix string
@ -200,7 +203,8 @@ func NewSchedulerServer() *SchedulerServer {
mesosUser: defaultMesosUser, mesosUser: defaultMesosUser,
mesosExecutorCPUs: defaultExecutorCPUs, mesosExecutorCPUs: defaultExecutorCPUs,
mesosExecutorMem: defaultExecutorMem, mesosExecutorMem: defaultExecutorMem,
mesosRoles: strings.Split(defaultMesosRoles, ","), frameworkRoles: strings.Split(defaultFrameworkRoles, ","),
defaultPodRoles: strings.Split(defaultPodRoles, ","),
reconcileInterval: defaultReconcileInterval, reconcileInterval: defaultReconcileInterval,
reconcileCooldown: defaultReconcileCooldown, reconcileCooldown: defaultReconcileCooldown,
checkpoint: true, checkpoint: true,
@ -239,7 +243,8 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.mesosMaster, "mesos-master", s.mesosMaster, "Location of the Mesos master. The format is a comma-delimited list of of hosts like zk://host1:port,host2:port/mesos. If using ZooKeeper, pay particular attention to the leading zk:// and trailing /mesos! If not using ZooKeeper, standard URLs like http://localhost are also acceptable.") fs.StringVar(&s.mesosMaster, "mesos-master", s.mesosMaster, "Location of the Mesos master. The format is a comma-delimited list of of hosts like zk://host1:port,host2:port/mesos. If using ZooKeeper, pay particular attention to the leading zk:// and trailing /mesos! If not using ZooKeeper, standard URLs like http://localhost are also acceptable.")
fs.StringVar(&s.mesosUser, "mesos-user", s.mesosUser, "Mesos user for this framework, defaults to root.") fs.StringVar(&s.mesosUser, "mesos-user", s.mesosUser, "Mesos user for this framework, defaults to root.")
fs.StringSliceVar(&s.mesosRoles, "mesos-roles", s.mesosRoles, "Mesos framework roles. The first role will be used to launch pods having no "+meta.RolesKey+" label.") fs.StringSliceVar(&s.frameworkRoles, "mesos-framework-roles", s.frameworkRoles, "Mesos framework roles that the scheduler receives offers for. Currently only \"*\" and optionally one additional role are supported.")
fs.StringSliceVar(&s.defaultPodRoles, "mesos-default-pod-roles", s.defaultPodRoles, "Roles that will be used to launch pods having no "+meta.RolesKey+" label.")
fs.StringVar(&s.mesosAuthPrincipal, "mesos-authentication-principal", s.mesosAuthPrincipal, "Mesos authentication principal.") fs.StringVar(&s.mesosAuthPrincipal, "mesos-authentication-principal", s.mesosAuthPrincipal, "Mesos authentication principal.")
fs.StringVar(&s.mesosAuthSecretFile, "mesos-authentication-secret-file", s.mesosAuthSecretFile, "Mesos authentication secret file.") fs.StringVar(&s.mesosAuthSecretFile, "mesos-authentication-secret-file", s.mesosAuthSecretFile, "Mesos authentication secret file.")
fs.StringVar(&s.mesosAuthProvider, "mesos-authentication-provider", s.mesosAuthProvider, fmt.Sprintf("Authentication provider to use, default is SASL that supports mechanisms: %+v", mech.ListSupported())) fs.StringVar(&s.mesosAuthProvider, "mesos-authentication-provider", s.mesosAuthProvider, fmt.Sprintf("Authentication provider to use, default is SASL that supports mechanisms: %+v", mech.ListSupported()))
@ -535,10 +540,16 @@ func (s *SchedulerServer) getDriver() (driver bindings.SchedulerDriver) {
} }
func (s *SchedulerServer) Run(hks hyperkube.Interface, _ []string) error { func (s *SchedulerServer) Run(hks hyperkube.Interface, _ []string) error {
if n := len(s.mesosRoles); n == 0 || n > 2 || (n == 2 && s.mesosRoles[0] != "*" && s.mesosRoles[1] != "*") { if n := len(s.frameworkRoles); n == 0 || n > 2 || (n == 2 && s.frameworkRoles[0] != "*" && s.frameworkRoles[1] != "*") {
log.Fatalf(`only one custom role allowed in addition to "*"`) log.Fatalf(`only one custom role allowed in addition to "*"`)
} }
fwSet := sets.NewString(s.frameworkRoles...)
podSet := sets.NewString(s.defaultPodRoles...)
if !fwSet.IsSuperset(podSet) {
log.Fatalf("all default pod roles %q must be included in framework roles %q", s.defaultPodRoles, s.frameworkRoles)
}
// get scheduler low-level config // get scheduler low-level config
sc := schedcfg.CreateDefaultConfig() sc := schedcfg.CreateDefaultConfig()
if s.schedulerConfigFileName != "" { if s.schedulerConfigFileName != "" {
@ -777,7 +788,8 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
s.mux, s.mux,
lw, lw,
eiPrototype, eiPrototype,
s.mesosRoles, s.frameworkRoles,
s.defaultPodRoles,
s.defaultContainerCPULimit, s.defaultContainerCPULimit,
s.defaultContainerMemLimit, s.defaultContainerMemLimit,
) )
@ -890,7 +902,7 @@ func (s *SchedulerServer) buildFrameworkInfo() (info *mesos.FrameworkInfo, cred
// set the framework's role to the first configured non-star role. // set the framework's role to the first configured non-star role.
// once Mesos supports multiple roles simply set the configured mesos roles slice. // once Mesos supports multiple roles simply set the configured mesos roles slice.
for _, role := range s.mesosRoles { for _, role := range s.frameworkRoles {
if role != "*" { if role != "*" {
// mesos currently supports only one role per framework info // mesos currently supports only one role per framework info
// The framework will be offered role's resources as well as * resources // The framework will be offered role's resources as well as * resources

View File

@ -194,7 +194,8 @@ mesos-executor-mem
mesos-launch-grace-period mesos-launch-grace-period
mesos-master mesos-master
mesos-sandbox-overlay mesos-sandbox-overlay
mesos-roles mesos-framework-roles
mesos-default-pod-roles
mesos-user mesos-user
minimum-container-ttl-duration minimum-container-ttl-duration
minion-max-log-age minion-max-log-age

View File

@ -87,7 +87,7 @@ var _ = Describe("Mesos", func() {
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: podName, Name: podName,
Labels: map[string]string{ Labels: map[string]string{
"k8s.mesosphere.io/roles": "role1", "k8s.mesosphere.io/roles": "public",
}, },
}, },
Spec: api.PodSpec{ Spec: api.PodSpec{
@ -106,10 +106,12 @@ var _ = Describe("Mesos", func() {
expectNoError(err) expectNoError(err)
nodeClient := framework.Client.Nodes() nodeClient := framework.Client.Nodes()
role1 := labels.SelectorFromSet(map[string]string{
"k8s.mesosphere.io/attribute-role": "role1", // schedule onto node with rack=2 being assigned to the "public" role
rack2 := labels.SelectorFromSet(map[string]string{
"k8s.mesosphere.io/attribute-rack": "2",
}) })
options := api.ListOptions{LabelSelector: role1} options := api.ListOptions{LabelSelector: rack2}
nodes, err := nodeClient.List(options) nodes, err := nodeClient.List(options)
expectNoError(err) expectNoError(err)