port minion registry to generic etcd

This commit is contained in:
Masahiro Sano 2015-03-13 23:49:38 +09:00
parent 2ad3af246a
commit c49af0b7cb
9 changed files with 538 additions and 602 deletions

View File

@ -79,31 +79,3 @@ func (svcStrategy) AllowCreateOnUpdate() bool {
func (svcStrategy) ValidateUpdate(obj, old runtime.Object) fielderrors.ValidationErrorList {
return validation.ValidateServiceUpdate(old.(*api.Service), obj.(*api.Service))
}
// nodeStrategy implements behavior for nodes
// TODO: move to a node specific package.
type nodeStrategy struct {
runtime.ObjectTyper
api.NameGenerator
}
// Nodes is the default logic that applies when creating and updating Node
// objects.
var Nodes RESTCreateStrategy = nodeStrategy{api.Scheme, api.SimpleNameGenerator}
// NamespaceScoped is false for nodes.
func (nodeStrategy) NamespaceScoped() bool {
return false
}
// ResetBeforeCreate clears fields that are not allowed to be set by end users on creation.
func (nodeStrategy) ResetBeforeCreate(obj runtime.Object) {
_ = obj.(*api.Node)
// Nodes allow *all* fields, including status, to be set.
}
// Validate validates a new node.
func (nodeStrategy) Validate(obj runtime.Object) fielderrors.ValidationErrorList {
node := obj.(*api.Node)
return validation.ValidateMinion(node)
}

View File

@ -49,6 +49,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/event"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/limitrange"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion"
nodeetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/namespace"
namespaceetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/namespace/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
@ -362,13 +363,12 @@ func (m *Master) init(c *Config) {
endpointsStorage := endpointsetcd.NewStorage(c.EtcdHelper)
m.endpointRegistry = endpoint.NewRegistry(endpointsStorage)
nodeStorage := nodeetcd.NewStorage(c.EtcdHelper, c.KubeletClient)
m.nodeRegistry = minion.NewRegistry(nodeStorage)
// TODO: split me up into distinct storage registries
registry := etcd.NewRegistry(c.EtcdHelper, podRegistry, m.endpointRegistry)
m.serviceRegistry = registry
m.nodeRegistry = registry
nodeStorage := minion.NewStorage(m.nodeRegistry, c.KubeletClient)
controllerStorage := controlleretcd.NewREST(c.EtcdHelper)

View File

@ -38,8 +38,6 @@ const (
ControllerPath string = "/registry/controllers"
// ServicePath is the path to service resources in etcd
ServicePath string = "/registry/services/specs"
// NodePath is the path to node resources in etcd
NodePath string = "/registry/minions"
)
// TODO: Need to add a reconciler loop that makes sure that things in pods are reflected into
@ -280,65 +278,3 @@ func (r *Registry) WatchServices(ctx api.Context, label labels.Selector, field f
}
return nil, fmt.Errorf("only the 'name' and default (everything) field selectors are supported")
}
func makeNodeKey(nodeID string) string {
return NodePath + "/" + nodeID
}
func makeNodeListKey() string {
return NodePath
}
func (r *Registry) ListMinions(ctx api.Context) (*api.NodeList, error) {
minions := &api.NodeList{}
err := r.ExtractToList(makeNodeListKey(), minions)
return minions, err
}
func (r *Registry) CreateMinion(ctx api.Context, minion *api.Node) error {
// TODO: Add some validations.
err := r.CreateObj(makeNodeKey(minion.Name), minion, nil, 0)
return etcderr.InterpretCreateError(err, "minion", minion.Name)
}
func (r *Registry) UpdateMinion(ctx api.Context, minion *api.Node) error {
// TODO: Add some validations.
err := r.SetObj(makeNodeKey(minion.Name), minion, nil, 0)
return etcderr.InterpretUpdateError(err, "minion", minion.Name)
}
func (r *Registry) GetMinion(ctx api.Context, minionID string) (*api.Node, error) {
var minion api.Node
key := makeNodeKey(minionID)
err := r.ExtractObj(key, &minion, false)
if err != nil {
return nil, etcderr.InterpretGetError(err, "minion", minionID)
}
return &minion, nil
}
func (r *Registry) DeleteMinion(ctx api.Context, minionID string) error {
key := makeNodeKey(minionID)
err := r.Delete(key, true)
if err != nil {
return etcderr.InterpretDeleteError(err, "minion", minionID)
}
return nil
}
func (r *Registry) WatchMinions(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) {
version, err := tools.ParseWatchResourceVersion(resourceVersion, "node")
if err != nil {
return nil, err
}
key := makeNodeListKey()
return r.WatchList(key, version, func(obj runtime.Object) bool {
minionObj, ok := obj.(*api.Node)
if !ok {
// Must be an error: return true to propagate to upper level.
return true
}
// TODO: Add support for filtering based on field, once NodeStatus is defined.
return label.Matches(labels.Set(minionObj.Labels))
})
}

View File

@ -708,225 +708,6 @@ func TestEtcdWatchEndpointsAcrossNamespaces(t *testing.T) {
watching.Stop()
}
func TestEtcdListMinions(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t)
key := "/registry/minions"
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Node{
ObjectMeta: api.ObjectMeta{Name: "foo"},
}),
},
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Node{
ObjectMeta: api.ObjectMeta{Name: "bar"},
}),
},
},
},
},
E: nil,
}
registry := NewTestEtcdRegistry(fakeClient)
minions, err := registry.ListMinions(ctx)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(minions.Items) != 2 || minions.Items[0].Name != "foo" || minions.Items[1].Name != "bar" {
t.Errorf("Unexpected minion list: %#v", minions)
}
}
func TestEtcdCreateMinion(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreateMinion(ctx, &api.Node{
ObjectMeta: api.ObjectMeta{Name: "foo"},
})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
resp, err := fakeClient.Get("/registry/minions/foo", false, false)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
var minion api.Node
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &minion)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if minion.Name != "foo" {
t.Errorf("Unexpected minion: %#v %s", minion, resp.Node.Value)
}
}
func TestEtcdGetMinion(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.Set("/registry/minions/foo", runtime.EncodeOrDie(latest.Codec, &api.Node{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0)
registry := NewTestEtcdRegistry(fakeClient)
minion, err := registry.GetMinion(ctx, "foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if minion.Name != "foo" {
t.Errorf("Unexpected minion: %#v", minion)
}
}
func TestEtcdGetMinionNotFound(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.Data["/registry/minions/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: tools.EtcdErrorNotFound,
}
registry := NewTestEtcdRegistry(fakeClient)
_, err := registry.GetMinion(ctx, "foo")
if !errors.IsNotFound(err) {
t.Errorf("Unexpected error returned: %#v", err)
}
}
func TestEtcdDeleteMinion(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
key := "/registry/minions/foo"
fakeClient.Set("/registry/minions/foo", runtime.EncodeOrDie(latest.Codec, &api.Node{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0)
err := registry.DeleteMinion(ctx, "foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(fakeClient.DeletedKeys) != 1 {
t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys)
}
if fakeClient.DeletedKeys[0] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
}
}
func TestEtcdWatchMinion(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
watching, err := registry.WatchMinions(ctx,
labels.Everything(),
fields.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
default:
}
fakeClient.WatchInjectError <- nil
if _, ok := <-watching.ResultChan(); ok {
t.Errorf("watching channel should be closed")
}
watching.Stop()
}
func TestEtcdWatchMinionsMatch(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
watching, err := registry.WatchMinions(ctx,
labels.SelectorFromSet(labels.Set{"name": "foo"}),
fields.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
node := &api.Node{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Labels: map[string]string{
"name": "foo",
},
},
}
nodeBytes, _ := latest.Codec.Encode(node)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(nodeBytes),
},
}
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
case <-time.After(time.Millisecond * 100):
t.Error("unexpected timeout from result channel")
}
watching.Stop()
}
func TestEtcdWatchMinionsNotMatch(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
watching, err := registry.WatchMinions(ctx,
labels.SelectorFromSet(labels.Set{"name": "foo"}),
fields.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
node := &api.Node{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Labels: map[string]string{
"name": "bar",
},
},
}
nodeBytes, _ := latest.Codec.Encode(node)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(nodeBytes),
},
}
select {
case <-watching.ResultChan():
t.Error("unexpected result from result channel")
case <-time.After(time.Millisecond * 100):
// expected case
}
}
// TODO We need a test for the compare and swap behavior. This basically requires two things:
// 1) Add a per-operation synchronization channel to the fake etcd client, such that any operation waits on that
// channel, this will enable us to orchestrate the flow of etcd requests in the test.

