diff --git a/pkg/registry/controller/registry.go b/pkg/registry/controller/registry.go index b432ecbc1cc..9943eb4f007 100644 --- a/pkg/registry/controller/registry.go +++ b/pkg/registry/controller/registry.go @@ -18,13 +18,14 @@ package controller import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) // Registry is an interface for things that know how to store ReplicationControllers. type Registry interface { ListControllers(ctx api.Context) (*api.ReplicationControllerList, error) - WatchControllers(ctx api.Context, resourceVersion string) (watch.Interface, error) + WatchControllers(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) GetController(ctx api.Context, controllerID string) (*api.ReplicationController, error) CreateController(ctx api.Context, controller *api.ReplicationController) error UpdateController(ctx api.Context, controller *api.ReplicationController) error diff --git a/pkg/registry/controller/rest.go b/pkg/registry/controller/rest.go index 4aba1803402..1ae3f3648b3 100644 --- a/pkg/registry/controller/rest.go +++ b/pkg/registry/controller/rest.go @@ -146,41 +146,7 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE // Watch returns ReplicationController events via a watch.Interface. // It implements apiserver.ResourceWatcher. func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { - if !field.Empty() { - return nil, fmt.Errorf("no field selector implemented for controllers") - } - incoming, err := rs.registry.WatchControllers(ctx, resourceVersion) - if err != nil { - return nil, err - } - // TODO(lavalamp): remove watch.Filter, which is broken. Implement consistent way of filtering. - // TODO(lavalamp): this watch method needs a test. - return watch.Filter(incoming, func(e watch.Event) (watch.Event, bool) { - controller, ok := e.Object.(*api.ReplicationController) - if !ok { - // must be an error event-- pass it on - return e, true - } - match := label.Matches(labels.Set(controller.Labels)) - if match { - rs.fillCurrentState(ctx, controller) - } - return e, match - }), nil -} - -func (rs *REST) waitForController(ctx api.Context, controller *api.ReplicationController) (runtime.Object, error) { - for { - pods, err := rs.podLister.ListPods(ctx, labels.Set(controller.Spec.Selector).AsSelector()) - if err != nil { - return controller, err - } - if len(pods.Items) == controller.Spec.Replicas { - break - } - time.Sleep(rs.pollPeriod) - } - return controller, nil + return rs.registry.WatchControllers(ctx, label, field, resourceVersion) } func (rs *REST) fillCurrentState(ctx api.Context, controller *api.ReplicationController) error { diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index faf8557118a..bb815ac905c 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -122,20 +122,20 @@ func (r *Registry) ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool } // WatchPods begins watching for new, changed, or deleted pods. -func (r *Registry) WatchPods(ctx api.Context, resourceVersion string, filter func(*api.Pod) bool) (watch.Interface, error) { +func (r *Registry) WatchPods(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { version, err := tools.ParseWatchResourceVersion(resourceVersion, "pod") if err != nil { return nil, err } key := makePodListKey(ctx) return r.WatchList(key, version, func(obj runtime.Object) bool { - switch t := obj.(type) { - case *api.Pod: - return filter(t) - default: - // Must be an error + podObj, ok := obj.(*api.Pod) + if !ok { + // Must be an error: return true to propagate to upper level. return true } + fields := pod.PodToSelectableFields(podObj) + return label.Matches(labels.Set(podObj.Labels)) && field.Matches(fields) }) } @@ -327,13 +327,28 @@ func (r *Registry) ListControllers(ctx api.Context) (*api.ReplicationControllerL } // WatchControllers begins watching for new, changed, or deleted controllers. -func (r *Registry) WatchControllers(ctx api.Context, resourceVersion string) (watch.Interface, error) { +func (r *Registry) WatchControllers(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { + if !field.Empty() { + return nil, fmt.Errorf("field selectors are not supported on replication controllers") + } version, err := tools.ParseWatchResourceVersion(resourceVersion, "replicationControllers") if err != nil { return nil, err } key := makeControllerListKey(ctx) - return r.WatchList(key, version, tools.Everything) + return r.WatchList(key, version, func(obj runtime.Object) bool { + controller, ok := obj.(*api.ReplicationController) + if !ok { + // Must be an error: return true to propagate to upper level. + return true + } + match := label.Matches(labels.Set(controller.Labels)) + if match { + pods, _ := r.ListPods(ctx, labels.Set(controller.Spec.Selector).AsSelector()) + controller.Status.Replicas = len(pods.Items) + } + return match + }) } // makeControllerListKey constructs etcd paths to controller directories enforcing namespace rules. diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index 18f52035502..fe45ad7bb44 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -21,6 +21,7 @@ import ( "strconv" "strings" "testing" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" @@ -705,6 +706,112 @@ func TestEtcdListPods(t *testing.T) { } } +func TestEtcdWatchPods(t *testing.T) { + ctx := api.NewDefaultContext() + fakeClient := tools.NewFakeEtcdClient(t) + registry := NewTestEtcdRegistry(fakeClient) + watching, err := registry.WatchPods(ctx, + labels.Everything(), + labels.Everything(), + "1", + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() + + select { + case _, ok := <-watching.ResultChan(): + if !ok { + t.Errorf("watching channel should be open") + } + default: + } + fakeClient.WatchInjectError <- nil + if _, ok := <-watching.ResultChan(); ok { + t.Errorf("watching channel should be closed") + } + watching.Stop() +} + +func TestEtcdWatchPodsMatch(t *testing.T) { + ctx := api.NewDefaultContext() + fakeClient := tools.NewFakeEtcdClient(t) + registry := NewTestEtcdRegistry(fakeClient) + watching, err := registry.WatchPods(ctx, + labels.SelectorFromSet(labels.Set{"name": "foo"}), + labels.Everything(), + "1", + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() + + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Labels: map[string]string{ + "name": "foo", + }, + }, + } + podBytes, _ := latest.Codec.Encode(pod) + fakeClient.WatchResponse <- &etcd.Response{ + Action: "create", + Node: &etcd.Node{ + Value: string(podBytes), + }, + } + select { + case _, ok := <-watching.ResultChan(): + if !ok { + t.Errorf("watching channel should be open") + } + case <-time.After(time.Millisecond * 100): + t.Error("unexpected timeout from result channel") + } + watching.Stop() +} + +func TestEtcdWatchPodsNotMatch(t *testing.T) { + ctx := api.NewDefaultContext() + fakeClient := tools.NewFakeEtcdClient(t) + registry := NewTestEtcdRegistry(fakeClient) + watching, err := registry.WatchPods(ctx, + labels.SelectorFromSet(labels.Set{"name": "foo"}), + labels.Everything(), + "1", + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() + + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "bar", + Labels: map[string]string{ + "name": "bar", + }, + }, + } + podBytes, _ := latest.Codec.Encode(pod) + fakeClient.WatchResponse <- &etcd.Response{ + Action: "create", + Node: &etcd.Node{ + Value: string(podBytes), + }, + } + + select { + case <-watching.ResultChan(): + t.Error("unexpected result from result channel") + case <-time.After(time.Millisecond * 100): + // expected case + } +} + func TestEtcdListControllersNotFound(t *testing.T) { fakeClient := tools.NewFakeEtcdClient(t) ctx := api.NewDefaultContext() @@ -934,6 +1041,114 @@ func TestEtcdUpdateController(t *testing.T) { } } +func TestEtcdWatchController(t *testing.T) { + ctx := api.NewDefaultContext() + fakeClient := tools.NewFakeEtcdClient(t) + registry := NewTestEtcdRegistry(fakeClient) + watching, err := registry.WatchControllers(ctx, + labels.Everything(), + labels.Everything(), + "1", + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() + + select { + case _, ok := <-watching.ResultChan(): + if !ok { + t.Errorf("watching channel should be open") + } + default: + } + fakeClient.WatchInjectError <- nil + if _, ok := <-watching.ResultChan(); ok { + t.Errorf("watching channel should be closed") + } + watching.Stop() +} + +func TestEtcdWatchControllersMatch(t *testing.T) { + ctx := api.NewDefaultContext() + fakeClient := tools.NewFakeEtcdClient(t) + fakeClient.ExpectNotFoundGet(makePodListKey(ctx)) + registry := NewTestEtcdRegistry(fakeClient) + watching, err := registry.WatchControllers(ctx, + labels.SelectorFromSet(labels.Set{"name": "foo"}), + labels.Everything(), + "1", + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() + + controller := &api.ReplicationController{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Labels: map[string]string{ + "name": "foo", + }, + }, + } + controllerBytes, _ := latest.Codec.Encode(controller) + fakeClient.WatchResponse <- &etcd.Response{ + Action: "create", + Node: &etcd.Node{ + Value: string(controllerBytes), + }, + } + select { + case _, ok := <-watching.ResultChan(): + if !ok { + t.Errorf("watching channel should be open") + } + case <-time.After(time.Millisecond * 100): + t.Error("unexpected timeout from result channel") + } + watching.Stop() +} + +func TestEtcdWatchControllersNotMatch(t *testing.T) { + ctx := api.NewDefaultContext() + fakeClient := tools.NewFakeEtcdClient(t) + fakeClient.ExpectNotFoundGet(makePodListKey(ctx)) + registry := NewTestEtcdRegistry(fakeClient) + watching, err := registry.WatchControllers(ctx, + labels.SelectorFromSet(labels.Set{"name": "foo"}), + labels.Everything(), + "1", + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() + + controller := &api.ReplicationController{ + ObjectMeta: api.ObjectMeta{ + Name: "bar", + Labels: map[string]string{ + "name": "bar", + }, + }, + } + controllerBytes, _ := latest.Codec.Encode(controller) + fakeClient.WatchResponse <- &etcd.Response{ + Action: "create", + Node: &etcd.Node{ + Value: string(controllerBytes), + }, + } + + select { + case <-watching.ResultChan(): + t.Error("unexpected result from result channel") + case <-time.After(time.Millisecond * 100): + // expected case + } +} + func TestEtcdListServices(t *testing.T) { ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) diff --git a/pkg/registry/pod/registry.go b/pkg/registry/pod/registry.go index ae630fed1bc..4fdaafdb5b4 100644 --- a/pkg/registry/pod/registry.go +++ b/pkg/registry/pod/registry.go @@ -29,7 +29,7 @@ type Registry interface { // ListPodsPredicate obtains a list of pods for which filter returns true. ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool) (*api.PodList, error) // Watch for new/changed/deleted pods - WatchPods(ctx api.Context, resourceVersion string, filter func(*api.Pod) bool) (watch.Interface, error) + WatchPods(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) // Get a specific pod GetPod(ctx api.Context, podID string) (*api.Pod, error) // Create a pod based on a specification. diff --git a/pkg/registry/pod/rest.go b/pkg/registry/pod/rest.go index c3c14b60067..a1b70ae2e54 100644 --- a/pkg/registry/pod/rest.go +++ b/pkg/registry/pod/rest.go @@ -137,7 +137,7 @@ func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { return pod, err } -func (rs *REST) podToSelectableFields(pod *api.Pod) labels.Set { +func PodToSelectableFields(pod *api.Pod) labels.Set { // TODO we are populating both Status and DesiredState because selectors are not aware of API versions // see https://github.com/GoogleCloudPlatform/kubernetes/pull/2503 @@ -158,7 +158,7 @@ func (rs *REST) podToSelectableFields(pod *api.Pod) labels.Set { // ListPods & WatchPods. func (rs *REST) filterFunc(label, field labels.Selector) func(*api.Pod) bool { return func(pod *api.Pod) bool { - fields := rs.podToSelectableFields(pod) + fields := PodToSelectableFields(pod) return label.Matches(labels.Set(pod.Labels)) && field.Matches(fields) } } @@ -184,7 +184,8 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj // Watch begins watching for new, changed, or deleted pods. func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { - return rs.registry.WatchPods(ctx, resourceVersion, rs.filterFunc(label, field)) + // TODO: Add pod status to watch command + return rs.registry.WatchPods(ctx, label, field, resourceVersion) } func (*REST) New() runtime.Object { diff --git a/pkg/registry/registrytest/controller.go b/pkg/registry/registrytest/controller.go index 041834b20f8..e0476574042 100644 --- a/pkg/registry/registrytest/controller.go +++ b/pkg/registry/registrytest/controller.go @@ -18,6 +18,7 @@ package registrytest import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) @@ -47,6 +48,6 @@ func (r *ControllerRegistry) DeleteController(ctx api.Context, ID string) error return r.Err } -func (r *ControllerRegistry) WatchControllers(ctx api.Context, resourceVersion string) (watch.Interface, error) { +func (r *ControllerRegistry) WatchControllers(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { return nil, r.Err } diff --git a/pkg/registry/registrytest/pod.go b/pkg/registry/registrytest/pod.go index 72046e1f0dc..25533dae67f 100644 --- a/pkg/registry/registrytest/pod.go +++ b/pkg/registry/registrytest/pod.go @@ -63,9 +63,9 @@ func (r *PodRegistry) ListPods(ctx api.Context, selector labels.Selector) (*api. }) } -func (r *PodRegistry) WatchPods(ctx api.Context, resourceVersion string, filter func(*api.Pod) bool) (watch.Interface, error) { - // TODO: wire filter down into the broadcaster; it needs access to current and previous state :( +func (r *PodRegistry) WatchPods(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { return r.broadcaster.Watch(), nil + } func (r *PodRegistry) GetPod(ctx api.Context, podId string) (*api.Pod, error) {