Merge pull request #909 from lavalamp/scheduler3

Scheduler plugin v1
This commit is contained in:
Clayton Coleman 2014-08-21 19:04:51 -04:00
commit 1c68247954
24 changed files with 1310 additions and 104 deletions

View File

@ -0,0 +1,6 @@
{% set daemon_args = "$DAEMON_ARGS" %}
{% if grains['os_family'] == 'RedHat' %}
{% set daemon_args = "" %}
{% endif %}
{% set master="-master=127.0.0.1:8080" %}
DAEMON_ARGS="{{daemon_args}} {{master}}"

View File

@ -0,0 +1,91 @@
{% set root = '/var/src/scheduler' %}
{% set package = 'github.com/GoogleCloudPlatform/kubernetes' %}
{% set package_dir = root + '/src/' + package %}
{% if grains['os_family'] == 'RedHat' %}
{% set environment_file = '/etc/sysconfig/scheduler' %}
{% else %}
{% set environment_file = '/etc/default/scheduler' %}
{% endif %}
{{ package_dir }}:
file.recurse:
- source: salt://scheduler/go
- user: root
{% if grains['os_family'] == 'RedHat' %}
- group: root
{% else %}
- group: staff
{% endif %}
- dir_mode: 775
- file_mode: 664
- makedirs: True
- recurse:
- user
- group
- mode
{{ environment_file }}:
file.managed:
- source: salt://scheduler/default
- template: jinja
- user: root
- group: root
- mode: 644
scheduler-build:
cmd.run:
- cwd: {{ root }}
- names:
- go build {{ package }}/plugin/cmd/scheduler
- env:
- PATH: {{ grains['path'] }}:/usr/local/bin
- GOPATH: {{ root }}:{{ package_dir }}/Godeps/_workspace
- require:
- file: {{ package_dir }}
/usr/local/bin/scheduler:
file.symlink:
- target: {{ root }}/scheduler
- watch:
- cmd: scheduler-build
{% if grains['os_family'] == 'RedHat' %}
/usr/lib/systemd/system/scheduler.service:
file.managed:
- source: salt://scheduler/scheduler.service
- user: root
- group: root
{% else %}
/etc/init.d/scheduler:
file.managed:
- source: salt://scheduler/initd
- user: root
- group: root
- mode: 755
{% endif %}
scheduler:
group.present:
- system: True
user.present:
- system: True
- gid_from_name: True
- shell: /sbin/nologin
- home: /var/scheduler
- require:
- group: scheduler
service.running:
- enable: True
- watch:
- cmd: scheduler-build
- file: /usr/local/bin/scheduler
- file: {{ environment_file }}
{% if grains['os_family'] != 'RedHat' %}
- file: /etc/init.d/scheduler
{% endif %}

View File

@ -0,0 +1,120 @@
#!/bin/bash
#
### BEGIN INIT INFO
# Provides: scheduler
# Required-Start: $local_fs $network $syslog
# Required-Stop:
# Default-Start: 2 3 4 5
# Default-Stop: 0 1 6
# Short-Description: The Kubernetes scheduler plugin
# Description:
# The Kubernetes scheduler plugin is responsible for scheduling pods on
# available minions.
### END INIT INFO
# PATH should only include /usr/* if it runs after the mountnfs.sh script
PATH=/sbin:/usr/sbin:/bin:/usr/bin
DESC="The Kubernetes scheduler plugin"
NAME=scheduler
DAEMON=/usr/local/bin/scheduler
DAEMON_ARGS=" --master=127.0.0.1:8080"
DAEMON_LOG_FILE=/var/log/$NAME.log
PIDFILE=/var/run/$NAME.pid
SCRIPTNAME=/etc/init.d/$NAME
DAEMON_USER=scheduler
# Exit if the package is not installed
[ -x "$DAEMON" ] || exit 0
# Read configuration variable file if it is present
[ -r /etc/default/$NAME ] && . /etc/default/$NAME
# Define LSB log_* functions.
# Depend on lsb-base (>= 3.2-14) to ensure that this file is present
# and status_of_proc is working.
. /lib/lsb/init-functions
#
# Function that starts the daemon/service
#
do_start()
{
# Return
# 0 if daemon has been started
# 1 if daemon was already running
# 2 if daemon could not be started
start-stop-daemon --start --quiet --background --no-close \
--make-pidfile --pidfile $PIDFILE \
--exec $DAEMON -c $DAEMON_USER --test > /dev/null \
|| return 1
start-stop-daemon --start --quiet --background --no-close \
--make-pidfile --pidfile $PIDFILE \
--exec $DAEMON -c $DAEMON_USER -- \
$DAEMON_ARGS >> $DAEMON_LOG_FILE 2>&1 \
|| return 2
}
#
# Function that stops the daemon/service
#
do_stop()
{
# Return
# 0 if daemon has been stopped
# 1 if daemon was already stopped
# 2 if daemon could not be stopped
# other if a failure occurred
start-stop-daemon --stop --quiet --retry=TERM/30/KILL/5 --pidfile $PIDFILE --exec $DAEMON
RETVAL="$?"
[ "$RETVAL" = 2 ] && return 2
# Many daemons don't delete their pidfiles when they exit.
rm -f $PIDFILE
return "$RETVAL"
}
case "$1" in
start)
log_daemon_msg "Starting $DESC" "$NAME"
do_start
case "$?" in
0|1) log_end_msg 0 || exit 0 ;;
2) verblog_end_msg 1 || exit 1 ;;
esac
;;
stop)
log_daemon_msg "Stopping $DESC" "$NAME"
do_stop
case "$?" in
0|1) log_end_msg 0 ;;
2) exit 1 ;;
esac
;;
status)
status_of_proc -p $PIDFILE "$DAEMON" "$NAME" && exit 0 || exit $?
;;
restart|force-reload)
log_daemon_msg "Restarting $DESC" "$NAME"
do_stop
case "$?" in
0|1)
do_start
case "$?" in
0) log_end_msg 0 ;;
1) log_end_msg 1 ;; # Old process is still running
*) log_end_msg 1 ;; # Failed to start
esac
;;
*)
# Failed to stop
log_end_msg 1
;;
esac
;;
*)
echo "Usage: $SCRIPTNAME {start|stop|status|restart|force-reload}" >&2
exit 3
;;
esac