View File

@ -0,0 +1,75 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"net/http"
"net/url"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic"
etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
)
type REST struct {
*etcdgeneric.Etcd
connection client.ConnectionInfoGetter
}
// NewStorage returns a RESTStorage object that will work against nodes.
func NewStorage(h tools.EtcdHelper, connection client.ConnectionInfoGetter) *REST {
prefix := "/registry/minions"
store := &etcdgeneric.Etcd{
NewFunc: func() runtime.Object { return &api.Node{} },
NewListFunc: func() runtime.Object { return &api.NodeList{} },
KeyRootFunc: func(ctx api.Context) string {
return prefix
},
KeyFunc: func(ctx api.Context, name string) (string, error) {
return prefix + "/" + name, nil
},
ObjectNameFunc: func(obj runtime.Object) (string, error) {
return obj.(*api.Node).Name, nil
},
PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher {
return minion.MatchNode(label, field)
},
EndpointName: "minion",
CreateStrategy: minion.Strategy,
UpdateStrategy: minion.Strategy,
Helper: h,
}
return &REST{store, connection}
}
// Implement Redirector.
var _ = rest.Redirector(&REST{})
// ResourceLocation returns a URL to which one can send traffic for the specified minion.
func (r *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.RoundTripper, error) {
return minion.ResourceLocation(r, r.connection, ctx, id)
}

