From a496e8dd20b1c9d83f963cfbff1b0bdd50153a27 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Thu, 17 Sep 2015 11:05:10 +0200 Subject: [PATCH 1/4] Expose HistoricalFIFO's pop with cancel channel --- contrib/mesos/pkg/queue/historical.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/contrib/mesos/pkg/queue/historical.go b/contrib/mesos/pkg/queue/historical.go index a9021c14b4d..9ef42eaf80b 100644 --- a/contrib/mesos/pkg/queue/historical.go +++ b/contrib/mesos/pkg/queue/historical.go @@ -222,7 +222,7 @@ func (f *HistoricalFIFO) Poll(id string, t EventType) bool { func (q *HistoricalFIFO) Await(timeout time.Duration) interface{} { cancel := make(chan struct{}) ch := make(chan interface{}, 1) - go func() { ch <- q.pop(cancel) }() + go func() { ch <- q.CancelablePop(cancel) }() select { case <-time.After(timeout): close(cancel) @@ -232,10 +232,10 @@ func (q *HistoricalFIFO) Await(timeout time.Duration) interface{} { } } func (f *HistoricalFIFO) Pop() interface{} { - return f.pop(nil) + return f.CancelablePop(nil) } -func (f *HistoricalFIFO) pop(cancel chan struct{}) interface{} { +func (f *HistoricalFIFO) CancelablePop(cancel <-chan struct{}) interface{} { popEvent := (Entry)(nil) defer func() { f.carrier(popEvent) @@ -383,7 +383,7 @@ func (f *HistoricalFIFO) merge(id string, obj UniqueCopyable) (notifications []E // NewHistorical returns a Store which can be used to queue up items to // process. If a non-nil Mux is provided, then modifications to the // the FIFO are delivered on a channel specific to this fifo. -func NewHistorical(ch chan<- Entry) FIFO { +func NewHistorical(ch chan<- Entry) *HistoricalFIFO { carrier := dead if ch != nil { carrier = func(msg Entry) { From 4d4ebe9f180b792d1b2c46d5fc4bf98841243384 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Fri, 11 Sep 2015 10:40:46 +0200 Subject: [PATCH 2/4] Add Mesos slave attributes as node labels - pre-create node api objects from the scheduler when offers arrive - decline offers until nodes a registered - turn slave attributes as k8s.mesosphere.io/attribute-* labels - update labels from executor Register/Reregister - watch nodes in scheduler to make non-Mesos labels available for NodeSelector matching - add unit tests for label predicate - add e2e test to check that slave attributes really end up as node labels --- contrib/mesos/pkg/executor/executor.go | 16 ++ contrib/mesos/pkg/node/doc.go | 18 ++ contrib/mesos/pkg/node/node.go | 160 ++++++++++++++++++ contrib/mesos/pkg/node/registrator.go | 148 ++++++++++++++++ contrib/mesos/pkg/scheduler/fcfs.go | 19 ++- contrib/mesos/pkg/scheduler/plugin.go | 6 +- contrib/mesos/pkg/scheduler/plugin_test.go | 15 +- .../mesos/pkg/scheduler/podtask/minimal.go | 3 +- .../pkg/scheduler/podtask/pod_task_test.go | 83 ++++++--- .../mesos/pkg/scheduler/podtask/predicate.go | 36 ++-- contrib/mesos/pkg/scheduler/scheduler.go | 24 ++- contrib/mesos/pkg/scheduler/scheduler_test.go | 46 ++++- .../mesos/pkg/scheduler/service/service.go | 26 ++- test/e2e/mesos.go | 53 ++++++ 14 files changed, 605 insertions(+), 48 deletions(-) create mode 100644 contrib/mesos/pkg/node/doc.go create mode 100644 contrib/mesos/pkg/node/node.go create mode 100644 contrib/mesos/pkg/node/registrator.go create mode 100644 test/e2e/mesos.go diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index 5668a79e201..1577c3a7883 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -33,6 +33,7 @@ import ( mutil "github.com/mesos/mesos-go/mesosutil" "k8s.io/kubernetes/contrib/mesos/pkg/archive" "k8s.io/kubernetes/contrib/mesos/pkg/executor/messages" + "k8s.io/kubernetes/contrib/mesos/pkg/node" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" @@ -225,6 +226,13 @@ func (k *KubernetesExecutor) Registered(driver bindings.ExecutorDriver, k.staticPodsConfig = executorInfo.Data } + if slaveInfo != nil { + _, err := node.CreateOrUpdate(k.client, slaveInfo.GetHostname(), node.SlaveAttributesToLabels(slaveInfo.Attributes)) + if err != nil { + log.Errorf("cannot update node labels: %v", err) + } + } + k.initialRegistration.Do(k.onInitialRegistration) } @@ -239,11 +247,19 @@ func (k *KubernetesExecutor) Reregistered(driver bindings.ExecutorDriver, slaveI log.Errorf("failed to reregister/transition to a connected state") } + if slaveInfo != nil { + _, err := node.CreateOrUpdate(k.client, slaveInfo.GetHostname(), node.SlaveAttributesToLabels(slaveInfo.Attributes)) + if err != nil { + log.Errorf("cannot update node labels: %v", err) + } + } + k.initialRegistration.Do(k.onInitialRegistration) } func (k *KubernetesExecutor) onInitialRegistration() { defer close(k.initialRegComplete) + // emit an empty update to allow the mesos "source" to be marked as seen k.updateChan <- kubelet.PodUpdate{ Pods: []*api.Pod{}, diff --git a/contrib/mesos/pkg/node/doc.go b/contrib/mesos/pkg/node/doc.go new file mode 100644 index 00000000000..45b77f59617 --- /dev/null +++ b/contrib/mesos/pkg/node/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package node provides utilities to create and update nodes +package node diff --git a/contrib/mesos/pkg/node/node.go b/contrib/mesos/pkg/node/node.go new file mode 100644 index 00000000000..d98ff7dc0f5 --- /dev/null +++ b/contrib/mesos/pkg/node/node.go @@ -0,0 +1,160 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package node + +import ( + "encoding/json" + "fmt" + "reflect" + "strconv" + "strings" + + log "github.com/golang/glog" + mesos "github.com/mesos/mesos-go/mesosproto" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/util/validation" +) + +const ( + labelPrefix = "k8s.mesosphere.io/attribute-" +) + +// Create creates a new node api object with the given hostname and labels +func Create(client *client.Client, hostName string, labels map[string]string) (*api.Node, error) { + n := api.Node{ + ObjectMeta: api.ObjectMeta{ + Name: hostName, + Labels: map[string]string{"kubernetes.io/hostname": hostName}, + }, + Spec: api.NodeSpec{ + ExternalID: hostName, + }, + Status: api.NodeStatus{ + Phase: api.NodePending, + }, + } + for k, v := range labels { + n.Labels[k] = v + } + + // try to create + return client.Nodes().Create(&n) +} + +// Update updates an existing node api object with new labels +func Update(client *client.Client, n *api.Node, labels map[string]string) (*api.Node, error) { + patch := struct { + Metadata struct { + Labels map[string]string `json:"labels"` + } `json:"metadata"` + }{} + patch.Metadata.Labels = map[string]string{} + for k, v := range n.Labels { + if !IsSlaveAttributeLabel(k) { + patch.Metadata.Labels[k] = v + } + } + for k, v := range labels { + patch.Metadata.Labels[k] = v + } + patchJson, _ := json.Marshal(patch) + log.V(4).Infof("Patching labels of node %q: %v", n.Name, string(patchJson)) + err := client.Patch(api.MergePatchType).RequestURI(n.SelfLink).Body(patchJson).Do().Error() + if err != nil { + return nil, fmt.Errorf("error updating labels of node %q: %v", n.Name, err) + } + + newNode, err := api.Scheme.DeepCopy(n) + if err != nil { + return nil, err + } + newNode.(*api.Node).Labels = patch.Metadata.Labels + + return newNode.(*api.Node), nil +} + +// CreateOrUpdate tries to create a node api object or updates an already existing one +func CreateOrUpdate(client *client.Client, hostName string, labels map[string]string) (*api.Node, error) { + n, err := Create(client, hostName, labels) + if err == nil { + return n, nil + } + if !errors.IsAlreadyExists(err) { + return nil, fmt.Errorf("unable to register %q with the apiserver: %v", hostName, err) + } + + // fall back to update an old node with new labels + n, err = client.Nodes().Get(hostName) + if err != nil { + return nil, fmt.Errorf("error getting node %q: %v", hostName, err) + } + if n == nil { + return nil, fmt.Errorf("no node instance returned for %q", hostName) + } + return Update(client, n, labels) +} + +// IsSlaveAttributeLabel returns true iff the given label is derived from a slave attribute +func IsSlaveAttributeLabel(l string) bool { + return strings.HasPrefix(l, labelPrefix) +} + +// IsUpToDate returns true iff the node's slave labels match the given attributes labels +func IsUpToDate(n *api.Node, labels map[string]string) bool { + slaveLabels := map[string]string{} + for k, v := range n.Labels { + if IsSlaveAttributeLabel(k) { + slaveLabels[k] = v + } + } + return reflect.DeepEqual(slaveLabels, labels) +} + +// SlaveAttributesToLabels converts slave attributes into string key/value labels +func SlaveAttributesToLabels(attrs []*mesos.Attribute) map[string]string { + l := map[string]string{} + for _, a := range attrs { + if a == nil { + continue + } + + var v string + k := labelPrefix + a.GetName() + + switch a.GetType() { + case mesos.Value_TEXT: + v = a.GetText().GetValue() + case mesos.Value_SCALAR: + v = strconv.FormatFloat(a.GetScalar().GetValue(), 'G', -1, 64) + } + + if !validation.IsQualifiedName(k) { + log.V(3).Infof("ignoring invalid node label name %q", k) + continue + } + + if !validation.IsValidLabelValue(v) { + log.V(3).Infof("ignoring invalid node label %s value: %q", k, v) + continue + } + + l[k] = v + } + return l +} diff --git a/contrib/mesos/pkg/node/registrator.go b/contrib/mesos/pkg/node/registrator.go new file mode 100644 index 00000000000..3e7a18fb660 --- /dev/null +++ b/contrib/mesos/pkg/node/registrator.go @@ -0,0 +1,148 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package node + +import ( + "fmt" + "time" + + log "github.com/golang/glog" + "k8s.io/kubernetes/contrib/mesos/pkg/queue" + "k8s.io/kubernetes/contrib/mesos/pkg/runtime" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" + client "k8s.io/kubernetes/pkg/client/unversioned" +) + +type Registrator interface { + // Register checks whether the node is registered with the given labels. If it + // is not, it is created or updated on the apiserver. If an the node was up-to-date, + // false is returned. + Register(hostName string, labels map[string]string) (bool, error) + + // Start the registration loop and return immediately. + Run(terminate <-chan struct{}) error +} + +type registration struct { + hostName string + labels map[string]string +} + +func (r *registration) Copy() queue.Copyable { + return ®istration{ + hostName: r.hostName, + labels: r.labels, // labels are never changed, no need to clone + } +} + +func (r *registration) GetUID() string { + return r.hostName +} + +func (r *registration) Value() queue.UniqueCopyable { + return r +} + +type LookupFunc func(hostName string) *api.Node + +type clientRegistrator struct { + lookupNode LookupFunc + client *client.Client + queue *queue.HistoricalFIFO +} + +func NewRegistrator(client *client.Client, lookupNode LookupFunc) *clientRegistrator { + return &clientRegistrator{ + lookupNode: lookupNode, + client: client, + queue: queue.NewHistorical(nil), + } +} + +func (r *clientRegistrator) Run(terminate <-chan struct{}) error { + loop := func() { + RegistrationLoop: + for { + obj := r.queue.CancelablePop(terminate) + if obj == nil { + break RegistrationLoop + } + select { + case <-terminate: + break RegistrationLoop + default: + } + + rg := obj.(*registration) + n, needsUpdate := r.updateNecessary(rg.hostName, rg.labels) + if !needsUpdate { + continue + } + + if n == nil { + log.V(2).Infof("creating node %s with labels %v", rg.hostName, rg.labels) + _, err := CreateOrUpdate(r.client, rg.hostName, rg.labels) + if err != nil { + log.Errorf("error creating the node %s: %v", rg.hostName, rg.labels) + } + } else { + log.V(2).Infof("updating node %s with labels %v", rg.hostName, rg.labels) + _, err := Update(r.client, n, rg.labels) + if err != nil && errors.IsNotFound(err) { + // last chance when our store was out of date + _, err = Create(r.client, rg.hostName, rg.labels) + } + if err != nil { + log.Errorf("error updating the node %s: %v", rg.hostName, rg.labels) + } + } + } + } + go runtime.Until(loop, time.Second, terminate) + + return nil +} + +func (r *clientRegistrator) Register(hostName string, labels map[string]string) (bool, error) { + _, needsUpdate := r.updateNecessary(hostName, labels) + + if needsUpdate { + log.V(5).Infof("queuing registration for node %s with labels %v", hostName, labels) + err := r.queue.Update(®istration{ + hostName: hostName, + labels: labels, + }) + if err != nil { + return false, fmt.Errorf("cannot register node %s: %v", hostName, err) + } + return true, nil + } + + return false, nil +} + +// updateNecessary retrieves the node with the given hostname and checks whether the given +// labels would mean any update to the node. The unmodified node is returned, plus +// true iff an update is necessary. +func (r *clientRegistrator) updateNecessary(hostName string, labels map[string]string) (*api.Node, bool) { + if r.lookupNode == nil { + return nil, true + } + n := r.lookupNode(hostName) + return n, n == nil || !IsUpToDate(n, labels) +} diff --git a/contrib/mesos/pkg/scheduler/fcfs.go b/contrib/mesos/pkg/scheduler/fcfs.go index d2bca5a8e3e..9b9c93a5146 100644 --- a/contrib/mesos/pkg/scheduler/fcfs.go +++ b/contrib/mesos/pkg/scheduler/fcfs.go @@ -21,6 +21,7 @@ import ( log "github.com/golang/glog" + "k8s.io/kubernetes/contrib/mesos/pkg/node" "k8s.io/kubernetes/contrib/mesos/pkg/offers" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" ) @@ -53,10 +54,11 @@ func NewAllocationStrategy(fitPredicate podtask.FitPredicate, procurement podtas type fcfsPodScheduler struct { AllocationStrategy + lookupNode node.LookupFunc } -func NewFCFSPodScheduler(as AllocationStrategy) PodScheduler { - return &fcfsPodScheduler{as} +func NewFCFSPodScheduler(as AllocationStrategy, lookupNode node.LookupFunc) PodScheduler { + return &fcfsPodScheduler{as, lookupNode} } // A first-come-first-serve scheduler: acquires the first offer that can support the task @@ -68,7 +70,18 @@ func (fps *fcfsPodScheduler) SchedulePod(r offers.Registry, unused SlaveIndex, t if offer == nil { return false, fmt.Errorf("nil offer while scheduling task %v", task.ID) } - if fps.FitPredicate()(task, offer) { + + // check that the node actually exists. As offers are declined if not, the + // case n==nil can only happen when the node object was deleted since the + // offer came in. + nodeName := offer.GetHostname() + n := fps.lookupNode(nodeName) + if n == nil { + log.V(3).Infof("ignoring offer for node %s because node went away", nodeName) + return false, nil + } + + if fps.FitPredicate()(task, offer, n) { if p.Acquire() { acceptedOffer = p log.V(3).Infof("Pod %s accepted offer %v", podName, offer.Id.GetValue()) diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index 3283597421b..aedbd2efe13 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -548,7 +548,11 @@ func (k *errorHandler) handleSchedulingError(pod *api.Pod, schedulingErr error) defer k.api.Unlock() switch task, state := k.api.tasks().Get(task.ID); state { case podtask.StatePending: - return !task.Has(podtask.Launched) && k.api.algorithm().FitPredicate()(task, offer) + // Assess fitness of pod with the current offer. The scheduler normally + // "backs off" when it can't find an offer that matches up with a pod. + // The backoff period for a pod can terminate sooner if an offer becomes + // available that matches up. + return !task.Has(podtask.Launched) && k.api.algorithm().FitPredicate()(task, offer, nil) default: // no point in continuing to check for matching offers return true diff --git a/contrib/mesos/pkg/scheduler/plugin_test.go b/contrib/mesos/pkg/scheduler/plugin_test.go index 3290340108b..5223f364c2c 100644 --- a/contrib/mesos/pkg/scheduler/plugin_test.go +++ b/contrib/mesos/pkg/scheduler/plugin_test.go @@ -393,14 +393,21 @@ func TestPlugin_LifeCycle(t *testing.T) { executor.Data = []byte{0, 1, 2} // create scheduler + nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc) as := NewAllocationStrategy( podtask.DefaultPredicate, podtask.NewDefaultProcurement(mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit)) testScheduler := New(Config{ - Executor: executor, - Client: client.NewOrDie(&client.Config{Host: testApiServer.server.URL, Version: testapi.Default.Version()}), - Scheduler: NewFCFSPodScheduler(as), - Schedcfg: *schedcfg.CreateDefaultConfig(), + Executor: executor, + Client: client.NewOrDie(&client.Config{Host: testApiServer.server.URL, Version: testapi.Default.Version()}), + Scheduler: NewFCFSPodScheduler(as, func(node string) *api.Node { + obj, _, _ := nodeStore.GetByKey(node) + if obj == nil { + return nil + } + return obj.(*api.Node) + }), + Schedcfg: *schedcfg.CreateDefaultConfig(), }) assert.NotNil(testScheduler.client, "client is nil") diff --git a/contrib/mesos/pkg/scheduler/podtask/minimal.go b/contrib/mesos/pkg/scheduler/podtask/minimal.go index 842455e5919..ef4c0ef716d 100644 --- a/contrib/mesos/pkg/scheduler/podtask/minimal.go +++ b/contrib/mesos/pkg/scheduler/podtask/minimal.go @@ -19,6 +19,7 @@ package podtask import ( log "github.com/golang/glog" mesos "github.com/mesos/mesos-go/mesosproto" + "k8s.io/kubernetes/pkg/api" ) // bogus numbers that we use to make sure that there's some set of minimal offered resources on the slave @@ -43,7 +44,7 @@ var ( }).Procure ) -func MinimalPodResourcesPredicate(t *T, offer *mesos.Offer) bool { +func MinimalPodResourcesPredicate(t *T, offer *mesos.Offer, _ *api.Node) bool { var ( offeredCpus float64 offeredMem float64 diff --git a/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go b/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go index 9b18e283fae..74bcb290346 100644 --- a/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go +++ b/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go @@ -146,10 +146,10 @@ func TestEmptyOffer(t *testing.T) { mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit) mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit) - if ok := DefaultPredicate(task, nil); ok { + if ok := DefaultPredicate(task, nil, nil); ok { t.Fatalf("accepted nil offer") } - if ok := DefaultPredicate(task, &mesos.Offer{}); ok { + if ok := DefaultPredicate(task, &mesos.Offer{}, nil); ok { t.Fatalf("accepted empty offer") } } @@ -176,7 +176,7 @@ func TestNoPortsInPodOrOffer(t *testing.T) { mutil.NewScalarResource("mem", 0.001), }, } - if ok := DefaultPredicate(task, offer); ok { + if ok := DefaultPredicate(task, offer, nil); ok { t.Fatalf("accepted offer %v:", offer) } @@ -186,7 +186,7 @@ func TestNoPortsInPodOrOffer(t *testing.T) { mutil.NewScalarResource("mem", t_min_mem), }, } - if ok := DefaultPredicate(task, offer); !ok { + if ok := DefaultPredicate(task, offer, nil); !ok { t.Fatalf("did not accepted offer %v:", offer) } } @@ -203,7 +203,7 @@ func TestAcceptOfferPorts(t *testing.T) { rangeResource("ports", []uint64{1, 1}), }, } - if ok := DefaultPredicate(task, offer); !ok { + if ok := DefaultPredicate(task, offer, nil); !ok { t.Fatalf("did not accepted offer %v:", offer) } @@ -218,17 +218,17 @@ func TestAcceptOfferPorts(t *testing.T) { mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit) mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit) - if ok := DefaultPredicate(task, offer); ok { + if ok := DefaultPredicate(task, offer, nil); ok { t.Fatalf("accepted offer %v:", offer) } pod.Spec.Containers[0].Ports[0].HostPort = 1 - if ok := DefaultPredicate(task, offer); !ok { + if ok := DefaultPredicate(task, offer, nil); !ok { t.Fatalf("did not accepted offer %v:", offer) } pod.Spec.Containers[0].Ports[0].HostPort = 0 - if ok := DefaultPredicate(task, offer); !ok { + if ok := DefaultPredicate(task, offer, nil); !ok { t.Fatalf("did not accepted offer %v:", offer) } @@ -236,12 +236,12 @@ func TestAcceptOfferPorts(t *testing.T) { mutil.NewScalarResource("cpus", t_min_cpu), mutil.NewScalarResource("mem", t_min_mem), } - if ok := DefaultPredicate(task, offer); ok { + if ok := DefaultPredicate(task, offer, nil); ok { t.Fatalf("accepted offer %v:", offer) } pod.Spec.Containers[0].Ports[0].HostPort = 1 - if ok := DefaultPredicate(task, offer); ok { + if ok := DefaultPredicate(task, offer, nil); ok { t.Fatalf("accepted offer %v:", offer) } } @@ -270,21 +270,61 @@ func TestGeneratePodName(t *testing.T) { func TestNodeSelector(t *testing.T) { t.Parallel() - sel1 := map[string]string{"rack": "a"} - sel2 := map[string]string{"rack": "a", "gen": "2014"} + sel1 := map[string]string{"k8s.mesosphere.io/attribute-rack": "a"} + sel2 := map[string]string{"k8s.mesosphere.io/attribute-rack": "a", "k8s.mesosphere.io/attribute-gen": "2014"} + sel3 := map[string]string{"kubernetes.io/hostname": "node1"} + sel4 := map[string]string{"kubernetes.io/hostname": "node2"} + sel5 := map[string]string{"k8s.mesosphere.io/attribute-old": "42"} + sel6 := map[string]string{"some.other/label": "43"} + + newNode := func(hostName string, l map[string]string) *api.Node { + nodeLabels := map[string]string{"kubernetes.io/hostname": hostName} + if l != nil { + for k, v := range l { + nodeLabels[k] = v + } + } + return &api.Node{ + ObjectMeta: api.ObjectMeta{ + Name: hostName, + Labels: nodeLabels, + }, + Spec: api.NodeSpec{ + ExternalID: hostName, + }, + } + } + node1 := newNode("node1", nil) + node2 := newNode("node2", nil) + node3 := newNode("node3", map[string]string{ + "k8s.mesosphere.io/attribute-old": "42", + "k8s.mesosphere.io/attribute-gen": "2015", + "some.other/label": "43", + }) tests := []struct { selector map[string]string attrs []*mesos.Attribute + node *api.Node ok bool + desc string }{ - {sel1, []*mesos.Attribute{newTextAttribute("rack", "a")}, true}, - {sel1, []*mesos.Attribute{newTextAttribute("rack", "b")}, false}, - {sel1, []*mesos.Attribute{newTextAttribute("rack", "a"), newTextAttribute("gen", "2014")}, true}, - {sel1, []*mesos.Attribute{newTextAttribute("rack", "a"), newScalarAttribute("num", 42.0)}, true}, - {sel1, []*mesos.Attribute{newScalarAttribute("rack", 42.0)}, false}, - {sel2, []*mesos.Attribute{newTextAttribute("rack", "a"), newTextAttribute("gen", "2014")}, true}, - {sel2, []*mesos.Attribute{newTextAttribute("rack", "a"), newTextAttribute("gen", "2015")}, false}, + {sel1, []*mesos.Attribute{newTextAttribute("rack", "a")}, node1, true, "label value matches"}, + {sel1, []*mesos.Attribute{newTextAttribute("rack", "b")}, node1, false, "label value does not match"}, + {sel1, []*mesos.Attribute{newTextAttribute("rack", "a"), newTextAttribute("gen", "2014")}, node1, true, "required labels match"}, + {sel1, []*mesos.Attribute{newTextAttribute("rack", "a"), newScalarAttribute("num", 42.0)}, node1, true, "scalar label matches"}, + {sel1, []*mesos.Attribute{newScalarAttribute("rack", 42.0)}, node1, false, "scalar label does not match"}, + {sel2, []*mesos.Attribute{newTextAttribute("rack", "a"), newTextAttribute("gen", "2014")}, node1, true, "all labels match"}, + {sel2, []*mesos.Attribute{newTextAttribute("rack", "a"), newTextAttribute("gen", "2015")}, node1, false, "one label does not match"}, + + {sel3, []*mesos.Attribute{}, node1, true, "hostname label matches"}, + {sel4, []*mesos.Attribute{}, node1, false, "hostname label does not match"}, + {sel4, []*mesos.Attribute{}, node2, true, "hostname label does not match"}, + + {sel5, []*mesos.Attribute{}, node3, false, "old slave attribute is removed"}, + {sel6, []*mesos.Attribute{}, node1, false, "non-slave attribute does not match"}, + {sel6, []*mesos.Attribute{}, node3, true, "non-slave attribute matches"}, + {sel2, []*mesos.Attribute{newTextAttribute("rack", "a"), newTextAttribute("gen", "2014")}, node3, true, "old slave attributes are overwritten"}, } for _, ts := range tests { @@ -296,9 +336,10 @@ func TestNodeSelector(t *testing.T) { mutil.NewScalarResource("mem", t_min_mem), }, Attributes: ts.attrs, + Hostname: &ts.node.Name, } - if got, want := DefaultPredicate(task, offer), ts.ok; got != want { - t.Fatalf("expected acceptance of offer %v for selector %v to be %v, got %v:", want, got, ts.attrs, ts.selector) + if got, want := DefaultPredicate(task, offer, ts.node), ts.ok; got != want { + t.Fatalf("expected acceptance of offer for selector %v to be %v, got %v: %q", ts.selector, want, got, ts.desc) } } } diff --git a/contrib/mesos/pkg/scheduler/podtask/predicate.go b/contrib/mesos/pkg/scheduler/podtask/predicate.go index 4b46cbb4900..cafdb2654ba 100644 --- a/contrib/mesos/pkg/scheduler/podtask/predicate.go +++ b/contrib/mesos/pkg/scheduler/podtask/predicate.go @@ -19,7 +19,9 @@ package podtask import ( log "github.com/golang/glog" mesos "github.com/mesos/mesos-go/mesosproto" + "k8s.io/kubernetes/contrib/mesos/pkg/node" mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/labels" ) @@ -31,25 +33,25 @@ var DefaultPredicate = RequireAllPredicate([]FitPredicate{ }).Fit // FitPredicate implementations determine if the given task "fits" into offered Mesos resources. -// Neither the task or offer should be modified. -type FitPredicate func(*T, *mesos.Offer) bool +// Neither the task or offer should be modified. Note that the node can be nil. +type FitPredicate func(*T, *mesos.Offer, *api.Node) bool type RequireAllPredicate []FitPredicate -func (f RequireAllPredicate) Fit(t *T, offer *mesos.Offer) bool { +func (f RequireAllPredicate) Fit(t *T, offer *mesos.Offer, n *api.Node) bool { for _, p := range f { - if !p(t, offer) { + if !p(t, offer, n) { return false } } return true } -func ValidationPredicate(t *T, offer *mesos.Offer) bool { +func ValidationPredicate(t *T, offer *mesos.Offer, _ *api.Node) bool { return t != nil && offer != nil } -func NodeSelectorPredicate(t *T, offer *mesos.Offer) bool { +func NodeSelectorPredicate(t *T, offer *mesos.Offer, n *api.Node) bool { // if the user has specified a target host, make sure this offer is for that host if t.Pod.Spec.NodeName != "" && offer.GetHostname() != t.Pod.Spec.NodeName { return false @@ -57,21 +59,29 @@ func NodeSelectorPredicate(t *T, offer *mesos.Offer) bool { // check the NodeSelector if len(t.Pod.Spec.NodeSelector) > 0 { - slaveLabels := map[string]string{} - for _, a := range offer.Attributes { - if a.GetType() == mesos.Value_TEXT { - slaveLabels[a.GetName()] = a.GetText().GetValue() + l := map[string]string{ + "kubernetes.io/hostname": offer.GetHostname(), + } + if n != nil && n.Labels != nil { + for k, v := range n.Labels { + if !node.IsSlaveAttributeLabel(k) { + l[k] = v + } } } + for k, v := range node.SlaveAttributesToLabels(offer.Attributes) { + l[k] = v + } + selector := labels.SelectorFromSet(t.Pod.Spec.NodeSelector) - if !selector.Matches(labels.Set(slaveLabels)) { + if !selector.Matches(labels.Set(l)) { return false } } return true } -func PortsPredicate(t *T, offer *mesos.Offer) bool { +func PortsPredicate(t *T, offer *mesos.Offer, _ *api.Node) bool { // check ports if _, err := t.mapper.Generate(t, offer); err != nil { log.V(3).Info(err) @@ -80,7 +90,7 @@ func PortsPredicate(t *T, offer *mesos.Offer) bool { return true } -func PodFitsResourcesPredicate(t *T, offer *mesos.Offer) bool { +func PodFitsResourcesPredicate(t *T, offer *mesos.Offer, _ *api.Node) bool { // find offered cpu and mem var ( offeredCpus mresource.CPUShares diff --git a/contrib/mesos/pkg/scheduler/scheduler.go b/contrib/mesos/pkg/scheduler/scheduler.go index afa8e79fe82..1d15790772c 100644 --- a/contrib/mesos/pkg/scheduler/scheduler.go +++ b/contrib/mesos/pkg/scheduler/scheduler.go @@ -31,6 +31,7 @@ import ( bindings "github.com/mesos/mesos-go/scheduler" execcfg "k8s.io/kubernetes/contrib/mesos/pkg/executor/config" "k8s.io/kubernetes/contrib/mesos/pkg/executor/messages" + "k8s.io/kubernetes/contrib/mesos/pkg/node" "k8s.io/kubernetes/contrib/mesos/pkg/offers" offerMetrics "k8s.io/kubernetes/contrib/mesos/pkg/offers/metrics" "k8s.io/kubernetes/contrib/mesos/pkg/proc" @@ -82,6 +83,7 @@ type KubernetesScheduler struct { etcdClient tools.EtcdClient failoverTimeout float64 // in seconds reconcileInterval int64 + nodeRegistrator node.Registrator // Mesos context. @@ -116,6 +118,7 @@ type Config struct { FailoverTimeout float64 ReconcileInterval int64 ReconcileCooldown time.Duration + LookupNode node.LookupFunc } // New creates a new KubernetesScheduler @@ -131,16 +134,23 @@ func New(config Config) *KubernetesScheduler { etcdClient: config.EtcdClient, failoverTimeout: config.FailoverTimeout, reconcileInterval: config.ReconcileInterval, + nodeRegistrator: node.NewRegistrator(config.Client, config.LookupNode), offers: offers.CreateRegistry(offers.RegistryConfig{ Compat: func(o *mesos.Offer) bool { - // filter the offers: the executor IDs must not identify a kubelet- - // executor with a group that doesn't match ours + // the node must be registered and have up-to-date labels + n := config.LookupNode(o.GetHostname()) + if n == nil || !node.IsUpToDate(n, node.SlaveAttributesToLabels(o.GetAttributes())) { + return false + } + + // the executor IDs must not identify a kubelet-executor with a group that doesn't match ours for _, eid := range o.GetExecutorIds() { execuid := uid.Parse(eid.GetValue()) if execuid.Name() == execcfg.DefaultInfoID && execuid.Group() != k.executorGroup { return false } } + return true }, DeclineOffer: func(id string) <-chan error { @@ -183,6 +193,7 @@ func (k *KubernetesScheduler) Init(electedMaster proc.Process, pl PluginInterfac k.plugin = pl k.offers.Init(k.terminate) k.InstallDebugHandlers(mux) + k.nodeRegistrator.Run(k.terminate) return k.recoverTasks() } @@ -323,6 +334,15 @@ func (k *KubernetesScheduler) ResourceOffers(driver bindings.SchedulerDriver, of for _, offer := range offers { slaveId := offer.GetSlaveId().GetValue() k.slaveHostNames.Register(slaveId, offer.GetHostname()) + + // create api object if not existing already + if k.nodeRegistrator != nil { + labels := node.SlaveAttributesToLabels(offer.GetAttributes()) + _, err := k.nodeRegistrator.Register(offer.GetHostname(), labels) + if err != nil { + log.Error(err) + } + } } } diff --git a/contrib/mesos/pkg/scheduler/scheduler_test.go b/contrib/mesos/pkg/scheduler/scheduler_test.go index 9d4e6b9d0d3..d2312e1c899 100644 --- a/contrib/mesos/pkg/scheduler/scheduler_test.go +++ b/contrib/mesos/pkg/scheduler/scheduler_test.go @@ -17,6 +17,7 @@ limitations under the License. package scheduler import ( + "reflect" "testing" mesos "github.com/mesos/mesos-go/mesosproto" @@ -27,6 +28,8 @@ import ( schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/slave" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" ) //get number of non-expired offers from offer registry @@ -42,10 +45,47 @@ func getNumberOffers(os offers.Registry) int { return walked } -//test adding of ressource offer, should be added to offer registry and slavesf +type mockRegistrator struct { + store cache.Store +} + +func (r *mockRegistrator) Run(terminate <-chan struct{}) error { + return nil +} + +func (r *mockRegistrator) Register(hostName string, labels map[string]string) (bool, error) { + obj, _, err := r.store.GetByKey(hostName) + if err != nil { + return false, err + } + if obj == nil { + return true, r.store.Add(&api.Node{ + ObjectMeta: api.ObjectMeta{ + Name: hostName, + Labels: labels, + }, + Spec: api.NodeSpec{ + ExternalID: hostName, + }, + Status: api.NodeStatus{ + Phase: api.NodePending, + }, + }) + } else { + n := obj.(*api.Node) + if reflect.DeepEqual(n.Labels, labels) { + return false, nil + } + n.Labels = labels + return true, r.store.Update(n) + } +} + +//test adding of ressource offer, should be added to offer registry and slaves func TestResourceOffer_Add(t *testing.T) { assert := assert.New(t) + registrator := &mockRegistrator{cache.NewStore(cache.MetaNamespaceKeyFunc)} testScheduler := &KubernetesScheduler{ offers: offers.CreateRegistry(offers.RegistryConfig{ Compat: func(o *mesos.Offer) bool { @@ -59,7 +99,8 @@ func TestResourceOffer_Add(t *testing.T) { TTL: schedcfg.DefaultOfferTTL, ListenerDelay: schedcfg.DefaultListenerDelay, }), - slaveHostNames: slave.NewRegistry(), + slaveHostNames: slave.NewRegistry(), + nodeRegistrator: registrator, } hostname := "h1" @@ -67,6 +108,7 @@ func TestResourceOffer_Add(t *testing.T) { offer1 := &mesos.Offer{Id: offerID1, Hostname: &hostname, SlaveId: util.NewSlaveID(hostname)} offers1 := []*mesos.Offer{offer1} testScheduler.ResourceOffers(nil, offers1) + assert.Equal(1, len(registrator.store.List())) assert.Equal(1, getNumberOffers(testScheduler.offers)) //check slave hostname diff --git a/contrib/mesos/pkg/scheduler/service/service.go b/contrib/mesos/pkg/scheduler/service/service.go index c4c84a9e794..d1c588c73e8 100644 --- a/contrib/mesos/pkg/scheduler/service/service.go +++ b/contrib/mesos/pkg/scheduler/service/service.go @@ -63,8 +63,10 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/uid" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" clientauth "k8s.io/kubernetes/pkg/client/unversioned/auth" + "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/master/ports" etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" @@ -76,6 +78,7 @@ const ( defaultMesosUser = "root" // should have privs to execute docker and iptables commands defaultReconcileInterval = 300 // 5m default task reconciliation interval defaultReconcileCooldown = 15 * time.Second + defaultNodeRelistPeriod = 5 * time.Minute defaultFrameworkName = "Kubernetes" defaultExecutorCPUs = mresource.CPUShares(0.25) // initial CPU allocated for executor defaultExecutorMem = mresource.MegaBytes(128.0) // initial memory allocated for executor @@ -145,6 +148,7 @@ type SchedulerServer struct { DockerCfgPath string ContainPodResources bool AccountForPodResources bool + nodeRelistPeriod time.Duration executable string // path to the binary running this service client *client.Client @@ -192,6 +196,7 @@ func NewSchedulerServer() *SchedulerServer { KubeletSyncFrequency: 10 * time.Second, ContainPodResources: true, AccountForPodResources: true, + nodeRelistPeriod: defaultNodeRelistPeriod, } // cache this for later use. also useful in case the original binary gets deleted, e.g. // during upgrades, development deployments, etc. @@ -245,6 +250,7 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) { fs.Var(&s.DefaultContainerMemLimit, "default-container-mem-limit", "Containers without a memory resource limit are admitted this much amount of memory in MB") fs.BoolVar(&s.ContainPodResources, "contain-pod-resources", s.ContainPodResources, "Reparent pod containers into mesos cgroups; disable if you're having strange mesos/docker/systemd interactions.") fs.BoolVar(&s.AccountForPodResources, "account-for-pod-resources", s.AccountForPodResources, "Allocate pod CPU and memory resources from offers (Default: true)") + fs.DurationVar(&s.nodeRelistPeriod, "node-monitor-period", s.nodeRelistPeriod, "Period between relisting of all nodes from the apiserver.") fs.IntVar(&s.ExecutorLogV, "executor-logv", s.ExecutorLogV, "Logging verbosity of spawned minion and executor processes.") fs.BoolVar(&s.ExecutorBindall, "executor-bindall", s.ExecutorBindall, "When true will set -address of the executor to 0.0.0.0.") @@ -678,7 +684,24 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config podtask.DefaultMinimalProcurement) } - fcfs := scheduler.NewFCFSPodScheduler(as) + // mirror all nodes into the nodeStore + nodesClient, err := s.createAPIServerClient() + if err != nil { + log.Fatalf("Cannot create client to watch nodes: %v", err) + } + nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc) + nodeLW := cache.NewListWatchFromClient(nodesClient, "nodes", api.NamespaceAll, fields.Everything()) + cache.NewReflector(nodeLW, &api.Node{}, nodeStore, s.nodeRelistPeriod).Run() + + lookupNode := func(hostName string) *api.Node { + n, _, _ := nodeStore.GetByKey(hostName) // ignore error and return nil then + if n == nil { + return nil + } + return n.(*api.Node) + } + + fcfs := scheduler.NewFCFSPodScheduler(as, lookupNode) mesosPodScheduler := scheduler.New(scheduler.Config{ Schedcfg: *sc, Executor: executor, @@ -688,6 +711,7 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config FailoverTimeout: s.FailoverTimeout, ReconcileInterval: s.ReconcileInterval, ReconcileCooldown: s.ReconcileCooldown, + LookupNode: lookupNode, }) masterUri := s.MesosMaster diff --git a/test/e2e/mesos.go b/test/e2e/mesos.go new file mode 100644 index 00000000000..eeb8e2ea6d6 --- /dev/null +++ b/test/e2e/mesos.go @@ -0,0 +1,53 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/labels" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "k8s.io/kubernetes/pkg/fields" +) + +var _ = Describe("Mesos", func() { + framework := NewFramework("pods") + + BeforeEach(func() { + SkipUnlessProviderIs("mesos/docker") + }) + + It("applies slave attributes as labels", func() { + nodeClient := framework.Client.Nodes() + + rackA := labels.SelectorFromSet(map[string]string{"k8s.mesosphere.io/attribute-rack": "1"}) + nodes, err := nodeClient.List(rackA, fields.Everything()) + if err != nil { + Failf("Failed to query for node: %v", err) + } + Expect(len(nodes.Items)).To(Equal(1)) + + var addr string + for _, a := range nodes.Items[0].Status.Addresses { + if a.Type == api.NodeInternalIP { + addr = a.Address + } + } + Expect(len(addr)).NotTo(Equal("")) + }) +}) From 6f2a1742abe0d9808498ae0673bb56ef4de922de Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Fri, 11 Sep 2015 22:48:37 +0200 Subject: [PATCH 3/4] Add positive e2e test for NodeSelector --- test/e2e/scheduler_predicates.go | 75 +++++++++++++++++++++++++++++++- 1 file changed, 74 insertions(+), 1 deletion(-) diff --git a/test/e2e/scheduler_predicates.go b/test/e2e/scheduler_predicates.go index a690c7c8d5b..3415881f7f3 100644 --- a/test/e2e/scheduler_predicates.go +++ b/test/e2e/scheduler_predicates.go @@ -334,7 +334,7 @@ var _ = Describe("SchedulerPredicates", func() { // Test Nodes does not have any label, hence it should be impossible to schedule Pod with // nonempty Selector set. - It("validates that NodeSelector is respected.", func() { + It("validates that NodeSelector is respected if not matching", func() { By("Trying to schedule Pod with nonempty NodeSelector.") podName := "restricted-pod" @@ -371,4 +371,77 @@ var _ = Describe("SchedulerPredicates", func() { verifyResult(c, podName, ns, currentlyDeadPods) cleanupPods(c, ns) }) + + It("validates that NodeSelector is respected if matching.", func() { + // launch a pod to find a node which can launch a pod. We intentionally do + // not just take the node list and choose the first of them. Depending on the + // cluster and the scheduler it might be that a "normal" pod cannot be + // scheduled onto it. + By("Trying to launch a pod without a label to get a node which can launch it.") + podName := "without-label" + _, err := c.Pods(ns).Create(&api.Pod{ + TypeMeta: unversioned.TypeMeta{ + Kind: "Pod", + }, + ObjectMeta: api.ObjectMeta{ + Name: podName, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: podName, + Image: "gcr.io/google_containers/pause:go", + }, + }, + }, + }) + expectNoError(err) + expectNoError(waitForPodRunningInNamespace(c, podName, ns)) + pod, err := c.Pods(ns).Get(podName) + expectNoError(err) + + nodeName := pod.Spec.NodeName + err = c.Pods(ns).Delete(podName, api.NewDeleteOptions(0)) + expectNoError(err) + + By("Trying to apply a random label on the found node.") + k := fmt.Sprintf("kubernetes.io/e2e-%s", string(util.NewUUID())) + v := "42" + patch := fmt.Sprintf(`{"metadata":{"labels":{"%s":"%s"}}}`, k, v) + err = c.Patch(api.MergePatchType).Resource("nodes").Name(nodeName).Body([]byte(patch)).Do().Error() + expectNoError(err) + + node, err := c.Nodes().Get(nodeName) + expectNoError(err) + Expect(node.Labels[k]).To(Equal(v)) + + By("Trying to relaunch the pod, now with labels.") + labelPodName := "with-labels" + _, err = c.Pods(ns).Create(&api.Pod{ + TypeMeta: unversioned.TypeMeta{ + Kind: "Pod", + }, + ObjectMeta: api.ObjectMeta{ + Name: labelPodName, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: labelPodName, + Image: "gcr.io/google_containers/pause:go", + }, + }, + NodeSelector: map[string]string{ + "kubernetes.io/hostname": nodeName, + k: v, + }, + }, + }) + expectNoError(err) + defer c.Pods(ns).Delete(labelPodName, api.NewDeleteOptions(0)) + expectNoError(waitForPodRunningInNamespace(c, labelPodName, ns)) + labelPod, err := c.Pods(ns).Get(labelPodName) + expectNoError(err) + Expect(labelPod.Spec.NodeName).To(Equal(nodeName)) + }) }) From 112f80fa4a30999d2e8457a817b48debe23a9cc9 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Fri, 25 Sep 2015 17:15:58 +0200 Subject: [PATCH 4/4] Simplify FitPredicate for NodeSelector We can assume that n != nil holds, because otherwise offers are skipped. --- .../pkg/scheduler/podtask/pod_task_test.go | 64 +++++++++---------- .../mesos/pkg/scheduler/podtask/predicate.go | 18 +----- 2 files changed, 34 insertions(+), 48 deletions(-) diff --git a/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go b/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go index 74bcb290346..6f15565098c 100644 --- a/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go +++ b/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go @@ -19,14 +19,14 @@ package podtask import ( "testing" + "github.com/gogo/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" mutil "github.com/mesos/mesos-go/mesosutil" + "github.com/stretchr/testify/assert" + "k8s.io/kubernetes/contrib/mesos/pkg/node" mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" - - "github.com/gogo/protobuf/proto" - "github.com/stretchr/testify/assert" ) const ( @@ -270,13 +270,6 @@ func TestGeneratePodName(t *testing.T) { func TestNodeSelector(t *testing.T) { t.Parallel() - sel1 := map[string]string{"k8s.mesosphere.io/attribute-rack": "a"} - sel2 := map[string]string{"k8s.mesosphere.io/attribute-rack": "a", "k8s.mesosphere.io/attribute-gen": "2014"} - sel3 := map[string]string{"kubernetes.io/hostname": "node1"} - sel4 := map[string]string{"kubernetes.io/hostname": "node2"} - sel5 := map[string]string{"k8s.mesosphere.io/attribute-old": "42"} - sel6 := map[string]string{"some.other/label": "43"} - newNode := func(hostName string, l map[string]string) *api.Node { nodeLabels := map[string]string{"kubernetes.io/hostname": hostName} if l != nil { @@ -294,37 +287,43 @@ func TestNodeSelector(t *testing.T) { }, } } - node1 := newNode("node1", nil) - node2 := newNode("node2", nil) - node3 := newNode("node3", map[string]string{ - "k8s.mesosphere.io/attribute-old": "42", - "k8s.mesosphere.io/attribute-gen": "2015", - "some.other/label": "43", + node1 := newNode("node1", node.SlaveAttributesToLabels([]*mesos.Attribute{ + newTextAttribute("rack", "a"), + newTextAttribute("gen", "2014"), + newScalarAttribute("num", 42.0), + })) + node2 := newNode("node2", node.SlaveAttributesToLabels([]*mesos.Attribute{ + newTextAttribute("rack", "b"), + newTextAttribute("gen", "2015"), + newScalarAttribute("num", 0.0), + })) + labels3 := node.SlaveAttributesToLabels([]*mesos.Attribute{ + newTextAttribute("rack", "c"), + newTextAttribute("gen", "2015"), + newScalarAttribute("old", 42), }) + labels3["some.other/label"] = "43" + node3 := newNode("node3", labels3) tests := []struct { selector map[string]string - attrs []*mesos.Attribute node *api.Node ok bool desc string }{ - {sel1, []*mesos.Attribute{newTextAttribute("rack", "a")}, node1, true, "label value matches"}, - {sel1, []*mesos.Attribute{newTextAttribute("rack", "b")}, node1, false, "label value does not match"}, - {sel1, []*mesos.Attribute{newTextAttribute("rack", "a"), newTextAttribute("gen", "2014")}, node1, true, "required labels match"}, - {sel1, []*mesos.Attribute{newTextAttribute("rack", "a"), newScalarAttribute("num", 42.0)}, node1, true, "scalar label matches"}, - {sel1, []*mesos.Attribute{newScalarAttribute("rack", 42.0)}, node1, false, "scalar label does not match"}, - {sel2, []*mesos.Attribute{newTextAttribute("rack", "a"), newTextAttribute("gen", "2014")}, node1, true, "all labels match"}, - {sel2, []*mesos.Attribute{newTextAttribute("rack", "a"), newTextAttribute("gen", "2015")}, node1, false, "one label does not match"}, + {map[string]string{"k8s.mesosphere.io/attribute-rack": "a"}, node1, true, "label value matches"}, + {map[string]string{"k8s.mesosphere.io/attribute-rack": "b"}, node1, false, "label value does not match"}, + {map[string]string{"k8s.mesosphere.io/attribute-rack": "a", "k8s.mesosphere.io/attribute-gen": "2014"}, node1, true, "multiple required labels match"}, + {map[string]string{"k8s.mesosphere.io/attribute-rack": "a", "k8s.mesosphere.io/attribute-gen": "2015"}, node1, false, "one label does not match"}, + {map[string]string{"k8s.mesosphere.io/attribute-rack": "a", "k8s.mesosphere.io/attribute-num": "42"}, node1, true, "scalar label matches"}, + {map[string]string{"k8s.mesosphere.io/attribute-rack": "a", "k8s.mesosphere.io/attribute-num": "43"}, node1, false, "scalar label does not match"}, - {sel3, []*mesos.Attribute{}, node1, true, "hostname label matches"}, - {sel4, []*mesos.Attribute{}, node1, false, "hostname label does not match"}, - {sel4, []*mesos.Attribute{}, node2, true, "hostname label does not match"}, + {map[string]string{"kubernetes.io/hostname": "node1"}, node1, true, "hostname label matches"}, + {map[string]string{"kubernetes.io/hostname": "node2"}, node1, false, "hostname label does not match"}, + {map[string]string{"kubernetes.io/hostname": "node2"}, node2, true, "hostname label matches"}, - {sel5, []*mesos.Attribute{}, node3, false, "old slave attribute is removed"}, - {sel6, []*mesos.Attribute{}, node1, false, "non-slave attribute does not match"}, - {sel6, []*mesos.Attribute{}, node3, true, "non-slave attribute matches"}, - {sel2, []*mesos.Attribute{newTextAttribute("rack", "a"), newTextAttribute("gen", "2014")}, node3, true, "old slave attributes are overwritten"}, + {map[string]string{"some.other/label": "43"}, node1, false, "non-slave attribute does not match"}, + {map[string]string{"some.other/label": "43"}, node3, true, "non-slave attribute matches"}, } for _, ts := range tests { @@ -335,8 +334,7 @@ func TestNodeSelector(t *testing.T) { mutil.NewScalarResource("cpus", t_min_cpu), mutil.NewScalarResource("mem", t_min_mem), }, - Attributes: ts.attrs, - Hostname: &ts.node.Name, + Hostname: &ts.node.Name, } if got, want := DefaultPredicate(task, offer, ts.node), ts.ok; got != want { t.Fatalf("expected acceptance of offer for selector %v to be %v, got %v: %q", ts.selector, want, got, ts.desc) diff --git a/contrib/mesos/pkg/scheduler/podtask/predicate.go b/contrib/mesos/pkg/scheduler/podtask/predicate.go index cafdb2654ba..25667e3d2c1 100644 --- a/contrib/mesos/pkg/scheduler/podtask/predicate.go +++ b/contrib/mesos/pkg/scheduler/podtask/predicate.go @@ -19,7 +19,6 @@ package podtask import ( log "github.com/golang/glog" mesos "github.com/mesos/mesos-go/mesosproto" - "k8s.io/kubernetes/contrib/mesos/pkg/node" mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/labels" @@ -59,22 +58,11 @@ func NodeSelectorPredicate(t *T, offer *mesos.Offer, n *api.Node) bool { // check the NodeSelector if len(t.Pod.Spec.NodeSelector) > 0 { - l := map[string]string{ - "kubernetes.io/hostname": offer.GetHostname(), + if n.Labels == nil { + return false } - if n != nil && n.Labels != nil { - for k, v := range n.Labels { - if !node.IsSlaveAttributeLabel(k) { - l[k] = v - } - } - } - for k, v := range node.SlaveAttributesToLabels(offer.Attributes) { - l[k] = v - } - selector := labels.SelectorFromSet(t.Pod.Spec.NodeSelector) - if !selector.Matches(labels.Set(l)) { + if !selector.Matches(labels.Set(n.Labels)) { return false } }