Add Mesos slave attributes as node labels

- pre-create node api objects from the scheduler when offers arrive
- decline offers until nodes a registered
- turn slave attributes as k8s.mesosphere.io/attribute-* labels
- update labels from executor Register/Reregister
- watch nodes in scheduler to make non-Mesos labels available for NodeSelector matching
- add unit tests for label predicate
- add e2e test to check that slave attributes really end up as node labels
This commit is contained in:
Dr. Stefan Schimanski 2015-09-11 10:40:46 +02:00
parent a496e8dd20
commit 4d4ebe9f18
14 changed files with 605 additions and 48 deletions

View File

@ -33,6 +33,7 @@ import (
mutil "github.com/mesos/mesos-go/mesosutil"
"k8s.io/kubernetes/contrib/mesos/pkg/archive"
"k8s.io/kubernetes/contrib/mesos/pkg/executor/messages"
"k8s.io/kubernetes/contrib/mesos/pkg/node"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
@ -225,6 +226,13 @@ func (k *KubernetesExecutor) Registered(driver bindings.ExecutorDriver,
k.staticPodsConfig = executorInfo.Data
}
if slaveInfo != nil {
_, err := node.CreateOrUpdate(k.client, slaveInfo.GetHostname(), node.SlaveAttributesToLabels(slaveInfo.Attributes))
if err != nil {
log.Errorf("cannot update node labels: %v", err)
}
}
k.initialRegistration.Do(k.onInitialRegistration)
}
@ -239,11 +247,19 @@ func (k *KubernetesExecutor) Reregistered(driver bindings.ExecutorDriver, slaveI
log.Errorf("failed to reregister/transition to a connected state")
}
if slaveInfo != nil {
_, err := node.CreateOrUpdate(k.client, slaveInfo.GetHostname(), node.SlaveAttributesToLabels(slaveInfo.Attributes))
if err != nil {
log.Errorf("cannot update node labels: %v", err)
}
}
k.initialRegistration.Do(k.onInitialRegistration)
}
func (k *KubernetesExecutor) onInitialRegistration() {
defer close(k.initialRegComplete)
// emit an empty update to allow the mesos "source" to be marked as seen
k.updateChan <- kubelet.PodUpdate{
Pods: []*api.Pod{},

View File

@ -0,0 +1,18 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package node provides utilities to create and update nodes
package node

View File

@ -0,0 +1,160 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package node
import (
"encoding/json"
"fmt"
"reflect"
"strconv"
"strings"
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/util/validation"
)
const (
labelPrefix = "k8s.mesosphere.io/attribute-"
)
// Create creates a new node api object with the given hostname and labels
func Create(client *client.Client, hostName string, labels map[string]string) (*api.Node, error) {
n := api.Node{
ObjectMeta: api.ObjectMeta{
Name: hostName,
Labels: map[string]string{"kubernetes.io/hostname": hostName},
},
Spec: api.NodeSpec{
ExternalID: hostName,
},
Status: api.NodeStatus{
Phase: api.NodePending,
},
}
for k, v := range labels {
n.Labels[k] = v
}
// try to create
return client.Nodes().Create(&n)
}
// Update updates an existing node api object with new labels
func Update(client *client.Client, n *api.Node, labels map[string]string) (*api.Node, error) {
patch := struct {
Metadata struct {
Labels map[string]string `json:"labels"`
} `json:"metadata"`
}{}
patch.Metadata.Labels = map[string]string{}
for k, v := range n.Labels {
if !IsSlaveAttributeLabel(k) {
patch.Metadata.Labels[k] = v
}
}
for k, v := range labels {
patch.Metadata.Labels[k] = v
}
patchJson, _ := json.Marshal(patch)
log.V(4).Infof("Patching labels of node %q: %v", n.Name, string(patchJson))
err := client.Patch(api.MergePatchType).RequestURI(n.SelfLink).Body(patchJson).Do().Error()
if err != nil {
return nil, fmt.Errorf("error updating labels of node %q: %v", n.Name, err)
}
newNode, err := api.Scheme.DeepCopy(n)
if err != nil {
return nil, err
}
newNode.(*api.Node).Labels = patch.Metadata.Labels
return newNode.(*api.Node), nil
}
// CreateOrUpdate tries to create a node api object or updates an already existing one
func CreateOrUpdate(client *client.Client, hostName string, labels map[string]string) (*api.Node, error) {
n, err := Create(client, hostName, labels)
if err == nil {
return n, nil
}
if !errors.IsAlreadyExists(err) {
return nil, fmt.Errorf("unable to register %q with the apiserver: %v", hostName, err)
}
// fall back to update an old node with new labels
n, err = client.Nodes().Get(hostName)
if err != nil {
return nil, fmt.Errorf("error getting node %q: %v", hostName, err)
}
if n == nil {
return nil, fmt.Errorf("no node instance returned for %q", hostName)
}
return Update(client, n, labels)
}
// IsSlaveAttributeLabel returns true iff the given label is derived from a slave attribute
func IsSlaveAttributeLabel(l string) bool {
return strings.HasPrefix(l, labelPrefix)
}
// IsUpToDate returns true iff the node's slave labels match the given attributes labels
func IsUpToDate(n *api.Node, labels map[string]string) bool {
slaveLabels := map[string]string{}
for k, v := range n.Labels {
if IsSlaveAttributeLabel(k) {
slaveLabels[k] = v
}
}
return reflect.DeepEqual(slaveLabels, labels)
}
// SlaveAttributesToLabels converts slave attributes into string key/value labels
func SlaveAttributesToLabels(attrs []*mesos.Attribute) map[string]string {
l := map[string]string{}
for _, a := range attrs {
if a == nil {
continue
}
var v string
k := labelPrefix + a.GetName()
switch a.GetType() {
case mesos.Value_TEXT:
v = a.GetText().GetValue()
case mesos.Value_SCALAR:
v = strconv.FormatFloat(a.GetScalar().GetValue(), 'G', -1, 64)
}
if !validation.IsQualifiedName(k) {
log.V(3).Infof("ignoring invalid node label name %q", k)
continue
}
if !validation.IsValidLabelValue(v) {
log.V(3).Infof("ignoring invalid node label %s value: %q", k, v)
continue
}
l[k] = v
}
return l
}

View File

@ -0,0 +1,148 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package node
import (
"fmt"
"time"
log "github.com/golang/glog"
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
client "k8s.io/kubernetes/pkg/client/unversioned"
)
type Registrator interface {
// Register checks whether the node is registered with the given labels. If it
// is not, it is created or updated on the apiserver. If an the node was up-to-date,
// false is returned.
Register(hostName string, labels map[string]string) (bool, error)
// Start the registration loop and return immediately.
Run(terminate <-chan struct{}) error
}
type registration struct {
hostName string
labels map[string]string
}
func (r *registration) Copy() queue.Copyable {
return &registration{
hostName: r.hostName,
labels: r.labels, // labels are never changed, no need to clone
}
}
func (r *registration) GetUID() string {
return r.hostName
}
func (r *registration) Value() queue.UniqueCopyable {
return r
}
type LookupFunc func(hostName string) *api.Node
type clientRegistrator struct {
lookupNode LookupFunc
client *client.Client
queue *queue.HistoricalFIFO
}
func NewRegistrator(client *client.Client, lookupNode LookupFunc) *clientRegistrator {
return &clientRegistrator{
lookupNode: lookupNode,
client: client,
queue: queue.NewHistorical(nil),
}
}
func (r *clientRegistrator) Run(terminate <-chan struct{}) error {
loop := func() {
RegistrationLoop:
for {
obj := r.queue.CancelablePop(terminate)
if obj == nil {
break RegistrationLoop
}
select {
case <-terminate:
break RegistrationLoop
default:
}
rg := obj.(*registration)
n, needsUpdate := r.updateNecessary(rg.hostName, rg.labels)
if !needsUpdate {
continue
}
if n == nil {
log.V(2).Infof("creating node %s with labels %v", rg.hostName, rg.labels)
_, err := CreateOrUpdate(r.client, rg.hostName, rg.labels)
if err != nil {
log.Errorf("error creating the node %s: %v", rg.hostName, rg.labels)
}
} else {
log.V(2).Infof("updating node %s with labels %v", rg.hostName, rg.labels)
_, err := Update(r.client, n, rg.labels)
if err != nil && errors.IsNotFound(err) {
// last chance when our store was out of date
_, err = Create(r.client, rg.hostName, rg.labels)
}
if err != nil {
log.Errorf("error updating the node %s: %v", rg.hostName, rg.labels)
}
}
}
}
go runtime.Until(loop, time.Second, terminate)
return nil
}
func (r *clientRegistrator) Register(hostName string, labels map[string]string) (bool, error) {
_, needsUpdate := r.updateNecessary(hostName, labels)
if needsUpdate {
log.V(5).Infof("queuing registration for node %s with labels %v", hostName, labels)
err := r.queue.Update(&registration{
hostName: hostName,
labels: labels,
})
if err != nil {
return false, fmt.Errorf("cannot register node %s: %v", hostName, err)
}
return true, nil
}
return false, nil
}
// updateNecessary retrieves the node with the given hostname and checks whether the given
// labels would mean any update to the node. The unmodified node is returned, plus
// true iff an update is necessary.
func (r *clientRegistrator) updateNecessary(hostName string, labels map[string]string) (*api.Node, bool) {
if r.lookupNode == nil {
return nil, true
}
n := r.lookupNode(hostName)
return n, n == nil || !IsUpToDate(n, labels)
}

View File

@ -21,6 +21,7 @@ import (
log "github.com/golang/glog"
"k8s.io/kubernetes/contrib/mesos/pkg/node"
"k8s.io/kubernetes/contrib/mesos/pkg/offers"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
)
@ -53,10 +54,11 @@ func NewAllocationStrategy(fitPredicate podtask.FitPredicate, procurement podtas
type fcfsPodScheduler struct {
AllocationStrategy
lookupNode node.LookupFunc
}
func NewFCFSPodScheduler(as AllocationStrategy) PodScheduler {
return &fcfsPodScheduler{as}
func NewFCFSPodScheduler(as AllocationStrategy, lookupNode node.LookupFunc) PodScheduler {
return &fcfsPodScheduler{as, lookupNode}
}
// A first-come-first-serve scheduler: acquires the first offer that can support the task
@ -68,7 +70,18 @@ func (fps *fcfsPodScheduler) SchedulePod(r offers.Registry, unused SlaveIndex, t
if offer == nil {
return false, fmt.Errorf("nil offer while scheduling task %v", task.ID)
}
if fps.FitPredicate()(task, offer) {
// check that the node actually exists. As offers are declined if not, the
// case n==nil can only happen when the node object was deleted since the
// offer came in.
nodeName := offer.GetHostname()
n := fps.lookupNode(nodeName)
if n == nil {
log.V(3).Infof("ignoring offer for node %s because node went away", nodeName)
return false, nil
}
if fps.FitPredicate()(task, offer, n) {
if p.Acquire() {
acceptedOffer = p
log.V(3).Infof("Pod %s accepted offer %v", podName, offer.Id.GetValue())

View File

@ -548,7 +548,11 @@ func (k *errorHandler) handleSchedulingError(pod *api.Pod, schedulingErr error)
defer k.api.Unlock()
switch task, state := k.api.tasks().Get(task.ID); state {
case podtask.StatePending:
return !task.Has(podtask.Launched) && k.api.algorithm().FitPredicate()(task, offer)
// Assess fitness of pod with the current offer. The scheduler normally
// "backs off" when it can't find an offer that matches up with a pod.
// The backoff period for a pod can terminate sooner if an offer becomes
// available that matches up.
return !task.Has(podtask.Launched) && k.api.algorithm().FitPredicate()(task, offer, nil)
default:
// no point in continuing to check for matching offers
return true

View File

@ -393,14 +393,21 @@ func TestPlugin_LifeCycle(t *testing.T) {
executor.Data = []byte{0, 1, 2}
// create scheduler
nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
as := NewAllocationStrategy(
podtask.DefaultPredicate,
podtask.NewDefaultProcurement(mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit))
testScheduler := New(Config{
Executor: executor,
Client: client.NewOrDie(&client.Config{Host: testApiServer.server.URL, Version: testapi.Default.Version()}),
Scheduler: NewFCFSPodScheduler(as),
Schedcfg: *schedcfg.CreateDefaultConfig(),
Executor: executor,
Client: client.NewOrDie(&client.Config{Host: testApiServer.server.URL, Version: testapi.Default.Version()}),
Scheduler: NewFCFSPodScheduler(as, func(node string) *api.Node {
obj, _, _ := nodeStore.GetByKey(node)
if obj == nil {
return nil
}
return obj.(*api.Node)
}),
Schedcfg: *schedcfg.CreateDefaultConfig(),
})
assert.NotNil(testScheduler.client, "client is nil")

View File

@ -19,6 +19,7 @@ package podtask
import (
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
"k8s.io/kubernetes/pkg/api"
)
// bogus numbers that we use to make sure that there's some set of minimal offered resources on the slave
@ -43,7 +44,7 @@ var (
}).Procure
)
func MinimalPodResourcesPredicate(t *T, offer *mesos.Offer) bool {
func MinimalPodResourcesPredicate(t *T, offer *mesos.Offer, _ *api.Node) bool {
var (
offeredCpus float64
offeredMem float64

View File

@ -146,10 +146,10 @@ func TestEmptyOffer(t *testing.T) {
mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit)
mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit)
if ok := DefaultPredicate(task, nil); ok {
if ok := DefaultPredicate(task, nil, nil); ok {
t.Fatalf("accepted nil offer")
}
if ok := DefaultPredicate(task, &mesos.Offer{}); ok {
if ok := DefaultPredicate(task, &mesos.Offer{}, nil); ok {
t.Fatalf("accepted empty offer")
}
}
@ -176,7 +176,7 @@ func TestNoPortsInPodOrOffer(t *testing.T) {
mutil.NewScalarResource("mem", 0.001),
},
}
if ok := DefaultPredicate(task, offer); ok {
if ok := DefaultPredicate(task, offer, nil); ok {
t.Fatalf("accepted offer %v:", offer)
}
@ -186,7 +186,7 @@ func TestNoPortsInPodOrOffer(t *testing.T) {
mutil.NewScalarResource("mem", t_min_mem),
},
}
if ok := DefaultPredicate(task, offer); !ok {
if ok := DefaultPredicate(task, offer, nil); !ok {
t.Fatalf("did not accepted offer %v:", offer)
}
}
@ -203,7 +203,7 @@ func TestAcceptOfferPorts(t *testing.T) {
rangeResource("ports", []uint64{1, 1}),
},
}
if ok := DefaultPredicate(task, offer); !ok {
if ok := DefaultPredicate(task, offer, nil); !ok {
t.Fatalf("did not accepted offer %v:", offer)
}
@ -218,17 +218,17 @@ func TestAcceptOfferPorts(t *testing.T) {
mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit)
mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit)
if ok := DefaultPredicate(task, offer); ok {
if ok := DefaultPredicate(task, offer, nil); ok {
t.Fatalf("accepted offer %v:", offer)
}
pod.Spec.Containers[0].Ports[0].HostPort = 1
if ok := DefaultPredicate(task, offer); !ok {
if ok := DefaultPredicate(task, offer, nil); !ok {
t.Fatalf("did not accepted offer %v:", offer)
}
pod.Spec.Containers[0].Ports[0].HostPort = 0
if ok := DefaultPredicate(task, offer); !ok {
if ok := DefaultPredicate(task, offer, nil); !ok {
t.Fatalf("did not accepted offer %v:", offer)
}
@ -236,12 +236,12 @@ func TestAcceptOfferPorts(t *testing.T) {
mutil.NewScalarResource("cpus", t_min_cpu),
mutil.NewScalarResource("mem", t_min_mem),
}
if ok := DefaultPredicate(task, offer); ok {
if ok := DefaultPredicate(task, offer, nil); ok {
t.Fatalf("accepted offer %v:", offer)
}
pod.Spec.Containers[0].Ports[0].HostPort = 1
if ok := DefaultPredicate(task, offer); ok {
if ok := DefaultPredicate(task, offer, nil); ok {
t.Fatalf("accepted offer %v:", offer)
}
}
@ -270,21 +270,61 @@ func TestGeneratePodName(t *testing.T) {
func TestNodeSelector(t *testing.T) {
t.Parallel()
sel1 := map[string]string{"rack": "a"}
sel2 := map[string]string{"rack": "a", "gen": "2014"}
sel1 := map[string]string{"k8s.mesosphere.io/attribute-rack": "a"}
sel2 := map[string]string{"k8s.mesosphere.io/attribute-rack": "a", "k8s.mesosphere.io/attribute-gen": "2014"}
sel3 := map[string]string{"kubernetes.io/hostname": "node1"}
sel4 := map[string]string{"kubernetes.io/hostname": "node2"}
sel5 := map[string]string{"k8s.mesosphere.io/attribute-old": "42"}
sel6 := map[string]string{"some.other/label": "43"}
newNode := func(hostName string, l map[string]string) *api.Node {
nodeLabels := map[string]string{"kubernetes.io/hostname": hostName}
if l != nil {
for k, v := range l {
nodeLabels[k] = v
}
}
return &api.Node{
ObjectMeta: api.ObjectMeta{
Name: hostName,
Labels: nodeLabels,
},
Spec: api.NodeSpec{
ExternalID: hostName,
},
}
}
node1 := newNode("node1", nil)
node2 := newNode("node2", nil)
node3 := newNode("node3", map[string]string{
"k8s.mesosphere.io/attribute-old": "42",
"k8s.mesosphere.io/attribute-gen": "2015",
"some.other/label": "43",
})
tests := []struct {
selector map[string]string
attrs []*mesos.Attribute
node *api.Node
ok bool
desc string
}{
{sel1, []*mesos.Attribute{newTextAttribute("rack", "a")}, true},
{sel1, []*mesos.Attribute{newTextAttribute("rack", "b")}, false},
{sel1, []*mesos.Attribute{newTextAttribute("rack", "a"), newTextAttribute("gen", "2014")}, true},
{sel1, []*mesos.Attribute{newTextAttribute("rack", "a"), newScalarAttribute("num", 42.0)}, true},
{sel1, []*mesos.Attribute{newScalarAttribute("rack", 42.0)}, false},
{sel2, []*mesos.Attribute{newTextAttribute("rack", "a"), newTextAttribute("gen", "2014")}, true},
{sel2, []*mesos.Attribute{newTextAttribute("rack", "a"), newTextAttribute("gen", "2015")}, false},
{sel1, []*mesos.Attribute{newTextAttribute("rack", "a")}, node1, true, "label value matches"},
{sel1, []*mesos.Attribute{newTextAttribute("rack", "b")}, node1, false, "label value does not match"},
{sel1, []*mesos.Attribute{newTextAttribute("rack", "a"), newTextAttribute("gen", "2014")}, node1, true, "required labels match"},
{sel1, []*mesos.Attribute{newTextAttribute("rack", "a"), newScalarAttribute("num", 42.0)}, node1, true, "scalar label matches"},
{sel1, []*mesos.Attribute{newScalarAttribute("rack", 42.0)}, node1, false, "scalar label does not match"},
{sel2, []*mesos.Attribute{newTextAttribute("rack", "a"), newTextAttribute("gen", "2014")}, node1, true, "all labels match"},
{sel2, []*mesos.Attribute{newTextAttribute("rack", "a"), newTextAttribute("gen", "2015")}, node1, false, "one label does not match"},
{sel3, []*mesos.Attribute{}, node1, true, "hostname label matches"},
{sel4, []*mesos.Attribute{}, node1, false, "hostname label does not match"},
{sel4, []*mesos.Attribute{}, node2, true, "hostname label does not match"},
{sel5, []*mesos.Attribute{}, node3, false, "old slave attribute is removed"},
{sel6, []*mesos.Attribute{}, node1, false, "non-slave attribute does not match"},
{sel6, []*mesos.Attribute{}, node3, true, "non-slave attribute matches"},
{sel2, []*mesos.Attribute{newTextAttribute("rack", "a"), newTextAttribute("gen", "2014")}, node3, true, "old slave attributes are overwritten"},
}
for _, ts := range tests {
@ -296,9 +336,10 @@ func TestNodeSelector(t *testing.T) {
mutil.NewScalarResource("mem", t_min_mem),
},
Attributes: ts.attrs,
Hostname: &ts.node.Name,
}
if got, want := DefaultPredicate(task, offer), ts.ok; got != want {
t.Fatalf("expected acceptance of offer %v for selector %v to be %v, got %v:", want, got, ts.attrs, ts.selector)
if got, want := DefaultPredicate(task, offer, ts.node), ts.ok; got != want {
t.Fatalf("expected acceptance of offer for selector %v to be %v, got %v: %q", ts.selector, want, got, ts.desc)
}
}
}

View File

@ -19,7 +19,9 @@ package podtask
import (
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
"k8s.io/kubernetes/contrib/mesos/pkg/node"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
)
@ -31,25 +33,25 @@ var DefaultPredicate = RequireAllPredicate([]FitPredicate{
}).Fit
// FitPredicate implementations determine if the given task "fits" into offered Mesos resources.
// Neither the task or offer should be modified.
type FitPredicate func(*T, *mesos.Offer) bool
// Neither the task or offer should be modified. Note that the node can be nil.
type FitPredicate func(*T, *mesos.Offer, *api.Node) bool
type RequireAllPredicate []FitPredicate
func (f RequireAllPredicate) Fit(t *T, offer *mesos.Offer) bool {
func (f RequireAllPredicate) Fit(t *T, offer *mesos.Offer, n *api.Node) bool {
for _, p := range f {
if !p(t, offer) {
if !p(t, offer, n) {
return false
}
}
return true
}
func ValidationPredicate(t *T, offer *mesos.Offer) bool {
func ValidationPredicate(t *T, offer *mesos.Offer, _ *api.Node) bool {
return t != nil && offer != nil
}
func NodeSelectorPredicate(t *T, offer *mesos.Offer) bool {
func NodeSelectorPredicate(t *T, offer *mesos.Offer, n *api.Node) bool {
// if the user has specified a target host, make sure this offer is for that host
if t.Pod.Spec.NodeName != "" && offer.GetHostname() != t.Pod.Spec.NodeName {
return false
@ -57,21 +59,29 @@ func NodeSelectorPredicate(t *T, offer *mesos.Offer) bool {
// check the NodeSelector
if len(t.Pod.Spec.NodeSelector) > 0 {
slaveLabels := map[string]string{}
for _, a := range offer.Attributes {
if a.GetType() == mesos.Value_TEXT {
slaveLabels[a.GetName()] = a.GetText().GetValue()
l := map[string]string{
"kubernetes.io/hostname": offer.GetHostname(),
}
if n != nil && n.Labels != nil {
for k, v := range n.Labels {
if !node.IsSlaveAttributeLabel(k) {
l[k] = v
}
}
}
for k, v := range node.SlaveAttributesToLabels(offer.Attributes) {
l[k] = v
}
selector := labels.SelectorFromSet(t.Pod.Spec.NodeSelector)
if !selector.Matches(labels.Set(slaveLabels)) {
if !selector.Matches(labels.Set(l)) {
return false
}
}
return true
}
func PortsPredicate(t *T, offer *mesos.Offer) bool {
func PortsPredicate(t *T, offer *mesos.Offer, _ *api.Node) bool {
// check ports
if _, err := t.mapper.Generate(t, offer); err != nil {
log.V(3).Info(err)
@ -80,7 +90,7 @@ func PortsPredicate(t *T, offer *mesos.Offer) bool {
return true
}
func PodFitsResourcesPredicate(t *T, offer *mesos.Offer) bool {
func PodFitsResourcesPredicate(t *T, offer *mesos.Offer, _ *api.Node) bool {
// find offered cpu and mem
var (
offeredCpus mresource.CPUShares

View File

@ -31,6 +31,7 @@ import (
bindings "github.com/mesos/mesos-go/scheduler"
execcfg "k8s.io/kubernetes/contrib/mesos/pkg/executor/config"
"k8s.io/kubernetes/contrib/mesos/pkg/executor/messages"
"k8s.io/kubernetes/contrib/mesos/pkg/node"
"k8s.io/kubernetes/contrib/mesos/pkg/offers"
offerMetrics "k8s.io/kubernetes/contrib/mesos/pkg/offers/metrics"
"k8s.io/kubernetes/contrib/mesos/pkg/proc"
@ -82,6 +83,7 @@ type KubernetesScheduler struct {
etcdClient tools.EtcdClient
failoverTimeout float64 // in seconds
reconcileInterval int64
nodeRegistrator node.Registrator
// Mesos context.
@ -116,6 +118,7 @@ type Config struct {
FailoverTimeout float64
ReconcileInterval int64
ReconcileCooldown time.Duration
LookupNode node.LookupFunc
}
// New creates a new KubernetesScheduler
@ -131,16 +134,23 @@ func New(config Config) *KubernetesScheduler {
etcdClient: config.EtcdClient,
failoverTimeout: config.FailoverTimeout,
reconcileInterval: config.ReconcileInterval,
nodeRegistrator: node.NewRegistrator(config.Client, config.LookupNode),
offers: offers.CreateRegistry(offers.RegistryConfig{
Compat: func(o *mesos.Offer) bool {
// filter the offers: the executor IDs must not identify a kubelet-
// executor with a group that doesn't match ours
// the node must be registered and have up-to-date labels
n := config.LookupNode(o.GetHostname())
if n == nil || !node.IsUpToDate(n, node.SlaveAttributesToLabels(o.GetAttributes())) {
return false
}
// the executor IDs must not identify a kubelet-executor with a group that doesn't match ours
for _, eid := range o.GetExecutorIds() {
execuid := uid.Parse(eid.GetValue())
if execuid.Name() == execcfg.DefaultInfoID && execuid.Group() != k.executorGroup {
return false
}
}
return true
},
DeclineOffer: func(id string) <-chan error {
@ -183,6 +193,7 @@ func (k *KubernetesScheduler) Init(electedMaster proc.Process, pl PluginInterfac
k.plugin = pl
k.offers.Init(k.terminate)
k.InstallDebugHandlers(mux)
k.nodeRegistrator.Run(k.terminate)
return k.recoverTasks()
}
@ -323,6 +334,15 @@ func (k *KubernetesScheduler) ResourceOffers(driver bindings.SchedulerDriver, of
for _, offer := range offers {
slaveId := offer.GetSlaveId().GetValue()
k.slaveHostNames.Register(slaveId, offer.GetHostname())
// create api object if not existing already
if k.nodeRegistrator != nil {
labels := node.SlaveAttributesToLabels(offer.GetAttributes())
_, err := k.nodeRegistrator.Register(offer.GetHostname(), labels)
if err != nil {
log.Error(err)
}
}
}
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package scheduler
import (
"reflect"
"testing"
mesos "github.com/mesos/mesos-go/mesosproto"
@ -27,6 +28,8 @@ import (
schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/slave"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
)
//get number of non-expired offers from offer registry
@ -42,10 +45,47 @@ func getNumberOffers(os offers.Registry) int {
return walked
}
//test adding of ressource offer, should be added to offer registry and slavesf
type mockRegistrator struct {
store cache.Store
}
func (r *mockRegistrator) Run(terminate <-chan struct{}) error {
return nil
}
func (r *mockRegistrator) Register(hostName string, labels map[string]string) (bool, error) {
obj, _, err := r.store.GetByKey(hostName)
if err != nil {
return false, err
}
if obj == nil {
return true, r.store.Add(&api.Node{
ObjectMeta: api.ObjectMeta{
Name: hostName,
Labels: labels,
},
Spec: api.NodeSpec{
ExternalID: hostName,
},
Status: api.NodeStatus{
Phase: api.NodePending,
},
})
} else {
n := obj.(*api.Node)
if reflect.DeepEqual(n.Labels, labels) {
return false, nil
}
n.Labels = labels
return true, r.store.Update(n)
}
}
//test adding of ressource offer, should be added to offer registry and slaves
func TestResourceOffer_Add(t *testing.T) {
assert := assert.New(t)
registrator := &mockRegistrator{cache.NewStore(cache.MetaNamespaceKeyFunc)}
testScheduler := &KubernetesScheduler{
offers: offers.CreateRegistry(offers.RegistryConfig{
Compat: func(o *mesos.Offer) bool {
@ -59,7 +99,8 @@ func TestResourceOffer_Add(t *testing.T) {
TTL: schedcfg.DefaultOfferTTL,
ListenerDelay: schedcfg.DefaultListenerDelay,
}),
slaveHostNames: slave.NewRegistry(),
slaveHostNames: slave.NewRegistry(),
nodeRegistrator: registrator,
}
hostname := "h1"
@ -67,6 +108,7 @@ func TestResourceOffer_Add(t *testing.T) {
offer1 := &mesos.Offer{Id: offerID1, Hostname: &hostname, SlaveId: util.NewSlaveID(hostname)}
offers1 := []*mesos.Offer{offer1}
testScheduler.ResourceOffers(nil, offers1)
assert.Equal(1, len(registrator.store.List()))
assert.Equal(1, getNumberOffers(testScheduler.offers))
//check slave hostname

View File

@ -63,8 +63,10 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/uid"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
clientauth "k8s.io/kubernetes/pkg/client/unversioned/auth"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/master/ports"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
@ -76,6 +78,7 @@ const (
defaultMesosUser = "root" // should have privs to execute docker and iptables commands
defaultReconcileInterval = 300 // 5m default task reconciliation interval
defaultReconcileCooldown = 15 * time.Second
defaultNodeRelistPeriod = 5 * time.Minute
defaultFrameworkName = "Kubernetes"
defaultExecutorCPUs = mresource.CPUShares(0.25) // initial CPU allocated for executor
defaultExecutorMem = mresource.MegaBytes(128.0) // initial memory allocated for executor
@ -145,6 +148,7 @@ type SchedulerServer struct {
DockerCfgPath string
ContainPodResources bool
AccountForPodResources bool
nodeRelistPeriod time.Duration
executable string // path to the binary running this service
client *client.Client
@ -192,6 +196,7 @@ func NewSchedulerServer() *SchedulerServer {
KubeletSyncFrequency: 10 * time.Second,
ContainPodResources: true,
AccountForPodResources: true,
nodeRelistPeriod: defaultNodeRelistPeriod,
}
// cache this for later use. also useful in case the original binary gets deleted, e.g.
// during upgrades, development deployments, etc.
@ -245,6 +250,7 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) {
fs.Var(&s.DefaultContainerMemLimit, "default-container-mem-limit", "Containers without a memory resource limit are admitted this much amount of memory in MB")
fs.BoolVar(&s.ContainPodResources, "contain-pod-resources", s.ContainPodResources, "Reparent pod containers into mesos cgroups; disable if you're having strange mesos/docker/systemd interactions.")
fs.BoolVar(&s.AccountForPodResources, "account-for-pod-resources", s.AccountForPodResources, "Allocate pod CPU and memory resources from offers (Default: true)")
fs.DurationVar(&s.nodeRelistPeriod, "node-monitor-period", s.nodeRelistPeriod, "Period between relisting of all nodes from the apiserver.")
fs.IntVar(&s.ExecutorLogV, "executor-logv", s.ExecutorLogV, "Logging verbosity of spawned minion and executor processes.")
fs.BoolVar(&s.ExecutorBindall, "executor-bindall", s.ExecutorBindall, "When true will set -address of the executor to 0.0.0.0.")
@ -678,7 +684,24 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
podtask.DefaultMinimalProcurement)
}
fcfs := scheduler.NewFCFSPodScheduler(as)
// mirror all nodes into the nodeStore
nodesClient, err := s.createAPIServerClient()
if err != nil {
log.Fatalf("Cannot create client to watch nodes: %v", err)
}
nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
nodeLW := cache.NewListWatchFromClient(nodesClient, "nodes", api.NamespaceAll, fields.Everything())
cache.NewReflector(nodeLW, &api.Node{}, nodeStore, s.nodeRelistPeriod).Run()
lookupNode := func(hostName string) *api.Node {
n, _, _ := nodeStore.GetByKey(hostName) // ignore error and return nil then
if n == nil {
return nil
}
return n.(*api.Node)
}
fcfs := scheduler.NewFCFSPodScheduler(as, lookupNode)
mesosPodScheduler := scheduler.New(scheduler.Config{
Schedcfg: *sc,
Executor: executor,
@ -688,6 +711,7 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
FailoverTimeout: s.FailoverTimeout,
ReconcileInterval: s.ReconcileInterval,
ReconcileCooldown: s.ReconcileCooldown,
LookupNode: lookupNode,
})
masterUri := s.MesosMaster

53
test/e2e/mesos.go Normal file
View File

@ -0,0 +1,53 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/kubernetes/pkg/fields"
)
var _ = Describe("Mesos", func() {
framework := NewFramework("pods")
BeforeEach(func() {
SkipUnlessProviderIs("mesos/docker")
})
It("applies slave attributes as labels", func() {
nodeClient := framework.Client.Nodes()
rackA := labels.SelectorFromSet(map[string]string{"k8s.mesosphere.io/attribute-rack": "1"})
nodes, err := nodeClient.List(rackA, fields.Everything())
if err != nil {
Failf("Failed to query for node: %v", err)
}
Expect(len(nodes.Items)).To(Equal(1))
var addr string
for _, a := range nodes.Items[0].Status.Addresses {
if a.Type == api.NodeInternalIP {
addr = a.Address
}
}
Expect(len(addr)).NotTo(Equal(""))
})
})