View File

@ -0,0 +1,364 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"net/http"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest/resttest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/coreos/go-etcd/etcd"
)
type fakeConnectionInfoGetter struct {
}
func (fakeConnectionInfoGetter) GetConnectionInfo(host string) (string, uint, http.RoundTripper, error) {
return "http", 12345, nil, nil
}
func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) {
fakeEtcdClient := tools.NewFakeEtcdClient(t)
fakeEtcdClient.TestIndex = true
helper := tools.NewEtcdHelper(fakeEtcdClient, latest.Codec)
return fakeEtcdClient, helper
}
func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) {
fakeEtcdClient, h := newHelper(t)
storage := NewStorage(h, fakeConnectionInfoGetter{})
return storage, fakeEtcdClient
}
func validNewNode() *api.Node {
return &api.Node{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Labels: map[string]string{
"name": "foo",
},
},
}
}
func validChangedNode() *api.Node {
node := validNewNode()
node.ResourceVersion = "1"
return node
}
func TestCreate(t *testing.T) {
storage, fakeEtcdClient := newStorage(t)
test := resttest.New(t, storage, fakeEtcdClient.SetError)
node := validNewNode()
node.ObjectMeta = api.ObjectMeta{}
test.TestCreate(
// valid
node,
// invalid
&api.Node{
ObjectMeta: api.ObjectMeta{Name: "_-a123-a_"},
},
)
}
func TestDelete(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeEtcdClient := newStorage(t)
test := resttest.New(t, storage, fakeEtcdClient.SetError)
node := validChangedNode()
key, _ := storage.KeyFunc(ctx, node.Name)
createFn := func() runtime.Object {
fakeEtcdClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(latest.Codec, node),
ModifiedIndex: 1,
},
},
}
return node
}
gracefulSetFn := func() bool {
if fakeEtcdClient.Data[key].R.Node == nil {
return false
}
return fakeEtcdClient.Data[key].R.Node.TTL == 30
}
test.TestDeleteNoGraceful(createFn, gracefulSetFn)
}
func TestEtcdListNodes(t *testing.T) {
ctx := api.NewContext()
storage, fakeClient := newStorage(t)
key := storage.KeyRootFunc(ctx)
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Node{
ObjectMeta: api.ObjectMeta{Name: "foo"},
}),
},
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Node{
ObjectMeta: api.ObjectMeta{Name: "bar"},
}),
},
},
},
},
E: nil,
}
nodesObj, err := storage.List(ctx, labels.Everything(), fields.Everything())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
nodes := nodesObj.(*api.NodeList)
if len(nodes.Items) != 2 || nodes.Items[0].Name != "foo" || nodes.Items[1].Name != "bar" {
t.Errorf("Unexpected nodes list: %#v", nodes)
}
}
func TestEtcdListNodesMatch(t *testing.T) {
ctx := api.NewContext()
storage, fakeClient := newStorage(t)
key := storage.KeyRootFunc(ctx)
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Node{
ObjectMeta: api.ObjectMeta{Name: "foo",
Labels: map[string]string{
"name": "foo",
},
},
}),
},
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Node{
ObjectMeta: api.ObjectMeta{Name: "bar",
Labels: map[string]string{
"name": "bar",
},
},
}),
},
},
},
},
E: nil,
}
label := labels.SelectorFromSet(labels.Set{"name": "bar"})
nodesObj, err := storage.List(ctx, label, fields.Everything())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
nodes := nodesObj.(*api.NodeList)
if len(nodes.Items) != 1 || nodes.Items[0].Name != "bar" {
t.Errorf("Unexpected nodes list: %#v", nodes)
}
}
func TestEtcdGetNode(t *testing.T) {
ctx := api.NewContext()
storage, fakeClient := newStorage(t)
node := validNewNode()
key, _ := storage.KeyFunc(ctx, node.Name)
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, node), 0)
nodeObj, err := storage.Get(ctx, node.Name)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
got := nodeObj.(*api.Node)
node.ObjectMeta.ResourceVersion = got.ObjectMeta.ResourceVersion
if e, a := node, got; !api.Semantic.DeepEqual(*e, *a) {
t.Errorf("Unexpected node: %#v, expected %#v", e, a)
}
}
func TestEtcdUpdateEndpoints(t *testing.T) {
ctx := api.NewContext()
storage, fakeClient := newStorage(t)
node := validChangedNode()
key, _ := storage.KeyFunc(ctx, node.Name)
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, validNewNode()), 0)
_, _, err := storage.Update(ctx, node)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
response, err := fakeClient.Get(key, false, false)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var nodeOut api.Node
err = latest.Codec.DecodeInto([]byte(response.Node.Value), &nodeOut)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
node.ObjectMeta.ResourceVersion = nodeOut.ObjectMeta.ResourceVersion
if !api.Semantic.DeepEqual(node, &nodeOut) {
t.Errorf("Unexpected node: %#v, expected %#v", &nodeOut, node)
}
}
func TestEtcdGetNodeNotFound(t *testing.T) {
ctx := api.NewContext()
storage, fakeClient := newStorage(t)
fakeClient.Data["/registry/minions/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: tools.EtcdErrorNotFound,
}
_, err := storage.Get(ctx, "foo")
if !errors.IsNotFound(err) {
t.Errorf("Unexpected error returned: %#v", err)
}
}
func TestEtcdDeleteNode(t *testing.T) {
ctx := api.NewContext()
storage, fakeClient := newStorage(t)
node := validNewNode()
key, _ := storage.KeyFunc(ctx, node.Name)
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, node), 0)
_, err := storage.Delete(ctx, node.Name, nil)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(fakeClient.DeletedKeys) != 1 {
t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys)
}
if fakeClient.DeletedKeys[0] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
}
}
func TestEtcdWatchNode(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)
watching, err := storage.Watch(ctx,
labels.Everything(),
fields.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
default:
}
fakeClient.WatchInjectError <- nil
if _, ok := <-watching.ResultChan(); ok {
t.Errorf("watching channel should be closed")
}
watching.Stop()
}
func TestEtcdWatchNodesMatch(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)
node := validNewNode()
watching, err := storage.Watch(ctx,
labels.SelectorFromSet(labels.Set{"name": node.Name}),
fields.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
nodeBytes, _ := latest.Codec.Encode(node)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(nodeBytes),
},
}
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
case <-time.After(time.Millisecond * 100):
t.Error("unexpected timeout from result channel")
}
watching.Stop()
}
func TestEtcdWatchNodesNotMatch(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)
node := validNewNode()
watching, err := storage.Watch(ctx,
labels.SelectorFromSet(labels.Set{"name": "bar"}),
fields.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
nodeBytes, _ := latest.Codec.Encode(node)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(nodeBytes),
},
}
select {
case <-watching.ResultChan():
t.Error("unexpected result from result channel")
case <-time.After(time.Millisecond * 100):
// expected case
}
}

