diff --git a/pkg/api/rest/types.go b/pkg/api/rest/types.go index 308e2ec3ec1..fe839781d71 100644 --- a/pkg/api/rest/types.go +++ b/pkg/api/rest/types.go @@ -79,31 +79,3 @@ func (svcStrategy) AllowCreateOnUpdate() bool { func (svcStrategy) ValidateUpdate(obj, old runtime.Object) fielderrors.ValidationErrorList { return validation.ValidateServiceUpdate(old.(*api.Service), obj.(*api.Service)) } - -// nodeStrategy implements behavior for nodes -// TODO: move to a node specific package. -type nodeStrategy struct { - runtime.ObjectTyper - api.NameGenerator -} - -// Nodes is the default logic that applies when creating and updating Node -// objects. -var Nodes RESTCreateStrategy = nodeStrategy{api.Scheme, api.SimpleNameGenerator} - -// NamespaceScoped is false for nodes. -func (nodeStrategy) NamespaceScoped() bool { - return false -} - -// ResetBeforeCreate clears fields that are not allowed to be set by end users on creation. -func (nodeStrategy) ResetBeforeCreate(obj runtime.Object) { - _ = obj.(*api.Node) - // Nodes allow *all* fields, including status, to be set. -} - -// Validate validates a new node. -func (nodeStrategy) Validate(obj runtime.Object) fielderrors.ValidationErrorList { - node := obj.(*api.Node) - return validation.ValidateMinion(node) -} diff --git a/pkg/master/master.go b/pkg/master/master.go index d285baeb700..81ec2b7ca59 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -49,6 +49,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/event" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/limitrange" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" + nodeetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/namespace" namespaceetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/namespace/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" @@ -362,13 +363,12 @@ func (m *Master) init(c *Config) { endpointsStorage := endpointsetcd.NewStorage(c.EtcdHelper) m.endpointRegistry = endpoint.NewRegistry(endpointsStorage) + nodeStorage := nodeetcd.NewStorage(c.EtcdHelper, c.KubeletClient) + m.nodeRegistry = minion.NewRegistry(nodeStorage) + // TODO: split me up into distinct storage registries registry := etcd.NewRegistry(c.EtcdHelper, podRegistry, m.endpointRegistry) - m.serviceRegistry = registry - m.nodeRegistry = registry - - nodeStorage := minion.NewStorage(m.nodeRegistry, c.KubeletClient) controllerStorage := controlleretcd.NewREST(c.EtcdHelper) diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index eb6dd4c49b5..08f0f169f5c 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -38,8 +38,6 @@ const ( ControllerPath string = "/registry/controllers" // ServicePath is the path to service resources in etcd ServicePath string = "/registry/services/specs" - // NodePath is the path to node resources in etcd - NodePath string = "/registry/minions" ) // TODO: Need to add a reconciler loop that makes sure that things in pods are reflected into @@ -280,65 +278,3 @@ func (r *Registry) WatchServices(ctx api.Context, label labels.Selector, field f } return nil, fmt.Errorf("only the 'name' and default (everything) field selectors are supported") } - -func makeNodeKey(nodeID string) string { - return NodePath + "/" + nodeID -} - -func makeNodeListKey() string { - return NodePath -} - -func (r *Registry) ListMinions(ctx api.Context) (*api.NodeList, error) { - minions := &api.NodeList{} - err := r.ExtractToList(makeNodeListKey(), minions) - return minions, err -} - -func (r *Registry) CreateMinion(ctx api.Context, minion *api.Node) error { - // TODO: Add some validations. - err := r.CreateObj(makeNodeKey(minion.Name), minion, nil, 0) - return etcderr.InterpretCreateError(err, "minion", minion.Name) -} - -func (r *Registry) UpdateMinion(ctx api.Context, minion *api.Node) error { - // TODO: Add some validations. - err := r.SetObj(makeNodeKey(minion.Name), minion, nil, 0) - return etcderr.InterpretUpdateError(err, "minion", minion.Name) -} - -func (r *Registry) GetMinion(ctx api.Context, minionID string) (*api.Node, error) { - var minion api.Node - key := makeNodeKey(minionID) - err := r.ExtractObj(key, &minion, false) - if err != nil { - return nil, etcderr.InterpretGetError(err, "minion", minionID) - } - return &minion, nil -} - -func (r *Registry) DeleteMinion(ctx api.Context, minionID string) error { - key := makeNodeKey(minionID) - err := r.Delete(key, true) - if err != nil { - return etcderr.InterpretDeleteError(err, "minion", minionID) - } - return nil -} - -func (r *Registry) WatchMinions(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - version, err := tools.ParseWatchResourceVersion(resourceVersion, "node") - if err != nil { - return nil, err - } - key := makeNodeListKey() - return r.WatchList(key, version, func(obj runtime.Object) bool { - minionObj, ok := obj.(*api.Node) - if !ok { - // Must be an error: return true to propagate to upper level. - return true - } - // TODO: Add support for filtering based on field, once NodeStatus is defined. - return label.Matches(labels.Set(minionObj.Labels)) - }) -} diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index 47ef8010145..66014ba645c 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -708,225 +708,6 @@ func TestEtcdWatchEndpointsAcrossNamespaces(t *testing.T) { watching.Stop() } -func TestEtcdListMinions(t *testing.T) { - ctx := api.NewContext() - fakeClient := tools.NewFakeEtcdClient(t) - key := "/registry/minions" - fakeClient.Data[key] = tools.EtcdResponseWithError{ - R: &etcd.Response{ - Node: &etcd.Node{ - Nodes: []*etcd.Node{ - { - Value: runtime.EncodeOrDie(latest.Codec, &api.Node{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, - }), - }, - { - Value: runtime.EncodeOrDie(latest.Codec, &api.Node{ - ObjectMeta: api.ObjectMeta{Name: "bar"}, - }), - }, - }, - }, - }, - E: nil, - } - registry := NewTestEtcdRegistry(fakeClient) - minions, err := registry.ListMinions(ctx) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - - if len(minions.Items) != 2 || minions.Items[0].Name != "foo" || minions.Items[1].Name != "bar" { - t.Errorf("Unexpected minion list: %#v", minions) - } -} - -func TestEtcdCreateMinion(t *testing.T) { - ctx := api.NewContext() - fakeClient := tools.NewFakeEtcdClient(t) - registry := NewTestEtcdRegistry(fakeClient) - err := registry.CreateMinion(ctx, &api.Node{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, - }) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - - resp, err := fakeClient.Get("/registry/minions/foo", false, false) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - - var minion api.Node - err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &minion) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - - if minion.Name != "foo" { - t.Errorf("Unexpected minion: %#v %s", minion, resp.Node.Value) - } -} - -func TestEtcdGetMinion(t *testing.T) { - ctx := api.NewContext() - fakeClient := tools.NewFakeEtcdClient(t) - fakeClient.Set("/registry/minions/foo", runtime.EncodeOrDie(latest.Codec, &api.Node{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0) - registry := NewTestEtcdRegistry(fakeClient) - minion, err := registry.GetMinion(ctx, "foo") - if err != nil { - t.Errorf("unexpected error: %v", err) - } - - if minion.Name != "foo" { - t.Errorf("Unexpected minion: %#v", minion) - } -} - -func TestEtcdGetMinionNotFound(t *testing.T) { - ctx := api.NewContext() - fakeClient := tools.NewFakeEtcdClient(t) - fakeClient.Data["/registry/minions/foo"] = tools.EtcdResponseWithError{ - R: &etcd.Response{ - Node: nil, - }, - E: tools.EtcdErrorNotFound, - } - registry := NewTestEtcdRegistry(fakeClient) - _, err := registry.GetMinion(ctx, "foo") - - if !errors.IsNotFound(err) { - t.Errorf("Unexpected error returned: %#v", err) - } -} - -func TestEtcdDeleteMinion(t *testing.T) { - ctx := api.NewContext() - fakeClient := tools.NewFakeEtcdClient(t) - registry := NewTestEtcdRegistry(fakeClient) - key := "/registry/minions/foo" - fakeClient.Set("/registry/minions/foo", runtime.EncodeOrDie(latest.Codec, &api.Node{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0) - - err := registry.DeleteMinion(ctx, "foo") - if err != nil { - t.Errorf("unexpected error: %v", err) - } - - if len(fakeClient.DeletedKeys) != 1 { - t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys) - } - if fakeClient.DeletedKeys[0] != key { - t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) - } -} - -func TestEtcdWatchMinion(t *testing.T) { - ctx := api.NewDefaultContext() - fakeClient := tools.NewFakeEtcdClient(t) - registry := NewTestEtcdRegistry(fakeClient) - watching, err := registry.WatchMinions(ctx, - labels.Everything(), - fields.Everything(), - "1", - ) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - fakeClient.WaitForWatchCompletion() - - select { - case _, ok := <-watching.ResultChan(): - if !ok { - t.Errorf("watching channel should be open") - } - default: - } - fakeClient.WatchInjectError <- nil - if _, ok := <-watching.ResultChan(); ok { - t.Errorf("watching channel should be closed") - } - watching.Stop() -} - -func TestEtcdWatchMinionsMatch(t *testing.T) { - ctx := api.NewDefaultContext() - fakeClient := tools.NewFakeEtcdClient(t) - registry := NewTestEtcdRegistry(fakeClient) - watching, err := registry.WatchMinions(ctx, - labels.SelectorFromSet(labels.Set{"name": "foo"}), - fields.Everything(), - "1", - ) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - fakeClient.WaitForWatchCompletion() - - node := &api.Node{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - Labels: map[string]string{ - "name": "foo", - }, - }, - } - nodeBytes, _ := latest.Codec.Encode(node) - fakeClient.WatchResponse <- &etcd.Response{ - Action: "create", - Node: &etcd.Node{ - Value: string(nodeBytes), - }, - } - select { - case _, ok := <-watching.ResultChan(): - if !ok { - t.Errorf("watching channel should be open") - } - case <-time.After(time.Millisecond * 100): - t.Error("unexpected timeout from result channel") - } - watching.Stop() -} - -func TestEtcdWatchMinionsNotMatch(t *testing.T) { - ctx := api.NewDefaultContext() - fakeClient := tools.NewFakeEtcdClient(t) - registry := NewTestEtcdRegistry(fakeClient) - watching, err := registry.WatchMinions(ctx, - labels.SelectorFromSet(labels.Set{"name": "foo"}), - fields.Everything(), - "1", - ) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - fakeClient.WaitForWatchCompletion() - - node := &api.Node{ - ObjectMeta: api.ObjectMeta{ - Name: "bar", - Labels: map[string]string{ - "name": "bar", - }, - }, - } - nodeBytes, _ := latest.Codec.Encode(node) - fakeClient.WatchResponse <- &etcd.Response{ - Action: "create", - Node: &etcd.Node{ - Value: string(nodeBytes), - }, - } - - select { - case <-watching.ResultChan(): - t.Error("unexpected result from result channel") - case <-time.After(time.Millisecond * 100): - // expected case - } -} - // TODO We need a test for the compare and swap behavior. This basically requires two things: // 1) Add a per-operation synchronization channel to the fake etcd client, such that any operation waits on that // channel, this will enable us to orchestrate the flow of etcd requests in the test. diff --git a/pkg/registry/minion/etcd/etcd.go b/pkg/registry/minion/etcd/etcd.go new file mode 100644 index 00000000000..192e9da2356 --- /dev/null +++ b/pkg/registry/minion/etcd/etcd.go @@ -0,0 +1,75 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd + +import ( + "net/http" + "net/url" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" + etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" +) + +type REST struct { + *etcdgeneric.Etcd + connection client.ConnectionInfoGetter +} + +// NewStorage returns a RESTStorage object that will work against nodes. +func NewStorage(h tools.EtcdHelper, connection client.ConnectionInfoGetter) *REST { + prefix := "/registry/minions" + store := &etcdgeneric.Etcd{ + NewFunc: func() runtime.Object { return &api.Node{} }, + NewListFunc: func() runtime.Object { return &api.NodeList{} }, + KeyRootFunc: func(ctx api.Context) string { + return prefix + }, + KeyFunc: func(ctx api.Context, name string) (string, error) { + return prefix + "/" + name, nil + }, + ObjectNameFunc: func(obj runtime.Object) (string, error) { + return obj.(*api.Node).Name, nil + }, + PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher { + return minion.MatchNode(label, field) + }, + EndpointName: "minion", + + CreateStrategy: minion.Strategy, + UpdateStrategy: minion.Strategy, + + Helper: h, + } + + return &REST{store, connection} +} + +// Implement Redirector. +var _ = rest.Redirector(&REST{}) + +// ResourceLocation returns a URL to which one can send traffic for the specified minion. +func (r *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.RoundTripper, error) { + return minion.ResourceLocation(r, r.connection, ctx, id) +} diff --git a/pkg/registry/minion/etcd/etcd_test.go b/pkg/registry/minion/etcd/etcd_test.go new file mode 100644 index 00000000000..eabf55781a6 --- /dev/null +++ b/pkg/registry/minion/etcd/etcd_test.go @@ -0,0 +1,364 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd + +import ( + "net/http" + "testing" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest/resttest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + + "github.com/coreos/go-etcd/etcd" +) + +type fakeConnectionInfoGetter struct { +} + +func (fakeConnectionInfoGetter) GetConnectionInfo(host string) (string, uint, http.RoundTripper, error) { + return "http", 12345, nil, nil +} + +func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) { + fakeEtcdClient := tools.NewFakeEtcdClient(t) + fakeEtcdClient.TestIndex = true + helper := tools.NewEtcdHelper(fakeEtcdClient, latest.Codec) + return fakeEtcdClient, helper +} + +func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) { + fakeEtcdClient, h := newHelper(t) + storage := NewStorage(h, fakeConnectionInfoGetter{}) + return storage, fakeEtcdClient +} + +func validNewNode() *api.Node { + return &api.Node{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Labels: map[string]string{ + "name": "foo", + }, + }, + } +} + +func validChangedNode() *api.Node { + node := validNewNode() + node.ResourceVersion = "1" + return node +} + +func TestCreate(t *testing.T) { + storage, fakeEtcdClient := newStorage(t) + test := resttest.New(t, storage, fakeEtcdClient.SetError) + node := validNewNode() + node.ObjectMeta = api.ObjectMeta{} + test.TestCreate( + // valid + node, + // invalid + &api.Node{ + ObjectMeta: api.ObjectMeta{Name: "_-a123-a_"}, + }, + ) +} + +func TestDelete(t *testing.T) { + ctx := api.NewDefaultContext() + storage, fakeEtcdClient := newStorage(t) + test := resttest.New(t, storage, fakeEtcdClient.SetError) + + node := validChangedNode() + key, _ := storage.KeyFunc(ctx, node.Name) + createFn := func() runtime.Object { + fakeEtcdClient.Data[key] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(latest.Codec, node), + ModifiedIndex: 1, + }, + }, + } + return node + } + gracefulSetFn := func() bool { + if fakeEtcdClient.Data[key].R.Node == nil { + return false + } + return fakeEtcdClient.Data[key].R.Node.TTL == 30 + } + test.TestDeleteNoGraceful(createFn, gracefulSetFn) +} + +func TestEtcdListNodes(t *testing.T) { + ctx := api.NewContext() + storage, fakeClient := newStorage(t) + key := storage.KeyRootFunc(ctx) + fakeClient.Data[key] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Nodes: []*etcd.Node{ + { + Value: runtime.EncodeOrDie(latest.Codec, &api.Node{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + }), + }, + { + Value: runtime.EncodeOrDie(latest.Codec, &api.Node{ + ObjectMeta: api.ObjectMeta{Name: "bar"}, + }), + }, + }, + }, + }, + E: nil, + } + nodesObj, err := storage.List(ctx, labels.Everything(), fields.Everything()) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + nodes := nodesObj.(*api.NodeList) + if len(nodes.Items) != 2 || nodes.Items[0].Name != "foo" || nodes.Items[1].Name != "bar" { + t.Errorf("Unexpected nodes list: %#v", nodes) + } +} + +func TestEtcdListNodesMatch(t *testing.T) { + ctx := api.NewContext() + storage, fakeClient := newStorage(t) + key := storage.KeyRootFunc(ctx) + fakeClient.Data[key] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Nodes: []*etcd.Node{ + { + Value: runtime.EncodeOrDie(latest.Codec, &api.Node{ + ObjectMeta: api.ObjectMeta{Name: "foo", + Labels: map[string]string{ + "name": "foo", + }, + }, + }), + }, + { + Value: runtime.EncodeOrDie(latest.Codec, &api.Node{ + ObjectMeta: api.ObjectMeta{Name: "bar", + Labels: map[string]string{ + "name": "bar", + }, + }, + }), + }, + }, + }, + }, + E: nil, + } + label := labels.SelectorFromSet(labels.Set{"name": "bar"}) + nodesObj, err := storage.List(ctx, label, fields.Everything()) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + nodes := nodesObj.(*api.NodeList) + if len(nodes.Items) != 1 || nodes.Items[0].Name != "bar" { + t.Errorf("Unexpected nodes list: %#v", nodes) + } +} + +func TestEtcdGetNode(t *testing.T) { + ctx := api.NewContext() + storage, fakeClient := newStorage(t) + node := validNewNode() + key, _ := storage.KeyFunc(ctx, node.Name) + + fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, node), 0) + nodeObj, err := storage.Get(ctx, node.Name) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + got := nodeObj.(*api.Node) + + node.ObjectMeta.ResourceVersion = got.ObjectMeta.ResourceVersion + if e, a := node, got; !api.Semantic.DeepEqual(*e, *a) { + t.Errorf("Unexpected node: %#v, expected %#v", e, a) + } +} + +func TestEtcdUpdateEndpoints(t *testing.T) { + ctx := api.NewContext() + storage, fakeClient := newStorage(t) + node := validChangedNode() + + key, _ := storage.KeyFunc(ctx, node.Name) + fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, validNewNode()), 0) + + _, _, err := storage.Update(ctx, node) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + response, err := fakeClient.Get(key, false, false) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + var nodeOut api.Node + err = latest.Codec.DecodeInto([]byte(response.Node.Value), &nodeOut) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + node.ObjectMeta.ResourceVersion = nodeOut.ObjectMeta.ResourceVersion + if !api.Semantic.DeepEqual(node, &nodeOut) { + t.Errorf("Unexpected node: %#v, expected %#v", &nodeOut, node) + } +} + +func TestEtcdGetNodeNotFound(t *testing.T) { + ctx := api.NewContext() + storage, fakeClient := newStorage(t) + fakeClient.Data["/registry/minions/foo"] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: nil, + }, + E: tools.EtcdErrorNotFound, + } + _, err := storage.Get(ctx, "foo") + + if !errors.IsNotFound(err) { + t.Errorf("Unexpected error returned: %#v", err) + } +} + +func TestEtcdDeleteNode(t *testing.T) { + ctx := api.NewContext() + storage, fakeClient := newStorage(t) + node := validNewNode() + key, _ := storage.KeyFunc(ctx, node.Name) + fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, node), 0) + _, err := storage.Delete(ctx, node.Name, nil) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if len(fakeClient.DeletedKeys) != 1 { + t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys) + } + if fakeClient.DeletedKeys[0] != key { + t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) + } +} + +func TestEtcdWatchNode(t *testing.T) { + ctx := api.NewDefaultContext() + storage, fakeClient := newStorage(t) + watching, err := storage.Watch(ctx, + labels.Everything(), + fields.Everything(), + "1", + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() + + select { + case _, ok := <-watching.ResultChan(): + if !ok { + t.Errorf("watching channel should be open") + } + default: + } + fakeClient.WatchInjectError <- nil + if _, ok := <-watching.ResultChan(); ok { + t.Errorf("watching channel should be closed") + } + watching.Stop() +} + +func TestEtcdWatchNodesMatch(t *testing.T) { + ctx := api.NewDefaultContext() + storage, fakeClient := newStorage(t) + node := validNewNode() + + watching, err := storage.Watch(ctx, + labels.SelectorFromSet(labels.Set{"name": node.Name}), + fields.Everything(), + "1", + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() + + nodeBytes, _ := latest.Codec.Encode(node) + fakeClient.WatchResponse <- &etcd.Response{ + Action: "create", + Node: &etcd.Node{ + Value: string(nodeBytes), + }, + } + select { + case _, ok := <-watching.ResultChan(): + if !ok { + t.Errorf("watching channel should be open") + } + case <-time.After(time.Millisecond * 100): + t.Error("unexpected timeout from result channel") + } + watching.Stop() +} + +func TestEtcdWatchNodesNotMatch(t *testing.T) { + ctx := api.NewDefaultContext() + storage, fakeClient := newStorage(t) + node := validNewNode() + + watching, err := storage.Watch(ctx, + labels.SelectorFromSet(labels.Set{"name": "bar"}), + fields.Everything(), + "1", + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() + + nodeBytes, _ := latest.Codec.Encode(node) + fakeClient.WatchResponse <- &etcd.Response{ + Action: "create", + Node: &etcd.Node{ + Value: string(nodeBytes), + }, + } + + select { + case <-watching.ResultChan(): + t.Error("unexpected result from result channel") + case <-time.After(time.Millisecond * 100): + // expected case + } +} diff --git a/pkg/registry/minion/registry.go b/pkg/registry/minion/registry.go index 94a36f748a8..f4438332da8 100644 --- a/pkg/registry/minion/registry.go +++ b/pkg/registry/minion/registry.go @@ -18,12 +18,13 @@ package minion import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) -// MinionRegistry is an interface for things that know how to store minions. +// Registry is an interface for things that know how to store node. type Registry interface { ListMinions(ctx api.Context) (*api.NodeList, error) CreateMinion(ctx api.Context, minion *api.Node) error @@ -32,3 +33,50 @@ type Registry interface { DeleteMinion(ctx api.Context, minionID string) error WatchMinions(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) } + +// storage puts strong typing around storage calls +type storage struct { + rest.StandardStorage +} + +// NewRegistry returns a new Registry interface for the given Storage. Any mismatched +// types will panic. +func NewRegistry(s rest.StandardStorage) Registry { + return &storage{s} +} + +func (s *storage) ListMinions(ctx api.Context) (*api.NodeList, error) { + obj, err := s.List(ctx, labels.Everything(), fields.Everything()) + if err != nil { + return nil, err + } + + return obj.(*api.NodeList), nil +} + +func (s *storage) CreateMinion(ctx api.Context, node *api.Node) error { + _, err := s.Create(ctx, node) + return err +} + +func (s *storage) UpdateMinion(ctx api.Context, node *api.Node) error { + _, _, err := s.Update(ctx, node) + return err +} + +func (s *storage) WatchMinions(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { + return s.Watch(ctx, label, field, resourceVersion) +} + +func (s *storage) GetMinion(ctx api.Context, name string) (*api.Node, error) { + obj, err := s.Get(ctx, name) + if err != nil { + return nil, err + } + return obj.(*api.Node), nil +} + +func (s *storage) DeleteMinion(ctx api.Context, name string) error { + _, err := s.Delete(ctx, name, nil) + return err +} diff --git a/pkg/registry/minion/rest.go b/pkg/registry/minion/rest.go index ff3dd791fcd..418ae7b00a7 100644 --- a/pkg/registry/minion/rest.go +++ b/pkg/registry/minion/rest.go @@ -17,7 +17,6 @@ limitations under the License. package minion import ( - "errors" "fmt" "net" "net/http" @@ -25,135 +24,79 @@ import ( "strconv" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/fielderrors" ) -// REST adapts minion into apiserver's RESTStorage model. -type REST struct { - registry Registry - connection client.ConnectionInfoGetter +// nodeStrategy implements behavior for nodes +type nodeStrategy struct { + runtime.ObjectTyper + api.NameGenerator } -// NewStorage returns a new rest.Storage implementation for minion. -func NewStorage(m Registry, connection client.ConnectionInfoGetter) *REST { - return &REST{ - registry: m, - connection: connection, - } +// Nodes is the default logic that applies when creating and updating Node +// objects. +var Strategy = nodeStrategy{api.Scheme, api.SimpleNameGenerator} + +// NamespaceScoped is false for nodes. +func (nodeStrategy) NamespaceScoped() bool { + return false } -var ErrDoesNotExist = errors.New("The requested resource does not exist.") - -// Create satisfies the RESTStorage interface. -func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { - minion, ok := obj.(*api.Node) - if !ok { - return nil, fmt.Errorf("not a minion: %#v", obj) - } - - if err := rest.BeforeCreate(rest.Nodes, ctx, obj); err != nil { - return nil, err - } - - if err := rs.registry.CreateMinion(ctx, minion); err != nil { - err = rest.CheckGeneratedNameError(rest.Nodes, err, minion) - return nil, err - } - return minion, nil +// AllowCreateOnUpdate is false for nodes. +func (nodeStrategy) AllowCreateOnUpdate() bool { + return false } -// Delete satisfies the RESTStorage interface. -func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) { - minion, err := rs.registry.GetMinion(ctx, id) - if minion == nil { - return nil, ErrDoesNotExist - } - if err != nil { - return nil, err - } - return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteMinion(ctx, id) +// ResetBeforeCreate clears fields that are not allowed to be set by end users on creation. +func (nodeStrategy) ResetBeforeCreate(obj runtime.Object) { + _ = obj.(*api.Node) + // Nodes allow *all* fields, including status, to be set. } -// Get satisfies the RESTStorage interface. -func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { - minion, err := rs.registry.GetMinion(ctx, id) - if err != nil { - return minion, err - } - if minion == nil { - return nil, ErrDoesNotExist - } - return minion, err +// Validate validates a new node. +func (nodeStrategy) Validate(obj runtime.Object) fielderrors.ValidationErrorList { + node := obj.(*api.Node) + return validation.ValidateMinion(node) } -// List satisfies the RESTStorage interface. -func (rs *REST) List(ctx api.Context, label labels.Selector, field fields.Selector) (runtime.Object, error) { - return rs.registry.ListMinions(ctx) +// ValidateUpdate is the default update validation for an end user. +func (nodeStrategy) ValidateUpdate(obj, old runtime.Object) fielderrors.ValidationErrorList { + return validation.ValidateMinionUpdate(old.(*api.Node), obj.(*api.Node)) } -func (rs *REST) New() runtime.Object { - return &api.Node{} +// ResourceGetter is an interface for retrieving resources by ResourceLocation. +type ResourceGetter interface { + Get(api.Context, string) (runtime.Object, error) } -func (*REST) NewList() runtime.Object { - return &api.NodeList{} +// MatchNode returns a generic matcher for a given label and field selector. +func MatchNode(label labels.Selector, field fields.Selector) generic.Matcher { + return generic.MatcherFunc(func(obj runtime.Object) (bool, error) { + nodeObj, ok := obj.(*api.Node) + if !ok { + return false, fmt.Errorf("not a node") + } + // TODO: Add support for filtering based on field, once NodeStatus is defined. + return label.Matches(labels.Set(nodeObj.Labels)), nil + }) } -// Update satisfies the RESTStorage interface. -func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) { - minion, ok := obj.(*api.Node) - if !ok { - return nil, false, fmt.Errorf("not a minion: %#v", obj) - } - // This is hacky, but minions don't really have a namespace, but kubectl currently automatically - // stuffs one in there. Fix it here temporarily until we fix kubectl - if minion.Namespace == api.NamespaceDefault { - minion.Namespace = api.NamespaceNone - } - // Clear out the self link, if specified, since it's not in the registry either. - minion.SelfLink = "" - - oldMinion, err := rs.registry.GetMinion(ctx, minion.Name) - if err != nil { - return nil, false, err - } - - if errs := validation.ValidateMinionUpdate(oldMinion, minion); len(errs) > 0 { - return nil, false, kerrors.NewInvalid("minion", minion.Name, errs) - } - - if err := rs.registry.UpdateMinion(ctx, minion); err != nil { - return nil, false, err - } - out, err := rs.registry.GetMinion(ctx, minion.Name) - return out, false, err -} - -// Watch returns Minions events via a watch.Interface. -// It implements rest.Watcher. -func (rs *REST) Watch(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - return rs.registry.WatchMinions(ctx, label, field, resourceVersion) -} - -// Implement Redirector. -var _ = rest.Redirector(&REST{}) - -// ResourceLocation returns a URL to which one can send traffic for the specified minion. -func (rs *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.RoundTripper, error) { - minion, err := rs.registry.GetMinion(ctx, id) +// ResourceLocation returns a URL to which one can send traffic for the specified node. +func ResourceLocation(getter ResourceGetter, connection client.ConnectionInfoGetter, ctx api.Context, id string) (*url.URL, http.RoundTripper, error) { + nodeObj, err := getter.Get(ctx, id) if err != nil { return nil, nil, err } - host := minion.Name + node := nodeObj.(*api.Node) + host := node.Name - scheme, port, transport, err := rs.connection.GetConnectionInfo(host) + scheme, port, transport, err := connection.GetConnectionInfo(host) if err != nil { return nil, nil, err } diff --git a/pkg/registry/minion/rest_test.go b/pkg/registry/minion/rest_test.go deleted file mode 100644 index 14b23a0a15a..00000000000 --- a/pkg/registry/minion/rest_test.go +++ /dev/null @@ -1,183 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package minion - -import ( - "net/http" - "testing" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest/resttest" - "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" - "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" -) - -type FakeConnectionInfoGetter struct { -} - -func (FakeConnectionInfoGetter) GetConnectionInfo(host string) (string, uint, http.RoundTripper, error) { - return "http", 12345, nil, nil -} - -func TestMinionRegistryREST(t *testing.T) { - ms := NewStorage(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}), FakeConnectionInfoGetter{}) - ctx := api.NewContext() - if obj, err := ms.Get(ctx, "foo"); err != nil || obj.(*api.Node).Name != "foo" { - t.Errorf("missing expected object") - } - if obj, err := ms.Get(ctx, "bar"); err != nil || obj.(*api.Node).Name != "bar" { - t.Errorf("missing expected object") - } - if _, err := ms.Get(ctx, "baz"); !errors.IsNotFound(err) { - t.Errorf("has unexpected error: %v", err) - } - - obj, err := ms.Create(ctx, &api.Node{ObjectMeta: api.ObjectMeta{Name: "baz"}}) - if err != nil { - t.Fatalf("insert failed: %v", err) - } - if !api.HasObjectMetaSystemFieldValues(&obj.(*api.Node).ObjectMeta) { - t.Errorf("storage did not populate object meta field values") - } - if m, ok := obj.(*api.Node); !ok || m.Name != "baz" { - t.Errorf("insert return value was weird: %#v", obj) - } - if obj, err := ms.Get(ctx, "baz"); err != nil || obj.(*api.Node).Name != "baz" { - t.Errorf("insert didn't actually insert") - } - - obj, err = ms.Delete(ctx, "bar") - if err != nil { - t.Fatalf("delete failed") - } - if s, ok := obj.(*api.Status); !ok || s.Status != api.StatusSuccess { - t.Errorf("delete return value was weird: %#v", obj) - } - if _, err := ms.Get(ctx, "bar"); !errors.IsNotFound(err) { - t.Errorf("delete didn't actually delete: %v", err) - } - - _, err = ms.Delete(ctx, "bar") - if err != ErrDoesNotExist { - t.Fatalf("delete returned wrong error") - } - - list, err := ms.List(ctx, labels.Everything(), fields.Everything()) - if err != nil { - t.Errorf("got error calling List") - } - expect := []api.Node{ - { - ObjectMeta: api.ObjectMeta{Name: "foo"}, - }, { - ObjectMeta: api.ObjectMeta{Name: "baz"}, - }, - } - nodeList := list.(*api.NodeList) - if len(expect) != len(nodeList.Items) || !contains(nodeList, "foo") || !contains(nodeList, "baz") { - t.Errorf("Unexpected list value: %#v", list) - } -} - -func TestMinionRegistryValidUpdate(t *testing.T) { - storage := NewStorage(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}), FakeConnectionInfoGetter{}) - ctx := api.NewContext() - obj, err := storage.Get(ctx, "foo") - if err != nil { - t.Errorf("Unexpected error: %v", err) - } - minion, ok := obj.(*api.Node) - if !ok { - t.Fatalf("Object is not a minion: %#v", obj) - } - minion.Labels = map[string]string{ - "foo": "bar", - "baz": "home", - } - if _, _, err = storage.Update(ctx, minion); err != nil { - t.Errorf("Unexpected error: %v", err) - } -} - -var ( - validSelector = map[string]string{"a": "b"} - invalidSelector = map[string]string{"NoUppercaseOrSpecialCharsLike=Equals": "b"} -) - -func TestMinionRegistryValidatesCreate(t *testing.T) { - storage := NewStorage(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}), FakeConnectionInfoGetter{}) - ctx := api.NewContext() - failureCases := map[string]api.Node{ - "zero-length Name": { - ObjectMeta: api.ObjectMeta{ - Name: "", - Labels: validSelector, - }, - Status: api.NodeStatus{ - Addresses: []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: "something"}, - }, - }, - }, - "invalid-labels": { - ObjectMeta: api.ObjectMeta{ - Name: "abc-123", - Labels: invalidSelector, - }, - }, - } - for _, failureCase := range failureCases { - c, err := storage.Create(ctx, &failureCase) - if c != nil { - t.Errorf("Expected nil object") - } - if !errors.IsInvalid(err) { - t.Errorf("Expected to get an invalid resource error, got %v", err) - } - } -} - -func contains(nodes *api.NodeList, nodeID string) bool { - for _, node := range nodes.Items { - if node.Name == nodeID { - return true - } - } - return false -} - -func TestCreate(t *testing.T) { - registry := registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}) - test := resttest.New(t, NewStorage(registry, FakeConnectionInfoGetter{}), registry.SetError).ClusterScope() - test.TestCreate( - // valid - &api.Node{ - Status: api.NodeStatus{ - Addresses: []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: "something"}, - }, - }, - }, - // invalid - &api.Node{ - ObjectMeta: api.ObjectMeta{ - Labels: invalidSelector, - }, - }) -}