diff --git a/pkg/registry/controller/registry.go b/pkg/registry/controller/registry.go index 0765b2188bf..f88f503b4ff 100644 --- a/pkg/registry/controller/registry.go +++ b/pkg/registry/controller/registry.go @@ -18,14 +18,13 @@ package controller import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) // Registry is an interface for things that know how to store ReplicationControllers. type Registry interface { ListControllers() ([]api.ReplicationController, error) - WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) + WatchControllers(resourceVersion uint64) (watch.Interface, error) GetController(controllerID string) (*api.ReplicationController, error) CreateController(controller api.ReplicationController) error UpdateController(controller api.ReplicationController) error diff --git a/pkg/registry/controller/storage.go b/pkg/registry/controller/storage.go index 5b1b7316f1c..6eb90f14b51 100644 --- a/pkg/registry/controller/storage.go +++ b/pkg/registry/controller/storage.go @@ -131,7 +131,17 @@ func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { // Watch returns ReplicationController events via a watch.Interface. // It implements apiserver.ResourceWatcher. func (rs *RegistryStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { - return rs.registry.WatchControllers(label, field, resourceVersion) + if !field.Empty() { + return nil, fmt.Errorf("no field selector implemented for controllers") + } + incoming, err := rs.registry.WatchControllers(resourceVersion) + if err != nil { + return nil, err + } + return watch.Filter(incoming, func(e watch.Event) (watch.Event, bool) { + repController := e.Object.(*api.ReplicationController) + return e, label.Matches(labels.Set(repController.Labels)) + }), nil } func (rs *RegistryStorage) waitForController(ctrl api.ReplicationController) (interface{}, error) { diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 707668a1432..bf6b70f9bc8 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -78,16 +78,8 @@ func (r *Registry) ListPods(selector labels.Selector) ([]api.Pod, error) { } // WatchPods begins watching for new, changed, or deleted pods. -func (r *Registry) WatchPods(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { - return r.WatchList("/registry/pods", resourceVersion, func(obj interface{}) bool { - pod := obj.(*api.Pod) - fields := labels.Set{ - "ID": pod.ID, - "CurrentState.Status": string(pod.CurrentState.Status), - "CurrentState.Host": pod.CurrentState.Host, - } - return label.Matches(labels.Set(pod.Labels)) && field.Matches(fields) - }) +func (r *Registry) WatchPods(resourceVersion uint64) (watch.Interface, error) { + return r.WatchList("/registry/pods", resourceVersion, tools.Everything) } // GetPod gets a specific pod specified by its ID. @@ -222,13 +214,8 @@ func (r *Registry) ListControllers() ([]api.ReplicationController, error) { } // WatchControllers begins watching for new, changed, or deleted controllers. -func (r *Registry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { - if !field.Empty() { - return nil, fmt.Errorf("no field selector implemented for controllers") - } - return r.WatchList("/registry/controllers", resourceVersion, func(obj interface{}) bool { - return label.Matches(labels.Set(obj.(*api.ReplicationController).Labels)) - }) +func (r *Registry) WatchControllers(resourceVersion uint64) (watch.Interface, error) { + return r.WatchList("/registry/controllers", resourceVersion, tools.Everything) } func makeControllerKey(id string) string { diff --git a/pkg/registry/pod/registry.go b/pkg/registry/pod/registry.go index 8e027119991..0a2f0822be7 100644 --- a/pkg/registry/pod/registry.go +++ b/pkg/registry/pod/registry.go @@ -27,7 +27,7 @@ type Registry interface { // ListPods obtains a list of pods that match selector. ListPods(selector labels.Selector) ([]api.Pod, error) // Watch for new/changed/deleted pods - WatchPods(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) + WatchPods(resourceVersion uint64) (watch.Interface, error) // Get a specific pod GetPod(podID string) (*api.Pod, error) // Create a pod based on a specification, schedule it onto a specific machine. diff --git a/pkg/registry/pod/storage.go b/pkg/registry/pod/storage.go index ea665874f53..85f64eea2fd 100644 --- a/pkg/registry/pod/storage.go +++ b/pkg/registry/pod/storage.go @@ -125,7 +125,19 @@ func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) { // Watch begins watching for new, changed, or deleted pods. func (rs *RegistryStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { - return rs.registry.WatchPods(label, field, resourceVersion) + source, err := rs.registry.WatchPods(resourceVersion) + if err != nil { + return nil, err + } + return watch.Filter(source, func(e watch.Event) (watch.Event, bool) { + pod := e.Object.(*api.Pod) + fields := labels.Set{ + "ID": pod.ID, + "DesiredState.Status": string(pod.CurrentState.Status), + "DesiredState.Host": pod.CurrentState.Host, + } + return e, label.Matches(labels.Set(pod.Labels)) && field.Matches(fields) + }), nil } func (rs RegistryStorage) New() interface{} { diff --git a/pkg/registry/pod/storage_test.go b/pkg/registry/pod/storage_test.go index d3e4d5cc6ce..dfa76274942 100644 --- a/pkg/registry/pod/storage_test.go +++ b/pkg/registry/pod/storage_test.go @@ -55,9 +55,8 @@ func expectPod(t *testing.T, ch <-chan interface{}) (*api.Pod, bool) { } func TestCreatePodRegistryError(t *testing.T) { - podRegistry := ®istrytest.PodRegistry{ - Err: fmt.Errorf("test error"), - } + podRegistry := registrytest.NewPodRegistry(nil) + podRegistry.Err = fmt.Errorf("test error") storage := RegistryStorage{ scheduler: ®istrytest.Scheduler{}, registry: podRegistry, @@ -96,12 +95,11 @@ func TestCreatePodSchedulerError(t *testing.T) { } func TestCreatePodSetsIds(t *testing.T) { - mockRegistry := ®istrytest.PodRegistryStorage{ - PodRegistry: registrytest.PodRegistry{Err: fmt.Errorf("test error")}, - } + podRegistry := registrytest.NewPodRegistry(nil) + podRegistry.Err = fmt.Errorf("test error") storage := RegistryStorage{ scheduler: ®istrytest.Scheduler{Machine: "test"}, - registry: mockRegistry, + registry: podRegistry, } desiredState := api.PodState{ Manifest: api.ContainerManifest{ @@ -113,26 +111,25 @@ func TestCreatePodSetsIds(t *testing.T) { if err != nil { t.Errorf("Expected %#v, Got %#v", nil, err) } - expectApiStatusError(t, ch, mockRegistry.Err.Error()) + expectApiStatusError(t, ch, podRegistry.Err.Error()) - if len(mockRegistry.PodRegistry.Pod.ID) == 0 { + if len(podRegistry.Pod.ID) == 0 { t.Errorf("Expected pod ID to be set, Got %#v", pod) } - if mockRegistry.PodRegistry.Pod.DesiredState.Manifest.ID != mockRegistry.PodRegistry.Pod.ID { + if podRegistry.Pod.DesiredState.Manifest.ID != podRegistry.Pod.ID { t.Errorf("Expected manifest ID to be equal to pod ID, Got %#v", pod) } } func TestListPodsError(t *testing.T) { - mockRegistry := registrytest.PodRegistry{ - Err: fmt.Errorf("test error"), - } + podRegistry := registrytest.NewPodRegistry(nil) + podRegistry.Err = fmt.Errorf("test error") storage := RegistryStorage{ - registry: &mockRegistry, + registry: podRegistry, } pods, err := storage.List(labels.Everything()) - if err != mockRegistry.Err { - t.Errorf("Expected %#v, Got %#v", mockRegistry.Err, err) + if err != podRegistry.Err { + t.Errorf("Expected %#v, Got %#v", podRegistry.Err, err) } if len(pods.(api.PodList).Items) != 0 { t.Errorf("Unexpected non-zero pod list: %#v", pods) @@ -140,9 +137,9 @@ func TestListPodsError(t *testing.T) { } func TestListEmptyPodList(t *testing.T) { - mockRegistry := registrytest.PodRegistry{} + podRegistry := registrytest.NewPodRegistry(nil) storage := RegistryStorage{ - registry: &mockRegistry, + registry: podRegistry, } pods, err := storage.List(labels.Everything()) if err != nil { @@ -155,22 +152,21 @@ func TestListEmptyPodList(t *testing.T) { } func TestListPodList(t *testing.T) { - mockRegistry := registrytest.PodRegistry{ - Pods: []api.Pod{ - { - JSONBase: api.JSONBase{ - ID: "foo", - }, + podRegistry := registrytest.NewPodRegistry(nil) + podRegistry.Pods = []api.Pod{ + { + JSONBase: api.JSONBase{ + ID: "foo", }, - { - JSONBase: api.JSONBase{ - ID: "bar", - }, + }, + { + JSONBase: api.JSONBase{ + ID: "bar", }, }, } storage := RegistryStorage{ - registry: &mockRegistry, + registry: podRegistry, } podsObj, err := storage.List(labels.Everything()) pods := podsObj.(api.PodList) @@ -190,9 +186,9 @@ func TestListPodList(t *testing.T) { } func TestPodDecode(t *testing.T) { - mockRegistry := registrytest.PodRegistry{} + podRegistry := registrytest.NewPodRegistry(nil) storage := RegistryStorage{ - registry: &mockRegistry, + registry: podRegistry, } expected := &api.Pod{ JSONBase: api.JSONBase{ @@ -215,13 +211,10 @@ func TestPodDecode(t *testing.T) { } func TestGetPod(t *testing.T) { - mockRegistry := registrytest.PodRegistry{ - Pod: &api.Pod{ - JSONBase: api.JSONBase{ID: "foo"}, - }, - } + podRegistry := registrytest.NewPodRegistry(nil) + podRegistry.Pod = &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} storage := RegistryStorage{ - registry: &mockRegistry, + registry: podRegistry, } obj, err := storage.Get("foo") pod := obj.(*api.Pod) @@ -229,20 +222,17 @@ func TestGetPod(t *testing.T) { t.Errorf("unexpected error: %v", err) } - if !reflect.DeepEqual(*mockRegistry.Pod, *pod) { - t.Errorf("Unexpected pod. Expected %#v, Got %#v", *mockRegistry.Pod, *pod) + if e, a := podRegistry.Pod, pod; !reflect.DeepEqual(e, a) { + t.Errorf("Unexpected pod. Expected %#v, Got %#v", e, a) } } func TestGetPodCloud(t *testing.T) { fakeCloud := &cloudprovider.FakeCloud{} - mockRegistry := registrytest.PodRegistry{ - Pod: &api.Pod{ - JSONBase: api.JSONBase{ID: "foo"}, - }, - } + podRegistry := registrytest.NewPodRegistry(nil) + podRegistry.Pod = &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} storage := RegistryStorage{ - registry: &mockRegistry, + registry: podRegistry, cloudProvider: fakeCloud, } obj, err := storage.Get("foo") @@ -251,8 +241,8 @@ func TestGetPodCloud(t *testing.T) { t.Errorf("unexpected error: %v", err) } - if !reflect.DeepEqual(*mockRegistry.Pod, *pod) { - t.Errorf("Unexpected pod. Expected %#v, Got %#v", *mockRegistry.Pod, *pod) + if e, a := podRegistry.Pod, pod; !reflect.DeepEqual(e, a) { + t.Errorf("Unexpected pod. Expected %#v, Got %#v", e, a) } if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "ip-address" { t.Errorf("Unexpected calls: %#v", fakeCloud.Calls) @@ -354,12 +344,11 @@ func TestMakePodStatus(t *testing.T) { } func TestPodStorageValidatesCreate(t *testing.T) { - mockRegistry := ®istrytest.PodRegistryStorage{ - PodRegistry: registrytest.PodRegistry{Err: fmt.Errorf("test error")}, - } + podRegistry := registrytest.NewPodRegistry(nil) + podRegistry.Err = fmt.Errorf("test error") storage := RegistryStorage{ scheduler: ®istrytest.Scheduler{Machine: "test"}, - registry: mockRegistry, + registry: podRegistry, } pod := &api.Pod{} c, err := storage.Create(pod) @@ -372,12 +361,11 @@ func TestPodStorageValidatesCreate(t *testing.T) { } func TestPodStorageValidatesUpdate(t *testing.T) { - mockRegistry := ®istrytest.PodRegistryStorage{ - PodRegistry: registrytest.PodRegistry{Err: fmt.Errorf("test error")}, - } + podRegistry := registrytest.NewPodRegistry(nil) + podRegistry.Err = fmt.Errorf("test error") storage := RegistryStorage{ scheduler: ®istrytest.Scheduler{Machine: "test"}, - registry: mockRegistry, + registry: podRegistry, } pod := &api.Pod{} c, err := storage.Update(pod) @@ -390,16 +378,15 @@ func TestPodStorageValidatesUpdate(t *testing.T) { } func TestCreatePod(t *testing.T) { - mockRegistry := registrytest.PodRegistry{ - Pod: &api.Pod{ - JSONBase: api.JSONBase{ID: "foo"}, - CurrentState: api.PodState{ - Host: "machine", - }, + podRegistry := registrytest.NewPodRegistry(nil) + podRegistry.Pod = &api.Pod{ + JSONBase: api.JSONBase{ID: "foo"}, + CurrentState: api.PodState{ + Host: "machine", }, } storage := RegistryStorage{ - registry: &mockRegistry, + registry: podRegistry, podPollPeriod: time.Millisecond * 100, scheduler: scheduler.MakeRoundRobinScheduler(), minionLister: minion.NewRegistry([]string{"machine"}), @@ -424,7 +411,8 @@ func TestCreatePod(t *testing.T) { case <-channel: t.Error("Unexpected read from async channel") } - mockRegistry.UpdatePod(api.Pod{ + // TODO: Is the below actually testing something? + podRegistry.UpdatePod(api.Pod{ JSONBase: api.JSONBase{ID: "foo"}, CurrentState: api.PodState{ Status: api.PodRunning, diff --git a/pkg/registry/registrytest/controller.go b/pkg/registry/registrytest/controller.go index 7855c611b54..be857dbaf4d 100644 --- a/pkg/registry/registrytest/controller.go +++ b/pkg/registry/registrytest/controller.go @@ -18,7 +18,6 @@ package registrytest import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) @@ -48,6 +47,6 @@ func (r *ControllerRegistry) DeleteController(ID string) error { return r.Err } -func (r *ControllerRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (r *ControllerRegistry) WatchControllers(resourceVersion uint64) (watch.Interface, error) { return nil, r.Err } diff --git a/pkg/registry/registrytest/pod.go b/pkg/registry/registrytest/pod.go index ffdd69667f3..f702e8743a8 100644 --- a/pkg/registry/registrytest/pod.go +++ b/pkg/registry/registrytest/pod.go @@ -17,7 +17,6 @@ limitations under the License. package registrytest import ( - "errors" "sync" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -26,15 +25,19 @@ import ( ) type PodRegistry struct { - Err error - Pod *api.Pod - Pods []api.Pod + Err error + Machine string + Pod *api.Pod + Pods []api.Pod sync.Mutex + + mux *watch.Mux } func NewPodRegistry(pods []api.Pod) *PodRegistry { return &PodRegistry{ Pods: pods, + mux: watch.NewMux(0), } } @@ -53,8 +56,8 @@ func (r *PodRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) { return filtered, nil } -func (r *PodRegistry) WatchPods(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { - return nil, errors.New("unimplemented") +func (r *PodRegistry) WatchPods(resourceVersion uint64) (watch.Interface, error) { + return r.mux.Watch(), nil } func (r *PodRegistry) GetPod(podId string) (*api.Pod, error) { @@ -66,6 +69,9 @@ func (r *PodRegistry) GetPod(podId string) (*api.Pod, error) { func (r *PodRegistry) CreatePod(machine string, pod api.Pod) error { r.Lock() defer r.Unlock() + r.Machine = machine + r.Pod = &pod + r.mux.Action(watch.Added, &pod) return r.Err } @@ -73,11 +79,13 @@ func (r *PodRegistry) UpdatePod(pod api.Pod) error { r.Lock() defer r.Unlock() r.Pod = &pod + r.mux.Action(watch.Modified, &pod) return r.Err } func (r *PodRegistry) DeletePod(podId string) error { r.Lock() defer r.Unlock() + r.mux.Action(watch.Deleted, r.Pod) return r.Err } diff --git a/pkg/registry/registrytest/pod_storage.go b/pkg/registry/registrytest/pod_storage.go deleted file mode 100644 index fb787d48242..00000000000 --- a/pkg/registry/registrytest/pod_storage.go +++ /dev/null @@ -1,30 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package registrytest - -import "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - -type PodRegistryStorage struct { - PodRegistry - machine string -} - -func (rs *PodRegistryStorage) CreatePod(machine string, pod api.Pod) error { - rs.PodRegistry.Pod = &pod - rs.machine = machine - return rs.PodRegistry.Err -}