View File

@ -0,0 +1,11 @@
[Unit]
Description=Kubernetes Scheduler Plugin
Documentation=https://github.com/GoogleCloudPlatform/kubernetes
[Service]
Type=simple
EnvironmentFile=-/etc/sysconfig/scheduler
ExecStart=/usr/local/bin/scheduler "$DAEMON_ARGS"
[Install]
WantedBy=multi-user.target

View File

@ -17,4 +17,5 @@ base:
- etcd
- apiserver
- controller-manager
- scheduler
- nginx

View File

@ -33,7 +33,7 @@ cd "${KUBE_REPO_ROOT}"
if [[ $# == 0 ]]; then
# Update $@ with the default list of targets to build.
set -- cmd/proxy cmd/apiserver cmd/controller-manager cmd/kubelet cmd/kubecfg
set -- cmd/proxy cmd/apiserver cmd/controller-manager cmd/kubelet cmd/kubecfg plugin/cmd/scheduler
fi
binaries=()

View File

@ -18,6 +18,8 @@ package cache
import (
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
// FIFO receives adds and updates from a Reflector, and puts them in a queue for
@ -33,30 +35,30 @@ type FIFO struct {
}
// Add inserts an item, and puts it in the queue.
func (f *FIFO) Add(ID string, obj interface{}) {
func (f *FIFO) Add(id string, obj interface{}) {
f.lock.Lock()
defer f.lock.Unlock()
f.items[ID] = obj
f.queue = append(f.queue, ID)
f.items[id] = obj
f.queue = append(f.queue, id)
f.cond.Broadcast()
}
// Update updates an item, and adds it to the queue.
func (f *FIFO) Update(ID string, obj interface{}) {
func (f *FIFO) Update(id string, obj interface{}) {
f.lock.Lock()
defer f.lock.Unlock()
f.items[ID] = obj
f.queue = append(f.queue, ID)
f.items[id] = obj
f.queue = append(f.queue, id)
f.cond.Broadcast()
}
// Delete removes an item. It doesn't add it to the queue, because
// this implementation assumes the consumer only cares about the objects,
// not the order in which they were created/added.
func (f *FIFO) Delete(ID string, obj interface{}) {
func (f *FIFO) Delete(id string) {
f.lock.Lock()
defer f.lock.Unlock()
delete(f.items, ID)
delete(f.items, id)
}
// List returns a list of all the items.
@ -70,11 +72,24 @@ func (f *FIFO) List() []interface{} {
return list
}
// Contains returns a util.StringSet containing all IDs of stored the items.
// This is a snapshot of a moment in time, and one should keep in mind that
// other go routines can add or remove items after you call this.
func (c *FIFO) Contains() util.StringSet {
c.lock.RLock()
defer c.lock.RUnlock()
set := util.StringSet{}
for id := range c.items {
set.Insert(id)
}
return set
}
// Get returns the requested item, or sets exists=false.
func (f *FIFO) Get(ID string) (item interface{}, exists bool) {
func (f *FIFO) Get(id string) (item interface{}, exists bool) {
f.lock.RLock()
defer f.lock.RUnlock()
item, exists = f.items[ID]
item, exists = f.items[id]
return item, exists
}

81
pkg/client/cache/poller.go vendored Normal file
View File

@ -0,0 +1,81 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
)
// Enumerator should be able to return the list of objects to be synced with
// one object at a time.
type Enumerator interface {
Len() int
Get(index int) (ID string, object interface{})
}
// GetFunc should return an enumerator that you wish the Poller to proccess.
type GetFunc func() (Enumerator, error)
// Poller is like Reflector, but it periodically polls instead of watching.
// This is intended to be a workaround for api objects that don't yet support
// watching.
type Poller struct {
getFunc GetFunc
period time.Duration
store Store
}
// NewPoller constructs a new poller. Note that polling probably doesn't make much
// sense to use along with the FIFO queue. The returned Poller will call getFunc and
// sync the objects in 'store' with the returned Enumerator, waiting 'period' between
// each call. It probably only makes sense to use a poller if you're treating the
// store as read-only.
func NewPoller(getFunc GetFunc, period time.Duration, store Store) *Poller {
return &Poller{
getFunc: getFunc,
period: period,
store: store,
}
}
// Run begins polling. It starts a goroutine and returns immediately.
func (p *Poller) Run() {
go util.Forever(func() {
e, err := p.getFunc()
if err != nil {
glog.Errorf("failed to list: %v", err)
return
}
p.sync(e)
}, p.period)
}
func (p *Poller) sync(e Enumerator) {
current := p.store.Contains()
for i := 0; i < e.Len(); i++ {
id, object := e.Get(i)
p.store.Update(id, object)
current.Delete(id)
}
// Delete all the objects not found.
for id := range current {
p.store.Delete(id)
}
}

119
pkg/client/cache/poller_test.go vendored Normal file
View File

@ -0,0 +1,119 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"errors"
"reflect"
"testing"
"time"
)
type testPair struct {
id string
obj interface{}
}
type testEnumerator []testPair
func (t testEnumerator) Len() int { return len(t) }
func (t testEnumerator) Get(i int) (string, interface{}) {
return t[i].id, t[i].obj
}
func TestPoller_sync(t *testing.T) {
table := []struct {
// each step simulates the list that a getFunc would receive.
steps [][]testPair
}{
{
steps: [][]testPair{
{
{"foo", "foo1"},
{"bar", "bar1"},
{"baz", "baz1"},
{"qux", "qux1"},
}, {
{"foo", "foo2"},
{"bar", "bar2"},
{"qux", "qux2"},
}, {
{"bar", "bar3"},
{"baz", "baz2"},
{"qux", "qux3"},
}, {
{"qux", "qux4"},
}, {
{"foo", "foo3"},
},
},
},
}
for testCase, item := range table {
s := NewStore()
// This is a unit test for the sync function, hence the nil getFunc.
p := NewPoller(nil, 0, s)
for line, pairs := range item.steps {
p.sync(testEnumerator(pairs))
ids := s.Contains()
for _, pair := range pairs {
if !ids.Has(pair.id) {
t.Errorf("%v, %v: expected to find entry for %v, but did not.", testCase, line, pair.id)
continue
}
found, ok := s.Get(pair.id)
if !ok {
t.Errorf("%v, %v: unexpected absent entry for %v", testCase, line, pair.id)
continue
}
if e, a := pair.obj, found; !reflect.DeepEqual(e, a) {
t.Errorf("%v, %v: expected %v, got %v for %v", testCase, line, e, a, pair.id)
}
}
if e, a := len(pairs), len(ids); e != a {
t.Errorf("%v, %v: expected len %v, got %v", testCase, line, e, a)
}
}
}
}
func TestPoller_Run(t *testing.T) {
s := NewStore()
const count = 10
var called = 0
done := make(chan struct{})
NewPoller(func() (Enumerator, error) {
called++
if called == count {
close(done)
}
// test both error and regular returns.
if called&1 == 0 {
return testEnumerator{}, nil
}
return nil, errors.New("transient error")
}, time.Millisecond, s).Run()
// The test here is that we get called at least count times.
<-done
// We never added anything, verify that.
if e, a := 0, len(s.Contains()); e != a {
t.Errorf("expected %v, got %v", e, a)
}
}

View File

@ -21,62 +21,56 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)
// Store is a generic object storage interface. Reflector knows how to watch a server
// and update a store. A generic store is provided, which allows Reflector to be used
// as a local caching system, and an LRU store, which allows Reflector to work like a
// queue of items yet to be processed.
type Store interface {
Add(ID string, obj interface{})
Update(ID string, obj interface{})
Delete(ID string, obj interface{})
List() []interface{}
Get(ID string) (item interface{}, exists bool)
}
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
type Reflector struct {
kubeClient *client.Client
resource string
// The type of object we expect to place in the store.
expectedType reflect.Type
store Store
// The destination to sync up with the watch source
store Store
// watchFactory is called to initiate watches.
watchFactory WatchFactory
// period controls timing between one watch ending and
// the beginning of the next one.
period time.Duration
}
// WatchFactory should begin a watch at the specified version.
type WatchFactory func(resourceVersion uint64) (watch.Interface, error)
// NewReflector makes a new Reflector object which will keep the given store up to
// date with the server's contents for the given resource. Reflector promises to
// only put things in the store that have the type of expectedType.
// TODO: define a query so you only locally cache a subset of items.
func NewReflector(resource string, kubeClient *client.Client, expectedType interface{}, store Store) *Reflector {
func NewReflector(watchFactory WatchFactory, expectedType interface{}, store Store) *Reflector {
gc := &Reflector{
resource: resource,
kubeClient: kubeClient,
watchFactory: watchFactory,
store: store,
expectedType: reflect.TypeOf(expectedType),
period: time.Second,
}
return gc
}
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
// Run starts a goroutine and returns immediately.
func (gc *Reflector) Run() {
var resourceVersion uint64
go util.Forever(func() {
w, err := gc.startWatch()
w, err := gc.watchFactory(resourceVersion)
if err != nil {
glog.Errorf("failed to watch %v: %v", gc.resource, err)
glog.Errorf("failed to watch %v: %v", gc.expectedType, err)
return
}
gc.watchHandler(w)
}, 5*time.Second)
gc.watchHandler(w, &resourceVersion)
}, gc.period)
}
func (gc *Reflector) startWatch() (watch.Interface, error) {
return gc.kubeClient.Get().Path(gc.resource).Path("watch").Watch()
}
func (gc *Reflector) watchHandler(w watch.Interface) {
// watchHandler watches w and keeps *resourceVersion up to date.
func (gc *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) {
for {
event, ok := <-w.ResultChan()
if !ok {
@ -98,9 +92,13 @@ func (gc *Reflector) watchHandler(w watch.Interface) {
case watch.Modified:
gc.store.Update(jsonBase.ID(), event.Object)
case watch.Deleted:
gc.store.Delete(jsonBase.ID(), event.Object)
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
gc.store.Delete(jsonBase.ID())
default:
glog.Errorf("unable to understand watch event %#v", event)
}
*resourceVersion = jsonBase.ResourceVersion() + 1
}
}

View File

@ -17,29 +17,27 @@ limitations under the License.
package cache
import (
"net/http"
"net/http/httptest"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
func TestReflector_watchHandler(t *testing.T) {
s := NewStore()
g := NewReflector("foo", nil, &api.Pod{}, s)
g := NewReflector(nil, &api.Pod{}, s)
fw := watch.NewFake()
s.Add("foo", &api.Pod{JSONBase: api.JSONBase{ID: "foo"}})
s.Add("bar", &api.Pod{JSONBase: api.JSONBase{ID: "bar"}})
go func() {
fw.Modify(&api.Pod{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: 55}})
fw.Add(&api.Pod{JSONBase: api.JSONBase{ID: "baz"}})
fw.Add(&api.Service{JSONBase: api.JSONBase{ID: "rejected"}})
fw.Delete(&api.Pod{JSONBase: api.JSONBase{ID: "foo"}})
fw.Modify(&api.Pod{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: 55}})
fw.Add(&api.Pod{JSONBase: api.JSONBase{ID: "baz", ResourceVersion: 32}})
fw.Stop()
}()
g.watchHandler(fw)
var resumeRV uint64
g.watchHandler(fw, &resumeRV)
table := []struct {
ID string
@ -49,7 +47,7 @@ func TestReflector_watchHandler(t *testing.T) {
{"foo", 0, false},
{"rejected", 0, false},
{"bar", 55, true},
{"baz", 0, true},
{"baz", 32, true},
}
for _, item := range table {
obj, exists := s.Get(item.ID)
@ -63,32 +61,62 @@ func TestReflector_watchHandler(t *testing.T) {
t.Errorf("%v: expected %v, got %v", item.ID, e, a)
}
}
// RV should stay 1 higher than the last id we see.
if e, a := uint64(33), resumeRV; e != a {
t.Errorf("expected %v, got %v", e, a)
}
}
func TestReflector_startWatch(t *testing.T) {
table := []struct{ resource, path string }{
{"pods", "/api/v1beta1/pods/watch"},
{"services", "/api/v1beta1/services/watch"},
}
for _, testItem := range table {
got := make(chan struct{})
srv := httptest.NewServer(http.HandlerFunc(
func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusNotFound)
if req.URL.Path == testItem.path {
close(got)
return
}
t.Errorf("unexpected path %v", req.URL.Path)
}))
s := NewStore()
c := client.New(srv.URL, nil)
g := NewReflector(testItem.resource, c, &api.Pod{}, s)
_, err := g.startWatch()
// We're just checking that it watches the right path.
if err == nil {
t.Errorf("unexpected non-error")
func TestReflector_Run(t *testing.T) {
createdFakes := make(chan *watch.FakeWatcher)
// Expect our starter to get called at the beginning of the watch with 0, and again with 3 when we
// inject an error at 2.
expectedRVs := []uint64{0, 3}
watchStarter := func(rv uint64) (watch.Interface, error) {
fw := watch.NewFake()
if e, a := expectedRVs[0], rv; e != a {
t.Errorf("Expected rv %v, but got %v", e, a)
}
<-got
expectedRVs = expectedRVs[1:]
// channel is not buffered because the for loop below needs to block. But
// we don't want to block here, so report the new fake via a go routine.
go func() { createdFakes <- fw }()
return fw, nil
}
s := NewFIFO()
r := NewReflector(watchStarter, &api.Pod{}, s)
r.period = 0
r.Run()
ids := []string{"foo", "bar", "baz", "qux", "zoo"}
var fw *watch.FakeWatcher
for i, id := range ids {
if fw == nil {
fw = <-createdFakes
}
sendingRV := uint64(i + 1)
fw.Add(&api.Pod{JSONBase: api.JSONBase{ID: id, ResourceVersion: sendingRV}})
if sendingRV == 2 {
// Inject a failure.
fw.Stop()
fw = nil
}
}
// Verify we received the right ids with the right resource versions.
for i, id := range ids {
pod := s.Pop().(*api.Pod)
if e, a := id, pod.ID; e != a {
t.Errorf("%v: Expected %v, got %v", i, e, a)
}
if e, a := uint64(i+1), pod.ResourceVersion; e != a {
t.Errorf("%v: Expected %v, got %v", i, e, a)
}
}
if len(expectedRVs) != 0 {
t.Error("called watchStarter an unexpected number of times")
}
}

View File

@ -18,32 +18,47 @@ package cache
import (
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
// Store is a generic object storage interface. Reflector knows how to watch a server
// and update a store. A generic store is provided, which allows Reflector to be used
// as a local caching system, and an LRU store, which allows Reflector to work like a
// queue of items yet to be processed.
type Store interface {
Add(id string, obj interface{})
Update(id string, obj interface{})
Delete(id string)
List() []interface{}
Contains() util.StringSet
Get(id string) (item interface{}, exists bool)
}
type cache struct {
lock sync.RWMutex
items map[string]interface{}
}
// Add inserts an item into the cache.
func (c *cache) Add(ID string, obj interface{}) {
func (c *cache) Add(id string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
c.items[ID] = obj
c.items[id] = obj
}
// Update sets an item in the cache to its updated state.
func (c *cache) Update(ID string, obj interface{}) {
func (c *cache) Update(id string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
c.items[ID] = obj
c.items[id] = obj
}
// Delete removes an item from the cache.
func (c *cache) Delete(ID string, obj interface{}) {
func (c *cache) Delete(id string) {
c.lock.Lock()
defer c.lock.Unlock()
delete(c.items, ID)
delete(c.items, id)
}
// List returns a list of all the items.
@ -58,12 +73,25 @@ func (c *cache) List() []interface{} {
return list
}
// Get returns the requested item, or sets exists=false.
// Get is completely threadsafe as long as you treat all items as immutable.
func (c *cache) Get(ID string) (item interface{}, exists bool) {
// Contains returns a util.StringSet containing all IDs of stored the items.
// This is a snapshot of a moment in time, and one should keep in mind that
// other go routines can add or remove items after you call this.
func (c *cache) Contains() util.StringSet {
c.lock.RLock()
defer c.lock.RUnlock()
item, exists = c.items[ID]
set := util.StringSet{}
for id := range c.items {
set.Insert(id)
}
return set
}
// Get returns the requested item, or sets exists=false.
// Get is completely threadsafe as long as you treat all items as immutable.
func (c *cache) Get(id string) (item interface{}, exists bool) {
c.lock.RLock()
defer c.lock.RUnlock()
item, exists = c.items[id]
return item, exists
}

View File

@ -40,10 +40,12 @@ func doTestStore(t *testing.T, store Store) {
t.Errorf("expected %v, got %v", e, a)
}
}
store.Delete("foo", "qux")
store.Delete("foo")
if _, ok := store.Get("foo"); ok {
t.Errorf("found deleted item??")
}
// Test List
store.Add("a", "b")
store.Add("c", "d")
store.Add("e", "e")
@ -57,6 +59,15 @@ func doTestStore(t *testing.T, store Store) {
if len(found) != 3 {
t.Errorf("extra items")
}
// Check that ID list is correct.
ids := store.Contains()
if !ids.HasAll("a", "c", "e") {
t.Errorf("missing items")
}
if len(ids) != 3 {
t.Errorf("extra items")
}
}
func TestCache(t *testing.T) {

View File

@ -343,13 +343,17 @@ func (c *testClient) Validate(t *testing.T, received interface{}, err error) {
}
requestBody := body(c.Request.Body, c.Request.RawBody)
actualQuery := c.handler.RequestReceived.URL.Query()
// We check the query manually, so blank it out so that FakeHandler.ValidateRequest
// won't check it.
c.handler.RequestReceived.URL.RawQuery = ""
c.handler.ValidateRequest(t, makeURL(c.Request.Path), c.Request.Method, requestBody)
for key, values := range c.Request.Query {
validator, ok := c.QueryValidator[key]
if !ok {
validator = func(a, b string) bool { return a == b }
}
observed := c.handler.RequestReceived.URL.Query().Get(key)
observed := actualQuery.Get(key)
if !validator(values[0], observed) {
t.Errorf("Unexpected query arg for key: %s. Expected %s, Received %s", key, values[0], observed)
}

View File

@ -62,10 +62,7 @@ func TestDoRequestNewWay(t *testing.T) {
} else if !reflect.DeepEqual(obj, expectedObj) {
t.Errorf("Expected: %#v, got %#v", expectedObj, obj)
}
fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz", "POST", &reqBody)
if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo" {
t.Errorf("Unexpected query: %v", fakeHandler.RequestReceived.URL.RawQuery)
}
fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz?labels=name%3Dfoo", "POST", &reqBody)
if fakeHandler.RequestReceived.Header["Authorization"] == nil {
t.Errorf("Request is missing authorization header: %#v", *fakeHandler.RequestReceived)
}
@ -88,7 +85,7 @@ func TestDoRequestNewWayReader(t *testing.T) {
Path("foo/bar").
Path("baz").
SelectorParam("labels", labels.Set{"name": "foo"}.AsSelector()).
Sync(false).
Sync(true).
Timeout(time.Second).
Body(bytes.NewBuffer(reqBodyExpected)).
Do().Get()
@ -102,10 +99,7 @@ func TestDoRequestNewWayReader(t *testing.T) {
t.Errorf("Expected: %#v, got %#v", expectedObj, obj)
}
tmpStr := string(reqBodyExpected)
fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz", "POST", &tmpStr)
if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo" {
t.Errorf("Unexpected query: %v", fakeHandler.RequestReceived.URL.RawQuery)
}
fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz?labels=name%3Dfoo&sync=true&timeout=1s", "POST", &tmpStr)
if fakeHandler.RequestReceived.Header["Authorization"] == nil {
t.Errorf("Request is missing authorization header: %#v", *fakeHandler.RequestReceived)
}
@ -141,10 +135,7 @@ func TestDoRequestNewWayObj(t *testing.T) {
t.Errorf("Expected: %#v, got %#v", expectedObj, obj)
}
tmpStr := string(reqBodyExpected)
fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz", "POST", &tmpStr)
if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo" {
t.Errorf("Unexpected query: %v", fakeHandler.RequestReceived.URL.RawQuery)
}
fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz?labels=name%3Dfoo", "POST", &tmpStr)
if fakeHandler.RequestReceived.Header["Authorization"] == nil {
t.Errorf("Request is missing authorization header: %#v", *fakeHandler.RequestReceived)
}
@ -194,10 +185,7 @@ func TestDoRequestNewWayFile(t *testing.T) {
t.Errorf("Expected: %#v, got %#v", expectedObj, obj)
}
tmpStr := string(reqBodyExpected)
fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz", "POST", &tmpStr)
if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo" {
t.Errorf("Unexpected query: %v", fakeHandler.RequestReceived.URL.RawQuery)
}
fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz?labels=name%3Dfoo", "POST", &tmpStr)
if fakeHandler.RequestReceived.Header["Authorization"] == nil {
t.Errorf("Request is missing authorization header: %#v", *fakeHandler.RequestReceived)
}

View File

@ -36,6 +36,7 @@ func (f FakeMinionLister) List() ([]string, error) {
// PodLister interface represents anything that can list pods for a scheduler
type PodLister interface {
// TODO: make this exactly the same as client's ListPods() method...
ListPods(labels.Selector) ([]api.Pod, error)
}

View File

@ -57,6 +57,7 @@ func (s *RandomFitScheduler) Schedule(pod api.Pod, minionLister MinionLister) (s
return "", err
}
machineToPods := map[string][]api.Pod{}
// TODO: perform more targeted query...
pods, err := s.podLister.ListPods(labels.Everything())
if err != nil {
return "", err

View File

@ -19,6 +19,8 @@ package util
import (
"io/ioutil"
"net/http"
"net/url"
"reflect"
)
// TestInterface is a simple interface providing Errorf, to make injection for
@ -57,8 +59,15 @@ func (f *FakeHandler) ServeHTTP(response http.ResponseWriter, request *http.Requ
// ValidateRequest verifies that FakeHandler received a request with expected path, method, and body.
func (f FakeHandler) ValidateRequest(t TestInterface, expectedPath, expectedMethod string, body *string) {
if f.RequestReceived.URL.Path != expectedPath {
t.Errorf("Unexpected request path for request %#v, received: %q, expected: %q", f.RequestReceived, f.RequestReceived.URL.Path, expectedPath)
expectURL, err := url.Parse(expectedPath)
if err != nil {
t.Errorf("Couldn't parse %v as a URL.", expectedPath)
}
if f.RequestReceived.URL.Path != expectURL.Path {
t.Errorf("Unexpected request path for request %#v, received: %q, expected: %q", f.RequestReceived, f.RequestReceived.URL.Path, expectURL.Path)
}
if e, a := expectURL.Query(), f.RequestReceived.URL.Query(); !reflect.DeepEqual(e, a) {
t.Errorf("Unexpected query for request %#v, received: %q, expected: %q", f.RequestReceived, a, e)
}
if f.RequestReceived.Method != expectedMethod {
t.Errorf("Unexpected method: %q, expected: %q", f.RequestReceived.Method, expectedMethod)

View File

@ -0,0 +1,49 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"flag"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
verflag "github.com/GoogleCloudPlatform/kubernetes/pkg/version/flag"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory"
)
var (
master = flag.String("master", "", "The address of the Kubernetes API server")
)
func main() {
flag.Parse()
util.InitLogs()
defer util.FlushLogs()
verflag.PrintAndExitIfRequested()
// TODO: security story for plugins!
kubeClient := client.New("http://"+*master, nil)
configFactory := &factory.ConfigFactory{Client: kubeClient}
config := configFactory.Create()
s := scheduler.New(config)
s.Run()
select {}
}

View File

@ -0,0 +1,197 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package factory can set up a scheduler. This code is here instead of
// plugin/cmd/scheduler for both testability and reuse.
package factory
import (
"math/rand"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler"
"github.com/golang/glog"
)
// ConfigFactory knows how to fill out a scheduler config with its support functions.
type ConfigFactory struct {
Client *client.Client
}
// Create creates a scheduler and all support functions.
func (factory *ConfigFactory) Create() *scheduler.Config {
// Watch and queue pods that need scheduling.
podQueue := cache.NewFIFO()
cache.NewReflector(factory.createUnassignedPodWatch, &api.Pod{}, podQueue).Run()
// Watch and cache all running pods. Scheduler needs to find all pods
// so it knows where it's safe to place a pod. Cache this locally.
podCache := cache.NewStore()
cache.NewReflector(factory.createAssignedPodWatch, &api.Pod{}, podCache).Run()
// Watch minions.
// Minions may be listed frequently, so provide a local up-to-date cache.
minionCache := cache.NewStore()
if false {
// Disable this code until minions support watches.
cache.NewReflector(factory.createMinionWatch, &api.Minion{}, minionCache).Run()
} else {
cache.NewPoller(factory.pollMinions, 10*time.Second, minionCache).Run()
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
algo := algorithm.NewRandomFitScheduler(
&storeToPodLister{podCache}, r)
return &scheduler.Config{
MinionLister: &storeToMinionLister{minionCache},
Algorithm: algo,
Binder: &binder{factory.Client},
NextPod: func() *api.Pod {
return podQueue.Pop().(*api.Pod)
},
Error: factory.makeDefaultErrorFunc(podQueue),
}
}
// createUnassignedPodWatch starts a watch that finds all pods that need to be
// scheduled.
func (factory *ConfigFactory) createUnassignedPodWatch(resourceVersion uint64) (watch.Interface, error) {
return factory.Client.
Get().
Path("watch").
Path("pods").
SelectorParam("fields", labels.Set{"DesiredState.Host": ""}.AsSelector()).
UintParam("resourceVersion", resourceVersion).
Watch()
}
// createUnassignedPodWatch starts a watch that finds all pods that are
// already scheduled.
func (factory *ConfigFactory) createAssignedPodWatch(resourceVersion uint64) (watch.Interface, error) {
return factory.Client.
Get().
Path("watch").
Path("pods").
ParseSelectorParam("fields", "DesiredState.Host!=").
UintParam("resourceVersion", resourceVersion).
Watch()
}
// createMinionWatch starts a watch that gets all changes to minions.
func (factory *ConfigFactory) createMinionWatch(resourceVersion uint64) (watch.Interface, error) {
return factory.Client.
Get().
Path("watch").
Path("minions").
UintParam("resourceVersion", resourceVersion).
Watch()
}
// pollMinions lists all minions and returns an enumerator for cache.Poller.
func (factory *ConfigFactory) pollMinions() (cache.Enumerator, error) {
list := &api.MinionList{}
err := factory.Client.Get().Path("minions").Do().Into(list)
if err != nil {
return nil, err
}
return &minionEnumerator{list}, nil
}
func (factory *ConfigFactory) makeDefaultErrorFunc(podQueue *cache.FIFO) func(pod *api.Pod, err error) {
return func(pod *api.Pod, err error) {
glog.Errorf("Error scheduling %v: %v; retrying", pod.ID, err)
// Retry asynchronously.
// Note that this is extremely rudimentary and we need a more real error handling path.
go func() {
defer util.HandleCrash()
podID := pod.ID
// Get the pod again; it may have changed/been scheduled already.
pod = &api.Pod{}
err := factory.Client.Get().Path("pods").Path(podID).Do().Into(pod)
if err != nil {
glog.Errorf("Error getting pod %v for retry: %v; abandoning", podID, err)
return
}
if pod.DesiredState.Host == "" {
podQueue.Add(pod.ID, pod)
}
}()
}
}
// storeToMinionLister turns a store into a minion lister. The store must contain (only) minions.
type storeToMinionLister struct {
cache.Store
}
func (s *storeToMinionLister) List() (machines []string, err error) {
for _, m := range s.Store.List() {
machines = append(machines, m.(*api.Minion).ID)
}
return machines, nil
}
// storeToPodLister turns a store into a pod lister. The store must contain (only) pods.
type storeToPodLister struct {
cache.Store
}
func (s *storeToPodLister) ListPods(selector labels.Selector) (pods []api.Pod, err error) {
for _, m := range s.List() {
pod := m.(*api.Pod)
if selector.Matches(labels.Set(pod.Labels)) {
pods = append(pods, *pod)
}
}
return pods, nil
}
// minionEnumerator allows a cache.Poller to enumerate items in an api.PodList
type minionEnumerator struct {
*api.MinionList
}
// Returns the number of items in the pod list.
func (me *minionEnumerator) Len() int {
if me.MinionList == nil {
return 0
}
return len(me.Items)
}
// Returns the item (and ID) with the particular index.
func (me *minionEnumerator) Get(index int) (string, interface{}) {
return me.Items[index].ID, &me.Items[index]
}
type binder struct {
*client.Client
}
// Bind just does a POST binding RPC.
func (b *binder) Bind(binding *api.Binding) error {
return b.Post().Path("bindings").Body(binding).Do().Error()
}

View File

@ -0,0 +1,255 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package factory
import (
"net/http/httptest"
"reflect"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
func TestCreate(t *testing.T) {
handler := util.FakeHandler{
StatusCode: 500,
ResponseBody: "",
T: t,
}
server := httptest.NewServer(&handler)
factory := ConfigFactory{client.New(server.URL, nil)}
factory.Create()
}
func TestCreateWatches(t *testing.T) {
factory := ConfigFactory{nil}
table := []struct {
rv uint64
location string
watchFactory func(rv uint64) (watch.Interface, error)
}{
// Minion watch
{
rv: 0,
location: "/api/v1beta1/watch/minions?resourceVersion=0",
watchFactory: factory.createMinionWatch,
}, {
rv: 42,
location: "/api/v1beta1/watch/minions?resourceVersion=42",
watchFactory: factory.createMinionWatch,
},
// Assigned pod watches
{
rv: 0,
location: "/api/v1beta1/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=0",
watchFactory: factory.createAssignedPodWatch,
}, {
rv: 42,
location: "/api/v1beta1/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=42",
watchFactory: factory.createAssignedPodWatch,
},
// Unassigned pod watches
{
rv: 0,
location: "/api/v1beta1/watch/pods?fields=DesiredState.Host%3D&resourceVersion=0",
watchFactory: factory.createUnassignedPodWatch,
}, {
rv: 42,
location: "/api/v1beta1/watch/pods?fields=DesiredState.Host%3D&resourceVersion=42",
watchFactory: factory.createUnassignedPodWatch,
},
}
for _, item := range table {
handler := util.FakeHandler{
StatusCode: 500,
ResponseBody: "",
T: t,
}
server := httptest.NewServer(&handler)
factory.Client = client.New(server.URL, nil)
// This test merely tests that the correct request is made.
item.watchFactory(item.rv)
handler.ValidateRequest(t, item.location, "GET", nil)
}
}
func TestPollMinions(t *testing.T) {
table := []struct {
minions []api.Minion
}{
{
minions: []api.Minion{
{JSONBase: api.JSONBase{ID: "foo"}},
{JSONBase: api.JSONBase{ID: "bar"}},
},
},
}
for _, item := range table {
ml := &api.MinionList{Items: item.minions}
handler := util.FakeHandler{
StatusCode: 200,
ResponseBody: api.EncodeOrDie(ml),
T: t,
}
server := httptest.NewServer(&handler)
cf := ConfigFactory{client.New(server.URL, nil)}
ce, err := cf.pollMinions()
if err != nil {
t.Errorf("Unexpected error: %v", err)
continue
}
handler.ValidateRequest(t, "/api/v1beta1/minions", "GET", nil)
if e, a := len(item.minions), ce.Len(); e != a {
t.Errorf("Expected %v, got %v", e, a)
}
}
}
func TestDefaultErrorFunc(t *testing.T) {
testPod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
handler := util.FakeHandler{
StatusCode: 200,
ResponseBody: api.EncodeOrDie(testPod),
T: t,
}
server := httptest.NewServer(&handler)
factory := ConfigFactory{client.New(server.URL, nil)}
queue := cache.NewFIFO()
errFunc := factory.makeDefaultErrorFunc(queue)
errFunc(testPod, nil)
for {
// This is a terrible way to do this but I plan on replacing this
// whole error handling system in the future. The test will time
// out if something doesn't work.
time.Sleep(10 * time.Millisecond)
got, exists := queue.Get("foo")
if !exists {
continue
}
handler.ValidateRequest(t, "/api/v1beta1/pods/foo", "GET", nil)
if e, a := testPod, got; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
break
}
}
func TestStoreToMinionLister(t *testing.T) {
store := cache.NewStore()
ids := util.NewStringSet("foo", "bar", "baz")
for id := range ids {
store.Add(id, &api.Minion{JSONBase: api.JSONBase{ID: id}})
}
sml := storeToMinionLister{store}
got, err := sml.List()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !ids.HasAll(got...) || len(got) != len(ids) {
t.Errorf("Expected %v, got %v", ids, got)
}
}
func TestStoreToPodLister(t *testing.T) {
store := cache.NewStore()
ids := []string{"foo", "bar", "baz"}
for _, id := range ids {
store.Add(id, &api.Pod{
JSONBase: api.JSONBase{ID: id},
Labels: map[string]string{"name": id},
})
}
spl := storeToPodLister{store}
for _, id := range ids {
got, err := spl.ListPods(labels.Set{"name": id}.AsSelector())
if err != nil {
t.Errorf("Unexpected error: %v", err)
continue
}
if e, a := 1, len(got); e != a {
t.Errorf("Expected %v, got %v", e, a)
continue
}
if e, a := id, got[0].ID; e != a {
t.Errorf("Expected %v, got %v", e, a)
continue
}
}
}
func TestMinionEnumerator(t *testing.T) {
testList := &api.MinionList{
Items: []api.Minion{
{JSONBase: api.JSONBase{ID: "foo"}},
{JSONBase: api.JSONBase{ID: "bar"}},
{JSONBase: api.JSONBase{ID: "baz"}},
},
}
me := minionEnumerator{testList}
if e, a := 3, me.Len(); e != a {
t.Fatalf("expected %v, got %v", e, a)
}
for i := range testList.Items {
gotID, gotObj := me.Get(i)
if e, a := testList.Items[i].ID, gotID; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := &testList.Items[i], gotObj; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %v#", e, a)
}
}
}
func TestBind(t *testing.T) {
table := []struct {
binding *api.Binding
}{
{binding: &api.Binding{PodID: "foo", Host: "foohost.kubernetes.mydomain.com"}},
}
for _, item := range table {
handler := util.FakeHandler{
StatusCode: 200,
ResponseBody: "",
T: t,
}
server := httptest.NewServer(&handler)
b := binder{client.New(server.URL, nil)}
err := b.Bind(item.binding)
if err != nil {
t.Errorf("Unexpected error: %v", err)
continue
}
expectedBody := api.EncodeOrDie(item.binding)
handler.ValidateRequest(t, "/api/v1beta1/bindings", "POST", &expectedBody)
}
}

View File

@ -0,0 +1,80 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
// TODO: move everything from pkg/scheduler into this package. Remove references from registry.
"github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
// Binder knows how to write a binding.
type Binder interface {
Bind(binding *api.Binding) error
}
// Scheduler watches for new unscheduled pods. It attempts to find
// minions that they fit on and writes bindings back to the api server.
type Scheduler struct {
config *Config
}
type Config struct {
MinionLister scheduler.MinionLister
Algorithm scheduler.Scheduler
Binder Binder
// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
// a pod may take some amount of time and we don't want pods to get
// stale while they sit in a channel.
NextPod func() *api.Pod
// Error is called if there is an error. It is passed the pod in
// question, and the error
Error func(*api.Pod, error)
}
// New returns a new scheduler.
func New(c *Config) *Scheduler {
s := &Scheduler{
config: c,
}
return s
}
// Run begins watching and scheduling. It starts a goroutine and returns immediately.
func (s *Scheduler) Run() {
go util.Forever(s.scheduleOne, 0)
}
func (s *Scheduler) scheduleOne() {
pod := s.config.NextPod()
dest, err := s.config.Algorithm.Schedule(*pod, s.config.MinionLister)
if err != nil {
s.config.Error(pod, err)
return
}
b := &api.Binding{
PodID: pod.ID,
Host: dest,
}
if err := s.config.Binder.Bind(b); err != nil {
s.config.Error(pod, err)
}
}

View File

@ -0,0 +1,110 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"errors"
"reflect"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
)
type fakeBinder struct {
b func(binding *api.Binding) error
}
func (fb fakeBinder) Bind(binding *api.Binding) error { return fb.b(binding) }
func podWithID(id string) *api.Pod {
return &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
}
type mockScheduler struct {
machine string
err error
}
func (es mockScheduler) Schedule(pod api.Pod, ml scheduler.MinionLister) (string, error) {
return es.machine, es.err
}
func TestScheduler(t *testing.T) {
errS := errors.New("scheduler")
errB := errors.New("binder")
table := []struct {
injectBindError error
sendPod *api.Pod
algo scheduler.Scheduler
expectErrorPod *api.Pod
expectError error
expectBind *api.Binding
}{
{
sendPod: podWithID("foo"),
algo: mockScheduler{"machine1", nil},
expectBind: &api.Binding{PodID: "foo", Host: "machine1"},
}, {
sendPod: podWithID("foo"),
algo: mockScheduler{"machine1", errS},
expectError: errS,
expectErrorPod: podWithID("foo"),
}, {
sendPod: podWithID("foo"),
algo: mockScheduler{"machine1", nil},
expectBind: &api.Binding{PodID: "foo", Host: "machine1"},
injectBindError: errB,
expectError: errB,
expectErrorPod: podWithID("foo"),
},
}
for i, item := range table {
var gotError error
var gotPod *api.Pod
var gotBinding *api.Binding
c := &Config{
MinionLister: scheduler.FakeMinionLister{"machine1"},
Algorithm: item.algo,
Binder: fakeBinder{func(b *api.Binding) error {
gotBinding = b
return item.injectBindError
}},
Error: func(p *api.Pod, err error) {
gotPod = p
gotError = err
},
NextPod: func() *api.Pod {
return item.sendPod
},
}
s := New(c)
s.scheduleOne()
if e, a := item.expectErrorPod, gotPod; !reflect.DeepEqual(e, a) {
t.Errorf("%v: error pod: wanted %v, got %v", i, e, a)
}
if e, a := item.expectError, gotError; !reflect.DeepEqual(e, a) {
t.Errorf("%v: error: wanted %v, got %v", i, e, a)
}
if e, a := item.expectBind, gotBinding; !reflect.DeepEqual(e, a) {
t.Errorf("%v: error: wanted %v, got %v", i, e, a)
}
}
}

View File

@ -37,6 +37,9 @@ cp -R --preserve=mode $RELEASE_BASE/src/go/* /srv/salt/kube-proxy/go
mkdir -p /srv/salt/controller-manager/go
cp -R --preserve=mode $RELEASE_BASE/src/go/* /srv/salt/controller-manager/go
mkdir -p /srv/salt/scheduler/go
cp -R --preserve=mode $RELEASE_BASE/src/go/* /srv/salt/scheduler/go
mkdir -p /srv/salt/kubelet/go
cp -R --preserve=mode $RELEASE_BASE/src/go/* /srv/salt/kubelet/go