Merge pull request #13857 from mesosphere/node-labels

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2015-09-28 22:10:51 -07:00
commit c807bea089
16 changed files with 675 additions and 59 deletions

View File

@ -33,6 +33,7 @@ import (
mutil "github.com/mesos/mesos-go/mesosutil"
"k8s.io/kubernetes/contrib/mesos/pkg/archive"
"k8s.io/kubernetes/contrib/mesos/pkg/executor/messages"
"k8s.io/kubernetes/contrib/mesos/pkg/node"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
@ -225,6 +226,13 @@ func (k *KubernetesExecutor) Registered(driver bindings.ExecutorDriver,
k.staticPodsConfig = executorInfo.Data
}
if slaveInfo != nil {
_, err := node.CreateOrUpdate(k.client, slaveInfo.GetHostname(), node.SlaveAttributesToLabels(slaveInfo.Attributes))
if err != nil {
log.Errorf("cannot update node labels: %v", err)
}
}
k.initialRegistration.Do(k.onInitialRegistration)
}
@ -239,11 +247,19 @@ func (k *KubernetesExecutor) Reregistered(driver bindings.ExecutorDriver, slaveI
log.Errorf("failed to reregister/transition to a connected state")
}
if slaveInfo != nil {
_, err := node.CreateOrUpdate(k.client, slaveInfo.GetHostname(), node.SlaveAttributesToLabels(slaveInfo.Attributes))
if err != nil {
log.Errorf("cannot update node labels: %v", err)
}
}
k.initialRegistration.Do(k.onInitialRegistration)
}
func (k *KubernetesExecutor) onInitialRegistration() {
defer close(k.initialRegComplete)
// emit an empty update to allow the mesos "source" to be marked as seen
k.updateChan <- kubelet.PodUpdate{
Pods: []*api.Pod{},

View File

@ -0,0 +1,18 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package node provides utilities to create and update nodes
package node

View File

@ -0,0 +1,160 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package node
import (
"encoding/json"
"fmt"
"reflect"
"strconv"
"strings"
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/util/validation"
)
const (
labelPrefix = "k8s.mesosphere.io/attribute-"
)
// Create creates a new node api object with the given hostname and labels
func Create(client *client.Client, hostName string, labels map[string]string) (*api.Node, error) {
n := api.Node{
ObjectMeta: api.ObjectMeta{
Name: hostName,
Labels: map[string]string{"kubernetes.io/hostname": hostName},
},
Spec: api.NodeSpec{
ExternalID: hostName,
},
Status: api.NodeStatus{
Phase: api.NodePending,
},
}
for k, v := range labels {
n.Labels[k] = v
}
// try to create
return client.Nodes().Create(&n)
}
// Update updates an existing node api object with new labels
func Update(client *client.Client, n *api.Node, labels map[string]string) (*api.Node, error) {
patch := struct {
Metadata struct {
Labels map[string]string `json:"labels"`
} `json:"metadata"`
}{}
patch.Metadata.Labels = map[string]string{}
for k, v := range n.Labels {
if !IsSlaveAttributeLabel(k) {
patch.Metadata.Labels[k] = v
}
}
for k, v := range labels {
patch.Metadata.Labels[k] = v
}
patchJson, _ := json.Marshal(patch)
log.V(4).Infof("Patching labels of node %q: %v", n.Name, string(patchJson))
err := client.Patch(api.MergePatchType).RequestURI(n.SelfLink).Body(patchJson).Do().Error()
if err != nil {
return nil, fmt.Errorf("error updating labels of node %q: %v", n.Name, err)
}
newNode, err := api.Scheme.DeepCopy(n)
if err != nil {
return nil, err
}
newNode.(*api.Node).Labels = patch.Metadata.Labels
return newNode.(*api.Node), nil
}
// CreateOrUpdate tries to create a node api object or updates an already existing one
func CreateOrUpdate(client *client.Client, hostName string, labels map[string]string) (*api.Node, error) {
n, err := Create(client, hostName, labels)
if err == nil {
return n, nil
}
if !errors.IsAlreadyExists(err) {
return nil, fmt.Errorf("unable to register %q with the apiserver: %v", hostName, err)
}
// fall back to update an old node with new labels
n, err = client.Nodes().Get(hostName)
if err != nil {
return nil, fmt.Errorf("error getting node %q: %v", hostName, err)
}
if n == nil {
return nil, fmt.Errorf("no node instance returned for %q", hostName)
}
return Update(client, n, labels)
}
// IsSlaveAttributeLabel returns true iff the given label is derived from a slave attribute
func IsSlaveAttributeLabel(l string) bool {
return strings.HasPrefix(l, labelPrefix)
}
// IsUpToDate returns true iff the node's slave labels match the given attributes labels
func IsUpToDate(n *api.Node, labels map[string]string) bool {
slaveLabels := map[string]string{}
for k, v := range n.Labels {
if IsSlaveAttributeLabel(k) {
slaveLabels[k] = v
}
}
return reflect.DeepEqual(slaveLabels, labels)
}
// SlaveAttributesToLabels converts slave attributes into string key/value labels
func SlaveAttributesToLabels(attrs []*mesos.Attribute) map[string]string {
l := map[string]string{}
for _, a := range attrs {
if a == nil {
continue
}
var v string
k := labelPrefix + a.GetName()
switch a.GetType() {
case mesos.Value_TEXT:
v = a.GetText().GetValue()
case mesos.Value_SCALAR:
v = strconv.FormatFloat(a.GetScalar().GetValue(), 'G', -1, 64)
}
if !validation.IsQualifiedName(k) {
log.V(3).Infof("ignoring invalid node label name %q", k)
continue
}
if !validation.IsValidLabelValue(v) {
log.V(3).Infof("ignoring invalid node label %s value: %q", k, v)
continue
}
l[k] = v
}
return l
}

View File

@ -0,0 +1,148 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package node
import (
"fmt"
"time"
log "github.com/golang/glog"
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
client "k8s.io/kubernetes/pkg/client/unversioned"
)
type Registrator interface {
// Register checks whether the node is registered with the given labels. If it
// is not, it is created or updated on the apiserver. If an the node was up-to-date,
// false is returned.
Register(hostName string, labels map[string]string) (bool, error)
// Start the registration loop and return immediately.
Run(terminate <-chan struct{}) error
}
type registration struct {
hostName string
labels map[string]string
}
func (r *registration) Copy() queue.Copyable {
return &registration{
hostName: r.hostName,
labels: r.labels, // labels are never changed, no need to clone
}
}
func (r *registration) GetUID() string {
return r.hostName
}
func (r *registration) Value() queue.UniqueCopyable {
return r
}
type LookupFunc func(hostName string) *api.Node
type clientRegistrator struct {
lookupNode LookupFunc
client *client.Client
queue *queue.HistoricalFIFO
}
func NewRegistrator(client *client.Client, lookupNode LookupFunc) *clientRegistrator {
return &clientRegistrator{
lookupNode: lookupNode,
client: client,
queue: queue.NewHistorical(nil),
}
}
func (r *clientRegistrator) Run(terminate <-chan struct{}) error {
loop := func() {
RegistrationLoop:
for {
obj := r.queue.CancelablePop(terminate)
if obj == nil {
break RegistrationLoop
}
select {
case <-terminate:
break RegistrationLoop
default:
}
rg := obj.(*registration)
n, needsUpdate := r.updateNecessary(rg.hostName, rg.labels)
if !needsUpdate {
continue
}
if n == nil {
log.V(2).Infof("creating node %s with labels %v", rg.hostName, rg.labels)
_, err := CreateOrUpdate(r.client, rg.hostName, rg.labels)
if err != nil {
log.Errorf("error creating the node %s: %v", rg.hostName, rg.labels)
}
} else {
log.V(2).Infof("updating node %s with labels %v", rg.hostName, rg.labels)
_, err := Update(r.client, n, rg.labels)
if err != nil && errors.IsNotFound(err) {
// last chance when our store was out of date
_, err = Create(r.client, rg.hostName, rg.labels)
}
if err != nil {
log.Errorf("error updating the node %s: %v", rg.hostName, rg.labels)
}
}
}
}
go runtime.Until(loop, time.Second, terminate)
return nil
}
func (r *clientRegistrator) Register(hostName string, labels map[string]string) (bool, error) {
_, needsUpdate := r.updateNecessary(hostName, labels)
if needsUpdate {
log.V(5).Infof("queuing registration for node %s with labels %v", hostName, labels)
err := r.queue.Update(&registration{
hostName: hostName,
labels: labels,
})
if err != nil {
return false, fmt.Errorf("cannot register node %s: %v", hostName, err)
}
return true, nil
}
return false, nil
}
// updateNecessary retrieves the node with the given hostname and checks whether the given
// labels would mean any update to the node. The unmodified node is returned, plus
// true iff an update is necessary.
func (r *clientRegistrator) updateNecessary(hostName string, labels map[string]string) (*api.Node, bool) {
if r.lookupNode == nil {
return nil, true
}
n := r.lookupNode(hostName)
return n, n == nil || !IsUpToDate(n, labels)
}

View File

@ -222,7 +222,7 @@ func (f *HistoricalFIFO) Poll(id string, t EventType) bool {
func (q *HistoricalFIFO) Await(timeout time.Duration) interface{} {
cancel := make(chan struct{})
ch := make(chan interface{}, 1)
go func() { ch <- q.pop(cancel) }()
go func() { ch <- q.CancelablePop(cancel) }()
select {
case <-time.After(timeout):
close(cancel)
@ -232,10 +232,10 @@ func (q *HistoricalFIFO) Await(timeout time.Duration) interface{} {
}
}
func (f *HistoricalFIFO) Pop() interface{} {
return f.pop(nil)
return f.CancelablePop(nil)
}
func (f *HistoricalFIFO) pop(cancel chan struct{}) interface{} {
func (f *HistoricalFIFO) CancelablePop(cancel <-chan struct{}) interface{} {
popEvent := (Entry)(nil)
defer func() {
f.carrier(popEvent)
@ -383,7 +383,7 @@ func (f *HistoricalFIFO) merge(id string, obj UniqueCopyable) (notifications []E
// NewHistorical returns a Store which can be used to queue up items to
// process. If a non-nil Mux is provided, then modifications to the
// the FIFO are delivered on a channel specific to this fifo.
func NewHistorical(ch chan<- Entry) FIFO {
func NewHistorical(ch chan<- Entry) *HistoricalFIFO {
carrier := dead
if ch != nil {
carrier = func(msg Entry) {

View File

@ -21,6 +21,7 @@ import (
log "github.com/golang/glog"
"k8s.io/kubernetes/contrib/mesos/pkg/node"
"k8s.io/kubernetes/contrib/mesos/pkg/offers"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
)
@ -53,10 +54,11 @@ func NewAllocationStrategy(fitPredicate podtask.FitPredicate, procurement podtas
type fcfsPodScheduler struct {
AllocationStrategy
lookupNode node.LookupFunc
}
func NewFCFSPodScheduler(as AllocationStrategy) PodScheduler {
return &fcfsPodScheduler{as}
func NewFCFSPodScheduler(as AllocationStrategy, lookupNode node.LookupFunc) PodScheduler {
return &fcfsPodScheduler{as, lookupNode}
}
// A first-come-first-serve scheduler: acquires the first offer that can support the task
@ -68,7 +70,18 @@ func (fps *fcfsPodScheduler) SchedulePod(r offers.Registry, unused SlaveIndex, t
if offer == nil {
return false, fmt.Errorf("nil offer while scheduling task %v", task.ID)
}
if fps.FitPredicate()(task, offer) {
// check that the node actually exists. As offers are declined if not, the
// case n==nil can only happen when the node object was deleted since the
// offer came in.
nodeName := offer.GetHostname()
n := fps.lookupNode(nodeName)
if n == nil {
log.V(3).Infof("ignoring offer for node %s because node went away", nodeName)
return false, nil
}
if fps.FitPredicate()(task, offer, n) {
if p.Acquire() {
acceptedOffer = p
log.V(3).Infof("Pod %s accepted offer %v", podName, offer.Id.GetValue())

View File

@ -543,7 +543,11 @@ func (k *errorHandler) handleSchedulingError(pod *api.Pod, schedulingErr error)
defer k.api.Unlock()
switch task, state := k.api.tasks().Get(task.ID); state {
case podtask.StatePending:
return !task.Has(podtask.Launched) && k.api.algorithm().FitPredicate()(task, offer)
// Assess fitness of pod with the current offer. The scheduler normally
// "backs off" when it can't find an offer that matches up with a pod.
// The backoff period for a pod can terminate sooner if an offer becomes
// available that matches up.
return !task.Has(podtask.Launched) && k.api.algorithm().FitPredicate()(task, offer, nil)
default:
// no point in continuing to check for matching offers
return true

View File

@ -393,14 +393,21 @@ func TestPlugin_LifeCycle(t *testing.T) {
executor.Data = []byte{0, 1, 2}
// create scheduler
nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
as := NewAllocationStrategy(
podtask.DefaultPredicate,
podtask.NewDefaultProcurement(mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit))
testScheduler := New(Config{
Executor: executor,
Client: client.NewOrDie(&client.Config{Host: testApiServer.server.URL, Version: testapi.Default.Version()}),
Scheduler: NewFCFSPodScheduler(as),
Schedcfg: *schedcfg.CreateDefaultConfig(),
Executor: executor,
Client: client.NewOrDie(&client.Config{Host: testApiServer.server.URL, Version: testapi.Default.Version()}),
Scheduler: NewFCFSPodScheduler(as, func(node string) *api.Node {
obj, _, _ := nodeStore.GetByKey(node)
if obj == nil {
return nil
}
return obj.(*api.Node)
}),
Schedcfg: *schedcfg.CreateDefaultConfig(),
})
assert.NotNil(testScheduler.client, "client is nil")

View File

@ -19,6 +19,7 @@ package podtask
import (
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
"k8s.io/kubernetes/pkg/api"
)
// bogus numbers that we use to make sure that there's some set of minimal offered resources on the slave
@ -43,7 +44,7 @@ var (
}).Procure
)
func MinimalPodResourcesPredicate(t *T, offer *mesos.Offer) bool {
func MinimalPodResourcesPredicate(t *T, offer *mesos.Offer, _ *api.Node) bool {
var (
offeredCpus float64
offeredMem float64

View File

@ -19,14 +19,14 @@ package podtask
import (
"testing"
"github.com/gogo/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
mutil "github.com/mesos/mesos-go/mesosutil"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/contrib/mesos/pkg/node"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
)
const (
@ -146,10 +146,10 @@ func TestEmptyOffer(t *testing.T) {
mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit)
mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit)
if ok := DefaultPredicate(task, nil); ok {
if ok := DefaultPredicate(task, nil, nil); ok {
t.Fatalf("accepted nil offer")
}
if ok := DefaultPredicate(task, &mesos.Offer{}); ok {
if ok := DefaultPredicate(task, &mesos.Offer{}, nil); ok {
t.Fatalf("accepted empty offer")
}
}
@ -176,7 +176,7 @@ func TestNoPortsInPodOrOffer(t *testing.T) {
mutil.NewScalarResource("mem", 0.001),
},
}
if ok := DefaultPredicate(task, offer); ok {
if ok := DefaultPredicate(task, offer, nil); ok {
t.Fatalf("accepted offer %v:", offer)
}
@ -186,7 +186,7 @@ func TestNoPortsInPodOrOffer(t *testing.T) {
mutil.NewScalarResource("mem", t_min_mem),
},
}
if ok := DefaultPredicate(task, offer); !ok {
if ok := DefaultPredicate(task, offer, nil); !ok {
t.Fatalf("did not accepted offer %v:", offer)
}
}
@ -203,7 +203,7 @@ func TestAcceptOfferPorts(t *testing.T) {
rangeResource("ports", []uint64{1, 1}),
},
}
if ok := DefaultPredicate(task, offer); !ok {
if ok := DefaultPredicate(task, offer, nil); !ok {
t.Fatalf("did not accepted offer %v:", offer)
}
@ -218,17 +218,17 @@ func TestAcceptOfferPorts(t *testing.T) {
mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit)
mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit)
if ok := DefaultPredicate(task, offer); ok {
if ok := DefaultPredicate(task, offer, nil); ok {
t.Fatalf("accepted offer %v:", offer)
}
pod.Spec.Containers[0].Ports[0].HostPort = 1
if ok := DefaultPredicate(task, offer); !ok {
if ok := DefaultPredicate(task, offer, nil); !ok {
t.Fatalf("did not accepted offer %v:", offer)
}
pod.Spec.Containers[0].Ports[0].HostPort = 0
if ok := DefaultPredicate(task, offer); !ok {
if ok := DefaultPredicate(task, offer, nil); !ok {
t.Fatalf("did not accepted offer %v:", offer)
}
@ -236,12 +236,12 @@ func TestAcceptOfferPorts(t *testing.T) {
mutil.NewScalarResource("cpus", t_min_cpu),
mutil.NewScalarResource("mem", t_min_mem),
}
if ok := DefaultPredicate(task, offer); ok {
if ok := DefaultPredicate(task, offer, nil); ok {
t.Fatalf("accepted offer %v:", offer)
}
pod.Spec.Containers[0].Ports[0].HostPort = 1
if ok := DefaultPredicate(task, offer); ok {
if ok := DefaultPredicate(task, offer, nil); ok {
t.Fatalf("accepted offer %v:", offer)
}
}
@ -270,21 +270,60 @@ func TestGeneratePodName(t *testing.T) {
func TestNodeSelector(t *testing.T) {
t.Parallel()
sel1 := map[string]string{"rack": "a"}
sel2 := map[string]string{"rack": "a", "gen": "2014"}
newNode := func(hostName string, l map[string]string) *api.Node {
nodeLabels := map[string]string{"kubernetes.io/hostname": hostName}
if l != nil {
for k, v := range l {
nodeLabels[k] = v
}
}
return &api.Node{
ObjectMeta: api.ObjectMeta{
Name: hostName,
Labels: nodeLabels,
},
Spec: api.NodeSpec{
ExternalID: hostName,
},
}
}
node1 := newNode("node1", node.SlaveAttributesToLabels([]*mesos.Attribute{
newTextAttribute("rack", "a"),
newTextAttribute("gen", "2014"),
newScalarAttribute("num", 42.0),
}))
node2 := newNode("node2", node.SlaveAttributesToLabels([]*mesos.Attribute{
newTextAttribute("rack", "b"),
newTextAttribute("gen", "2015"),
newScalarAttribute("num", 0.0),
}))
labels3 := node.SlaveAttributesToLabels([]*mesos.Attribute{
newTextAttribute("rack", "c"),
newTextAttribute("gen", "2015"),
newScalarAttribute("old", 42),
})
labels3["some.other/label"] = "43"
node3 := newNode("node3", labels3)
tests := []struct {
selector map[string]string
attrs []*mesos.Attribute
node *api.Node
ok bool
desc string
}{
{sel1, []*mesos.Attribute{newTextAttribute("rack", "a")}, true},
{sel1, []*mesos.Attribute{newTextAttribute("rack", "b")}, false},
{sel1, []*mesos.Attribute{newTextAttribute("rack", "a"), newTextAttribute("gen", "2014")}, true},
{sel1, []*mesos.Attribute{newTextAttribute("rack", "a"), newScalarAttribute("num", 42.0)}, true},
{sel1, []*mesos.Attribute{newScalarAttribute("rack", 42.0)}, false},
{sel2, []*mesos.Attribute{newTextAttribute("rack", "a"), newTextAttribute("gen", "2014")}, true},
{sel2, []*mesos.Attribute{newTextAttribute("rack", "a"), newTextAttribute("gen", "2015")}, false},
{map[string]string{"k8s.mesosphere.io/attribute-rack": "a"}, node1, true, "label value matches"},
{map[string]string{"k8s.mesosphere.io/attribute-rack": "b"}, node1, false, "label value does not match"},
{map[string]string{"k8s.mesosphere.io/attribute-rack": "a", "k8s.mesosphere.io/attribute-gen": "2014"}, node1, true, "multiple required labels match"},
{map[string]string{"k8s.mesosphere.io/attribute-rack": "a", "k8s.mesosphere.io/attribute-gen": "2015"}, node1, false, "one label does not match"},
{map[string]string{"k8s.mesosphere.io/attribute-rack": "a", "k8s.mesosphere.io/attribute-num": "42"}, node1, true, "scalar label matches"},
{map[string]string{"k8s.mesosphere.io/attribute-rack": "a", "k8s.mesosphere.io/attribute-num": "43"}, node1, false, "scalar label does not match"},
{map[string]string{"kubernetes.io/hostname": "node1"}, node1, true, "hostname label matches"},
{map[string]string{"kubernetes.io/hostname": "node2"}, node1, false, "hostname label does not match"},
{map[string]string{"kubernetes.io/hostname": "node2"}, node2, true, "hostname label matches"},
{map[string]string{"some.other/label": "43"}, node1, false, "non-slave attribute does not match"},
{map[string]string{"some.other/label": "43"}, node3, true, "non-slave attribute matches"},
}
for _, ts := range tests {
@ -295,10 +334,10 @@ func TestNodeSelector(t *testing.T) {
mutil.NewScalarResource("cpus", t_min_cpu),
mutil.NewScalarResource("mem", t_min_mem),
},
Attributes: ts.attrs,
Hostname: &ts.node.Name,
}
if got, want := DefaultPredicate(task, offer), ts.ok; got != want {
t.Fatalf("expected acceptance of offer %v for selector %v to be %v, got %v:", want, got, ts.attrs, ts.selector)
if got, want := DefaultPredicate(task, offer, ts.node), ts.ok; got != want {
t.Fatalf("expected acceptance of offer for selector %v to be %v, got %v: %q", ts.selector, want, got, ts.desc)
}
}
}

View File

@ -20,6 +20,7 @@ import (
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
)
@ -31,25 +32,25 @@ var DefaultPredicate = RequireAllPredicate([]FitPredicate{
}).Fit
// FitPredicate implementations determine if the given task "fits" into offered Mesos resources.
// Neither the task or offer should be modified.
type FitPredicate func(*T, *mesos.Offer) bool
// Neither the task or offer should be modified. Note that the node can be nil.
type FitPredicate func(*T, *mesos.Offer, *api.Node) bool
type RequireAllPredicate []FitPredicate
func (f RequireAllPredicate) Fit(t *T, offer *mesos.Offer) bool {
func (f RequireAllPredicate) Fit(t *T, offer *mesos.Offer, n *api.Node) bool {
for _, p := range f {
if !p(t, offer) {
if !p(t, offer, n) {
return false
}
}
return true
}
func ValidationPredicate(t *T, offer *mesos.Offer) bool {
func ValidationPredicate(t *T, offer *mesos.Offer, _ *api.Node) bool {
return t != nil && offer != nil
}
func NodeSelectorPredicate(t *T, offer *mesos.Offer) bool {
func NodeSelectorPredicate(t *T, offer *mesos.Offer, n *api.Node) bool {
// if the user has specified a target host, make sure this offer is for that host
if t.Pod.Spec.NodeName != "" && offer.GetHostname() != t.Pod.Spec.NodeName {
return false
@ -57,21 +58,18 @@ func NodeSelectorPredicate(t *T, offer *mesos.Offer) bool {
// check the NodeSelector
if len(t.Pod.Spec.NodeSelector) > 0 {
slaveLabels := map[string]string{}
for _, a := range offer.Attributes {
if a.GetType() == mesos.Value_TEXT {
slaveLabels[a.GetName()] = a.GetText().GetValue()
}
if n.Labels == nil {
return false
}
selector := labels.SelectorFromSet(t.Pod.Spec.NodeSelector)
if !selector.Matches(labels.Set(slaveLabels)) {
if !selector.Matches(labels.Set(n.Labels)) {
return false
}
}
return true
}
func PortsPredicate(t *T, offer *mesos.Offer) bool {
func PortsPredicate(t *T, offer *mesos.Offer, _ *api.Node) bool {
// check ports
if _, err := t.mapper.Generate(t, offer); err != nil {
log.V(3).Info(err)
@ -80,7 +78,7 @@ func PortsPredicate(t *T, offer *mesos.Offer) bool {
return true
}
func PodFitsResourcesPredicate(t *T, offer *mesos.Offer) bool {
func PodFitsResourcesPredicate(t *T, offer *mesos.Offer, _ *api.Node) bool {
// find offered cpu and mem
var (
offeredCpus mresource.CPUShares

View File

@ -31,6 +31,7 @@ import (
bindings "github.com/mesos/mesos-go/scheduler"
execcfg "k8s.io/kubernetes/contrib/mesos/pkg/executor/config"
"k8s.io/kubernetes/contrib/mesos/pkg/executor/messages"
"k8s.io/kubernetes/contrib/mesos/pkg/node"
"k8s.io/kubernetes/contrib/mesos/pkg/offers"
offerMetrics "k8s.io/kubernetes/contrib/mesos/pkg/offers/metrics"
"k8s.io/kubernetes/contrib/mesos/pkg/proc"
@ -82,6 +83,7 @@ type KubernetesScheduler struct {
etcdClient tools.EtcdClient
failoverTimeout float64 // in seconds
reconcileInterval int64
nodeRegistrator node.Registrator
// Mesos context.
@ -116,6 +118,7 @@ type Config struct {
FailoverTimeout float64
ReconcileInterval int64
ReconcileCooldown time.Duration
LookupNode node.LookupFunc
}
// New creates a new KubernetesScheduler
@ -131,16 +134,23 @@ func New(config Config) *KubernetesScheduler {
etcdClient: config.EtcdClient,
failoverTimeout: config.FailoverTimeout,
reconcileInterval: config.ReconcileInterval,
nodeRegistrator: node.NewRegistrator(config.Client, config.LookupNode),
offers: offers.CreateRegistry(offers.RegistryConfig{
Compat: func(o *mesos.Offer) bool {
// filter the offers: the executor IDs must not identify a kubelet-
// executor with a group that doesn't match ours
// the node must be registered and have up-to-date labels
n := config.LookupNode(o.GetHostname())
if n == nil || !node.IsUpToDate(n, node.SlaveAttributesToLabels(o.GetAttributes())) {
return false
}
// the executor IDs must not identify a kubelet-executor with a group that doesn't match ours
for _, eid := range o.GetExecutorIds() {
execuid := uid.Parse(eid.GetValue())
if execuid.Name() == execcfg.DefaultInfoID && execuid.Group() != k.executorGroup {
return false
}
}
return true
},
DeclineOffer: func(id string) <-chan error {
@ -183,6 +193,7 @@ func (k *KubernetesScheduler) Init(electedMaster proc.Process, pl PluginInterfac
k.plugin = pl
k.offers.Init(k.terminate)
k.InstallDebugHandlers(mux)
k.nodeRegistrator.Run(k.terminate)
return k.recoverTasks()
}
@ -323,6 +334,15 @@ func (k *KubernetesScheduler) ResourceOffers(driver bindings.SchedulerDriver, of
for _, offer := range offers {
slaveId := offer.GetSlaveId().GetValue()
k.slaveHostNames.Register(slaveId, offer.GetHostname())
// create api object if not existing already
if k.nodeRegistrator != nil {
labels := node.SlaveAttributesToLabels(offer.GetAttributes())
_, err := k.nodeRegistrator.Register(offer.GetHostname(), labels)
if err != nil {
log.Error(err)
}
}
}
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package scheduler
import (
"reflect"
"testing"
mesos "github.com/mesos/mesos-go/mesosproto"
@ -27,6 +28,8 @@ import (
schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/slave"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
)
//get number of non-expired offers from offer registry
@ -42,10 +45,47 @@ func getNumberOffers(os offers.Registry) int {
return walked
}
//test adding of ressource offer, should be added to offer registry and slavesf
type mockRegistrator struct {
store cache.Store
}
func (r *mockRegistrator) Run(terminate <-chan struct{}) error {
return nil
}
func (r *mockRegistrator) Register(hostName string, labels map[string]string) (bool, error) {
obj, _, err := r.store.GetByKey(hostName)
if err != nil {
return false, err
}
if obj == nil {
return true, r.store.Add(&api.Node{
ObjectMeta: api.ObjectMeta{
Name: hostName,
Labels: labels,
},
Spec: api.NodeSpec{
ExternalID: hostName,
},
Status: api.NodeStatus{
Phase: api.NodePending,
},
})
} else {
n := obj.(*api.Node)
if reflect.DeepEqual(n.Labels, labels) {
return false, nil
}
n.Labels = labels
return true, r.store.Update(n)
}
}
//test adding of ressource offer, should be added to offer registry and slaves
func TestResourceOffer_Add(t *testing.T) {
assert := assert.New(t)
registrator := &mockRegistrator{cache.NewStore(cache.MetaNamespaceKeyFunc)}
testScheduler := &KubernetesScheduler{
offers: offers.CreateRegistry(offers.RegistryConfig{
Compat: func(o *mesos.Offer) bool {
@ -59,7 +99,8 @@ func TestResourceOffer_Add(t *testing.T) {
TTL: schedcfg.DefaultOfferTTL,
ListenerDelay: schedcfg.DefaultListenerDelay,
}),
slaveHostNames: slave.NewRegistry(),
slaveHostNames: slave.NewRegistry(),
nodeRegistrator: registrator,
}
hostname := "h1"
@ -67,6 +108,7 @@ func TestResourceOffer_Add(t *testing.T) {
offer1 := &mesos.Offer{Id: offerID1, Hostname: &hostname, SlaveId: util.NewSlaveID(hostname)}
offers1 := []*mesos.Offer{offer1}
testScheduler.ResourceOffers(nil, offers1)
assert.Equal(1, len(registrator.store.List()))
assert.Equal(1, getNumberOffers(testScheduler.offers))
//check slave hostname

View File

@ -63,8 +63,10 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/uid"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
clientauth "k8s.io/kubernetes/pkg/client/unversioned/auth"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/master/ports"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
@ -76,6 +78,7 @@ const (
defaultMesosUser = "root" // should have privs to execute docker and iptables commands
defaultReconcileInterval = 300 // 5m default task reconciliation interval
defaultReconcileCooldown = 15 * time.Second
defaultNodeRelistPeriod = 5 * time.Minute
defaultFrameworkName = "Kubernetes"
defaultExecutorCPUs = mresource.CPUShares(0.25) // initial CPU allocated for executor
defaultExecutorMem = mresource.MegaBytes(128.0) // initial memory allocated for executor
@ -145,6 +148,7 @@ type SchedulerServer struct {
DockerCfgPath string
ContainPodResources bool
AccountForPodResources bool
nodeRelistPeriod time.Duration
executable string // path to the binary running this service
client *client.Client
@ -192,6 +196,7 @@ func NewSchedulerServer() *SchedulerServer {
KubeletSyncFrequency: 10 * time.Second,
ContainPodResources: true,
AccountForPodResources: true,
nodeRelistPeriod: defaultNodeRelistPeriod,
}
// cache this for later use. also useful in case the original binary gets deleted, e.g.
// during upgrades, development deployments, etc.
@ -245,6 +250,7 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) {
fs.Var(&s.DefaultContainerMemLimit, "default-container-mem-limit", "Containers without a memory resource limit are admitted this much amount of memory in MB")
fs.BoolVar(&s.ContainPodResources, "contain-pod-resources", s.ContainPodResources, "Reparent pod containers into mesos cgroups; disable if you're having strange mesos/docker/systemd interactions.")
fs.BoolVar(&s.AccountForPodResources, "account-for-pod-resources", s.AccountForPodResources, "Allocate pod CPU and memory resources from offers (Default: true)")
fs.DurationVar(&s.nodeRelistPeriod, "node-monitor-period", s.nodeRelistPeriod, "Period between relisting of all nodes from the apiserver.")
fs.IntVar(&s.ExecutorLogV, "executor-logv", s.ExecutorLogV, "Logging verbosity of spawned minion and executor processes.")
fs.BoolVar(&s.ExecutorBindall, "executor-bindall", s.ExecutorBindall, "When true will set -address of the executor to 0.0.0.0.")
@ -678,7 +684,24 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
podtask.DefaultMinimalProcurement)
}
fcfs := scheduler.NewFCFSPodScheduler(as)
// mirror all nodes into the nodeStore
nodesClient, err := s.createAPIServerClient()
if err != nil {
log.Fatalf("Cannot create client to watch nodes: %v", err)
}
nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
nodeLW := cache.NewListWatchFromClient(nodesClient, "nodes", api.NamespaceAll, fields.Everything())
cache.NewReflector(nodeLW, &api.Node{}, nodeStore, s.nodeRelistPeriod).Run()
lookupNode := func(hostName string) *api.Node {
n, _, _ := nodeStore.GetByKey(hostName) // ignore error and return nil then
if n == nil {
return nil
}
return n.(*api.Node)
}
fcfs := scheduler.NewFCFSPodScheduler(as, lookupNode)
mesosPodScheduler := scheduler.New(scheduler.Config{
Schedcfg: *sc,
Executor: executor,
@ -688,6 +711,7 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
FailoverTimeout: s.FailoverTimeout,
ReconcileInterval: s.ReconcileInterval,
ReconcileCooldown: s.ReconcileCooldown,
LookupNode: lookupNode,
})
masterUri := s.MesosMaster

53
test/e2e/mesos.go Normal file
View File

@ -0,0 +1,53 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/kubernetes/pkg/fields"
)
var _ = Describe("Mesos", func() {
framework := NewFramework("pods")
BeforeEach(func() {
SkipUnlessProviderIs("mesos/docker")
})
It("applies slave attributes as labels", func() {
nodeClient := framework.Client.Nodes()
rackA := labels.SelectorFromSet(map[string]string{"k8s.mesosphere.io/attribute-rack": "1"})
nodes, err := nodeClient.List(rackA, fields.Everything())
if err != nil {
Failf("Failed to query for node: %v", err)
}
Expect(len(nodes.Items)).To(Equal(1))
var addr string
for _, a := range nodes.Items[0].Status.Addresses {
if a.Type == api.NodeInternalIP {
addr = a.Address
}
}
Expect(len(addr)).NotTo(Equal(""))
})
})

View File

@ -334,7 +334,7 @@ var _ = Describe("SchedulerPredicates", func() {
// Test Nodes does not have any label, hence it should be impossible to schedule Pod with
// nonempty Selector set.
It("validates that NodeSelector is respected.", func() {
It("validates that NodeSelector is respected if not matching", func() {
By("Trying to schedule Pod with nonempty NodeSelector.")
podName := "restricted-pod"
@ -371,4 +371,77 @@ var _ = Describe("SchedulerPredicates", func() {
verifyResult(c, podName, ns, currentlyDeadPods)
cleanupPods(c, ns)
})
It("validates that NodeSelector is respected if matching.", func() {
// launch a pod to find a node which can launch a pod. We intentionally do
// not just take the node list and choose the first of them. Depending on the
// cluster and the scheduler it might be that a "normal" pod cannot be
// scheduled onto it.
By("Trying to launch a pod without a label to get a node which can launch it.")
podName := "without-label"
_, err := c.Pods(ns).Create(&api.Pod{
TypeMeta: unversioned.TypeMeta{
Kind: "Pod",
},
ObjectMeta: api.ObjectMeta{
Name: podName,
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: podName,
Image: "gcr.io/google_containers/pause:go",
},
},
},
})
expectNoError(err)
expectNoError(waitForPodRunningInNamespace(c, podName, ns))
pod, err := c.Pods(ns).Get(podName)
expectNoError(err)
nodeName := pod.Spec.NodeName
err = c.Pods(ns).Delete(podName, api.NewDeleteOptions(0))
expectNoError(err)
By("Trying to apply a random label on the found node.")
k := fmt.Sprintf("kubernetes.io/e2e-%s", string(util.NewUUID()))
v := "42"
patch := fmt.Sprintf(`{"metadata":{"labels":{"%s":"%s"}}}`, k, v)
err = c.Patch(api.MergePatchType).Resource("nodes").Name(nodeName).Body([]byte(patch)).Do().Error()
expectNoError(err)
node, err := c.Nodes().Get(nodeName)
expectNoError(err)
Expect(node.Labels[k]).To(Equal(v))
By("Trying to relaunch the pod, now with labels.")
labelPodName := "with-labels"
_, err = c.Pods(ns).Create(&api.Pod{
TypeMeta: unversioned.TypeMeta{
Kind: "Pod",
},
ObjectMeta: api.ObjectMeta{
Name: labelPodName,
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: labelPodName,
Image: "gcr.io/google_containers/pause:go",
},
},
NodeSelector: map[string]string{
"kubernetes.io/hostname": nodeName,
k: v,
},
},
})
expectNoError(err)
defer c.Pods(ns).Delete(labelPodName, api.NewDeleteOptions(0))
expectNoError(waitForPodRunningInNamespace(c, labelPodName, ns))
labelPod, err := c.Pods(ns).Get(labelPodName)
expectNoError(err)
Expect(labelPod.Spec.NodeName).To(Equal(nodeName))
})
})