diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index bd6f57efb4d..58779203848 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -42,12 +42,15 @@ const ( ServicePath string = "/registry/services/specs" // ServiceEndpointPath is the path to service endpoints resources in etcd ServiceEndpointPath string = "/registry/services/endpoints" + // NodePath is the path to node resources in etcd + NodePath string = "/registry/minions" ) // TODO: Need to add a reconciler loop that makes sure that things in pods are reflected into // kubelet (and vice versa) -// Registry implements PodRegistry, ControllerRegistry, ServiceRegistry and MinionRegistry, backed by etcd. +// Registry implements BindingRegistry, ControllerRegistry, EndpointRegistry, +// MinionRegistry, PodRegistry and ServiceRegistry, backed by etcd. type Registry struct { tools.EtcdHelper boundPodFactory pod.BoundPodFactory @@ -570,31 +573,35 @@ func (r *Registry) WatchEndpoints(ctx api.Context, label, field labels.Selector, return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported") } -func makeMinionKey(minionID string) string { - return "/registry/minions/" + minionID +func makeNodeKey(nodeID string) string { + return NodePath + "/" + nodeID +} + +func makeNodeListKey() string { + return NodePath } func (r *Registry) ListMinions(ctx api.Context) (*api.NodeList, error) { minions := &api.NodeList{} - err := r.ExtractToList("/registry/minions", minions) + err := r.ExtractToList(makeNodeListKey(), minions) return minions, err } func (r *Registry) CreateMinion(ctx api.Context, minion *api.Node) error { // TODO: Add some validations. - err := r.CreateObj(makeMinionKey(minion.Name), minion, 0) + err := r.CreateObj(makeNodeKey(minion.Name), minion, 0) return etcderr.InterpretCreateError(err, "minion", minion.Name) } func (r *Registry) UpdateMinion(ctx api.Context, minion *api.Node) error { // TODO: Add some validations. - err := r.SetObj(makeMinionKey(minion.Name), minion) + err := r.SetObj(makeNodeKey(minion.Name), minion) return etcderr.InterpretUpdateError(err, "minion", minion.Name) } func (r *Registry) GetMinion(ctx api.Context, minionID string) (*api.Node, error) { var minion api.Node - key := makeMinionKey(minionID) + key := makeNodeKey(minionID) err := r.ExtractObj(key, &minion, false) if err != nil { return nil, etcderr.InterpretGetError(err, "minion", minion.Name) @@ -603,10 +610,27 @@ func (r *Registry) GetMinion(ctx api.Context, minionID string) (*api.Node, error } func (r *Registry) DeleteMinion(ctx api.Context, minionID string) error { - key := makeMinionKey(minionID) + key := makeNodeKey(minionID) err := r.Delete(key, true) if err != nil { return etcderr.InterpretDeleteError(err, "minion", minionID) } return nil } + +func (r *Registry) WatchMinions(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { + version, err := tools.ParseWatchResourceVersion(resourceVersion, "node") + if err != nil { + return nil, err + } + key := makeNodeListKey() + return r.WatchList(key, version, func(obj runtime.Object) bool { + minionObj, ok := obj.(*api.Node) + if !ok { + // Must be an error: return true to propagate to upper level. + return true + } + // TODO: Add support for filtering based on field, once NodeStatus is defined. + return label.Matches(labels.Set(minionObj.Labels)) + }) +} diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index d23c2186e79..6326b683df3 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -1681,6 +1681,112 @@ func TestEtcdDeleteMinion(t *testing.T) { } } +func TestEtcdWatchMinion(t *testing.T) { + ctx := api.NewDefaultContext() + fakeClient := tools.NewFakeEtcdClient(t) + registry := NewTestEtcdRegistry(fakeClient) + watching, err := registry.WatchMinions(ctx, + labels.Everything(), + labels.Everything(), + "1", + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() + + select { + case _, ok := <-watching.ResultChan(): + if !ok { + t.Errorf("watching channel should be open") + } + default: + } + fakeClient.WatchInjectError <- nil + if _, ok := <-watching.ResultChan(); ok { + t.Errorf("watching channel should be closed") + } + watching.Stop() +} + +func TestEtcdWatchMinionsMatch(t *testing.T) { + ctx := api.NewDefaultContext() + fakeClient := tools.NewFakeEtcdClient(t) + registry := NewTestEtcdRegistry(fakeClient) + watching, err := registry.WatchMinions(ctx, + labels.SelectorFromSet(labels.Set{"name": "foo"}), + labels.Everything(), + "1", + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() + + node := &api.Node{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Labels: map[string]string{ + "name": "foo", + }, + }, + } + nodeBytes, _ := latest.Codec.Encode(node) + fakeClient.WatchResponse <- &etcd.Response{ + Action: "create", + Node: &etcd.Node{ + Value: string(nodeBytes), + }, + } + select { + case _, ok := <-watching.ResultChan(): + if !ok { + t.Errorf("watching channel should be open") + } + case <-time.After(time.Millisecond * 100): + t.Error("unexpected timeout from result channel") + } + watching.Stop() +} + +func TestEtcdWatchMinionsNotMatch(t *testing.T) { + ctx := api.NewDefaultContext() + fakeClient := tools.NewFakeEtcdClient(t) + registry := NewTestEtcdRegistry(fakeClient) + watching, err := registry.WatchMinions(ctx, + labels.SelectorFromSet(labels.Set{"name": "foo"}), + labels.Everything(), + "1", + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() + + node := &api.Node{ + ObjectMeta: api.ObjectMeta{ + Name: "bar", + Labels: map[string]string{ + "name": "bar", + }, + }, + } + nodeBytes, _ := latest.Codec.Encode(node) + fakeClient.WatchResponse <- &etcd.Response{ + Action: "create", + Node: &etcd.Node{ + Value: string(nodeBytes), + }, + } + + select { + case <-watching.ResultChan(): + t.Error("unexpected result from result channel") + case <-time.After(time.Millisecond * 100): + // expected case + } +} + // TODO We need a test for the compare and swap behavior. This basically requires two things: // 1) Add a per-operation synchronization channel to the fake etcd client, such that any operation waits on that // channel, this will enable us to orchestrate the flow of etcd requests in the test. diff --git a/pkg/registry/minion/healthy_registry.go b/pkg/registry/minion/healthy_registry.go index a64bd9ac1c2..b7d87e5b4d8 100644 --- a/pkg/registry/minion/healthy_registry.go +++ b/pkg/registry/minion/healthy_registry.go @@ -20,6 +20,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/health" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/golang/glog" ) @@ -86,3 +88,7 @@ func (r *HealthyRegistry) ListMinions(ctx api.Context) (currentMinions *api.Node } return result, nil } + +func (r *HealthyRegistry) WatchMinions(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { + return r.delegate.WatchMinions(ctx, label, field, resourceVersion) +} diff --git a/pkg/registry/minion/registry.go b/pkg/registry/minion/registry.go index ceea50b6aef..c9ce4670d16 100644 --- a/pkg/registry/minion/registry.go +++ b/pkg/registry/minion/registry.go @@ -16,7 +16,11 @@ limitations under the License. package minion -import "github.com/GoogleCloudPlatform/kubernetes/pkg/api" +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) // MinionRegistry is an interface for things that know how to store minions. type Registry interface { @@ -25,4 +29,5 @@ type Registry interface { UpdateMinion(ctx api.Context, minion *api.Node) error GetMinion(ctx api.Context, minionID string) (*api.Node, error) DeleteMinion(ctx api.Context, minionID string) error + WatchMinions(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) } diff --git a/pkg/registry/minion/rest.go b/pkg/registry/minion/rest.go index 3a2e52901f6..2ae642c4dba 100644 --- a/pkg/registry/minion/rest.go +++ b/pkg/registry/minion/rest.go @@ -29,14 +29,15 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) -// REST implements the RESTStorage interface, backed by a MinionRegistry. +// REST adapts minion into apiserver's RESTStorage model. type REST struct { registry Registry } -// NewREST returns a new REST. +// NewREST returns a new apiserver.RESTStorage implementation for minion. func NewREST(m Registry) *REST { return &REST{ registry: m, @@ -46,6 +47,7 @@ func NewREST(m Registry) *REST { var ErrDoesNotExist = errors.New("The requested resource does not exist.") var ErrNotHealty = errors.New("The requested minion is not healthy.") +// Create satisfies the RESTStorage interface. func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { minion, ok := obj.(*api.Node) if !ok { @@ -67,6 +69,7 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE }), nil } +// Delete satisfies the RESTStorage interface. func (rs *REST) Delete(ctx api.Context, id string) (<-chan apiserver.RESTResult, error) { minion, err := rs.registry.GetMinion(ctx, id) if minion == nil { @@ -80,6 +83,7 @@ func (rs *REST) Delete(ctx api.Context, id string) (<-chan apiserver.RESTResult, }), nil } +// Get satisfies the RESTStorage interface. func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { minion, err := rs.registry.GetMinion(ctx, id) if minion == nil { @@ -88,6 +92,7 @@ func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { return minion, err } +// List satisfies the RESTStorage interface. func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) { return rs.registry.ListMinions(ctx) } @@ -96,6 +101,7 @@ func (rs *REST) New() runtime.Object { return &api.Node{} } +// Update satisfies the RESTStorage interface. func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { minion, ok := obj.(*api.Node) if !ok { @@ -121,8 +127,10 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE }), nil } -func (rs *REST) toApiMinion(name string) *api.Node { - return &api.Node{ObjectMeta: api.ObjectMeta{Name: name}} +// Watch returns Minions events via a watch.Interface. +// It implements apiserver.ResourceWatcher. +func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { + return rs.registry.WatchMinions(ctx, label, field, resourceVersion) } // ResourceLocation returns a URL to which one can send traffic for the specified minion. diff --git a/pkg/registry/minion/rest_test.go b/pkg/registry/minion/rest_test.go index 6f4d39e2867..16946e801bb 100644 --- a/pkg/registry/minion/rest_test.go +++ b/pkg/registry/minion/rest_test.go @@ -25,7 +25,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" ) -func TestMinionREST(t *testing.T) { +func TestMinionRegistryREST(t *testing.T) { ms := NewREST(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{})) ctx := api.NewContext() if obj, err := ms.Get(ctx, "foo"); err != nil || obj.(*api.Node).Name != "foo" { @@ -87,7 +87,7 @@ func TestMinionREST(t *testing.T) { } } -func TestMinionStorageWithHealthCheck(t *testing.T) { +func TestMinionRegistryHealthCheck(t *testing.T) { minionRegistry := registrytest.NewMinionRegistry([]string{}, api.NodeResources{}) minionHealthRegistry := HealthyRegistry{ delegate: minionRegistry, @@ -119,7 +119,7 @@ func contains(nodes *api.NodeList, nodeID string) bool { return false } -func TestMinionStorageInvalidUpdate(t *testing.T) { +func TestMinionRegistryInvalidUpdate(t *testing.T) { storage := NewREST(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{})) ctx := api.NewContext() obj, err := storage.Get(ctx, "foo") @@ -136,7 +136,7 @@ func TestMinionStorageInvalidUpdate(t *testing.T) { } } -func TestMinionStorageValidUpdate(t *testing.T) { +func TestMinionRegistryValidUpdate(t *testing.T) { storage := NewREST(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{})) ctx := api.NewContext() obj, err := storage.Get(ctx, "foo") @@ -156,7 +156,7 @@ func TestMinionStorageValidUpdate(t *testing.T) { } } -func TestMinionStorageValidatesCreate(t *testing.T) { +func TestMinionRegistryValidatesCreate(t *testing.T) { storage := NewREST(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{})) ctx := api.NewContext() validSelector := map[string]string{"a": "b"} diff --git a/pkg/registry/registrytest/minion.go b/pkg/registry/registrytest/minion.go index 7504826918d..c0d1b91c344 100644 --- a/pkg/registry/registrytest/minion.go +++ b/pkg/registry/registrytest/minion.go @@ -20,15 +20,20 @@ import ( "sync" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) +// MinionRegistry implements minion.Registry interface. type MinionRegistry struct { Err error Minion string Minions api.NodeList + sync.Mutex } +// MakeMinionList constructs api.MinionList from list of minion names and a NodeResource. func MakeMinionList(minions []string, nodeResources api.NodeResources) *api.NodeList { list := api.NodeList{ Items: make([]api.Node, len(minions)), @@ -95,3 +100,7 @@ func (r *MinionRegistry) DeleteMinion(ctx api.Context, minionID string) error { r.Minions.Items = newList return r.Err } + +func (r *MinionRegistry) WatchMinions(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { + return nil, r.Err +} diff --git a/pkg/registry/registrytest/pod.go b/pkg/registry/registrytest/pod.go index 25533dae67f..6d60d30f62b 100644 --- a/pkg/registry/registrytest/pod.go +++ b/pkg/registry/registrytest/pod.go @@ -65,7 +65,6 @@ func (r *PodRegistry) ListPods(ctx api.Context, selector labels.Selector) (*api. func (r *PodRegistry) WatchPods(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { return r.broadcaster.Watch(), nil - } func (r *PodRegistry) GetPod(ctx api.Context, podId string) (*api.Pod, error) {