View File

@ -18,12 +18,13 @@ package minion
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// MinionRegistry is an interface for things that know how to store minions.
// Registry is an interface for things that know how to store node.
type Registry interface {
ListMinions(ctx api.Context) (*api.NodeList, error)
CreateMinion(ctx api.Context, minion *api.Node) error
@ -32,3 +33,50 @@ type Registry interface {
DeleteMinion(ctx api.Context, minionID string) error
WatchMinions(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error)
}
// storage puts strong typing around storage calls
type storage struct {
rest.StandardStorage
}
// NewRegistry returns a new Registry interface for the given Storage. Any mismatched
// types will panic.
func NewRegistry(s rest.StandardStorage) Registry {
return &storage{s}
}
func (s *storage) ListMinions(ctx api.Context) (*api.NodeList, error) {
obj, err := s.List(ctx, labels.Everything(), fields.Everything())
if err != nil {
return nil, err
}
return obj.(*api.NodeList), nil
}
func (s *storage) CreateMinion(ctx api.Context, node *api.Node) error {
_, err := s.Create(ctx, node)
return err
}
func (s *storage) UpdateMinion(ctx api.Context, node *api.Node) error {
_, _, err := s.Update(ctx, node)
return err
}
func (s *storage) WatchMinions(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) {
return s.Watch(ctx, label, field, resourceVersion)
}
func (s *storage) GetMinion(ctx api.Context, name string) (*api.Node, error) {
obj, err := s.Get(ctx, name)
if err != nil {
return nil, err
}
return obj.(*api.Node), nil
}
func (s *storage) DeleteMinion(ctx api.Context, name string) error {
_, err := s.Delete(ctx, name, nil)
return err
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package minion
import (
"errors"
"fmt"
"net"
"net/http"
@ -25,135 +24,79 @@ import (
"strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/fielderrors"
)
// REST adapts minion into apiserver's RESTStorage model.
type REST struct {
registry Registry
connection client.ConnectionInfoGetter
// nodeStrategy implements behavior for nodes
type nodeStrategy struct {
runtime.ObjectTyper
api.NameGenerator
}
// NewStorage returns a new rest.Storage implementation for minion.
func NewStorage(m Registry, connection client.ConnectionInfoGetter) *REST {
return &REST{
registry: m,
connection: connection,
}
// Nodes is the default logic that applies when creating and updating Node
// objects.
var Strategy = nodeStrategy{api.Scheme, api.SimpleNameGenerator}
// NamespaceScoped is false for nodes.
func (nodeStrategy) NamespaceScoped() bool {
return false
}
var ErrDoesNotExist = errors.New("The requested resource does not exist.")
// Create satisfies the RESTStorage interface.
func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) {
minion, ok := obj.(*api.Node)
if !ok {
return nil, fmt.Errorf("not a minion: %#v", obj)
}
if err := rest.BeforeCreate(rest.Nodes, ctx, obj); err != nil {
return nil, err
}
if err := rs.registry.CreateMinion(ctx, minion); err != nil {
err = rest.CheckGeneratedNameError(rest.Nodes, err, minion)
return nil, err
}
return minion, nil
// AllowCreateOnUpdate is false for nodes.
func (nodeStrategy) AllowCreateOnUpdate() bool {
return false
}
// Delete satisfies the RESTStorage interface.
func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) {
minion, err := rs.registry.GetMinion(ctx, id)
if minion == nil {
return nil, ErrDoesNotExist
}
if err != nil {
return nil, err
}
return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteMinion(ctx, id)
// ResetBeforeCreate clears fields that are not allowed to be set by end users on creation.
func (nodeStrategy) ResetBeforeCreate(obj runtime.Object) {
_ = obj.(*api.Node)
// Nodes allow *all* fields, including status, to be set.
}
// Get satisfies the RESTStorage interface.
func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
minion, err := rs.registry.GetMinion(ctx, id)
if err != nil {
return minion, err
}
if minion == nil {
return nil, ErrDoesNotExist
}
return minion, err
// Validate validates a new node.
func (nodeStrategy) Validate(obj runtime.Object) fielderrors.ValidationErrorList {
node := obj.(*api.Node)
return validation.ValidateMinion(node)
}
// List satisfies the RESTStorage interface.
func (rs *REST) List(ctx api.Context, label labels.Selector, field fields.Selector) (runtime.Object, error) {
return rs.registry.ListMinions(ctx)
// ValidateUpdate is the default update validation for an end user.
func (nodeStrategy) ValidateUpdate(obj, old runtime.Object) fielderrors.ValidationErrorList {
return validation.ValidateMinionUpdate(old.(*api.Node), obj.(*api.Node))
}
func (rs *REST) New() runtime.Object {
return &api.Node{}
// ResourceGetter is an interface for retrieving resources by ResourceLocation.
type ResourceGetter interface {
Get(api.Context, string) (runtime.Object, error)
}
func (*REST) NewList() runtime.Object {
return &api.NodeList{}
// MatchNode returns a generic matcher for a given label and field selector.
func MatchNode(label labels.Selector, field fields.Selector) generic.Matcher {
return generic.MatcherFunc(func(obj runtime.Object) (bool, error) {
nodeObj, ok := obj.(*api.Node)
if !ok {
return false, fmt.Errorf("not a node")
}
// TODO: Add support for filtering based on field, once NodeStatus is defined.
return label.Matches(labels.Set(nodeObj.Labels)), nil
})
}
// Update satisfies the RESTStorage interface.
func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) {
minion, ok := obj.(*api.Node)
if !ok {
return nil, false, fmt.Errorf("not a minion: %#v", obj)
}
// This is hacky, but minions don't really have a namespace, but kubectl currently automatically
// stuffs one in there. Fix it here temporarily until we fix kubectl
if minion.Namespace == api.NamespaceDefault {
minion.Namespace = api.NamespaceNone
}
// Clear out the self link, if specified, since it's not in the registry either.
minion.SelfLink = ""
oldMinion, err := rs.registry.GetMinion(ctx, minion.Name)
if err != nil {
return nil, false, err
}
if errs := validation.ValidateMinionUpdate(oldMinion, minion); len(errs) > 0 {
return nil, false, kerrors.NewInvalid("minion", minion.Name, errs)
}
if err := rs.registry.UpdateMinion(ctx, minion); err != nil {
return nil, false, err
}
out, err := rs.registry.GetMinion(ctx, minion.Name)
return out, false, err
}
// Watch returns Minions events via a watch.Interface.
// It implements rest.Watcher.
func (rs *REST) Watch(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) {
return rs.registry.WatchMinions(ctx, label, field, resourceVersion)
}
// Implement Redirector.
var _ = rest.Redirector(&REST{})
// ResourceLocation returns a URL to which one can send traffic for the specified minion.
func (rs *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.RoundTripper, error) {
minion, err := rs.registry.GetMinion(ctx, id)
// ResourceLocation returns a URL to which one can send traffic for the specified node.
func ResourceLocation(getter ResourceGetter, connection client.ConnectionInfoGetter, ctx api.Context, id string) (*url.URL, http.RoundTripper, error) {
nodeObj, err := getter.Get(ctx, id)
if err != nil {
return nil, nil, err
}
host := minion.Name
node := nodeObj.(*api.Node)
host := node.Name
scheme, port, transport, err := rs.connection.GetConnectionInfo(host)
scheme, port, transport, err := connection.GetConnectionInfo(host)
if err != nil {
return nil, nil, err
}

View File

@ -1,183 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package minion
import (
"net/http"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest/resttest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
)
type FakeConnectionInfoGetter struct {
}
func (FakeConnectionInfoGetter) GetConnectionInfo(host string) (string, uint, http.RoundTripper, error) {
return "http", 12345, nil, nil
}
func TestMinionRegistryREST(t *testing.T) {
ms := NewStorage(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}), FakeConnectionInfoGetter{})
ctx := api.NewContext()
if obj, err := ms.Get(ctx, "foo"); err != nil || obj.(*api.Node).Name != "foo" {
t.Errorf("missing expected object")
}
if obj, err := ms.Get(ctx, "bar"); err != nil || obj.(*api.Node).Name != "bar" {
t.Errorf("missing expected object")
}
if _, err := ms.Get(ctx, "baz"); !errors.IsNotFound(err) {
t.Errorf("has unexpected error: %v", err)
}
obj, err := ms.Create(ctx, &api.Node{ObjectMeta: api.ObjectMeta{Name: "baz"}})
if err != nil {
t.Fatalf("insert failed: %v", err)
}
if !api.HasObjectMetaSystemFieldValues(&obj.(*api.Node).ObjectMeta) {
t.Errorf("storage did not populate object meta field values")
}
if m, ok := obj.(*api.Node); !ok || m.Name != "baz" {
t.Errorf("insert return value was weird: %#v", obj)
}
if obj, err := ms.Get(ctx, "baz"); err != nil || obj.(*api.Node).Name != "baz" {
t.Errorf("insert didn't actually insert")
}
obj, err = ms.Delete(ctx, "bar")
if err != nil {
t.Fatalf("delete failed")
}
if s, ok := obj.(*api.Status); !ok || s.Status != api.StatusSuccess {
t.Errorf("delete return value was weird: %#v", obj)
}
if _, err := ms.Get(ctx, "bar"); !errors.IsNotFound(err) {
t.Errorf("delete didn't actually delete: %v", err)
}
_, err = ms.Delete(ctx, "bar")
if err != ErrDoesNotExist {
t.Fatalf("delete returned wrong error")
}
list, err := ms.List(ctx, labels.Everything(), fields.Everything())
if err != nil {
t.Errorf("got error calling List")
}
expect := []api.Node{
{
ObjectMeta: api.ObjectMeta{Name: "foo"},
}, {
ObjectMeta: api.ObjectMeta{Name: "baz"},
},
}
nodeList := list.(*api.NodeList)
if len(expect) != len(nodeList.Items) || !contains(nodeList, "foo") || !contains(nodeList, "baz") {
t.Errorf("Unexpected list value: %#v", list)
}
}
func TestMinionRegistryValidUpdate(t *testing.T) {
storage := NewStorage(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}), FakeConnectionInfoGetter{})
ctx := api.NewContext()
obj, err := storage.Get(ctx, "foo")
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
minion, ok := obj.(*api.Node)
if !ok {
t.Fatalf("Object is not a minion: %#v", obj)
}
minion.Labels = map[string]string{
"foo": "bar",
"baz": "home",
}
if _, _, err = storage.Update(ctx, minion); err != nil {
t.Errorf("Unexpected error: %v", err)
}
}
var (
validSelector = map[string]string{"a": "b"}
invalidSelector = map[string]string{"NoUppercaseOrSpecialCharsLike=Equals": "b"}
)
func TestMinionRegistryValidatesCreate(t *testing.T) {
storage := NewStorage(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}), FakeConnectionInfoGetter{})
ctx := api.NewContext()
failureCases := map[string]api.Node{
"zero-length Name": {
ObjectMeta: api.ObjectMeta{
Name: "",
Labels: validSelector,
},
Status: api.NodeStatus{
Addresses: []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: "something"},
},
},
},
"invalid-labels": {
ObjectMeta: api.ObjectMeta{
Name: "abc-123",
Labels: invalidSelector,
},
},
}
for _, failureCase := range failureCases {
c, err := storage.Create(ctx, &failureCase)
if c != nil {
t.Errorf("Expected nil object")
}
if !errors.IsInvalid(err) {
t.Errorf("Expected to get an invalid resource error, got %v", err)
}
}
}
func contains(nodes *api.NodeList, nodeID string) bool {
for _, node := range nodes.Items {
if node.Name == nodeID {
return true
}
}
return false
}
func TestCreate(t *testing.T) {
registry := registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{})
test := resttest.New(t, NewStorage(registry, FakeConnectionInfoGetter{}), registry.SetError).ClusterScope()
test.TestCreate(
// valid
&api.Node{
Status: api.NodeStatus{
Addresses: []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: "something"},
},
},
},
// invalid
&api.Node{
ObjectMeta: api.ObjectMeta{
Labels: invalidSelector,
},
})
}