From 67e98fcfa832a0ca8eefe4254e20396a40dc0113 Mon Sep 17 00:00:00 2001 From: Sergiusz Urbaniak Date: Tue, 8 Dec 2015 10:50:04 +0100 Subject: [PATCH] executor, scheduler: make default pod roles configurable Currently if a pod is being scheduled with no meta.RolesKey label attached to it, per convention the first configured mesos (framework) role is being used. This is quite limiting and also lets e2e tests fail. This commit introduces a new configuration option "--mesos-default-pod-roles" defaulting to "*" which defines the default pod roles in case the meta.RolesKey pod label is missing. --- cluster/mesos/docker/docker-compose.yml | 10 +-- contrib/mesos/pkg/executor/executor_test.go | 2 + contrib/mesos/pkg/executor/node.go | 13 ++-- .../components/algorithm/algorithm.go | 34 +++++----- .../components/deleter/deleter_test.go | 2 + .../pkg/scheduler/components/scheduler.go | 4 +- .../scheduler/integration/integration_test.go | 1 + .../mesos/pkg/scheduler/podtask/pod_task.go | 64 ++++++++++--------- .../pkg/scheduler/podtask/pod_task_test.go | 21 +++--- .../scheduler/podtask/port_mapping_test.go | 14 ++-- .../pkg/scheduler/podtask/procurement.go | 4 +- .../pkg/scheduler/podtask/procurement_test.go | 1 + .../pkg/scheduler/podtask/registry_test.go | 14 ++-- .../mesos/pkg/scheduler/service/service.go | 26 ++++++-- hack/verify-flags/known-flags.txt | 3 +- test/e2e/mesos.go | 10 +-- 16 files changed, 130 insertions(+), 93 deletions(-) diff --git a/cluster/mesos/docker/docker-compose.yml b/cluster/mesos/docker/docker-compose.yml index 3e3026242da..f337c8b0233 100644 --- a/cluster/mesos/docker/docker-compose.yml +++ b/cluster/mesos/docker/docker-compose.yml @@ -23,7 +23,7 @@ mesosmaster1: - MESOS_QUORUM=1 - MESOS_REGISTRY=in_memory - MESOS_WORK_DIR=/var/lib/mesos - - MESOS_ROLES=role1 + - MESOS_ROLES=public links: - etcd - "ambassador:apiserver" @@ -38,12 +38,13 @@ mesosslave: - > NAME=$$(cut -f2 -d/ <<<$${MESOSMASTER1_NAME}) && N=$${NAME##*_} && + PUBLIC_RESOURCES="$$(if [ $${N} = 2 ]; then echo ";cpus(public):2;mem(public):640;ports(public):[7000-7999]"; fi)" && DOCKER_NETWORK_OFFSET=0.0.$${N}.0 exec wrapdocker mesos-slave --work_dir="/var/tmp/mesos/$${N}" - --attributes="rack:$${N};gen:201$${N};role:role$${N}" + --attributes="rack:$${N};gen:201$${N}" --hostname=$$(getent hosts mesosslave | cut -d' ' -f1 | sort -u | tail -1) - --resources="cpus:4;mem:1280;disk:25600;ports:[8000-21099];cpus(role$${N}):1;mem(role$${N}):640;disk(role$${N}):25600;ports(role$${N}):[7000-7999]" + --resources="cpus:4;mem:1280;disk:25600;ports:[8000-21099]$${PUBLIC_RESOURCES}" command: [] environment: - MESOS_MASTER=mesosmaster1:5050 @@ -144,7 +145,8 @@ scheduler: --mesos-executor-cpus=1.0 --mesos-sandbox-overlay=/opt/sandbox-overlay.tar.gz --static-pods-config=/opt/static-pods - --mesos-roles=*,role1 + --mesos-framework-roles=*,public + --mesos-default-pod-roles=*,public --v=4 --executor-logv=4 --profiling=true diff --git a/contrib/mesos/pkg/executor/executor_test.go b/contrib/mesos/pkg/executor/executor_test.go index 569b933e6ea..024882b6826 100644 --- a/contrib/mesos/pkg/executor/executor_test.go +++ b/contrib/mesos/pkg/executor/executor_test.go @@ -175,6 +175,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { pod, executorinfo, nil, + nil, ) assert.Equal(t, nil, err, "must be able to create a task from a pod") @@ -390,6 +391,7 @@ func TestExecutorFrameworkMessage(t *testing.T) { pod, executorinfo, nil, + nil, ) podTask.Spec = &podtask.Spec{ diff --git a/contrib/mesos/pkg/executor/node.go b/contrib/mesos/pkg/executor/node.go index 2169c016e4a..1fe3662492c 100644 --- a/contrib/mesos/pkg/executor/node.go +++ b/contrib/mesos/pkg/executor/node.go @@ -36,9 +36,9 @@ func nodeInfo(si *mesos.SlaveInfo, ei *mesos.ExecutorInfo) NodeInfo { } switch r.GetName() { case "cpus": - executorCPU = r.GetScalar().GetValue() + executorCPU += r.GetScalar().GetValue() case "mem": - executorMem = r.GetScalar().GetValue() + executorMem += r.GetScalar().GetValue() } } } @@ -55,10 +55,15 @@ func nodeInfo(si *mesos.SlaveInfo, ei *mesos.ExecutorInfo) NodeInfo { // We intentionally take the floor of executorCPU because cores are integers // and we would loose a complete cpu here if the value is <1. // TODO(sttts): switch to float64 when "Machine Allocables" are implemented - ni.Cores = int(r.GetScalar().GetValue() - float64(int(executorCPU))) + ni.Cores += int(r.GetScalar().GetValue()) case "mem": - ni.Mem = int64(r.GetScalar().GetValue()-executorMem) * 1024 * 1024 + ni.Mem += int64(r.GetScalar().GetValue()) * 1024 * 1024 } } + + // TODO(sttts): subtract executorCPU/Mem from static pod resources before subtracting them from the capacity + ni.Cores -= int(executorCPU) + ni.Mem -= int64(executorMem) * 1024 * 1024 + return ni } diff --git a/contrib/mesos/pkg/scheduler/components/algorithm/algorithm.go b/contrib/mesos/pkg/scheduler/components/algorithm/algorithm.go index 8b58258fecf..2383a6316a6 100644 --- a/contrib/mesos/pkg/scheduler/components/algorithm/algorithm.go +++ b/contrib/mesos/pkg/scheduler/components/algorithm/algorithm.go @@ -42,13 +42,14 @@ type SchedulerAlgorithm interface { // SchedulerAlgorithm implements the algorithm.ScheduleAlgorithm interface type schedulerAlgorithm struct { - sched scheduler.Scheduler - podUpdates queue.FIFO - podScheduler podschedulers.PodScheduler - prototype *mesosproto.ExecutorInfo - roles []string - defaultCpus mresource.CPUShares - defaultMem mresource.MegaBytes + sched scheduler.Scheduler + podUpdates queue.FIFO + podScheduler podschedulers.PodScheduler + prototype *mesosproto.ExecutorInfo + frameworkRoles []string + defaultPodRoles []string + defaultCpus mresource.CPUShares + defaultMem mresource.MegaBytes } // New returns a new SchedulerAlgorithm @@ -58,18 +59,19 @@ func New( podUpdates queue.FIFO, podScheduler podschedulers.PodScheduler, prototype *mesosproto.ExecutorInfo, - roles []string, + frameworkRoles, defaultPodRoles []string, defaultCpus mresource.CPUShares, defaultMem mresource.MegaBytes, ) SchedulerAlgorithm { return &schedulerAlgorithm{ - sched: sched, - podUpdates: podUpdates, - podScheduler: podScheduler, - roles: roles, - prototype: prototype, - defaultCpus: defaultCpus, - defaultMem: defaultMem, + sched: sched, + podUpdates: podUpdates, + podScheduler: podScheduler, + frameworkRoles: frameworkRoles, + defaultPodRoles: defaultPodRoles, + prototype: prototype, + defaultCpus: defaultCpus, + defaultMem: defaultMem, } } @@ -107,7 +109,7 @@ func (k *schedulerAlgorithm) Schedule(pod *api.Pod) (string, error) { // From here on we can expect that the pod spec of a task has proper limits for CPU and memory. k.limitPod(pod) - podTask, err := podtask.New(ctx, "", pod, k.prototype, k.roles) + podTask, err := podtask.New(ctx, "", pod, k.prototype, k.frameworkRoles, k.defaultPodRoles) if err != nil { log.Warningf("aborting Schedule, unable to create podtask object %+v: %v", pod, err) return "", err diff --git a/contrib/mesos/pkg/scheduler/components/deleter/deleter_test.go b/contrib/mesos/pkg/scheduler/components/deleter/deleter_test.go index fbc1d634b0d..044fdf56452 100644 --- a/contrib/mesos/pkg/scheduler/components/deleter/deleter_test.go +++ b/contrib/mesos/pkg/scheduler/components/deleter/deleter_test.go @@ -67,6 +67,7 @@ func TestDeleteOne_PendingPod(t *testing.T) { pod.Pod, &mesosproto.ExecutorInfo{}, nil, + nil, ) if err != nil { t.Fatalf("failed to create task: %v", err) @@ -113,6 +114,7 @@ func TestDeleteOne_Running(t *testing.T) { pod.Pod, &mesosproto.ExecutorInfo{}, nil, + nil, ) if err != nil { t.Fatalf("unexpected error: %v", err) diff --git a/contrib/mesos/pkg/scheduler/components/scheduler.go b/contrib/mesos/pkg/scheduler/components/scheduler.go index 57a716f0ec5..bbf3247e859 100644 --- a/contrib/mesos/pkg/scheduler/components/scheduler.go +++ b/contrib/mesos/pkg/scheduler/components/scheduler.go @@ -65,7 +65,7 @@ func New( mux *http.ServeMux, lw *cache.ListWatch, prototype *mesos.ExecutorInfo, - roles []string, + frameworkRoles, defaultPodRoles []string, defaultCpus mresource.CPUShares, defaultMem mresource.MegaBytes, ) scheduler.Scheduler { @@ -81,7 +81,7 @@ func New( q := queuer.New(queue.NewDelayFIFO(), podUpdates) - algorithm := algorithm.New(core, podUpdates, ps, prototype, roles, defaultCpus, defaultMem) + algorithm := algorithm.New(core, podUpdates, ps, prototype, frameworkRoles, defaultPodRoles, defaultCpus, defaultMem) podDeleter := deleter.New(core, q) diff --git a/contrib/mesos/pkg/scheduler/integration/integration_test.go b/contrib/mesos/pkg/scheduler/integration/integration_test.go index 547992f80ee..1e336a7f828 100644 --- a/contrib/mesos/pkg/scheduler/integration/integration_test.go +++ b/contrib/mesos/pkg/scheduler/integration/integration_test.go @@ -509,6 +509,7 @@ func newLifecycleTest(t *testing.T) lifecycleTest { &podsListWatch.ListWatch, ei, []string{"*"}, + []string{"*"}, mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit, ) diff --git a/contrib/mesos/pkg/scheduler/podtask/pod_task.go b/contrib/mesos/pkg/scheduler/podtask/pod_task.go index 1be0778824d..30740fed6c4 100644 --- a/contrib/mesos/pkg/scheduler/podtask/pod_task.go +++ b/contrib/mesos/pkg/scheduler/podtask/pod_task.go @@ -51,7 +51,7 @@ const ( Deleted = FlagType("deleted") ) -var defaultRoles = []string{"*"} +var starRole = []string{"*"} // A struct that describes a pod task. type T struct { @@ -68,13 +68,14 @@ type T struct { CreateTime time.Time UpdatedTime time.Time // time of the most recent StatusUpdate we've seen from the mesos master - podStatus api.PodStatus - prototype *mesos.ExecutorInfo // readonly - allowedRoles []string // roles under which pods are allowed to be launched - podKey string - launchTime time.Time - bindTime time.Time - mapper HostPortMapper + podStatus api.PodStatus + prototype *mesos.ExecutorInfo // readonly + frameworkRoles []string // Mesos framework roles, pods are allowed to be launched with those + defaultPodRoles []string // roles under which pods are scheduled if none are specified in labels + podKey string + launchTime time.Time + bindTime time.Time + mapper HostPortMapper } type Port struct { @@ -168,34 +169,38 @@ func (t *T) Has(f FlagType) (exists bool) { return } +// Roles returns the valid roles under which this pod task can be scheduled. +// If the pod has roles labels defined they are being used +// else default pod roles are being returned. func (t *T) Roles() []string { - var roles []string - if r, ok := t.Pod.ObjectMeta.Labels[annotation.RolesKey]; ok { - roles = strings.Split(r, ",") + roles := strings.Split(r, ",") for i, r := range roles { roles[i] = strings.TrimSpace(r) } - roles = filterRoles(roles, not(emptyRole), not(seenRole())) - } else { - // no roles label defined, - // by convention return the first allowed role - // to be used for launching the pod task - return []string{t.allowedRoles[0]} + return filterRoles( + roles, + not(emptyRole), not(seenRole()), inRoles(t.frameworkRoles...), + ) } - return filterRoles(roles, inRoles(t.allowedRoles...)) + // no roles label defined, return defaults + return t.defaultPodRoles } -func New(ctx api.Context, id string, pod *api.Pod, prototype *mesos.ExecutorInfo, allowedRoles []string) (*T, error) { +func New(ctx api.Context, id string, pod *api.Pod, prototype *mesos.ExecutorInfo, frameworkRoles, defaultPodRoles []string) (*T, error) { if prototype == nil { return nil, fmt.Errorf("illegal argument: executor is nil") } - if len(allowedRoles) == 0 { - allowedRoles = defaultRoles + if len(frameworkRoles) == 0 { + frameworkRoles = starRole + } + + if len(defaultPodRoles) == 0 { + defaultPodRoles = starRole } key, err := MakePodKey(ctx, pod.Name) @@ -208,14 +213,15 @@ func New(ctx api.Context, id string, pod *api.Pod, prototype *mesos.ExecutorInfo } task := &T{ - ID: id, - Pod: *pod, - State: StatePending, - podKey: key, - mapper: NewHostPortMapper(pod), - Flags: make(map[FlagType]struct{}), - prototype: prototype, - allowedRoles: allowedRoles, + ID: id, + Pod: *pod, + State: StatePending, + podKey: key, + mapper: NewHostPortMapper(pod), + Flags: make(map[FlagType]struct{}), + prototype: prototype, + frameworkRoles: frameworkRoles, + defaultPodRoles: defaultPodRoles, } task.CreateTime = time.Now() diff --git a/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go b/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go index 20c0bed269c..92671736240 100644 --- a/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go +++ b/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go @@ -34,7 +34,7 @@ const ( t_min_mem = 128 ) -func fakePodTask(id string, roles ...string) *T { +func fakePodTask(id string, allowedRoles, defaultRoles []string) *T { t, _ := New( api.NewDefaultContext(), "", @@ -45,7 +45,8 @@ func fakePodTask(id string, roles ...string) *T { }, }, &mesos.ExecutorInfo{}, - roles, + allowedRoles, + defaultRoles, ) return t @@ -62,12 +63,12 @@ func TestRoles(t *testing.T) { { map[string]string{}, nil, - defaultRoles, + starRole, }, { map[string]string{"other": "label"}, nil, - defaultRoles, + starRole, }, { map[string]string{meta.RolesKey: ""}, @@ -100,10 +101,10 @@ func TestRoles(t *testing.T) { { map[string]string{}, []string{"role1"}, - []string{"role1"}, + []string{"*"}, }, } { - task := fakePodTask("test", tt.frameworkRoles...) + task := fakePodTask("test", tt.frameworkRoles, starRole) task.Pod.ObjectMeta.Labels = tt.labels assert.True(reflect.DeepEqual(task.Roles(), tt.want), "test #%d got %#v want %#v", i, task.Roles(), tt.want) } @@ -127,7 +128,7 @@ func (mr mockRegistry) Invalidate(hostname string) { func TestEmptyOffer(t *testing.T) { t.Parallel() - task := fakePodTask("foo") + task := fakePodTask("foo", nil, nil) task.Pod.Spec = api.PodSpec{ Containers: []api.Container{{ @@ -156,7 +157,7 @@ func TestEmptyOffer(t *testing.T) { func TestNoPortsInPodOrOffer(t *testing.T) { t.Parallel() - task := fakePodTask("foo") + task := fakePodTask("foo", nil, nil) task.Pod.Spec = api.PodSpec{ Containers: []api.Container{{ @@ -206,7 +207,7 @@ func TestNoPortsInPodOrOffer(t *testing.T) { func TestAcceptOfferPorts(t *testing.T) { t.Parallel() - task := fakePodTask("foo") + task := fakePodTask("foo", nil, nil) pod := &task.Pod defaultProc := NewDefaultProcurement( @@ -376,7 +377,7 @@ func TestNodeSelector(t *testing.T) { ) for _, ts := range tests { - task := fakePodTask("foo") + task := fakePodTask("foo", nil, nil) task.Pod.Spec.NodeSelector = ts.selector offer := &mesos.Offer{ Resources: []*mesos.Resource{ diff --git a/contrib/mesos/pkg/scheduler/podtask/port_mapping_test.go b/contrib/mesos/pkg/scheduler/podtask/port_mapping_test.go index 60f26c82949..bd9730c1e7a 100644 --- a/contrib/mesos/pkg/scheduler/podtask/port_mapping_test.go +++ b/contrib/mesos/pkg/scheduler/podtask/port_mapping_test.go @@ -26,7 +26,7 @@ import ( func TestDefaultHostPortMatching(t *testing.T) { t.Parallel() - task := fakePodTask("foo") + task := fakePodTask("foo", nil, nil) pod := &task.Pod offer := &mesos.Offer{ @@ -52,7 +52,7 @@ func TestDefaultHostPortMatching(t *testing.T) { }}, }}, } - task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil) + task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil, nil) if err != nil { t.Fatal(err) } @@ -66,7 +66,7 @@ func TestDefaultHostPortMatching(t *testing.T) { func TestWildcardHostPortMatching(t *testing.T) { t.Parallel() - task := fakePodTask("foo") + task := fakePodTask("foo", nil, nil) pod := &task.Pod offer := &mesos.Offer{} @@ -100,7 +100,7 @@ func TestWildcardHostPortMatching(t *testing.T) { }}, }}, } - task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil) + task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil, nil) if err != nil { t.Fatal(err) } @@ -123,7 +123,7 @@ func TestWildcardHostPortMatching(t *testing.T) { }}, }}, } - task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil) + task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil, nil) if err != nil { t.Fatal(err) } @@ -144,7 +144,7 @@ func TestWildcardHostPortMatching(t *testing.T) { }}, }}, } - task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil) + task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil, nil) if err != nil { t.Fatal(err) } @@ -190,7 +190,7 @@ func TestWildcardHostPortMatching(t *testing.T) { }}, }}, } - task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil) + task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil, nil) if err != nil { t.Fatal(err) } diff --git a/contrib/mesos/pkg/scheduler/podtask/procurement.go b/contrib/mesos/pkg/scheduler/podtask/procurement.go index 5ccadac0f73..c86699332e3 100644 --- a/contrib/mesos/pkg/scheduler/podtask/procurement.go +++ b/contrib/mesos/pkg/scheduler/podtask/procurement.go @@ -225,12 +225,12 @@ func NewExecutorResourceProcurer(resources []*mesos.Resource, registry executori wantedCpus := sumResources(filterResources(resources, isScalar, hasName("cpus"))) wantedMem := sumResources(filterResources(resources, isScalar, hasName("mem"))) - procuredCpu, remaining := procureScalarResources("cpus", wantedCpus, t.allowedRoles, ps.offer.GetResources()) + procuredCpu, remaining := procureScalarResources("cpus", wantedCpus, t.frameworkRoles, ps.offer.GetResources()) if procuredCpu == nil { return fmt.Errorf("not enough cpu resources for executor: want=%v", wantedCpus) } - procuredMem, remaining := procureScalarResources("mem", wantedMem, t.allowedRoles, remaining) + procuredMem, remaining := procureScalarResources("mem", wantedMem, t.frameworkRoles, remaining) if procuredMem == nil { return fmt.Errorf("not enough mem resources for executor: want=%v", wantedMem) } diff --git a/contrib/mesos/pkg/scheduler/podtask/procurement_test.go b/contrib/mesos/pkg/scheduler/podtask/procurement_test.go index 1ef85b43fc5..910493dc505 100644 --- a/contrib/mesos/pkg/scheduler/podtask/procurement_test.go +++ b/contrib/mesos/pkg/scheduler/podtask/procurement_test.go @@ -78,6 +78,7 @@ func TestNewPodResourcesProcurement(t *testing.T) { }, executor, []string{"*"}, + []string{"*"}, ) procurement := NewPodResourcesProcurement() diff --git a/contrib/mesos/pkg/scheduler/podtask/registry_test.go b/contrib/mesos/pkg/scheduler/podtask/registry_test.go index c6a8388479c..d670a9318ae 100644 --- a/contrib/mesos/pkg/scheduler/podtask/registry_test.go +++ b/contrib/mesos/pkg/scheduler/podtask/registry_test.go @@ -38,14 +38,14 @@ func TestInMemoryRegistry_RegisterGetUnregister(t *testing.T) { assert.Empty(tasks) // add a task - a := fakePodTask("a") + a := fakePodTask("a", nil, nil) a_clone, err := registry.Register(a) assert.NoError(err) assert.Equal(a_clone.ID, a.ID) assert.Equal(a_clone.podKey, a.podKey) // add another task - b := fakePodTask("b") + b := fakePodTask("b", nil, nil) b_clone, err := registry.Register(b) assert.NoError(err) assert.Equal(b_clone.ID, b.ID) @@ -106,7 +106,7 @@ func TestInMemoryRegistry_RegisterGetUnregister(t *testing.T) { assertContains(t, a, tasks...) // unregister a task not registered - unregistered_task := fakePodTask("unregistered-task") + unregistered_task := fakePodTask("unregistered-task", nil, nil) registry.Unregister(unregistered_task) } @@ -124,7 +124,7 @@ func TestInMemoryRegistry_State(t *testing.T) { registry := NewInMemoryRegistry() // add a task - a := fakePodTask("a") + a := fakePodTask("a", nil, nil) a_clone, err := registry.Register(a) assert.NoError(err) assert.Equal(a.State, a_clone.State) @@ -167,7 +167,7 @@ func TestInMemoryRegistry_Update(t *testing.T) { // create registry registry := NewInMemoryRegistry() - a := fakePodTask("a") + a := fakePodTask("a", nil, nil) registry.Register(a.Clone()) // here clone a because we change it below // state changes are ignored @@ -225,7 +225,7 @@ func TestInMemoryRegistry_Update(t *testing.T) { assert.Error(err) // update unknown task - unknown_task := fakePodTask("unknown-task") + unknown_task := fakePodTask("unknown-task", nil, nil) err = registry.Update(unknown_task) assert.Error(err) @@ -256,7 +256,7 @@ func testStateTrace(t *testing.T, transitions []transition) *Registry { assert := assert.New(t) registry := NewInMemoryRegistry() - a := fakePodTask("a") + a := fakePodTask("a", nil, nil) a, _ = registry.Register(a) // initial pending state diff --git a/contrib/mesos/pkg/scheduler/service/service.go b/contrib/mesos/pkg/scheduler/service/service.go index 194d3eac08a..0ba7d58f734 100644 --- a/contrib/mesos/pkg/scheduler/service/service.go +++ b/contrib/mesos/pkg/scheduler/service/service.go @@ -75,6 +75,7 @@ import ( "k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/master/ports" etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" + "k8s.io/kubernetes/pkg/util/sets" // lock to this API version, compilation will fail when this becomes unsupported _ "k8s.io/kubernetes/pkg/api/v1" @@ -83,7 +84,8 @@ import ( const ( defaultMesosMaster = "localhost:5050" defaultMesosUser = "root" // should have privs to execute docker and iptables commands - defaultMesosRoles = "*" + defaultFrameworkRoles = "*" + defaultPodRoles = "*" defaultReconcileInterval = 300 // 5m default task reconciliation interval defaultReconcileCooldown = 15 * time.Second defaultNodeRelistPeriod = 5 * time.Minute @@ -106,7 +108,8 @@ type SchedulerServer struct { proxyPath string mesosMaster string mesosUser string - mesosRoles []string + frameworkRoles []string + defaultPodRoles []string mesosAuthPrincipal string mesosAuthSecretFile string mesosCgroupPrefix string @@ -200,7 +203,8 @@ func NewSchedulerServer() *SchedulerServer { mesosUser: defaultMesosUser, mesosExecutorCPUs: defaultExecutorCPUs, mesosExecutorMem: defaultExecutorMem, - mesosRoles: strings.Split(defaultMesosRoles, ","), + frameworkRoles: strings.Split(defaultFrameworkRoles, ","), + defaultPodRoles: strings.Split(defaultPodRoles, ","), reconcileInterval: defaultReconcileInterval, reconcileCooldown: defaultReconcileCooldown, checkpoint: true, @@ -239,7 +243,8 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) { fs.StringVar(&s.mesosMaster, "mesos-master", s.mesosMaster, "Location of the Mesos master. The format is a comma-delimited list of of hosts like zk://host1:port,host2:port/mesos. If using ZooKeeper, pay particular attention to the leading zk:// and trailing /mesos! If not using ZooKeeper, standard URLs like http://localhost are also acceptable.") fs.StringVar(&s.mesosUser, "mesos-user", s.mesosUser, "Mesos user for this framework, defaults to root.") - fs.StringSliceVar(&s.mesosRoles, "mesos-roles", s.mesosRoles, "Mesos framework roles. The first role will be used to launch pods having no "+meta.RolesKey+" label.") + fs.StringSliceVar(&s.frameworkRoles, "mesos-framework-roles", s.frameworkRoles, "Mesos framework roles that the scheduler receives offers for. Currently only \"*\" and optionally one additional role are supported.") + fs.StringSliceVar(&s.defaultPodRoles, "mesos-default-pod-roles", s.defaultPodRoles, "Roles that will be used to launch pods having no "+meta.RolesKey+" label.") fs.StringVar(&s.mesosAuthPrincipal, "mesos-authentication-principal", s.mesosAuthPrincipal, "Mesos authentication principal.") fs.StringVar(&s.mesosAuthSecretFile, "mesos-authentication-secret-file", s.mesosAuthSecretFile, "Mesos authentication secret file.") fs.StringVar(&s.mesosAuthProvider, "mesos-authentication-provider", s.mesosAuthProvider, fmt.Sprintf("Authentication provider to use, default is SASL that supports mechanisms: %+v", mech.ListSupported())) @@ -535,10 +540,16 @@ func (s *SchedulerServer) getDriver() (driver bindings.SchedulerDriver) { } func (s *SchedulerServer) Run(hks hyperkube.Interface, _ []string) error { - if n := len(s.mesosRoles); n == 0 || n > 2 || (n == 2 && s.mesosRoles[0] != "*" && s.mesosRoles[1] != "*") { + if n := len(s.frameworkRoles); n == 0 || n > 2 || (n == 2 && s.frameworkRoles[0] != "*" && s.frameworkRoles[1] != "*") { log.Fatalf(`only one custom role allowed in addition to "*"`) } + fwSet := sets.NewString(s.frameworkRoles...) + podSet := sets.NewString(s.defaultPodRoles...) + if !fwSet.IsSuperset(podSet) { + log.Fatalf("all default pod roles %q must be included in framework roles %q", s.defaultPodRoles, s.frameworkRoles) + } + // get scheduler low-level config sc := schedcfg.CreateDefaultConfig() if s.schedulerConfigFileName != "" { @@ -777,7 +788,8 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config s.mux, lw, eiPrototype, - s.mesosRoles, + s.frameworkRoles, + s.defaultPodRoles, s.defaultContainerCPULimit, s.defaultContainerMemLimit, ) @@ -890,7 +902,7 @@ func (s *SchedulerServer) buildFrameworkInfo() (info *mesos.FrameworkInfo, cred // set the framework's role to the first configured non-star role. // once Mesos supports multiple roles simply set the configured mesos roles slice. - for _, role := range s.mesosRoles { + for _, role := range s.frameworkRoles { if role != "*" { // mesos currently supports only one role per framework info // The framework will be offered role's resources as well as * resources diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index 386cd95fcf6..4af4cc1c8ca 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -194,7 +194,8 @@ mesos-executor-mem mesos-launch-grace-period mesos-master mesos-sandbox-overlay -mesos-roles +mesos-framework-roles +mesos-default-pod-roles mesos-user minimum-container-ttl-duration minion-max-log-age diff --git a/test/e2e/mesos.go b/test/e2e/mesos.go index d8dd94d2c1b..f2b97096d4d 100644 --- a/test/e2e/mesos.go +++ b/test/e2e/mesos.go @@ -87,7 +87,7 @@ var _ = Describe("Mesos", func() { ObjectMeta: api.ObjectMeta{ Name: podName, Labels: map[string]string{ - "k8s.mesosphere.io/roles": "role1", + "k8s.mesosphere.io/roles": "public", }, }, Spec: api.PodSpec{ @@ -106,10 +106,12 @@ var _ = Describe("Mesos", func() { expectNoError(err) nodeClient := framework.Client.Nodes() - role1 := labels.SelectorFromSet(map[string]string{ - "k8s.mesosphere.io/attribute-role": "role1", + + // schedule onto node with rack=2 being assigned to the "public" role + rack2 := labels.SelectorFromSet(map[string]string{ + "k8s.mesosphere.io/attribute-rack": "2", }) - options := api.ListOptions{LabelSelector: role1} + options := api.ListOptions{LabelSelector: rack2} nodes, err := nodeClient.List(options) expectNoError(err)