diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index ceaf22c7e3c..a75e347e30c 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -64,13 +64,8 @@ type SimpleRESTStorage struct { updated *Simple created *Simple - // Valid if WatchAll or WatchSingle is called - fakeWatch *watch.FakeWatcher - - // Set if WatchSingle is called - requestedID string - - // Set if WatchAll is called + // These are set when Watch is called + fakeWatch *watch.FakeWatcher requestedLabelSelector labels.Selector requestedFieldSelector labels.Selector requestedResourceVersion uint64 @@ -135,22 +130,11 @@ func (storage *SimpleRESTStorage) Update(obj interface{}) (<-chan interface{}, e } // Implement ResourceWatcher. -func (storage *SimpleRESTStorage) WatchAll(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (storage *SimpleRESTStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { storage.requestedLabelSelector = label storage.requestedFieldSelector = field storage.requestedResourceVersion = resourceVersion - if err := storage.errors["watchAll"]; err != nil { - return nil, err - } - storage.fakeWatch = watch.NewFake() - return storage.fakeWatch, nil -} - -// Implement ResourceWatcher. -func (storage *SimpleRESTStorage) WatchSingle(id string, resourceVersion uint64) (watch.Interface, error) { - storage.requestedID = id - storage.requestedResourceVersion = resourceVersion - if err := storage.errors["watchSingle"]; err != nil { + if err := storage.errors["watch"]; err != nil { return nil, err } storage.fakeWatch = watch.NewFake() diff --git a/pkg/apiserver/interfaces.go b/pkg/apiserver/interfaces.go index 3260207a3ca..2b1aa23cab6 100644 --- a/pkg/apiserver/interfaces.go +++ b/pkg/apiserver/interfaces.go @@ -33,11 +33,13 @@ type RESTStorage interface { List(labels.Selector) (interface{}, error) // Get finds a resource in the storage by id and returns it. - // Although it can return an arbitrary error value, IsNotFound(err) is true for the returned error value err when the specified resource is not found. + // Although it can return an arbitrary error value, IsNotFound(err) is true for the + // returned error value err when the specified resource is not found. Get(id string) (interface{}, error) // Delete finds a resource in the storage and deletes it. - // Although it can return an arbitrary error value, IsNotFound(err) is true for the returned error value err when the specified resource is not found. + // Although it can return an arbitrary error value, IsNotFound(err) is true for the + // returned error value err when the specified resource is not found. Delete(id string) (<-chan interface{}, error) Create(interface{}) (<-chan interface{}, error) @@ -47,10 +49,9 @@ type RESTStorage interface { // ResourceWatcher should be implemented by all RESTStorage objects that // want to offer the ability to watch for changes through the watch api. type ResourceWatcher interface { - // label selects on labels; field selects on the objects fields. Not all fields - // are supported; an error will be returned if you try to select for a field that - // isn't supported. - WatchAll(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) - // TODO: Decide if we need to keep WatchSingle? - WatchSingle(id string, resourceVersion uint64) (watch.Interface, error) + // 'label' selects on labels; 'field' selects on the object's fields. Not all fields + // are supported; an error should be returned if 'field' tries to select on a field that + // isn't supported. 'resourceVersion' allows for continuing/starting a watch at a + // particular version. + Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) } diff --git a/pkg/apiserver/watch.go b/pkg/apiserver/watch.go index 0898aac4ef6..b2315b78563 100644 --- a/pkg/apiserver/watch.go +++ b/pkg/apiserver/watch.go @@ -33,8 +33,7 @@ type WatchHandler struct { storage map[string]RESTStorage } -func (s *APIServer) getWatchParams(query url.Values) (id string, label, field labels.Selector, resourceVersion uint64) { - id = query.Get("id") +func getWatchParams(query url.Values) (label, field labels.Selector, resourceVersion uint64) { if s, err := labels.ParseSelector(query.Get("labels")); err != nil { label = labels.Everything() } else { @@ -48,7 +47,7 @@ func (s *APIServer) getWatchParams(query url.Values) (id string, label, field la if rv, err := strconv.ParseUint(query.Get("resourceVersion"), 10, 64); err == nil { resourceVersion = rv } - return id, label, field, resourceVersion + return label, field, resourceVersion } // handleWatch processes a watch request @@ -62,14 +61,8 @@ func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { notFound(w, req) } if watcher, ok := storage.(ResourceWatcher); ok { - var watching watch.Interface - var err error - id, label, field, resourceVersion := s.getWatchParams(req.URL.Query()) - if id != "" { - watching, err = watcher.WatchSingle(id, resourceVersion) - } else { - watching, err = watcher.WatchAll(label, field, resourceVersion) - } + label, field, resourceVersion := getWatchParams(req.URL.Query()) + watching, err := watcher.Watch(label, field, resourceVersion) if err != nil { internalError(err, w) return diff --git a/pkg/apiserver/watch_test.go b/pkg/apiserver/watch_test.go index be9af611a82..693026f8edf 100644 --- a/pkg/apiserver/watch_test.go +++ b/pkg/apiserver/watch_test.go @@ -40,6 +40,7 @@ var watchTestTable = []struct { func TestWatchWebsocket(t *testing.T) { simpleStorage := &SimpleRESTStorage{} + _ = ResourceWatcher(simpleStorage) // Give compile error if this doesn't work. handler := New(map[string]RESTStorage{ "foo": simpleStorage, }, codec, "/prefix/version") @@ -48,17 +49,13 @@ func TestWatchWebsocket(t *testing.T) { dest, _ := url.Parse(server.URL) dest.Scheme = "ws" // Required by websocket, though the server never sees it. dest.Path = "/prefix/version/watch/foo" - dest.RawQuery = "id=myID" + dest.RawQuery = "" ws, err := websocket.Dial(dest.String(), "", "http://localhost") if err != nil { t.Errorf("unexpected error: %v", err) } - if a, e := simpleStorage.requestedID, "myID"; a != e { - t.Fatalf("Expected %v, got %v", e, a) - } - try := func(action watch.EventType, object interface{}) { // Send simpleStorage.fakeWatch.Action(action, object) @@ -98,7 +95,7 @@ func TestWatchHTTP(t *testing.T) { dest, _ := url.Parse(server.URL) dest.Path = "/prefix/version/watch/foo" - dest.RawQuery = "id=myID" + dest.RawQuery = "" request, err := http.NewRequest("GET", dest.String(), nil) if err != nil { @@ -114,10 +111,6 @@ func TestWatchHTTP(t *testing.T) { t.Errorf("Unexpected response %#v", response) } - if a, e := simpleStorage.requestedID, "myID"; a != e { - t.Fatalf("Expected %v, got %v", e, a) - } - decoder := json.NewDecoder(response.Body) try := func(action watch.EventType, object interface{}) { @@ -164,26 +157,27 @@ func TestWatchParamParsing(t *testing.T) { resourceVersion uint64 labelSelector string fieldSelector string - id string }{ { - rawQuery: "id=myID&resourceVersion=1234", + rawQuery: "resourceVersion=1234", resourceVersion: 1234, labelSelector: "", fieldSelector: "", - id: "myID", }, { rawQuery: "resourceVersion=314159&fields=Host%3D&labels=name%3Dfoo", resourceVersion: 314159, labelSelector: "name=foo", fieldSelector: "Host=", - id: "", + }, { + rawQuery: "fields=ID%3dfoo&resourceVersion=1492", + resourceVersion: 1492, + labelSelector: "", + fieldSelector: "ID=foo", }, { rawQuery: "", resourceVersion: 0, labelSelector: "", fieldSelector: "", - id: "", }, } @@ -191,7 +185,6 @@ func TestWatchParamParsing(t *testing.T) { simpleStorage.requestedLabelSelector = nil simpleStorage.requestedFieldSelector = nil simpleStorage.requestedResourceVersion = 5 // Prove this is set in all cases - simpleStorage.requestedID = "" dest.RawQuery = item.rawQuery resp, err := http.Get(dest.String()) if err != nil { @@ -199,19 +192,14 @@ func TestWatchParamParsing(t *testing.T) { continue } resp.Body.Close() - if e, a := item.id, simpleStorage.requestedID; e != a { - t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a) - } if e, a := item.resourceVersion, simpleStorage.requestedResourceVersion; e != a { t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a) } - if simpleStorage.requestedID == "" { - if e, a := item.labelSelector, simpleStorage.requestedLabelSelector.String(); e != a { - t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a) - } - if e, a := item.fieldSelector, simpleStorage.requestedFieldSelector.String(); e != a { - t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a) - } + if e, a := item.labelSelector, simpleStorage.requestedLabelSelector.String(); e != a { + t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a) + } + if e, a := item.fieldSelector, simpleStorage.requestedFieldSelector.String(); e != a { + t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a) } } } diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index 64367b7f7ed..d5a9a0a487e 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -24,7 +24,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/golang/glog" ) @@ -37,9 +36,6 @@ type ReplicationManager struct { // To allow injection of syncReplicationController for testing. syncHandler func(controllerSpec api.ReplicationController) error - - // To allow injection of watch creation. - watchMaker func() (watch.Interface, error) } // PodControlInterface is an interface that knows how to add or delete pods diff --git a/pkg/registry/controllerstorage.go b/pkg/registry/controllerstorage.go index 7a8cdff5835..96f761d8f72 100644 --- a/pkg/registry/controllerstorage.go +++ b/pkg/registry/controllerstorage.go @@ -17,7 +17,6 @@ limitations under the License. package registry import ( - "errors" "fmt" "time" @@ -138,12 +137,6 @@ func (storage *ControllerRegistryStorage) waitForController(ctrl api.Replication // WatchAll returns ReplicationController events via a watch.Interface, implementing // apiserver.ResourceWatcher. -func (storage *ControllerRegistryStorage) WatchAll(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (storage *ControllerRegistryStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { return storage.registry.WatchControllers(label, field, resourceVersion) } - -// WatchSingle returns events for a single ReplicationController via a watch.Interface, -// implementing apiserver.ResourceWatcher. -func (storage *ControllerRegistryStorage) WatchSingle(id string, resourceVersion uint64) (watch.Interface, error) { - return nil, errors.New("unimplemented") -} diff --git a/pkg/tools/etcd_tools_test.go b/pkg/tools/etcd_tools_test.go index fbfc0df6fad..fec310c81fd 100644 --- a/pkg/tools/etcd_tools_test.go +++ b/pkg/tools/etcd_tools_test.go @@ -329,9 +329,9 @@ func TestWatchInterpretation_ListCreate(t *testing.T) { w := newEtcdWatcher(true, func(interface{}) bool { t.Errorf("unexpected filter call") return true - }, encoding) + }, codec) pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} - podBytes, _ := encoding.Encode(pod) + podBytes, _ := codec.Encode(pod) go w.sendResult(&etcd.Response{ Action: "create",