From e40e5b53a7d1ce9b3637e7f0bba69a021e1c4d74 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 11 Aug 2014 12:58:19 -0700 Subject: [PATCH 1/4] Add Watch to /pods. --- pkg/registry/etcd/etcd.go | 13 +++++++++++++ pkg/registry/pod/registry.go | 3 +++ pkg/registry/pod/storage.go | 6 ++++++ pkg/registry/registrytest/pod.go | 6 ++++++ 4 files changed, 28 insertions(+) diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 31c4f669663..707668a1432 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -77,6 +77,19 @@ func (r *Registry) ListPods(selector labels.Selector) ([]api.Pod, error) { return filteredPods, nil } +// WatchPods begins watching for new, changed, or deleted pods. +func (r *Registry) WatchPods(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return r.WatchList("/registry/pods", resourceVersion, func(obj interface{}) bool { + pod := obj.(*api.Pod) + fields := labels.Set{ + "ID": pod.ID, + "CurrentState.Status": string(pod.CurrentState.Status), + "CurrentState.Host": pod.CurrentState.Host, + } + return label.Matches(labels.Set(pod.Labels)) && field.Matches(fields) + }) +} + // GetPod gets a specific pod specified by its ID. func (r *Registry) GetPod(podID string) (*api.Pod, error) { var pod api.Pod diff --git a/pkg/registry/pod/registry.go b/pkg/registry/pod/registry.go index 12440353ad5..8e027119991 100644 --- a/pkg/registry/pod/registry.go +++ b/pkg/registry/pod/registry.go @@ -19,12 +19,15 @@ package pod import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) // Registry is an interface implemented by things that know how to store Pod objects. type Registry interface { // ListPods obtains a list of pods that match selector. ListPods(selector labels.Selector) ([]api.Pod, error) + // Watch for new/changed/deleted pods + WatchPods(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) // Get a specific pod GetPod(podID string) (*api.Pod, error) // Create a pod based on a specification, schedule it onto a specific machine. diff --git a/pkg/registry/pod/storage.go b/pkg/registry/pod/storage.go index 1dc1ea01c0e..ea665874f53 100644 --- a/pkg/registry/pod/storage.go +++ b/pkg/registry/pod/storage.go @@ -29,6 +29,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "code.google.com/p/go-uuid/uuid" "github.com/golang/glog" @@ -122,6 +123,11 @@ func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) { return result, err } +// Watch begins watching for new, changed, or deleted pods. +func (rs *RegistryStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return rs.registry.WatchPods(label, field, resourceVersion) +} + func (rs RegistryStorage) New() interface{} { return &api.Pod{} } diff --git a/pkg/registry/registrytest/pod.go b/pkg/registry/registrytest/pod.go index b2803328a46..ffdd69667f3 100644 --- a/pkg/registry/registrytest/pod.go +++ b/pkg/registry/registrytest/pod.go @@ -17,10 +17,12 @@ limitations under the License. package registrytest import ( + "errors" "sync" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) type PodRegistry struct { @@ -51,6 +53,10 @@ func (r *PodRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) { return filtered, nil } +func (r *PodRegistry) WatchPods(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return nil, errors.New("unimplemented") +} + func (r *PodRegistry) GetPod(podId string) (*api.Pod, error) { r.Lock() defer r.Unlock() From ca55bfb29c0b0962f1bb3c30a849a1fa5ee9ccad Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Thu, 14 Aug 2014 12:12:59 -0700 Subject: [PATCH 2/4] Add mux type to watch package --- pkg/watch/mux.go | 138 ++++++++++++++++++++++++++++++++++++++++++ pkg/watch/mux_test.go | 90 +++++++++++++++++++++++++++ 2 files changed, 228 insertions(+) create mode 100644 pkg/watch/mux.go create mode 100644 pkg/watch/mux_test.go diff --git a/pkg/watch/mux.go b/pkg/watch/mux.go new file mode 100644 index 00000000000..6ec0cec8bb6 --- /dev/null +++ b/pkg/watch/mux.go @@ -0,0 +1,138 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "sync" +) + +// Mux distributes event notifications among any number of watchers. +type Mux struct { + lock sync.Mutex + + watchers map[int64]*muxWatcher + nextWatcher int64 + + incoming chan Event +} + +// NewMux creates a new Mux. queueLength is the maximum number of events to queue. +// When queueLength is 0, Action will block until any prior event has been +// completely distributed. It is guaranteed that events will be distibuted in the +// order in which they occurr, but the order in which a single event is distributed +// among all of the watchers is unspecified. +func NewMux(queueLength int) *Mux { + m := &Mux{ + watchers: map[int64]*muxWatcher{}, + incoming: make(chan Event, queueLength), + } + go m.loop() + return m +} + +// Watch adds a new watcher to the list and returns an Interface for it. +// Note: new watchers will only receive new events. They won't get an entire history +// of previous events. +func (m *Mux) Watch() Interface { + m.lock.Lock() + defer m.lock.Unlock() + id := m.nextWatcher + m.nextWatcher++ + w := &muxWatcher{ + result: make(chan Event), + id: id, + m: m, + } + m.watchers[id] = w + return w +} + +// stopWatching stops the given watcher and removes it from the list. +func (m *Mux) stopWatching(id int64) { + m.lock.Lock() + defer m.lock.Unlock() + w, ok := m.watchers[id] + if !ok { + // No need to do anything, it's already been removed from the list. + return + } + delete(m.watchers, id) + close(w.result) +} + +// closeAll disconnects all watchers (presumably in response to a Shutdown call). +func (m *Mux) closeAll() { + m.lock.Lock() + defer m.lock.Unlock() + for _, w := range m.watchers { + close(w.result) + } + // Delete everything from the map, since presence/absence in the map is used + // by stopWatching to avoid double-closing the channel. + m.watchers = map[int64]*muxWatcher{} +} + +// Action distributes the given event among all watchers. +func (m *Mux) Action(action EventType, obj interface{}) { + m.incoming <- Event{action, obj} +} + +// Shutdown disconnects all watchers (but any queued events will still be distributed). +// You must not call Action after calling Shutdown. +func (m *Mux) Shutdown() { + close(m.incoming) +} + +// loop recieves from m.incoming and distributes to all watchers. +func (m *Mux) loop() { + // Deliberately not catching crashes here. Yes, bring down the process if there's a + // bug in watch.Mux. + for { + event, ok := <-m.incoming + if !ok { + break + } + m.distribute(event) + } + m.closeAll() +} + +// distribute sends event to all watchers. Blocking. +func (m *Mux) distribute(event Event) { + m.lock.Lock() + defer m.lock.Unlock() + for _, w := range m.watchers { + w.result <- event + } +} + +// muxWatcher handles a single watcher of a mux +type muxWatcher struct { + result chan Event + id int64 + m *Mux +} + +// ResultChan returns a channel to use for waiting on events. +func (mw *muxWatcher) ResultChan() <-chan Event { + return mw.result +} + +// Stop stops watching and removes mw from its list. +func (mw *muxWatcher) Stop() { + mw.m.stopWatching(mw.id) +} diff --git a/pkg/watch/mux_test.go b/pkg/watch/mux_test.go new file mode 100644 index 00000000000..80e5c2865c8 --- /dev/null +++ b/pkg/watch/mux_test.go @@ -0,0 +1,90 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "reflect" + "sync" + "testing" +) + +func TestMux(t *testing.T) { + type myType struct { + ID string + Value string + } + table := []Event{ + {Added, myType{"foo", "hello world 1"}}, + {Added, myType{"bar", "hello world 2"}}, + {Modified, myType{"foo", "goodbye world 3"}}, + {Deleted, myType{"bar", "hello world 4"}}, + } + + // The mux we're testing + m := NewMux(0) + + // Add a bunch of watchers + const testWatchers = 2 + wg := sync.WaitGroup{} + wg.Add(testWatchers) + for i := 0; i < testWatchers; i++ { + // Verify that each watcher gets the events in the correct order + go func(watcher int, w Interface) { + tableLine := 0 + for { + event, ok := <-w.ResultChan() + if !ok { + break + } + if e, a := table[tableLine], event; !reflect.DeepEqual(e, a) { + t.Errorf("Watcher %v, line %v: Expected (%v, %#v), got (%v, %#v)", + watcher, tableLine, e.Type, e.Object, a.Type, a.Object) + } else { + t.Logf("Got (%v, %#v)", event.Type, event.Object) + } + tableLine++ + } + wg.Done() + }(i, m.Watch()) + } + + for i, item := range table { + t.Logf("Sending %v", i) + m.Action(item.Type, item.Object) + } + + m.Shutdown() + + wg.Wait() +} + +func TestMuxWatcherClose(t *testing.T) { + m := NewMux(0) + w := m.Watch() + w2 := m.Watch() + w.Stop() + m.Shutdown() + if _, open := <-w.ResultChan(); open { + t.Errorf("Stop didn't work?") + } + if _, open := <-w2.ResultChan(); open { + t.Errorf("Shutdown didn't work?") + } + // Extra stops don't hurt things + w.Stop() + w2.Stop() +} From d900134a60ccc16c427262dc12c2b1db7fac53c0 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Thu, 14 Aug 2014 13:23:07 -0700 Subject: [PATCH 3/4] Add filter to watch --- pkg/watch/filter.go | 68 +++++++++++++++++++++++++++++++++ pkg/watch/filter_test.go | 82 ++++++++++++++++++++++++++++++++++++++++ pkg/watch/mux.go | 5 ++- 3 files changed, 153 insertions(+), 2 deletions(-) create mode 100644 pkg/watch/filter.go create mode 100644 pkg/watch/filter_test.go diff --git a/pkg/watch/filter.go b/pkg/watch/filter.go new file mode 100644 index 00000000000..61955ec4614 --- /dev/null +++ b/pkg/watch/filter.go @@ -0,0 +1,68 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import () + +// FilterFunc should take an event, possibly modify it in some way, and return +// the modified event. If the event should be ignored, then return keep=false. +type FilterFunc func(in Event) (out Event, keep bool) + +// Filter passes all events through f before allowing them to pass on. +// Putting a filter on a watch, as an unavoidable side-effect due to the way +// go channels work, effectively causes the watch's event channel to have its +// queue length increased by one. +func Filter(w Interface, f FilterFunc) Interface { + fw := &filteredWatch{ + incoming: w, + result: make(chan Event), + f: f, + } + go fw.loop() + return fw +} + +type filteredWatch struct { + incoming Interface + result chan Event + f FilterFunc +} + +// ResultChan returns a channel which will receive filtered events. +func (fw *filteredWatch) ResultChan() <-chan Event { + return fw.result +} + +// Stop stops the upstream watch, which will eventually stop this watch. +func (fw *filteredWatch) Stop() { + fw.incoming.Stop() +} + +// loop waits for new values, filters them, and resends them. +func (fw *filteredWatch) loop() { + defer close(fw.result) + for { + event, ok := <-fw.incoming.ResultChan() + if !ok { + break + } + filtered, keep := fw.f(event) + if keep { + fw.result <- filtered + } + } +} diff --git a/pkg/watch/filter_test.go b/pkg/watch/filter_test.go new file mode 100644 index 00000000000..a6e59ca3ed9 --- /dev/null +++ b/pkg/watch/filter_test.go @@ -0,0 +1,82 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "reflect" + "testing" +) + +func TestFilter(t *testing.T) { + table := []Event{ + {Added, "foo"}, + {Added, "bar"}, + {Added, "baz"}, + {Added, "qux"}, + {Added, "zoo"}, + } + + source := NewFake() + filtered := Filter(source, func(e Event) (Event, bool) { + return e, e.Object.(string)[0] != 'b' + }) + + go func() { + for _, item := range table { + source.Action(item.Type, item.Object) + } + source.Stop() + }() + + var got []string + for { + event, ok := <-filtered.ResultChan() + if !ok { + break + } + got = append(got, event.Object.(string)) + } + + if e, a := []string{"foo", "qux", "zoo"}, got; !reflect.DeepEqual(e, a) { + t.Errorf("got %v, wanted %v", e, a) + } +} + +func TestFilterStop(t *testing.T) { + source := NewFake() + filtered := Filter(source, func(e Event) (Event, bool) { + return e, e.Object.(string)[0] != 'b' + }) + + go func() { + source.Add("foo") + filtered.Stop() + }() + + var got []string + for { + event, ok := <-filtered.ResultChan() + if !ok { + break + } + got = append(got, event.Object.(string)) + } + + if e, a := []string{"foo"}, got; !reflect.DeepEqual(e, a) { + t.Errorf("got %v, wanted %v", e, a) + } +} diff --git a/pkg/watch/mux.go b/pkg/watch/mux.go index 6ec0cec8bb6..4935e204ffe 100644 --- a/pkg/watch/mux.go +++ b/pkg/watch/mux.go @@ -20,7 +20,8 @@ import ( "sync" ) -// Mux distributes event notifications among any number of watchers. +// Mux distributes event notifications among any number of watchers. Every event +// is delivered to every watcher. type Mux struct { lock sync.Mutex @@ -33,7 +34,7 @@ type Mux struct { // NewMux creates a new Mux. queueLength is the maximum number of events to queue. // When queueLength is 0, Action will block until any prior event has been // completely distributed. It is guaranteed that events will be distibuted in the -// order in which they occurr, but the order in which a single event is distributed +// order in which they ocurr, but the order in which a single event is distributed // among all of the watchers is unspecified. func NewMux(queueLength int) *Mux { m := &Mux{ From 4b2867fd8a7fdb4cb065f46a227b527d7651c504 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Thu, 14 Aug 2014 14:14:31 -0700 Subject: [PATCH 4/4] Standardize watch usage in registry/storage objects. Fix up extremely confusing pod test object. --- pkg/registry/controller/registry.go | 3 +- pkg/registry/controller/storage.go | 12 ++- pkg/registry/etcd/etcd.go | 21 +---- pkg/registry/pod/registry.go | 2 +- pkg/registry/pod/storage.go | 14 ++- pkg/registry/pod/storage_test.go | 114 ++++++++++------------- pkg/registry/registrytest/controller.go | 3 +- pkg/registry/registrytest/pod.go | 20 ++-- pkg/registry/registrytest/pod_storage.go | 30 ------ 9 files changed, 96 insertions(+), 123 deletions(-) delete mode 100644 pkg/registry/registrytest/pod_storage.go diff --git a/pkg/registry/controller/registry.go b/pkg/registry/controller/registry.go index 0765b2188bf..f88f503b4ff 100644 --- a/pkg/registry/controller/registry.go +++ b/pkg/registry/controller/registry.go @@ -18,14 +18,13 @@ package controller import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) // Registry is an interface for things that know how to store ReplicationControllers. type Registry interface { ListControllers() ([]api.ReplicationController, error) - WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) + WatchControllers(resourceVersion uint64) (watch.Interface, error) GetController(controllerID string) (*api.ReplicationController, error) CreateController(controller api.ReplicationController) error UpdateController(controller api.ReplicationController) error diff --git a/pkg/registry/controller/storage.go b/pkg/registry/controller/storage.go index 5b1b7316f1c..6eb90f14b51 100644 --- a/pkg/registry/controller/storage.go +++ b/pkg/registry/controller/storage.go @@ -131,7 +131,17 @@ func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { // Watch returns ReplicationController events via a watch.Interface. // It implements apiserver.ResourceWatcher. func (rs *RegistryStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { - return rs.registry.WatchControllers(label, field, resourceVersion) + if !field.Empty() { + return nil, fmt.Errorf("no field selector implemented for controllers") + } + incoming, err := rs.registry.WatchControllers(resourceVersion) + if err != nil { + return nil, err + } + return watch.Filter(incoming, func(e watch.Event) (watch.Event, bool) { + repController := e.Object.(*api.ReplicationController) + return e, label.Matches(labels.Set(repController.Labels)) + }), nil } func (rs *RegistryStorage) waitForController(ctrl api.ReplicationController) (interface{}, error) { diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 707668a1432..bf6b70f9bc8 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -78,16 +78,8 @@ func (r *Registry) ListPods(selector labels.Selector) ([]api.Pod, error) { } // WatchPods begins watching for new, changed, or deleted pods. -func (r *Registry) WatchPods(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { - return r.WatchList("/registry/pods", resourceVersion, func(obj interface{}) bool { - pod := obj.(*api.Pod) - fields := labels.Set{ - "ID": pod.ID, - "CurrentState.Status": string(pod.CurrentState.Status), - "CurrentState.Host": pod.CurrentState.Host, - } - return label.Matches(labels.Set(pod.Labels)) && field.Matches(fields) - }) +func (r *Registry) WatchPods(resourceVersion uint64) (watch.Interface, error) { + return r.WatchList("/registry/pods", resourceVersion, tools.Everything) } // GetPod gets a specific pod specified by its ID. @@ -222,13 +214,8 @@ func (r *Registry) ListControllers() ([]api.ReplicationController, error) { } // WatchControllers begins watching for new, changed, or deleted controllers. -func (r *Registry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { - if !field.Empty() { - return nil, fmt.Errorf("no field selector implemented for controllers") - } - return r.WatchList("/registry/controllers", resourceVersion, func(obj interface{}) bool { - return label.Matches(labels.Set(obj.(*api.ReplicationController).Labels)) - }) +func (r *Registry) WatchControllers(resourceVersion uint64) (watch.Interface, error) { + return r.WatchList("/registry/controllers", resourceVersion, tools.Everything) } func makeControllerKey(id string) string { diff --git a/pkg/registry/pod/registry.go b/pkg/registry/pod/registry.go index 8e027119991..0a2f0822be7 100644 --- a/pkg/registry/pod/registry.go +++ b/pkg/registry/pod/registry.go @@ -27,7 +27,7 @@ type Registry interface { // ListPods obtains a list of pods that match selector. ListPods(selector labels.Selector) ([]api.Pod, error) // Watch for new/changed/deleted pods - WatchPods(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) + WatchPods(resourceVersion uint64) (watch.Interface, error) // Get a specific pod GetPod(podID string) (*api.Pod, error) // Create a pod based on a specification, schedule it onto a specific machine. diff --git a/pkg/registry/pod/storage.go b/pkg/registry/pod/storage.go index ea665874f53..85f64eea2fd 100644 --- a/pkg/registry/pod/storage.go +++ b/pkg/registry/pod/storage.go @@ -125,7 +125,19 @@ func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) { // Watch begins watching for new, changed, or deleted pods. func (rs *RegistryStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { - return rs.registry.WatchPods(label, field, resourceVersion) + source, err := rs.registry.WatchPods(resourceVersion) + if err != nil { + return nil, err + } + return watch.Filter(source, func(e watch.Event) (watch.Event, bool) { + pod := e.Object.(*api.Pod) + fields := labels.Set{ + "ID": pod.ID, + "DesiredState.Status": string(pod.CurrentState.Status), + "DesiredState.Host": pod.CurrentState.Host, + } + return e, label.Matches(labels.Set(pod.Labels)) && field.Matches(fields) + }), nil } func (rs RegistryStorage) New() interface{} { diff --git a/pkg/registry/pod/storage_test.go b/pkg/registry/pod/storage_test.go index d3e4d5cc6ce..dfa76274942 100644 --- a/pkg/registry/pod/storage_test.go +++ b/pkg/registry/pod/storage_test.go @@ -55,9 +55,8 @@ func expectPod(t *testing.T, ch <-chan interface{}) (*api.Pod, bool) { } func TestCreatePodRegistryError(t *testing.T) { - podRegistry := ®istrytest.PodRegistry{ - Err: fmt.Errorf("test error"), - } + podRegistry := registrytest.NewPodRegistry(nil) + podRegistry.Err = fmt.Errorf("test error") storage := RegistryStorage{ scheduler: ®istrytest.Scheduler{}, registry: podRegistry, @@ -96,12 +95,11 @@ func TestCreatePodSchedulerError(t *testing.T) { } func TestCreatePodSetsIds(t *testing.T) { - mockRegistry := ®istrytest.PodRegistryStorage{ - PodRegistry: registrytest.PodRegistry{Err: fmt.Errorf("test error")}, - } + podRegistry := registrytest.NewPodRegistry(nil) + podRegistry.Err = fmt.Errorf("test error") storage := RegistryStorage{ scheduler: ®istrytest.Scheduler{Machine: "test"}, - registry: mockRegistry, + registry: podRegistry, } desiredState := api.PodState{ Manifest: api.ContainerManifest{ @@ -113,26 +111,25 @@ func TestCreatePodSetsIds(t *testing.T) { if err != nil { t.Errorf("Expected %#v, Got %#v", nil, err) } - expectApiStatusError(t, ch, mockRegistry.Err.Error()) + expectApiStatusError(t, ch, podRegistry.Err.Error()) - if len(mockRegistry.PodRegistry.Pod.ID) == 0 { + if len(podRegistry.Pod.ID) == 0 { t.Errorf("Expected pod ID to be set, Got %#v", pod) } - if mockRegistry.PodRegistry.Pod.DesiredState.Manifest.ID != mockRegistry.PodRegistry.Pod.ID { + if podRegistry.Pod.DesiredState.Manifest.ID != podRegistry.Pod.ID { t.Errorf("Expected manifest ID to be equal to pod ID, Got %#v", pod) } } func TestListPodsError(t *testing.T) { - mockRegistry := registrytest.PodRegistry{ - Err: fmt.Errorf("test error"), - } + podRegistry := registrytest.NewPodRegistry(nil) + podRegistry.Err = fmt.Errorf("test error") storage := RegistryStorage{ - registry: &mockRegistry, + registry: podRegistry, } pods, err := storage.List(labels.Everything()) - if err != mockRegistry.Err { - t.Errorf("Expected %#v, Got %#v", mockRegistry.Err, err) + if err != podRegistry.Err { + t.Errorf("Expected %#v, Got %#v", podRegistry.Err, err) } if len(pods.(api.PodList).Items) != 0 { t.Errorf("Unexpected non-zero pod list: %#v", pods) @@ -140,9 +137,9 @@ func TestListPodsError(t *testing.T) { } func TestListEmptyPodList(t *testing.T) { - mockRegistry := registrytest.PodRegistry{} + podRegistry := registrytest.NewPodRegistry(nil) storage := RegistryStorage{ - registry: &mockRegistry, + registry: podRegistry, } pods, err := storage.List(labels.Everything()) if err != nil { @@ -155,22 +152,21 @@ func TestListEmptyPodList(t *testing.T) { } func TestListPodList(t *testing.T) { - mockRegistry := registrytest.PodRegistry{ - Pods: []api.Pod{ - { - JSONBase: api.JSONBase{ - ID: "foo", - }, + podRegistry := registrytest.NewPodRegistry(nil) + podRegistry.Pods = []api.Pod{ + { + JSONBase: api.JSONBase{ + ID: "foo", }, - { - JSONBase: api.JSONBase{ - ID: "bar", - }, + }, + { + JSONBase: api.JSONBase{ + ID: "bar", }, }, } storage := RegistryStorage{ - registry: &mockRegistry, + registry: podRegistry, } podsObj, err := storage.List(labels.Everything()) pods := podsObj.(api.PodList) @@ -190,9 +186,9 @@ func TestListPodList(t *testing.T) { } func TestPodDecode(t *testing.T) { - mockRegistry := registrytest.PodRegistry{} + podRegistry := registrytest.NewPodRegistry(nil) storage := RegistryStorage{ - registry: &mockRegistry, + registry: podRegistry, } expected := &api.Pod{ JSONBase: api.JSONBase{ @@ -215,13 +211,10 @@ func TestPodDecode(t *testing.T) { } func TestGetPod(t *testing.T) { - mockRegistry := registrytest.PodRegistry{ - Pod: &api.Pod{ - JSONBase: api.JSONBase{ID: "foo"}, - }, - } + podRegistry := registrytest.NewPodRegistry(nil) + podRegistry.Pod = &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} storage := RegistryStorage{ - registry: &mockRegistry, + registry: podRegistry, } obj, err := storage.Get("foo") pod := obj.(*api.Pod) @@ -229,20 +222,17 @@ func TestGetPod(t *testing.T) { t.Errorf("unexpected error: %v", err) } - if !reflect.DeepEqual(*mockRegistry.Pod, *pod) { - t.Errorf("Unexpected pod. Expected %#v, Got %#v", *mockRegistry.Pod, *pod) + if e, a := podRegistry.Pod, pod; !reflect.DeepEqual(e, a) { + t.Errorf("Unexpected pod. Expected %#v, Got %#v", e, a) } } func TestGetPodCloud(t *testing.T) { fakeCloud := &cloudprovider.FakeCloud{} - mockRegistry := registrytest.PodRegistry{ - Pod: &api.Pod{ - JSONBase: api.JSONBase{ID: "foo"}, - }, - } + podRegistry := registrytest.NewPodRegistry(nil) + podRegistry.Pod = &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} storage := RegistryStorage{ - registry: &mockRegistry, + registry: podRegistry, cloudProvider: fakeCloud, } obj, err := storage.Get("foo") @@ -251,8 +241,8 @@ func TestGetPodCloud(t *testing.T) { t.Errorf("unexpected error: %v", err) } - if !reflect.DeepEqual(*mockRegistry.Pod, *pod) { - t.Errorf("Unexpected pod. Expected %#v, Got %#v", *mockRegistry.Pod, *pod) + if e, a := podRegistry.Pod, pod; !reflect.DeepEqual(e, a) { + t.Errorf("Unexpected pod. Expected %#v, Got %#v", e, a) } if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "ip-address" { t.Errorf("Unexpected calls: %#v", fakeCloud.Calls) @@ -354,12 +344,11 @@ func TestMakePodStatus(t *testing.T) { } func TestPodStorageValidatesCreate(t *testing.T) { - mockRegistry := ®istrytest.PodRegistryStorage{ - PodRegistry: registrytest.PodRegistry{Err: fmt.Errorf("test error")}, - } + podRegistry := registrytest.NewPodRegistry(nil) + podRegistry.Err = fmt.Errorf("test error") storage := RegistryStorage{ scheduler: ®istrytest.Scheduler{Machine: "test"}, - registry: mockRegistry, + registry: podRegistry, } pod := &api.Pod{} c, err := storage.Create(pod) @@ -372,12 +361,11 @@ func TestPodStorageValidatesCreate(t *testing.T) { } func TestPodStorageValidatesUpdate(t *testing.T) { - mockRegistry := ®istrytest.PodRegistryStorage{ - PodRegistry: registrytest.PodRegistry{Err: fmt.Errorf("test error")}, - } + podRegistry := registrytest.NewPodRegistry(nil) + podRegistry.Err = fmt.Errorf("test error") storage := RegistryStorage{ scheduler: ®istrytest.Scheduler{Machine: "test"}, - registry: mockRegistry, + registry: podRegistry, } pod := &api.Pod{} c, err := storage.Update(pod) @@ -390,16 +378,15 @@ func TestPodStorageValidatesUpdate(t *testing.T) { } func TestCreatePod(t *testing.T) { - mockRegistry := registrytest.PodRegistry{ - Pod: &api.Pod{ - JSONBase: api.JSONBase{ID: "foo"}, - CurrentState: api.PodState{ - Host: "machine", - }, + podRegistry := registrytest.NewPodRegistry(nil) + podRegistry.Pod = &api.Pod{ + JSONBase: api.JSONBase{ID: "foo"}, + CurrentState: api.PodState{ + Host: "machine", }, } storage := RegistryStorage{ - registry: &mockRegistry, + registry: podRegistry, podPollPeriod: time.Millisecond * 100, scheduler: scheduler.MakeRoundRobinScheduler(), minionLister: minion.NewRegistry([]string{"machine"}), @@ -424,7 +411,8 @@ func TestCreatePod(t *testing.T) { case <-channel: t.Error("Unexpected read from async channel") } - mockRegistry.UpdatePod(api.Pod{ + // TODO: Is the below actually testing something? + podRegistry.UpdatePod(api.Pod{ JSONBase: api.JSONBase{ID: "foo"}, CurrentState: api.PodState{ Status: api.PodRunning, diff --git a/pkg/registry/registrytest/controller.go b/pkg/registry/registrytest/controller.go index 7855c611b54..be857dbaf4d 100644 --- a/pkg/registry/registrytest/controller.go +++ b/pkg/registry/registrytest/controller.go @@ -18,7 +18,6 @@ package registrytest import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) @@ -48,6 +47,6 @@ func (r *ControllerRegistry) DeleteController(ID string) error { return r.Err } -func (r *ControllerRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (r *ControllerRegistry) WatchControllers(resourceVersion uint64) (watch.Interface, error) { return nil, r.Err } diff --git a/pkg/registry/registrytest/pod.go b/pkg/registry/registrytest/pod.go index ffdd69667f3..f702e8743a8 100644 --- a/pkg/registry/registrytest/pod.go +++ b/pkg/registry/registrytest/pod.go @@ -17,7 +17,6 @@ limitations under the License. package registrytest import ( - "errors" "sync" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -26,15 +25,19 @@ import ( ) type PodRegistry struct { - Err error - Pod *api.Pod - Pods []api.Pod + Err error + Machine string + Pod *api.Pod + Pods []api.Pod sync.Mutex + + mux *watch.Mux } func NewPodRegistry(pods []api.Pod) *PodRegistry { return &PodRegistry{ Pods: pods, + mux: watch.NewMux(0), } } @@ -53,8 +56,8 @@ func (r *PodRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) { return filtered, nil } -func (r *PodRegistry) WatchPods(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { - return nil, errors.New("unimplemented") +func (r *PodRegistry) WatchPods(resourceVersion uint64) (watch.Interface, error) { + return r.mux.Watch(), nil } func (r *PodRegistry) GetPod(podId string) (*api.Pod, error) { @@ -66,6 +69,9 @@ func (r *PodRegistry) GetPod(podId string) (*api.Pod, error) { func (r *PodRegistry) CreatePod(machine string, pod api.Pod) error { r.Lock() defer r.Unlock() + r.Machine = machine + r.Pod = &pod + r.mux.Action(watch.Added, &pod) return r.Err } @@ -73,11 +79,13 @@ func (r *PodRegistry) UpdatePod(pod api.Pod) error { r.Lock() defer r.Unlock() r.Pod = &pod + r.mux.Action(watch.Modified, &pod) return r.Err } func (r *PodRegistry) DeletePod(podId string) error { r.Lock() defer r.Unlock() + r.mux.Action(watch.Deleted, r.Pod) return r.Err } diff --git a/pkg/registry/registrytest/pod_storage.go b/pkg/registry/registrytest/pod_storage.go deleted file mode 100644 index fb787d48242..00000000000 --- a/pkg/registry/registrytest/pod_storage.go +++ /dev/null @@ -1,30 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package registrytest - -import "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - -type PodRegistryStorage struct { - PodRegistry - machine string -} - -func (rs *PodRegistryStorage) CreatePod(machine string, pod api.Pod) error { - rs.PodRegistry.Pod = &pod - rs.machine = machine - return rs.PodRegistry.Err -}