Use etcd as backend for minion registry.

This commit is contained in:
Deyuan Deng 2014-09-08 23:02:21 -04:00
parent a6c748451d
commit 4a35325f29
17 changed files with 305 additions and 244 deletions

View File

@ -111,7 +111,13 @@ func makeMinionRegistry(c *Config) minion.Registry {
}
}
if minionRegistry == nil {
minionRegistry = minion.NewRegistry(c.Minions, c.NodeResources)
minionRegistry = etcd.NewRegistry(c.EtcdHelper, nil)
for _, minionID := range c.Minions {
minionRegistry.InsertMinion(nil, &api.Minion{
JSONBase: api.JSONBase{ID: minionID},
NodeResources: c.NodeResources,
})
}
}
if c.HealthCheckMinions {
minionRegistry = minion.NewHealthyRegistry(minionRegistry, &http.Client{})

View File

@ -34,8 +34,7 @@ import (
// TODO: Need to add a reconciler loop that makes sure that things in pods are reflected into
// kubelet (and vice versa)
// Registry implements PodRegistry, ControllerRegistry and ServiceRegistry
// with backed by etcd.
// Registry implements PodRegistry, ControllerRegistry, ServiceRegistry and MinionRegitry, backed by etcd.
type Registry struct {
tools.EtcdHelper
manifestFactory pod.ManifestFactory
@ -382,3 +381,40 @@ func (r *Registry) WatchEndpoints(ctx api.Context, label, field labels.Selector,
}
return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported")
}
func makeMinionKey(minionID string) string {
return "/registry/minions/" + minionID
}
func (r *Registry) ListMinions(ctx api.Context) (*api.MinionList, error) {
minions := &api.MinionList{}
err := r.ExtractList("/registry/minions", &minions.Items, &minions.ResourceVersion)
return minions, err
}
func (r *Registry) InsertMinion(ctx api.Context, minion *api.Minion) error {
err := r.CreateObj(makeMinionKey(minion.ID), minion, 0)
return etcderr.InterpretCreateError(err, "minion", minion.ID)
}
func (r *Registry) ContainsMinion(ctx api.Context, minionID string) (bool, error) {
var minion api.Minion
key := makeMinionKey(minionID)
err := r.ExtractObj(key, &minion, false)
if err == nil {
return true, nil
} else if tools.IsEtcdNotFound(err) {
return false, nil
} else {
return false, etcderr.InterpretGetError(err, "minion", minion.ID)
}
}
func (r *Registry) DeleteMinion(ctx api.Context, minionID string) error {
key := makeMinionKey(minionID)
err := r.Delete(key, true)
if err != nil {
return etcderr.InterpretDeleteError(err, "minion", minionID)
}
return nil
}

View File

@ -1019,6 +1019,121 @@ func TestEtcdWatchEndpointsBadSelector(t *testing.T) {
}
}
func TestEtcdListMinions(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t)
key := "/registry/minions"
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Minion{
JSONBase: api.JSONBase{ID: "foo"},
}),
},
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Minion{
JSONBase: api.JSONBase{ID: "bar"},
}),
},
},
},
},
E: nil,
}
registry := NewTestEtcdRegistry(fakeClient)
minions, err := registry.ListMinions(ctx)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(minions.Items) != 2 || minions.Items[0].ID != "foo" || minions.Items[1].ID != "bar" {
t.Errorf("Unexpected minion list: %#v", minions)
}
}
func TestEtcdInsertMinion(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
err := registry.InsertMinion(ctx, &api.Minion{
JSONBase: api.JSONBase{ID: "foo"},
})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
resp, err := fakeClient.Get("/registry/minions/foo", false, false)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
var minion api.Minion
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &minion)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if minion.ID != "foo" {
t.Errorf("Unexpected minion: %#v %s", minion, resp.Node.Value)
}
}
func TestEtcdContainsMinion(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.Set("/registry/minions/foo", runtime.EncodeOrDie(latest.Codec, &api.Minion{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := NewTestEtcdRegistry(fakeClient)
contains, err := registry.ContainsMinion(ctx, "foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if contains == false {
t.Errorf("Expected true, but got false")
}
}
func TestEtcdContainsMinionNotFound(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.Data["/registry/minions/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: tools.EtcdErrorNotFound,
}
registry := NewTestEtcdRegistry(fakeClient)
contains, err := registry.ContainsMinion(ctx, "foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if contains == true {
t.Errorf("Expected false, but got true")
}
}
func TestEtcdDeleteMinion(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
err := registry.DeleteMinion(ctx, "foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(fakeClient.DeletedKeys) != 1 {
t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys)
}
key := "/registry/minions/foo"
if fakeClient.DeletedKeys[0] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
}
}
// TODO We need a test for the compare and swap behavior. This basically requires two things:
// 1) Add a per-operation synchronization channel to the fake etcd client, such that any operation waits on that
// channel, this will enable us to orchestrate the flow of etcd requests in the test.

View File

@ -44,7 +44,7 @@ type CachingRegistry struct {
}
func NewCachingRegistry(delegate Registry, ttl time.Duration) (Registry, error) {
list, err := delegate.List()
list, err := delegate.ListMinions(nil)
if err != nil {
return nil, err
}
@ -57,9 +57,9 @@ func NewCachingRegistry(delegate Registry, ttl time.Duration) (Registry, error)
}, nil
}
func (r *CachingRegistry) Contains(nodeID string) (bool, error) {
func (r *CachingRegistry) ContainsMinion(ctx api.Context, minionID string) (bool, error) {
if r.expired() {
if err := r.refresh(false); err != nil {
if err := r.refresh(ctx, false); err != nil {
return false, err
}
}
@ -67,30 +67,30 @@ func (r *CachingRegistry) Contains(nodeID string) (bool, error) {
r.lock.RLock()
defer r.lock.RUnlock()
for _, node := range r.nodes.Items {
if node.ID == nodeID {
if node.ID == minionID {
return true, nil
}
}
return false, nil
}
func (r *CachingRegistry) Delete(minion string) error {
if err := r.delegate.Delete(minion); err != nil {
func (r *CachingRegistry) DeleteMinion(ctx api.Context, minionID string) error {
if err := r.delegate.DeleteMinion(ctx, minionID); err != nil {
return err
}
return r.refresh(true)
return r.refresh(ctx, true)
}
func (r *CachingRegistry) Insert(minion string) error {
if err := r.delegate.Insert(minion); err != nil {
func (r *CachingRegistry) InsertMinion(ctx api.Context, minion *api.Minion) error {
if err := r.delegate.InsertMinion(ctx, minion); err != nil {
return err
}
return r.refresh(true)
return r.refresh(ctx, true)
}
func (r *CachingRegistry) List() (*api.MinionList, error) {
func (r *CachingRegistry) ListMinions(ctx api.Context) (*api.MinionList, error) {
if r.expired() {
if err := r.refresh(false); err != nil {
if err := r.refresh(ctx, false); err != nil {
return r.nodes, err
}
}
@ -105,12 +105,12 @@ func (r *CachingRegistry) expired() bool {
// refresh updates the current store. It double checks expired under lock with the assumption
// of optimistic concurrency with the other functions.
func (r *CachingRegistry) refresh(force bool) error {
func (r *CachingRegistry) refresh(ctx api.Context, force bool) error {
r.lock.Lock()
defer r.lock.Unlock()
if force || r.expired() {
var err error
r.nodes, err = r.delegate.List()
r.nodes, err = r.delegate.ListMinions(ctx)
time := r.clock.Now()
atomic.SwapInt64(&r.lastUpdate, time.Unix())
return err

View File

@ -21,6 +21,7 @@ import (
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
)
@ -33,11 +34,12 @@ func (f *fakeClock) Now() time.Time {
}
func TestCachingHit(t *testing.T) {
ctx := api.NewContext()
fakeClock := fakeClock{
now: time.Unix(0, 0),
}
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"})
expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"})
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}, api.NodeResources{})
expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"}, api.NodeResources{})
cache := CachingRegistry{
delegate: fakeRegistry,
ttl: 1 * time.Second,
@ -45,7 +47,7 @@ func TestCachingHit(t *testing.T) {
lastUpdate: fakeClock.Now().Unix(),
nodes: expected,
}
list, err := cache.List()
list, err := cache.ListMinions(ctx)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -55,11 +57,12 @@ func TestCachingHit(t *testing.T) {
}
func TestCachingMiss(t *testing.T) {
ctx := api.NewContext()
fakeClock := fakeClock{
now: time.Unix(0, 0),
}
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"})
expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"})
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}, api.NodeResources{})
expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"}, api.NodeResources{})
cache := CachingRegistry{
delegate: fakeRegistry,
ttl: 1 * time.Second,
@ -68,7 +71,7 @@ func TestCachingMiss(t *testing.T) {
nodes: expected,
}
fakeClock.now = time.Unix(3, 0)
list, err := cache.List()
list, err := cache.ListMinions(ctx)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -78,11 +81,12 @@ func TestCachingMiss(t *testing.T) {
}
func TestCachingInsert(t *testing.T) {
ctx := api.NewContext()
fakeClock := fakeClock{
now: time.Unix(0, 0),
}
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"})
expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"})
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}, api.NodeResources{})
expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"}, api.NodeResources{})
cache := CachingRegistry{
delegate: fakeRegistry,
ttl: 1 * time.Second,
@ -90,11 +94,13 @@ func TestCachingInsert(t *testing.T) {
lastUpdate: fakeClock.Now().Unix(),
nodes: expected,
}
err := cache.Insert("foo")
err := cache.InsertMinion(ctx, &api.Minion{
JSONBase: api.JSONBase{ID: "foo"},
})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
list, err := cache.List()
list, err := cache.ListMinions(ctx)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -104,11 +110,12 @@ func TestCachingInsert(t *testing.T) {
}
func TestCachingDelete(t *testing.T) {
ctx := api.NewContext()
fakeClock := fakeClock{
now: time.Unix(0, 0),
}
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"})
expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"})
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}, api.NodeResources{})
expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"}, api.NodeResources{})
cache := CachingRegistry{
delegate: fakeRegistry,
ttl: 1 * time.Second,
@ -116,11 +123,11 @@ func TestCachingDelete(t *testing.T) {
lastUpdate: fakeClock.Now().Unix(),
nodes: expected,
}
err := cache.Delete("m2")
err := cache.DeleteMinion(ctx, "m2")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
list, err := cache.List()
list, err := cache.ListMinions(ctx)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -37,28 +37,29 @@ func NewCloudRegistry(cloud cloudprovider.Interface, matchRE string, staticResou
}, nil
}
func (r *CloudRegistry) Contains(nodeID string) (bool, error) {
instances, err := r.List()
func (r *CloudRegistry) ContainsMinion(ctx api.Context, minionID string) (bool, error) {
instances, err := r.ListMinions(ctx)
if err != nil {
return false, err
}
for _, node := range instances.Items {
if node.ID == nodeID {
if node.ID == minionID {
return true, nil
}
}
return false, nil
}
func (r CloudRegistry) Delete(minion string) error {
func (r CloudRegistry) DeleteMinion(ctx api.Context, minionID string) error {
return fmt.Errorf("unsupported")
}
func (r CloudRegistry) Insert(minion string) error {
func (r CloudRegistry) InsertMinion(ctx api.Context, minion *api.Minion) error {
return fmt.Errorf("unsupported")
}
func (r *CloudRegistry) List() (*api.MinionList, error) {
func (r *CloudRegistry) ListMinions(ctx api.Context) (*api.MinionList, error) {
instances, ok := r.cloud.Instances()
if !ok {
return nil, fmt.Errorf("cloud doesn't support instances")

View File

@ -20,11 +20,13 @@ import (
"reflect"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
)
func TestCloudList(t *testing.T) {
ctx := api.NewContext()
instances := []string{"m1", "m2"}
fakeCloud := fake_cloud.FakeCloud{
Machines: instances,
@ -34,17 +36,18 @@ func TestCloudList(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
list, err := registry.List()
list, err := registry.ListMinions(ctx)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(list, registrytest.MakeMinionList(instances)) {
if !reflect.DeepEqual(list, registrytest.MakeMinionList(instances, api.NodeResources{})) {
t.Errorf("Unexpected inequality: %#v, %#v", list, instances)
}
}
func TestCloudContains(t *testing.T) {
ctx := api.NewContext()
instances := []string{"m1", "m2"}
fakeCloud := fake_cloud.FakeCloud{
Machines: instances,
@ -54,7 +57,7 @@ func TestCloudContains(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
contains, err := registry.Contains("m1")
contains, err := registry.ContainsMinion(ctx, "m1")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -63,7 +66,7 @@ func TestCloudContains(t *testing.T) {
t.Errorf("Unexpected !contains")
}
contains, err = registry.Contains("m100")
contains, err = registry.ContainsMinion(ctx, "m100")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -74,6 +77,7 @@ func TestCloudContains(t *testing.T) {
}
func TestCloudListRegexp(t *testing.T) {
ctx := api.NewContext()
instances := []string{"m1", "m2", "n1", "n2"}
fakeCloud := fake_cloud.FakeCloud{
Machines: instances,
@ -83,12 +87,12 @@ func TestCloudListRegexp(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
list, err := registry.List()
list, err := registry.ListMinions(ctx)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
expectedList := registrytest.MakeMinionList([]string{"m1", "m2"})
expectedList := registrytest.MakeMinionList([]string{"m1", "m2"}, api.NodeResources{})
if !reflect.DeepEqual(list, expectedList) {
t.Errorf("Unexpected inequality: %#v, %#v", list, expectedList)
}

View File

@ -40,8 +40,8 @@ func NewHealthyRegistry(delegate Registry, client *http.Client) Registry {
}
}
func (r *HealthyRegistry) Contains(minion string) (bool, error) {
contains, err := r.delegate.Contains(minion)
func (r *HealthyRegistry) ContainsMinion(ctx api.Context, minion string) (bool, error) {
contains, err := r.delegate.ContainsMinion(ctx, minion)
if err != nil {
return false, err
}
@ -58,17 +58,17 @@ func (r *HealthyRegistry) Contains(minion string) (bool, error) {
return true, nil
}
func (r *HealthyRegistry) Delete(minion string) error {
return r.delegate.Delete(minion)
func (r *HealthyRegistry) DeleteMinion(ctx api.Context, minionID string) error {
return r.delegate.DeleteMinion(ctx, minionID)
}
func (r *HealthyRegistry) Insert(minion string) error {
return r.delegate.Insert(minion)
func (r *HealthyRegistry) InsertMinion(ctx api.Context, minion *api.Minion) error {
return r.delegate.InsertMinion(ctx, minion)
}
func (r *HealthyRegistry) List() (currentMinions *api.MinionList, err error) {
func (r *HealthyRegistry) ListMinions(ctx api.Context) (currentMinions *api.MinionList, err error) {
result := &api.MinionList{}
list, err := r.delegate.List()
list, err := r.delegate.ListMinions(ctx)
if err != nil {
return result, err
}

View File

@ -23,6 +23,7 @@ import (
"reflect"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
)
@ -40,30 +41,33 @@ func (alwaysYes) Get(url string) (*http.Response, error) {
}
func TestBasicDelegation(t *testing.T) {
mockMinionRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2", "m3"})
ctx := api.NewContext()
mockMinionRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2", "m3"}, api.NodeResources{})
healthy := HealthyRegistry{
delegate: mockMinionRegistry,
client: alwaysYes{},
}
list, err := healthy.List()
list, err := healthy.ListMinions(ctx)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(list, &mockMinionRegistry.Minions) {
t.Errorf("Expected %v, Got %v", mockMinionRegistry.Minions, list)
}
err = healthy.Insert("foo")
err = healthy.InsertMinion(ctx, &api.Minion{
JSONBase: api.JSONBase{ID: "foo"},
})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
ok, err := healthy.Contains("m1")
ok, err := healthy.ContainsMinion(ctx, "m1")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !ok {
t.Errorf("Unexpected absence of 'm1'")
}
ok, err = healthy.Contains("m5")
ok, err = healthy.ContainsMinion(ctx, "m5")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -85,21 +89,22 @@ func (n *notMinion) Get(url string) (*http.Response, error) {
}
func TestFiltering(t *testing.T) {
mockMinionRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2", "m3"})
ctx := api.NewContext()
mockMinionRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2", "m3"}, api.NodeResources{})
healthy := HealthyRegistry{
delegate: mockMinionRegistry,
client: &notMinion{minion: "m1"},
port: 10250,
}
expected := []string{"m2", "m3"}
list, err := healthy.List()
list, err := healthy.ListMinions(ctx)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(list, registrytest.MakeMinionList(expected)) {
if !reflect.DeepEqual(list, registrytest.MakeMinionList(expected, api.NodeResources{})) {
t.Errorf("Expected %v, Got %v", expected, list)
}
ok, err := healthy.Contains("m1")
ok, err := healthy.ContainsMinion(ctx, "m1")
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -1,88 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package minion
import (
"fmt"
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
var ErrDoesNotExist = fmt.Errorf("The requested resource does not exist.")
// Registry keeps track of a set of minions. Safe for concurrent reading/writing.
type Registry interface {
List() (currentMinions *api.MinionList, err error)
Insert(minion string) error
Delete(minion string) error
Contains(minion string) (bool, error)
}
// NewRegistry initializes a minion registry with a list of minions.
func NewRegistry(minions []string, nodeResources api.NodeResources) Registry {
m := &minionList{
minions: util.StringSet{},
nodeResources: nodeResources,
}
for _, minion := range minions {
m.minions.Insert(minion)
}
return m
}
type minionList struct {
minions util.StringSet
lock sync.Mutex
nodeResources api.NodeResources
}
func (m *minionList) Contains(minion string) (bool, error) {
m.lock.Lock()
defer m.lock.Unlock()
return m.minions.Has(minion), nil
}
func (m *minionList) Delete(minion string) error {
m.lock.Lock()
defer m.lock.Unlock()
m.minions.Delete(minion)
return nil
}
func (m *minionList) Insert(newMinion string) error {
m.lock.Lock()
defer m.lock.Unlock()
m.minions.Insert(newMinion)
return nil
}
func (m *minionList) List() (currentMinions *api.MinionList, err error) {
m.lock.Lock()
defer m.lock.Unlock()
minions := []api.Minion{}
for minion := range m.minions {
minions = append(minions, api.Minion{
TypeMeta: api.TypeMeta{ID: minion},
NodeResources: m.nodeResources,
})
}
return &api.MinionList{
Items: minions,
}, nil
}

View File

@ -1,64 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package minion
import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
func TestRegistry(t *testing.T) {
m := NewRegistry([]string{"foo", "bar"}, api.NodeResources{})
if has, err := m.Contains("foo"); !has || err != nil {
t.Errorf("missing expected object")
}
if has, err := m.Contains("bar"); !has || err != nil {
t.Errorf("missing expected object")
}
if has, err := m.Contains("baz"); has || err != nil {
t.Errorf("has unexpected object")
}
if err := m.Insert("baz"); err != nil {
t.Errorf("insert failed")
}
if has, err := m.Contains("baz"); !has || err != nil {
t.Errorf("insert didn't actually insert")
}
if err := m.Delete("bar"); err != nil {
t.Errorf("delete failed")
}
if has, err := m.Contains("bar"); has || err != nil {
t.Errorf("delete didn't actually delete")
}
list, err := m.List()
if err != nil {
t.Errorf("got error calling List")
}
if len(list.Items) != 2 || !contains(list, "foo") || !contains(list, "baz") {
t.Errorf("unexpected %v", list)
}
}
func contains(nodes *api.MinionList, nodeID string) bool {
for _, node := range nodes.Items {
if node.ID == nodeID {
return true
}
}
return false
}

View File

@ -0,0 +1,27 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package minion
import "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
// MinionRegistry is an interface for things that know how to store minions.
type Registry interface {
ListMinions(ctx api.Context) (*api.MinionList, error)
InsertMinion(ctx api.Context, minion *api.Minion) error
ContainsMinion(ctx api.Context, minionID string) (bool, error)
DeleteMinion(ctx api.Context, minionID string) error
}

View File

@ -38,6 +38,8 @@ func NewREST(m Registry) *REST {
}
}
var ErrDoesNotExist = fmt.Errorf("The requested resource does not exist.")
func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Object, error) {
minion, ok := obj.(*api.Minion)
if !ok {
@ -50,11 +52,11 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje
minion.CreationTimestamp = util.Now()
return apiserver.MakeAsync(func() (runtime.Object, error) {
err := rs.registry.Insert(minion.ID)
err := rs.registry.InsertMinion(ctx, minion)
if err != nil {
return nil, err
}
contains, err := rs.registry.Contains(minion.ID)
contains, err := rs.registry.ContainsMinion(ctx, minion.ID)
if err != nil {
return nil, err
}
@ -66,7 +68,7 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje
}
func (rs *REST) Delete(ctx api.Context, id string) (<-chan runtime.Object, error) {
exists, err := rs.registry.Contains(id)
exists, err := rs.registry.ContainsMinion(ctx, id)
if !exists {
return nil, ErrDoesNotExist
}
@ -74,12 +76,12 @@ func (rs *REST) Delete(ctx api.Context, id string) (<-chan runtime.Object, error
return nil, err
}
return apiserver.MakeAsync(func() (runtime.Object, error) {
return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(id)
return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteMinion(ctx, id)
}), nil
}
func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
exists, err := rs.registry.Contains(id)
exists, err := rs.registry.ContainsMinion(ctx, id)
if !exists {
return nil, ErrDoesNotExist
}
@ -87,10 +89,10 @@ func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
}
func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) {
return rs.registry.List()
return rs.registry.ListMinions(ctx)
}
func (*REST) New() runtime.Object {
func (rs *REST) New() runtime.Object {
return &api.Minion{}
}

View File

@ -21,11 +21,11 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
)
func TestMinionREST(t *testing.T) {
m := NewRegistry([]string{"foo", "bar"}, api.NodeResources{})
ms := NewREST(m)
ms := NewREST(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}))
ctx := api.NewContext()
if obj, err := ms.Get(ctx, "foo"); err != nil || obj.(*api.Minion).ID != "foo" {
t.Errorf("missing expected object")
@ -72,9 +72,9 @@ func TestMinionREST(t *testing.T) {
}
expect := []api.Minion{
{
TypeMeta: api.TypeMeta{ID: "baz"},
}, {
TypeMeta: api.TypeMeta{ID: "foo"},
}, {
TypeMeta: api.TypeMeta{ID: "baz"},
},
}
nodeList := list.(*api.MinionList)
@ -82,3 +82,12 @@ func TestMinionREST(t *testing.T) {
t.Errorf("Unexpected list value: %#v", list)
}
}
func contains(nodes *api.MinionList, nodeID string) bool {
for _, node := range nodes.Items {
if node.ID == nodeID {
return true
}
}
return false
}

View File

@ -29,53 +29,55 @@ type MinionRegistry struct {
sync.Mutex
}
func MakeMinionList(minions []string) *api.MinionList {
func MakeMinionList(minions []string, nodeResources api.NodeResources) *api.MinionList {
list := api.MinionList{
Items: make([]api.Minion, len(minions)),
}
for i := range minions {
list.Items[i].ID = minions[i]
list.Items[i].NodeResources = nodeResources
}
return &list
}
func NewMinionRegistry(minions []string) *MinionRegistry {
func NewMinionRegistry(minions []string, nodeResources api.NodeResources) *MinionRegistry {
return &MinionRegistry{
Minions: *MakeMinionList(minions),
Minions: *MakeMinionList(minions, nodeResources),
}
}
func (r *MinionRegistry) List() (*api.MinionList, error) {
func (r *MinionRegistry) ListMinions(ctx api.Context) (*api.MinionList, error) {
r.Lock()
defer r.Unlock()
return &r.Minions, r.Err
}
func (r *MinionRegistry) Insert(minion string) error {
func (r *MinionRegistry) InsertMinion(ctx api.Context, minion *api.Minion) error {
r.Lock()
defer r.Unlock()
r.Minion = minion
r.Minions.Items = append(r.Minions.Items, api.Minion{TypeMeta: api.TypeMeta{ID: minion}})
r.Minion = minion.ID
r.Minions.Items = append(r.Minions.Items, *minion)
return r.Err
}
func (r *MinionRegistry) Contains(nodeID string) (bool, error) {
func (r *MinionRegistry) ContainsMinion(ctx api.Context, minionID string) (bool, error) {
r.Lock()
defer r.Unlock()
for _, node := range r.Minions.Items {
if node.ID == nodeID {
if node.ID == minionID {
return true, r.Err
}
}
return false, r.Err
}
func (r *MinionRegistry) Delete(minion string) error {
func (r *MinionRegistry) DeleteMinion(ctx api.Context, minionID string) error {
r.Lock()
defer r.Unlock()
var newList []api.Minion
for _, node := range r.Minions.Items {
if node.ID != minion {
if node.ID != minionID {
newList = append(newList, api.Minion{TypeMeta: api.TypeMeta{ID: node.ID}})
}
}

View File

@ -76,7 +76,7 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje
if !ok {
return nil, fmt.Errorf("The cloud provider does not support zone enumeration.")
}
hosts, err := rs.machines.List()
hosts, err := rs.machines.ListMinions(ctx)
if err != nil {
return nil, err
}

View File

@ -26,7 +26,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
)
@ -34,7 +33,7 @@ func TestServiceRegistryCreate(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{}))
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}))
svc := &api.Service{
Port: 6502,
TypeMeta: api.TypeMeta{ID: "foo"},
@ -156,7 +155,7 @@ func TestServiceRegistryExternalService(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{}))
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}))
svc := &api.Service{
Port: 6502,
TypeMeta: api.TypeMeta{ID: "foo"},
@ -183,7 +182,7 @@ func TestServiceRegistryExternalServiceError(t *testing.T) {
Err: fmt.Errorf("test error"),
}
machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{}))
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}))
svc := &api.Service{
Port: 6502,
TypeMeta: api.TypeMeta{ID: "foo"},
@ -206,7 +205,7 @@ func TestServiceRegistryDelete(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{}))
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}))
svc := &api.Service{
TypeMeta: api.TypeMeta{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
@ -227,7 +226,7 @@ func TestServiceRegistryDeleteExternal(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{}))
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}))
svc := &api.Service{
TypeMeta: api.TypeMeta{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
@ -314,7 +313,7 @@ func TestServiceRegistryGet(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{}))
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}))
registry.CreateService(ctx, &api.Service{
TypeMeta: api.TypeMeta{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
@ -334,7 +333,7 @@ func TestServiceRegistryResourceLocation(t *testing.T) {
registry.Endpoints = api.Endpoints{Endpoints: []string{"foo:80"}}
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{}))
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}))
registry.CreateService(ctx, &api.Service{
TypeMeta: api.TypeMeta{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
@ -363,7 +362,7 @@ func TestServiceRegistryList(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{}))
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}))
registry.CreateService(ctx, &api.Service{
TypeMeta: api.TypeMeta{ID: "foo"},
Selector: map[string]string{"bar": "baz"},