diff --git a/pkg/master/master.go b/pkg/master/master.go index 439f29b5b60..895454df8e5 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -111,7 +111,13 @@ func makeMinionRegistry(c *Config) minion.Registry { } } if minionRegistry == nil { - minionRegistry = minion.NewRegistry(c.Minions, c.NodeResources) + minionRegistry = etcd.NewRegistry(c.EtcdHelper, nil) + for _, minionID := range c.Minions { + minionRegistry.CreateMinion(nil, &api.Minion{ + TypeMeta: api.TypeMeta{ID: minionID}, + NodeResources: c.NodeResources, + }) + } } if c.HealthCheckMinions { minionRegistry = minion.NewHealthyRegistry(minionRegistry, &http.Client{}) diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 68701c0a95f..9f131e85c93 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -34,8 +34,7 @@ import ( // TODO: Need to add a reconciler loop that makes sure that things in pods are reflected into // kubelet (and vice versa) -// Registry implements PodRegistry, ControllerRegistry and ServiceRegistry -// with backed by etcd. +// Registry implements PodRegistry, ControllerRegistry, ServiceRegistry and MinionRegistry, backed by etcd. type Registry struct { tools.EtcdHelper manifestFactory pod.ManifestFactory @@ -382,3 +381,41 @@ func (r *Registry) WatchEndpoints(ctx api.Context, label, field labels.Selector, } return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported") } + +func makeMinionKey(minionID string) string { + return "/registry/minions/" + minionID +} + +func (r *Registry) ListMinions(ctx api.Context) (*api.MinionList, error) { + minions := &api.MinionList{} + err := r.ExtractList("/registry/minions", &minions.Items, &minions.ResourceVersion) + return minions, err +} + +func (r *Registry) CreateMinion(ctx api.Context, minion *api.Minion) error { + // TODO: Add some validations. + err := r.CreateObj(makeMinionKey(minion.ID), minion, 0) + return etcderr.InterpretCreateError(err, "minion", minion.ID) +} + +func (r *Registry) ContainsMinion(ctx api.Context, minionID string) (bool, error) { + var minion api.Minion + key := makeMinionKey(minionID) + err := r.ExtractObj(key, &minion, false) + if err == nil { + return true, nil + } else if tools.IsEtcdNotFound(err) { + return false, nil + } else { + return false, etcderr.InterpretGetError(err, "minion", minion.ID) + } +} + +func (r *Registry) DeleteMinion(ctx api.Context, minionID string) error { + key := makeMinionKey(minionID) + err := r.Delete(key, true) + if err != nil { + return etcderr.InterpretDeleteError(err, "minion", minionID) + } + return nil +} diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index 0a2a260a524..afdcaedfaa3 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -1019,6 +1019,121 @@ func TestEtcdWatchEndpointsBadSelector(t *testing.T) { } } +func TestEtcdListMinions(t *testing.T) { + ctx := api.NewContext() + fakeClient := tools.NewFakeEtcdClient(t) + key := "/registry/minions" + fakeClient.Data[key] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Nodes: []*etcd.Node{ + { + Value: runtime.EncodeOrDie(latest.Codec, &api.Minion{ + TypeMeta: api.TypeMeta{ID: "foo"}, + }), + }, + { + Value: runtime.EncodeOrDie(latest.Codec, &api.Minion{ + TypeMeta: api.TypeMeta{ID: "bar"}, + }), + }, + }, + }, + }, + E: nil, + } + registry := NewTestEtcdRegistry(fakeClient) + minions, err := registry.ListMinions(ctx) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if len(minions.Items) != 2 || minions.Items[0].ID != "foo" || minions.Items[1].ID != "bar" { + t.Errorf("Unexpected minion list: %#v", minions) + } +} + +func TestEtcdCreateMinion(t *testing.T) { + ctx := api.NewContext() + fakeClient := tools.NewFakeEtcdClient(t) + registry := NewTestEtcdRegistry(fakeClient) + err := registry.CreateMinion(ctx, &api.Minion{ + TypeMeta: api.TypeMeta{ID: "foo"}, + }) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + resp, err := fakeClient.Get("/registry/minions/foo", false, false) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + var minion api.Minion + err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &minion) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if minion.ID != "foo" { + t.Errorf("Unexpected minion: %#v %s", minion, resp.Node.Value) + } +} + +func TestEtcdContainsMinion(t *testing.T) { + ctx := api.NewContext() + fakeClient := tools.NewFakeEtcdClient(t) + fakeClient.Set("/registry/minions/foo", runtime.EncodeOrDie(latest.Codec, &api.Minion{TypeMeta: api.TypeMeta{ID: "foo"}}), 0) + registry := NewTestEtcdRegistry(fakeClient) + contains, err := registry.ContainsMinion(ctx, "foo") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if contains == false { + t.Errorf("Expected true, but got false") + } +} + +func TestEtcdContainsMinionNotFound(t *testing.T) { + ctx := api.NewContext() + fakeClient := tools.NewFakeEtcdClient(t) + fakeClient.Data["/registry/minions/foo"] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: nil, + }, + E: tools.EtcdErrorNotFound, + } + registry := NewTestEtcdRegistry(fakeClient) + contains, err := registry.ContainsMinion(ctx, "foo") + + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if contains == true { + t.Errorf("Expected false, but got true") + } +} + +func TestEtcdDeleteMinion(t *testing.T) { + ctx := api.NewContext() + fakeClient := tools.NewFakeEtcdClient(t) + registry := NewTestEtcdRegistry(fakeClient) + err := registry.DeleteMinion(ctx, "foo") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if len(fakeClient.DeletedKeys) != 1 { + t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys) + } + key := "/registry/minions/foo" + if fakeClient.DeletedKeys[0] != key { + t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) + } +} + // TODO We need a test for the compare and swap behavior. This basically requires two things: // 1) Add a per-operation synchronization channel to the fake etcd client, such that any operation waits on that // channel, this will enable us to orchestrate the flow of etcd requests in the test. diff --git a/pkg/registry/minion/caching_registry.go b/pkg/registry/minion/caching_registry.go index 604b67171bd..ce0a09dac4b 100644 --- a/pkg/registry/minion/caching_registry.go +++ b/pkg/registry/minion/caching_registry.go @@ -44,7 +44,7 @@ type CachingRegistry struct { } func NewCachingRegistry(delegate Registry, ttl time.Duration) (Registry, error) { - list, err := delegate.List() + list, err := delegate.ListMinions(nil) if err != nil { return nil, err } @@ -57,9 +57,9 @@ func NewCachingRegistry(delegate Registry, ttl time.Duration) (Registry, error) }, nil } -func (r *CachingRegistry) Contains(nodeID string) (bool, error) { +func (r *CachingRegistry) ContainsMinion(ctx api.Context, nodeID string) (bool, error) { if r.expired() { - if err := r.refresh(false); err != nil { + if err := r.refresh(ctx, false); err != nil { return false, err } } @@ -74,23 +74,23 @@ func (r *CachingRegistry) Contains(nodeID string) (bool, error) { return false, nil } -func (r *CachingRegistry) Delete(minion string) error { - if err := r.delegate.Delete(minion); err != nil { +func (r *CachingRegistry) DeleteMinion(ctx api.Context, nodeID string) error { + if err := r.delegate.DeleteMinion(ctx, nodeID); err != nil { return err } - return r.refresh(true) + return r.refresh(ctx, true) } -func (r *CachingRegistry) Insert(minion string) error { - if err := r.delegate.Insert(minion); err != nil { +func (r *CachingRegistry) CreateMinion(ctx api.Context, minion *api.Minion) error { + if err := r.delegate.CreateMinion(ctx, minion); err != nil { return err } - return r.refresh(true) + return r.refresh(ctx, true) } -func (r *CachingRegistry) List() (*api.MinionList, error) { +func (r *CachingRegistry) ListMinions(ctx api.Context) (*api.MinionList, error) { if r.expired() { - if err := r.refresh(false); err != nil { + if err := r.refresh(ctx, false); err != nil { return r.nodes, err } } @@ -105,12 +105,12 @@ func (r *CachingRegistry) expired() bool { // refresh updates the current store. It double checks expired under lock with the assumption // of optimistic concurrency with the other functions. -func (r *CachingRegistry) refresh(force bool) error { +func (r *CachingRegistry) refresh(ctx api.Context, force bool) error { r.lock.Lock() defer r.lock.Unlock() if force || r.expired() { var err error - r.nodes, err = r.delegate.List() + r.nodes, err = r.delegate.ListMinions(ctx) time := r.clock.Now() atomic.SwapInt64(&r.lastUpdate, time.Unix()) return err diff --git a/pkg/registry/minion/caching_registry_test.go b/pkg/registry/minion/caching_registry_test.go index ddef3cb6b9f..f2d16c607c3 100644 --- a/pkg/registry/minion/caching_registry_test.go +++ b/pkg/registry/minion/caching_registry_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" ) @@ -33,11 +34,12 @@ func (f *fakeClock) Now() time.Time { } func TestCachingHit(t *testing.T) { + ctx := api.NewContext() fakeClock := fakeClock{ now: time.Unix(0, 0), } - fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}) - expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"}) + fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}, api.NodeResources{}) + expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"}, api.NodeResources{}) cache := CachingRegistry{ delegate: fakeRegistry, ttl: 1 * time.Second, @@ -45,7 +47,7 @@ func TestCachingHit(t *testing.T) { lastUpdate: fakeClock.Now().Unix(), nodes: expected, } - list, err := cache.List() + list, err := cache.ListMinions(ctx) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -55,11 +57,12 @@ func TestCachingHit(t *testing.T) { } func TestCachingMiss(t *testing.T) { + ctx := api.NewContext() fakeClock := fakeClock{ now: time.Unix(0, 0), } - fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}) - expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"}) + fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}, api.NodeResources{}) + expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"}, api.NodeResources{}) cache := CachingRegistry{ delegate: fakeRegistry, ttl: 1 * time.Second, @@ -68,7 +71,7 @@ func TestCachingMiss(t *testing.T) { nodes: expected, } fakeClock.now = time.Unix(3, 0) - list, err := cache.List() + list, err := cache.ListMinions(ctx) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -78,11 +81,12 @@ func TestCachingMiss(t *testing.T) { } func TestCachingInsert(t *testing.T) { + ctx := api.NewContext() fakeClock := fakeClock{ now: time.Unix(0, 0), } - fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}) - expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"}) + fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}, api.NodeResources{}) + expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"}, api.NodeResources{}) cache := CachingRegistry{ delegate: fakeRegistry, ttl: 1 * time.Second, @@ -90,11 +94,13 @@ func TestCachingInsert(t *testing.T) { lastUpdate: fakeClock.Now().Unix(), nodes: expected, } - err := cache.Insert("foo") + err := cache.CreateMinion(ctx, &api.Minion{ + TypeMeta: api.TypeMeta{ID: "foo"}, + }) if err != nil { t.Errorf("unexpected error: %v", err) } - list, err := cache.List() + list, err := cache.ListMinions(ctx) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -104,11 +110,12 @@ func TestCachingInsert(t *testing.T) { } func TestCachingDelete(t *testing.T) { + ctx := api.NewContext() fakeClock := fakeClock{ now: time.Unix(0, 0), } - fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}) - expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"}) + fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}, api.NodeResources{}) + expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"}, api.NodeResources{}) cache := CachingRegistry{ delegate: fakeRegistry, ttl: 1 * time.Second, @@ -116,11 +123,11 @@ func TestCachingDelete(t *testing.T) { lastUpdate: fakeClock.Now().Unix(), nodes: expected, } - err := cache.Delete("m2") + err := cache.DeleteMinion(ctx, "m2") if err != nil { t.Errorf("unexpected error: %v", err) } - list, err := cache.List() + list, err := cache.ListMinions(ctx) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/pkg/registry/minion/cloud_registry.go b/pkg/registry/minion/cloud_registry.go index 61ffede3475..6197b7f71d4 100644 --- a/pkg/registry/minion/cloud_registry.go +++ b/pkg/registry/minion/cloud_registry.go @@ -37,8 +37,9 @@ func NewCloudRegistry(cloud cloudprovider.Interface, matchRE string, staticResou }, nil } -func (r *CloudRegistry) Contains(nodeID string) (bool, error) { - instances, err := r.List() +func (r *CloudRegistry) ContainsMinion(ctx api.Context, nodeID string) (bool, error) { + instances, err := r.ListMinions(ctx) + if err != nil { return false, err } @@ -50,15 +51,15 @@ func (r *CloudRegistry) Contains(nodeID string) (bool, error) { return false, nil } -func (r CloudRegistry) Delete(minion string) error { +func (r CloudRegistry) DeleteMinion(ctx api.Context, nodeID string) error { return fmt.Errorf("unsupported") } -func (r CloudRegistry) Insert(minion string) error { +func (r CloudRegistry) CreateMinion(ctx api.Context, minion *api.Minion) error { return fmt.Errorf("unsupported") } -func (r *CloudRegistry) List() (*api.MinionList, error) { +func (r *CloudRegistry) ListMinions(ctx api.Context) (*api.MinionList, error) { instances, ok := r.cloud.Instances() if !ok { return nil, fmt.Errorf("cloud doesn't support instances") diff --git a/pkg/registry/minion/cloud_registry_test.go b/pkg/registry/minion/cloud_registry_test.go index 457b8c5f3af..e0fecfc66a3 100644 --- a/pkg/registry/minion/cloud_registry_test.go +++ b/pkg/registry/minion/cloud_registry_test.go @@ -20,11 +20,13 @@ import ( "reflect" "testing" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" ) func TestCloudList(t *testing.T) { + ctx := api.NewContext() instances := []string{"m1", "m2"} fakeCloud := fake_cloud.FakeCloud{ Machines: instances, @@ -34,17 +36,18 @@ func TestCloudList(t *testing.T) { t.Errorf("unexpected error: %v", err) } - list, err := registry.List() + list, err := registry.ListMinions(ctx) if err != nil { t.Errorf("unexpected error: %v", err) } - if !reflect.DeepEqual(list, registrytest.MakeMinionList(instances)) { + if !reflect.DeepEqual(list, registrytest.MakeMinionList(instances, api.NodeResources{})) { t.Errorf("Unexpected inequality: %#v, %#v", list, instances) } } func TestCloudContains(t *testing.T) { + ctx := api.NewContext() instances := []string{"m1", "m2"} fakeCloud := fake_cloud.FakeCloud{ Machines: instances, @@ -54,7 +57,7 @@ func TestCloudContains(t *testing.T) { t.Errorf("unexpected error: %v", err) } - contains, err := registry.Contains("m1") + contains, err := registry.ContainsMinion(ctx, "m1") if err != nil { t.Errorf("unexpected error: %v", err) } @@ -63,7 +66,7 @@ func TestCloudContains(t *testing.T) { t.Errorf("Unexpected !contains") } - contains, err = registry.Contains("m100") + contains, err = registry.ContainsMinion(ctx, "m100") if err != nil { t.Errorf("unexpected error: %v", err) } @@ -74,6 +77,7 @@ func TestCloudContains(t *testing.T) { } func TestCloudListRegexp(t *testing.T) { + ctx := api.NewContext() instances := []string{"m1", "m2", "n1", "n2"} fakeCloud := fake_cloud.FakeCloud{ Machines: instances, @@ -83,12 +87,12 @@ func TestCloudListRegexp(t *testing.T) { t.Errorf("unexpected error: %v", err) } - list, err := registry.List() + list, err := registry.ListMinions(ctx) if err != nil { t.Errorf("unexpected error: %v", err) } - expectedList := registrytest.MakeMinionList([]string{"m1", "m2"}) + expectedList := registrytest.MakeMinionList([]string{"m1", "m2"}, api.NodeResources{}) if !reflect.DeepEqual(list, expectedList) { t.Errorf("Unexpected inequality: %#v, %#v", list, expectedList) } diff --git a/pkg/registry/minion/doc.go b/pkg/registry/minion/doc.go index b821ced0b48..2d0ed95cc82 100644 --- a/pkg/registry/minion/doc.go +++ b/pkg/registry/minion/doc.go @@ -14,6 +14,5 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package minion provides Registry interface and implementation -// for storing Minions. +// Package minion provides Registry interface and implementation for storing Minions. package minion diff --git a/pkg/registry/minion/healthy_registry.go b/pkg/registry/minion/healthy_registry.go index 28dfbaf4e04..92fb24ba0c2 100644 --- a/pkg/registry/minion/healthy_registry.go +++ b/pkg/registry/minion/healthy_registry.go @@ -40,8 +40,8 @@ func NewHealthyRegistry(delegate Registry, client *http.Client) Registry { } } -func (r *HealthyRegistry) Contains(minion string) (bool, error) { - contains, err := r.delegate.Contains(minion) +func (r *HealthyRegistry) ContainsMinion(ctx api.Context, minion string) (bool, error) { + contains, err := r.delegate.ContainsMinion(ctx, minion) if err != nil { return false, err } @@ -58,17 +58,17 @@ func (r *HealthyRegistry) Contains(minion string) (bool, error) { return true, nil } -func (r *HealthyRegistry) Delete(minion string) error { - return r.delegate.Delete(minion) +func (r *HealthyRegistry) DeleteMinion(ctx api.Context, minionID string) error { + return r.delegate.DeleteMinion(ctx, minionID) } -func (r *HealthyRegistry) Insert(minion string) error { - return r.delegate.Insert(minion) +func (r *HealthyRegistry) CreateMinion(ctx api.Context, minion *api.Minion) error { + return r.delegate.CreateMinion(ctx, minion) } -func (r *HealthyRegistry) List() (currentMinions *api.MinionList, err error) { +func (r *HealthyRegistry) ListMinions(ctx api.Context) (currentMinions *api.MinionList, err error) { result := &api.MinionList{} - list, err := r.delegate.List() + list, err := r.delegate.ListMinions(ctx) if err != nil { return result, err } diff --git a/pkg/registry/minion/healthy_registry_test.go b/pkg/registry/minion/healthy_registry_test.go index b8635018e17..0ac9076743d 100644 --- a/pkg/registry/minion/healthy_registry_test.go +++ b/pkg/registry/minion/healthy_registry_test.go @@ -23,6 +23,7 @@ import ( "reflect" "testing" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" ) @@ -40,30 +41,33 @@ func (alwaysYes) Get(url string) (*http.Response, error) { } func TestBasicDelegation(t *testing.T) { - mockMinionRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2", "m3"}) + ctx := api.NewContext() + mockMinionRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2", "m3"}, api.NodeResources{}) healthy := HealthyRegistry{ delegate: mockMinionRegistry, client: alwaysYes{}, } - list, err := healthy.List() + list, err := healthy.ListMinions(ctx) if err != nil { t.Errorf("unexpected error: %v", err) } if !reflect.DeepEqual(list, &mockMinionRegistry.Minions) { t.Errorf("Expected %v, Got %v", mockMinionRegistry.Minions, list) } - err = healthy.Insert("foo") + err = healthy.CreateMinion(ctx, &api.Minion{ + TypeMeta: api.TypeMeta{ID: "foo"}, + }) if err != nil { t.Errorf("unexpected error: %v", err) } - ok, err := healthy.Contains("m1") + ok, err := healthy.ContainsMinion(ctx, "m1") if err != nil { t.Errorf("unexpected error: %v", err) } if !ok { t.Errorf("Unexpected absence of 'm1'") } - ok, err = healthy.Contains("m5") + ok, err = healthy.ContainsMinion(ctx, "m5") if err != nil { t.Errorf("unexpected error: %v", err) } @@ -85,21 +89,22 @@ func (n *notMinion) Get(url string) (*http.Response, error) { } func TestFiltering(t *testing.T) { - mockMinionRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2", "m3"}) + ctx := api.NewContext() + mockMinionRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2", "m3"}, api.NodeResources{}) healthy := HealthyRegistry{ delegate: mockMinionRegistry, client: ¬Minion{minion: "m1"}, port: 10250, } expected := []string{"m2", "m3"} - list, err := healthy.List() + list, err := healthy.ListMinions(ctx) if err != nil { t.Errorf("unexpected error: %v", err) } - if !reflect.DeepEqual(list, registrytest.MakeMinionList(expected)) { + if !reflect.DeepEqual(list, registrytest.MakeMinionList(expected, api.NodeResources{})) { t.Errorf("Expected %v, Got %v", expected, list) } - ok, err := healthy.Contains("m1") + ok, err := healthy.ContainsMinion(ctx, "m1") if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/pkg/registry/minion/minion.go b/pkg/registry/minion/minion.go deleted file mode 100644 index 9f4ce9e15da..00000000000 --- a/pkg/registry/minion/minion.go +++ /dev/null @@ -1,88 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package minion - -import ( - "fmt" - "sync" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" -) - -var ErrDoesNotExist = fmt.Errorf("The requested resource does not exist.") - -// Registry keeps track of a set of minions. Safe for concurrent reading/writing. -type Registry interface { - List() (currentMinions *api.MinionList, err error) - Insert(minion string) error - Delete(minion string) error - Contains(minion string) (bool, error) -} - -// NewRegistry initializes a minion registry with a list of minions. -func NewRegistry(minions []string, nodeResources api.NodeResources) Registry { - m := &minionList{ - minions: util.StringSet{}, - nodeResources: nodeResources, - } - for _, minion := range minions { - m.minions.Insert(minion) - } - return m -} - -type minionList struct { - minions util.StringSet - lock sync.Mutex - nodeResources api.NodeResources -} - -func (m *minionList) Contains(minion string) (bool, error) { - m.lock.Lock() - defer m.lock.Unlock() - return m.minions.Has(minion), nil -} - -func (m *minionList) Delete(minion string) error { - m.lock.Lock() - defer m.lock.Unlock() - m.minions.Delete(minion) - return nil -} - -func (m *minionList) Insert(newMinion string) error { - m.lock.Lock() - defer m.lock.Unlock() - m.minions.Insert(newMinion) - return nil -} - -func (m *minionList) List() (currentMinions *api.MinionList, err error) { - m.lock.Lock() - defer m.lock.Unlock() - minions := []api.Minion{} - for minion := range m.minions { - minions = append(minions, api.Minion{ - TypeMeta: api.TypeMeta{ID: minion}, - NodeResources: m.nodeResources, - }) - } - return &api.MinionList{ - Items: minions, - }, nil -} diff --git a/pkg/registry/minion/minion_test.go b/pkg/registry/minion/minion_test.go deleted file mode 100644 index d4df16730f3..00000000000 --- a/pkg/registry/minion/minion_test.go +++ /dev/null @@ -1,64 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package minion - -import ( - "testing" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" -) - -func TestRegistry(t *testing.T) { - m := NewRegistry([]string{"foo", "bar"}, api.NodeResources{}) - if has, err := m.Contains("foo"); !has || err != nil { - t.Errorf("missing expected object") - } - if has, err := m.Contains("bar"); !has || err != nil { - t.Errorf("missing expected object") - } - if has, err := m.Contains("baz"); has || err != nil { - t.Errorf("has unexpected object") - } - if err := m.Insert("baz"); err != nil { - t.Errorf("insert failed") - } - if has, err := m.Contains("baz"); !has || err != nil { - t.Errorf("insert didn't actually insert") - } - if err := m.Delete("bar"); err != nil { - t.Errorf("delete failed") - } - if has, err := m.Contains("bar"); has || err != nil { - t.Errorf("delete didn't actually delete") - } - list, err := m.List() - if err != nil { - t.Errorf("got error calling List") - } - if len(list.Items) != 2 || !contains(list, "foo") || !contains(list, "baz") { - t.Errorf("unexpected %v", list) - } -} - -func contains(nodes *api.MinionList, nodeID string) bool { - for _, node := range nodes.Items { - if node.ID == nodeID { - return true - } - } - return false -} diff --git a/pkg/registry/minion/registry.go b/pkg/registry/minion/registry.go new file mode 100644 index 00000000000..129f8d98a9e --- /dev/null +++ b/pkg/registry/minion/registry.go @@ -0,0 +1,27 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package minion + +import "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + +// MinionRegistry is an interface for things that know how to store minions. +type Registry interface { + ListMinions(ctx api.Context) (*api.MinionList, error) + CreateMinion(ctx api.Context, minion *api.Minion) error + ContainsMinion(ctx api.Context, minionID string) (bool, error) + DeleteMinion(ctx api.Context, minionID string) error +} diff --git a/pkg/registry/minion/rest.go b/pkg/registry/minion/rest.go index 3e47217b301..e16d2dc5254 100644 --- a/pkg/registry/minion/rest.go +++ b/pkg/registry/minion/rest.go @@ -38,6 +38,8 @@ func NewREST(m Registry) *REST { } } +var ErrDoesNotExist = fmt.Errorf("The requested resource does not exist.") + func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Object, error) { minion, ok := obj.(*api.Minion) if !ok { @@ -50,11 +52,11 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje minion.CreationTimestamp = util.Now() return apiserver.MakeAsync(func() (runtime.Object, error) { - err := rs.registry.Insert(minion.ID) + err := rs.registry.CreateMinion(ctx, minion) if err != nil { return nil, err } - contains, err := rs.registry.Contains(minion.ID) + contains, err := rs.registry.ContainsMinion(ctx, minion.ID) if err != nil { return nil, err } @@ -66,7 +68,7 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje } func (rs *REST) Delete(ctx api.Context, id string) (<-chan runtime.Object, error) { - exists, err := rs.registry.Contains(id) + exists, err := rs.registry.ContainsMinion(ctx, id) if !exists { return nil, ErrDoesNotExist } @@ -74,12 +76,12 @@ func (rs *REST) Delete(ctx api.Context, id string) (<-chan runtime.Object, error return nil, err } return apiserver.MakeAsync(func() (runtime.Object, error) { - return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(id) + return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteMinion(ctx, id) }), nil } func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { - exists, err := rs.registry.Contains(id) + exists, err := rs.registry.ContainsMinion(ctx, id) if !exists { return nil, ErrDoesNotExist } @@ -87,10 +89,10 @@ func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { } func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) { - return rs.registry.List() + return rs.registry.ListMinions(ctx) } -func (*REST) New() runtime.Object { +func (rs *REST) New() runtime.Object { return &api.Minion{} } diff --git a/pkg/registry/minion/rest_test.go b/pkg/registry/minion/rest_test.go index c295455cb46..ad95be27d65 100644 --- a/pkg/registry/minion/rest_test.go +++ b/pkg/registry/minion/rest_test.go @@ -21,11 +21,11 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" ) func TestMinionREST(t *testing.T) { - m := NewRegistry([]string{"foo", "bar"}, api.NodeResources{}) - ms := NewREST(m) + ms := NewREST(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{})) ctx := api.NewContext() if obj, err := ms.Get(ctx, "foo"); err != nil || obj.(*api.Minion).ID != "foo" { t.Errorf("missing expected object") @@ -72,9 +72,9 @@ func TestMinionREST(t *testing.T) { } expect := []api.Minion{ { - TypeMeta: api.TypeMeta{ID: "baz"}, - }, { TypeMeta: api.TypeMeta{ID: "foo"}, + }, { + TypeMeta: api.TypeMeta{ID: "baz"}, }, } nodeList := list.(*api.MinionList) @@ -82,3 +82,12 @@ func TestMinionREST(t *testing.T) { t.Errorf("Unexpected list value: %#v", list) } } + +func contains(nodes *api.MinionList, nodeID string) bool { + for _, node := range nodes.Items { + if node.ID == nodeID { + return true + } + } + return false +} diff --git a/pkg/registry/registrytest/minion.go b/pkg/registry/registrytest/minion.go index 09187883d82..1cf756cb619 100644 --- a/pkg/registry/registrytest/minion.go +++ b/pkg/registry/registrytest/minion.go @@ -29,53 +29,55 @@ type MinionRegistry struct { sync.Mutex } -func MakeMinionList(minions []string) *api.MinionList { +func MakeMinionList(minions []string, nodeResources api.NodeResources) *api.MinionList { list := api.MinionList{ Items: make([]api.Minion, len(minions)), } for i := range minions { list.Items[i].ID = minions[i] + list.Items[i].NodeResources = nodeResources } return &list } -func NewMinionRegistry(minions []string) *MinionRegistry { +func NewMinionRegistry(minions []string, nodeResources api.NodeResources) *MinionRegistry { return &MinionRegistry{ - Minions: *MakeMinionList(minions), + Minions: *MakeMinionList(minions, nodeResources), } } -func (r *MinionRegistry) List() (*api.MinionList, error) { +func (r *MinionRegistry) ListMinions(ctx api.Context) (*api.MinionList, error) { r.Lock() defer r.Unlock() return &r.Minions, r.Err } -func (r *MinionRegistry) Insert(minion string) error { +func (r *MinionRegistry) CreateMinion(ctx api.Context, minion *api.Minion) error { r.Lock() defer r.Unlock() - r.Minion = minion - r.Minions.Items = append(r.Minions.Items, api.Minion{TypeMeta: api.TypeMeta{ID: minion}}) + r.Minion = minion.ID + r.Minions.Items = append(r.Minions.Items, *minion) return r.Err } -func (r *MinionRegistry) Contains(nodeID string) (bool, error) { +func (r *MinionRegistry) ContainsMinion(ctx api.Context, minionID string) (bool, error) { r.Lock() defer r.Unlock() for _, node := range r.Minions.Items { - if node.ID == nodeID { + if node.ID == minionID { return true, r.Err } } return false, r.Err } -func (r *MinionRegistry) Delete(minion string) error { +func (r *MinionRegistry) DeleteMinion(ctx api.Context, minionID string) error { r.Lock() defer r.Unlock() var newList []api.Minion for _, node := range r.Minions.Items { - if node.ID != minion { + + if node.ID != minionID { newList = append(newList, api.Minion{TypeMeta: api.TypeMeta{ID: node.ID}}) } } diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 1b7b8186b1b..0b607e02cf3 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -76,7 +76,7 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje if !ok { return nil, fmt.Errorf("The cloud provider does not support zone enumeration.") } - hosts, err := rs.machines.List() + hosts, err := rs.machines.ListMinions(ctx) if err != nil { return nil, err } diff --git a/pkg/registry/service/rest_test.go b/pkg/registry/service/rest_test.go index 517db889a16..9aecceff5ea 100644 --- a/pkg/registry/service/rest_test.go +++ b/pkg/registry/service/rest_test.go @@ -26,7 +26,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" ) @@ -34,7 +33,7 @@ func TestServiceRegistryCreate(t *testing.T) { registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{})) + storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{})) svc := &api.Service{ Port: 6502, TypeMeta: api.TypeMeta{ID: "foo"}, @@ -156,7 +155,7 @@ func TestServiceRegistryExternalService(t *testing.T) { registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{})) + storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{})) svc := &api.Service{ Port: 6502, TypeMeta: api.TypeMeta{ID: "foo"}, @@ -183,7 +182,7 @@ func TestServiceRegistryExternalServiceError(t *testing.T) { Err: fmt.Errorf("test error"), } machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{})) + storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{})) svc := &api.Service{ Port: 6502, TypeMeta: api.TypeMeta{ID: "foo"}, @@ -206,7 +205,7 @@ func TestServiceRegistryDelete(t *testing.T) { registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{})) + storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{})) svc := &api.Service{ TypeMeta: api.TypeMeta{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, @@ -227,7 +226,7 @@ func TestServiceRegistryDeleteExternal(t *testing.T) { registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{})) + storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{})) svc := &api.Service{ TypeMeta: api.TypeMeta{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, @@ -314,7 +313,7 @@ func TestServiceRegistryGet(t *testing.T) { registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{})) + storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{})) registry.CreateService(ctx, &api.Service{ TypeMeta: api.TypeMeta{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, @@ -334,7 +333,7 @@ func TestServiceRegistryResourceLocation(t *testing.T) { registry.Endpoints = api.Endpoints{Endpoints: []string{"foo:80"}} fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{})) + storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{})) registry.CreateService(ctx, &api.Service{ TypeMeta: api.TypeMeta{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, @@ -363,7 +362,7 @@ func TestServiceRegistryList(t *testing.T) { registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{})) + storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{})) registry.CreateService(ctx, &api.Service{ TypeMeta: api.TypeMeta{ID: "foo"}, Selector: map[string]string{"bar": "baz"},