diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index eaf109f515f..ceaf22c7e3c 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -70,6 +70,11 @@ type SimpleRESTStorage struct { // Set if WatchSingle is called requestedID string + // Set if WatchAll is called + requestedLabelSelector labels.Selector + requestedFieldSelector labels.Selector + requestedResourceVersion uint64 + // If non-nil, called inside the WorkFunc when answering update, delete, create. // obj receives the original input to the update, delete, or create call. injectedFunction func(obj interface{}) (returnObj interface{}, err error) @@ -95,7 +100,7 @@ func (storage *SimpleRESTStorage) Delete(id string) (<-chan interface{}, error) if storage.injectedFunction != nil { return storage.injectedFunction(id) } - return api.Status{Status: api.StatusSuccess}, nil + return &api.Status{Status: api.StatusSuccess}, nil }), nil } @@ -130,7 +135,10 @@ func (storage *SimpleRESTStorage) Update(obj interface{}) (<-chan interface{}, e } // Implement ResourceWatcher. -func (storage *SimpleRESTStorage) WatchAll() (watch.Interface, error) { +func (storage *SimpleRESTStorage) WatchAll(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + storage.requestedLabelSelector = label + storage.requestedFieldSelector = field + storage.requestedResourceVersion = resourceVersion if err := storage.errors["watchAll"]; err != nil { return nil, err } @@ -139,8 +147,9 @@ func (storage *SimpleRESTStorage) WatchAll() (watch.Interface, error) { } // Implement ResourceWatcher. -func (storage *SimpleRESTStorage) WatchSingle(id string) (watch.Interface, error) { +func (storage *SimpleRESTStorage) WatchSingle(id string, resourceVersion uint64) (watch.Interface, error) { storage.requestedID = id + storage.requestedResourceVersion = resourceVersion if err := storage.errors["watchSingle"]; err != nil { return nil, err } @@ -164,17 +173,17 @@ func TestNotFound(t *testing.T) { Path string } cases := map[string]T{ - "PATCH method": T{"PATCH", "/prefix/version/foo"}, - "GET long prefix": T{"GET", "/prefix/"}, - "GET missing storage": T{"GET", "/prefix/version/blah"}, - "GET with extra segment": T{"GET", "/prefix/version/foo/bar/baz"}, - "POST with extra segment": T{"POST", "/prefix/version/foo/bar"}, - "DELETE without extra segment": T{"DELETE", "/prefix/version/foo"}, - "DELETE with extra segment": T{"DELETE", "/prefix/version/foo/bar/baz"}, - "PUT without extra segment": T{"PUT", "/prefix/version/foo"}, - "PUT with extra segment": T{"PUT", "/prefix/version/foo/bar/baz"}, - "watch missing storage": T{"GET", "/prefix/version/watch/"}, - "watch with bad method": T{"POST", "/prefix/version/watch/foo/bar"}, + "PATCH method": {"PATCH", "/prefix/version/foo"}, + "GET long prefix": {"GET", "/prefix/"}, + "GET missing storage": {"GET", "/prefix/version/blah"}, + "GET with extra segment": {"GET", "/prefix/version/foo/bar/baz"}, + "POST with extra segment": {"POST", "/prefix/version/foo/bar"}, + "DELETE without extra segment": {"DELETE", "/prefix/version/foo"}, + "DELETE with extra segment": {"DELETE", "/prefix/version/foo/bar/baz"}, + "PUT without extra segment": {"PUT", "/prefix/version/foo"}, + "PUT with extra segment": {"PUT", "/prefix/version/foo/bar/baz"}, + "watch missing storage": {"GET", "/prefix/version/watch/"}, + "watch with bad method": {"POST", "/prefix/version/watch/foo/bar"}, } handler := New(map[string]RESTStorage{ "foo": &SimpleRESTStorage{}, diff --git a/pkg/apiserver/interfaces.go b/pkg/apiserver/interfaces.go index 0b47419fecc..3260207a3ca 100644 --- a/pkg/apiserver/interfaces.go +++ b/pkg/apiserver/interfaces.go @@ -29,6 +29,7 @@ type RESTStorage interface { New() interface{} // List selects resources in the storage which match to the selector. + // TODO: add field selector in addition to label selector. List(labels.Selector) (interface{}, error) // Get finds a resource in the storage by id and returns it. @@ -46,7 +47,10 @@ type RESTStorage interface { // ResourceWatcher should be implemented by all RESTStorage objects that // want to offer the ability to watch for changes through the watch api. type ResourceWatcher interface { - // TODO: take a query, like List, to filter out unwanted events. - WatchAll() (watch.Interface, error) - WatchSingle(id string) (watch.Interface, error) + // label selects on labels; field selects on the objects fields. Not all fields + // are supported; an error will be returned if you try to select for a field that + // isn't supported. + WatchAll(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) + // TODO: Decide if we need to keep WatchSingle? + WatchSingle(id string, resourceVersion uint64) (watch.Interface, error) } diff --git a/pkg/apiserver/watch.go b/pkg/apiserver/watch.go index 5b8480cd168..0898aac4ef6 100644 --- a/pkg/apiserver/watch.go +++ b/pkg/apiserver/watch.go @@ -19,10 +19,13 @@ package apiserver import ( "encoding/json" "net/http" + "net/url" + "strconv" "code.google.com/p/go.net/websocket" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) @@ -30,6 +33,24 @@ type WatchHandler struct { storage map[string]RESTStorage } +func (s *APIServer) getWatchParams(query url.Values) (id string, label, field labels.Selector, resourceVersion uint64) { + id = query.Get("id") + if s, err := labels.ParseSelector(query.Get("labels")); err != nil { + label = labels.Everything() + } else { + label = s + } + if s, err := labels.ParseSelector(query.Get("fields")); err != nil { + field = labels.Everything() + } else { + field = s + } + if rv, err := strconv.ParseUint(query.Get("resourceVersion"), 10, 64); err == nil { + resourceVersion = rv + } + return id, label, field, resourceVersion +} + // handleWatch processes a watch request func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { parts := splitPath(req.URL.Path) @@ -43,10 +64,11 @@ func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { if watcher, ok := storage.(ResourceWatcher); ok { var watching watch.Interface var err error - if id := req.URL.Query().Get("id"); id != "" { - watching, err = watcher.WatchSingle(id) + id, label, field, resourceVersion := s.getWatchParams(req.URL.Query()) + if id != "" { + watching, err = watcher.WatchSingle(id, resourceVersion) } else { - watching, err = watcher.WatchAll() + watching, err = watcher.WatchAll(label, field, resourceVersion) } if err != nil { internalError(err, w) diff --git a/pkg/apiserver/watch_test.go b/pkg/apiserver/watch_test.go index f737b209630..be9af611a82 100644 --- a/pkg/apiserver/watch_test.go +++ b/pkg/apiserver/watch_test.go @@ -148,3 +148,70 @@ func TestWatchHTTP(t *testing.T) { t.Errorf("Unexpected non-error") } } + +func TestWatchParamParsing(t *testing.T) { + simpleStorage := &SimpleRESTStorage{} + handler := New(map[string]RESTStorage{ + "foo": simpleStorage, + }, "/prefix/version") + server := httptest.NewServer(handler) + + dest, _ := url.Parse(server.URL) + dest.Path = "/prefix/version/watch/foo" + + table := []struct { + rawQuery string + resourceVersion uint64 + labelSelector string + fieldSelector string + id string + }{ + { + rawQuery: "id=myID&resourceVersion=1234", + resourceVersion: 1234, + labelSelector: "", + fieldSelector: "", + id: "myID", + }, { + rawQuery: "resourceVersion=314159&fields=Host%3D&labels=name%3Dfoo", + resourceVersion: 314159, + labelSelector: "name=foo", + fieldSelector: "Host=", + id: "", + }, { + rawQuery: "", + resourceVersion: 0, + labelSelector: "", + fieldSelector: "", + id: "", + }, + } + + for _, item := range table { + simpleStorage.requestedLabelSelector = nil + simpleStorage.requestedFieldSelector = nil + simpleStorage.requestedResourceVersion = 5 // Prove this is set in all cases + simpleStorage.requestedID = "" + dest.RawQuery = item.rawQuery + resp, err := http.Get(dest.String()) + if err != nil { + t.Errorf("%v: unexpected error: %v", item.rawQuery, err) + continue + } + resp.Body.Close() + if e, a := item.id, simpleStorage.requestedID; e != a { + t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a) + } + if e, a := item.resourceVersion, simpleStorage.requestedResourceVersion; e != a { + t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a) + } + if simpleStorage.requestedID == "" { + if e, a := item.labelSelector, simpleStorage.requestedLabelSelector.String(); e != a { + t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a) + } + if e, a := item.fieldSelector, simpleStorage.requestedFieldSelector.String(); e != a { + t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a) + } + } + } +} diff --git a/pkg/registry/controllerstorage.go b/pkg/registry/controllerstorage.go index b5d53c1e28e..7a8cdff5835 100644 --- a/pkg/registry/controllerstorage.go +++ b/pkg/registry/controllerstorage.go @@ -138,12 +138,12 @@ func (storage *ControllerRegistryStorage) waitForController(ctrl api.Replication // WatchAll returns ReplicationController events via a watch.Interface, implementing // apiserver.ResourceWatcher. -func (storage *ControllerRegistryStorage) WatchAll() (watch.Interface, error) { - return storage.registry.WatchControllers() +func (storage *ControllerRegistryStorage) WatchAll(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return storage.registry.WatchControllers(label, field, resourceVersion) } // WatchSingle returns events for a single ReplicationController via a watch.Interface, // implementing apiserver.ResourceWatcher. -func (storage *ControllerRegistryStorage) WatchSingle(id string) (watch.Interface, error) { +func (storage *ControllerRegistryStorage) WatchSingle(id string, resourceVersion uint64) (watch.Interface, error) { return nil, errors.New("unimplemented") } diff --git a/pkg/registry/controllerstorage_test.go b/pkg/registry/controllerstorage_test.go index 07518764ddb..9dd699ca5f7 100644 --- a/pkg/registry/controllerstorage_test.go +++ b/pkg/registry/controllerstorage_test.go @@ -29,6 +29,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) +// TODO: Why do we have this AND MemoryRegistry? type MockControllerRegistry struct { err error controllers []api.ReplicationController @@ -49,10 +50,12 @@ func (registry *MockControllerRegistry) CreateController(controller api.Replicat func (registry *MockControllerRegistry) UpdateController(controller api.ReplicationController) error { return registry.err } + func (registry *MockControllerRegistry) DeleteController(ID string) error { return registry.err } -func (registry *MockControllerRegistry) WatchControllers() (watch.Interface, error) { + +func (registry *MockControllerRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { return nil, registry.err } diff --git a/pkg/registry/etcdregistry.go b/pkg/registry/etcdregistry.go index f6aea24e7d1..fa50ee4353e 100644 --- a/pkg/registry/etcdregistry.go +++ b/pkg/registry/etcdregistry.go @@ -205,9 +205,13 @@ func (registry *EtcdRegistry) ListControllers() ([]api.ReplicationController, er } // WatchControllers begins watching for new, changed, or deleted controllers. -// TODO: Add id/selector parameters? -func (registry *EtcdRegistry) WatchControllers() (watch.Interface, error) { - return registry.helper.WatchList("/registry/controllers", tools.Everything) +func (registry *EtcdRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + if field.String() != "" { + return nil, fmt.Errorf("no field selector implemented for controllers") + } + return registry.helper.WatchList("/registry/controllers", resourceVersion, func(obj interface{}) bool { + return label.Matches(labels.Set(obj.(*api.ReplicationController).Labels)) + }) } func makeControllerKey(id string) string { diff --git a/pkg/registry/interfaces.go b/pkg/registry/interfaces.go index 0dda241197d..dfc3cd479e4 100644 --- a/pkg/registry/interfaces.go +++ b/pkg/registry/interfaces.go @@ -39,7 +39,7 @@ type PodRegistry interface { // ControllerRegistry is an interface for things that know how to store ReplicationControllers. type ControllerRegistry interface { ListControllers() ([]api.ReplicationController, error) - WatchControllers() (watch.Interface, error) + WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) GetController(controllerID string) (*api.ReplicationController, error) CreateController(controller api.ReplicationController) error UpdateController(controller api.ReplicationController) error diff --git a/pkg/registry/memory_registry.go b/pkg/registry/memory_registry.go index 59ed64d89ec..bce2fb60b10 100644 --- a/pkg/registry/memory_registry.go +++ b/pkg/registry/memory_registry.go @@ -89,7 +89,7 @@ func (registry *MemoryRegistry) ListControllers() ([]api.ReplicationController, return result, nil } -func (registry *MemoryRegistry) WatchControllers() (watch.Interface, error) { +func (registry *MemoryRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { return nil, errors.New("unimplemented") } diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index 4355ec1b509..2b3e46fec7c 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -296,18 +296,19 @@ func Everything(interface{}) bool { // WatchList begins watching the specified key's items. Items are decoded into // API objects, and any items passing 'filter' are sent down the returned -// watch.Interface. -func (h *EtcdHelper) WatchList(key string, filter FilterFunc) (watch.Interface, error) { +// watch.Interface. resourceVersion may be used to specify what version to begin +// watching (e.g., for reconnecting without missing any updateds). +func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { w := newEtcdWatcher(true, filter, h.Codec) - go w.etcdWatch(h.Client, key) + go w.etcdWatch(h.Client, key, resourceVersion) return w, nil } // Watch begins watching the specified key. Events are decoded into // API objects and sent down the returned watch.Interface. -func (h *EtcdHelper) Watch(key string) (watch.Interface, error) { +func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface, error) { w := newEtcdWatcher(false, nil, h.Codec) - go w.etcdWatch(h.Client, key) + go w.etcdWatch(h.Client, key, resourceVersion) return w, nil } @@ -350,10 +351,10 @@ func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec) *etcdWatcher { // etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called // as a goroutine. -func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string) { +func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) { defer util.HandleCrash() defer close(w.etcdCallEnded) - _, err := client.Watch(key, 0, w.list, w.etcdIncoming, w.etcdStop) + _, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop) if err != etcd.ErrWatchStoppedByUser { glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, err) } @@ -385,18 +386,20 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) { var action watch.EventType var data []byte switch res.Action { - case "create", "set": + case "create": if res.Node == nil { glog.Errorf("unexpected nil node: %#v", res) return } data = []byte(res.Node.Value) - // TODO: Is this conditional correct? - if res.EtcdIndex > 0 { - action = watch.Modified - } else { - action = watch.Added + action = watch.Added + case "set": + if res.Node == nil { + glog.Errorf("unexpected nil node: %#v", res) + return } + data = []byte(res.Node.Value) + action = watch.Modified case "delete": if res.PrevNode == nil { glog.Errorf("unexpected nil prev node: %#v", res) diff --git a/pkg/tools/etcd_tools_test.go b/pkg/tools/etcd_tools_test.go index 736ee915e4f..fbfc0df6fad 100644 --- a/pkg/tools/etcd_tools_test.go +++ b/pkg/tools/etcd_tools_test.go @@ -325,6 +325,30 @@ func TestAtomicUpdate_CreateCollision(t *testing.T) { } } +func TestWatchInterpretation_ListCreate(t *testing.T) { + w := newEtcdWatcher(true, func(interface{}) bool { + t.Errorf("unexpected filter call") + return true + }, encoding) + pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} + podBytes, _ := encoding.Encode(pod) + + go w.sendResult(&etcd.Response{ + Action: "create", + Node: &etcd.Node{ + Value: string(podBytes), + }, + }) + + got := <-w.outgoing + if e, a := watch.Added, got.Type; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + if e, a := pod, got.Object; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %v, got %v", e, a) + } +} + func TestWatchInterpretation_ListAdd(t *testing.T) { w := newEtcdWatcher(true, func(interface{}) bool { t.Errorf("unexpected filter call") @@ -341,7 +365,7 @@ func TestWatchInterpretation_ListAdd(t *testing.T) { }) got := <-w.outgoing - if e, a := watch.Added, got.Type; e != a { + if e, a := watch.Modified, got.Type; e != a { t.Errorf("Expected %v, got %v", e, a) } if e, a := pod, got.Object; !reflect.DeepEqual(e, a) { @@ -420,7 +444,7 @@ func TestWatch(t *testing.T) { fakeClient := MakeFakeEtcdClient(t) h := EtcdHelper{fakeClient, codec, versioner} - watching, err := h.Watch("/some/key") + watching, err := h.Watch("/some/key", 0) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -438,7 +462,7 @@ func TestWatch(t *testing.T) { } event := <-watching.ResultChan() - if e, a := watch.Added, event.Type; e != a { + if e, a := watch.Modified, event.Type; e != a { t.Errorf("Expected %v, got %v", e, a) } if e, a := pod, event.Object; !reflect.DeepEqual(e, a) { @@ -462,7 +486,7 @@ func TestWatchPurposefulShutdown(t *testing.T) { h := EtcdHelper{fakeClient, codec, versioner} // Test purposeful shutdown - watching, err := h.Watch("/some/key") + watching, err := h.Watch("/some/key", 0) if err != nil { t.Fatalf("Unexpected error: %v", err) }