From 283fdba6ab4914dcfbe0b6e605cbd658aad1006b Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Tue, 5 Aug 2014 13:33:25 -0700 Subject: [PATCH 1/7] Add more parameters to Watch * Add labels selector (same as List) * Add fields selector * Plan to let you select pods by Host and/or Status * Add resourceVersion to let you resume a watch where you left off. --- pkg/apiserver/apiserver_test.go | 37 ++++++++------ pkg/apiserver/interfaces.go | 10 ++-- pkg/apiserver/watch.go | 28 +++++++++-- pkg/apiserver/watch_test.go | 67 ++++++++++++++++++++++++++ pkg/registry/controllerstorage.go | 6 +-- pkg/registry/controllerstorage_test.go | 5 +- pkg/registry/etcdregistry.go | 10 ++-- pkg/registry/interfaces.go | 2 +- pkg/registry/memory_registry.go | 2 +- pkg/tools/etcd_tools.go | 29 ++++++----- pkg/tools/etcd_tools_test.go | 32 ++++++++++-- 11 files changed, 182 insertions(+), 46 deletions(-) diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index eaf109f515f..ceaf22c7e3c 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -70,6 +70,11 @@ type SimpleRESTStorage struct { // Set if WatchSingle is called requestedID string + // Set if WatchAll is called + requestedLabelSelector labels.Selector + requestedFieldSelector labels.Selector + requestedResourceVersion uint64 + // If non-nil, called inside the WorkFunc when answering update, delete, create. // obj receives the original input to the update, delete, or create call. injectedFunction func(obj interface{}) (returnObj interface{}, err error) @@ -95,7 +100,7 @@ func (storage *SimpleRESTStorage) Delete(id string) (<-chan interface{}, error) if storage.injectedFunction != nil { return storage.injectedFunction(id) } - return api.Status{Status: api.StatusSuccess}, nil + return &api.Status{Status: api.StatusSuccess}, nil }), nil } @@ -130,7 +135,10 @@ func (storage *SimpleRESTStorage) Update(obj interface{}) (<-chan interface{}, e } // Implement ResourceWatcher. -func (storage *SimpleRESTStorage) WatchAll() (watch.Interface, error) { +func (storage *SimpleRESTStorage) WatchAll(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + storage.requestedLabelSelector = label + storage.requestedFieldSelector = field + storage.requestedResourceVersion = resourceVersion if err := storage.errors["watchAll"]; err != nil { return nil, err } @@ -139,8 +147,9 @@ func (storage *SimpleRESTStorage) WatchAll() (watch.Interface, error) { } // Implement ResourceWatcher. -func (storage *SimpleRESTStorage) WatchSingle(id string) (watch.Interface, error) { +func (storage *SimpleRESTStorage) WatchSingle(id string, resourceVersion uint64) (watch.Interface, error) { storage.requestedID = id + storage.requestedResourceVersion = resourceVersion if err := storage.errors["watchSingle"]; err != nil { return nil, err } @@ -164,17 +173,17 @@ func TestNotFound(t *testing.T) { Path string } cases := map[string]T{ - "PATCH method": T{"PATCH", "/prefix/version/foo"}, - "GET long prefix": T{"GET", "/prefix/"}, - "GET missing storage": T{"GET", "/prefix/version/blah"}, - "GET with extra segment": T{"GET", "/prefix/version/foo/bar/baz"}, - "POST with extra segment": T{"POST", "/prefix/version/foo/bar"}, - "DELETE without extra segment": T{"DELETE", "/prefix/version/foo"}, - "DELETE with extra segment": T{"DELETE", "/prefix/version/foo/bar/baz"}, - "PUT without extra segment": T{"PUT", "/prefix/version/foo"}, - "PUT with extra segment": T{"PUT", "/prefix/version/foo/bar/baz"}, - "watch missing storage": T{"GET", "/prefix/version/watch/"}, - "watch with bad method": T{"POST", "/prefix/version/watch/foo/bar"}, + "PATCH method": {"PATCH", "/prefix/version/foo"}, + "GET long prefix": {"GET", "/prefix/"}, + "GET missing storage": {"GET", "/prefix/version/blah"}, + "GET with extra segment": {"GET", "/prefix/version/foo/bar/baz"}, + "POST with extra segment": {"POST", "/prefix/version/foo/bar"}, + "DELETE without extra segment": {"DELETE", "/prefix/version/foo"}, + "DELETE with extra segment": {"DELETE", "/prefix/version/foo/bar/baz"}, + "PUT without extra segment": {"PUT", "/prefix/version/foo"}, + "PUT with extra segment": {"PUT", "/prefix/version/foo/bar/baz"}, + "watch missing storage": {"GET", "/prefix/version/watch/"}, + "watch with bad method": {"POST", "/prefix/version/watch/foo/bar"}, } handler := New(map[string]RESTStorage{ "foo": &SimpleRESTStorage{}, diff --git a/pkg/apiserver/interfaces.go b/pkg/apiserver/interfaces.go index 0b47419fecc..3260207a3ca 100644 --- a/pkg/apiserver/interfaces.go +++ b/pkg/apiserver/interfaces.go @@ -29,6 +29,7 @@ type RESTStorage interface { New() interface{} // List selects resources in the storage which match to the selector. + // TODO: add field selector in addition to label selector. List(labels.Selector) (interface{}, error) // Get finds a resource in the storage by id and returns it. @@ -46,7 +47,10 @@ type RESTStorage interface { // ResourceWatcher should be implemented by all RESTStorage objects that // want to offer the ability to watch for changes through the watch api. type ResourceWatcher interface { - // TODO: take a query, like List, to filter out unwanted events. - WatchAll() (watch.Interface, error) - WatchSingle(id string) (watch.Interface, error) + // label selects on labels; field selects on the objects fields. Not all fields + // are supported; an error will be returned if you try to select for a field that + // isn't supported. + WatchAll(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) + // TODO: Decide if we need to keep WatchSingle? + WatchSingle(id string, resourceVersion uint64) (watch.Interface, error) } diff --git a/pkg/apiserver/watch.go b/pkg/apiserver/watch.go index 5b8480cd168..0898aac4ef6 100644 --- a/pkg/apiserver/watch.go +++ b/pkg/apiserver/watch.go @@ -19,10 +19,13 @@ package apiserver import ( "encoding/json" "net/http" + "net/url" + "strconv" "code.google.com/p/go.net/websocket" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) @@ -30,6 +33,24 @@ type WatchHandler struct { storage map[string]RESTStorage } +func (s *APIServer) getWatchParams(query url.Values) (id string, label, field labels.Selector, resourceVersion uint64) { + id = query.Get("id") + if s, err := labels.ParseSelector(query.Get("labels")); err != nil { + label = labels.Everything() + } else { + label = s + } + if s, err := labels.ParseSelector(query.Get("fields")); err != nil { + field = labels.Everything() + } else { + field = s + } + if rv, err := strconv.ParseUint(query.Get("resourceVersion"), 10, 64); err == nil { + resourceVersion = rv + } + return id, label, field, resourceVersion +} + // handleWatch processes a watch request func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { parts := splitPath(req.URL.Path) @@ -43,10 +64,11 @@ func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { if watcher, ok := storage.(ResourceWatcher); ok { var watching watch.Interface var err error - if id := req.URL.Query().Get("id"); id != "" { - watching, err = watcher.WatchSingle(id) + id, label, field, resourceVersion := s.getWatchParams(req.URL.Query()) + if id != "" { + watching, err = watcher.WatchSingle(id, resourceVersion) } else { - watching, err = watcher.WatchAll() + watching, err = watcher.WatchAll(label, field, resourceVersion) } if err != nil { internalError(err, w) diff --git a/pkg/apiserver/watch_test.go b/pkg/apiserver/watch_test.go index f737b209630..be9af611a82 100644 --- a/pkg/apiserver/watch_test.go +++ b/pkg/apiserver/watch_test.go @@ -148,3 +148,70 @@ func TestWatchHTTP(t *testing.T) { t.Errorf("Unexpected non-error") } } + +func TestWatchParamParsing(t *testing.T) { + simpleStorage := &SimpleRESTStorage{} + handler := New(map[string]RESTStorage{ + "foo": simpleStorage, + }, "/prefix/version") + server := httptest.NewServer(handler) + + dest, _ := url.Parse(server.URL) + dest.Path = "/prefix/version/watch/foo" + + table := []struct { + rawQuery string + resourceVersion uint64 + labelSelector string + fieldSelector string + id string + }{ + { + rawQuery: "id=myID&resourceVersion=1234", + resourceVersion: 1234, + labelSelector: "", + fieldSelector: "", + id: "myID", + }, { + rawQuery: "resourceVersion=314159&fields=Host%3D&labels=name%3Dfoo", + resourceVersion: 314159, + labelSelector: "name=foo", + fieldSelector: "Host=", + id: "", + }, { + rawQuery: "", + resourceVersion: 0, + labelSelector: "", + fieldSelector: "", + id: "", + }, + } + + for _, item := range table { + simpleStorage.requestedLabelSelector = nil + simpleStorage.requestedFieldSelector = nil + simpleStorage.requestedResourceVersion = 5 // Prove this is set in all cases + simpleStorage.requestedID = "" + dest.RawQuery = item.rawQuery + resp, err := http.Get(dest.String()) + if err != nil { + t.Errorf("%v: unexpected error: %v", item.rawQuery, err) + continue + } + resp.Body.Close() + if e, a := item.id, simpleStorage.requestedID; e != a { + t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a) + } + if e, a := item.resourceVersion, simpleStorage.requestedResourceVersion; e != a { + t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a) + } + if simpleStorage.requestedID == "" { + if e, a := item.labelSelector, simpleStorage.requestedLabelSelector.String(); e != a { + t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a) + } + if e, a := item.fieldSelector, simpleStorage.requestedFieldSelector.String(); e != a { + t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a) + } + } + } +} diff --git a/pkg/registry/controllerstorage.go b/pkg/registry/controllerstorage.go index b5d53c1e28e..7a8cdff5835 100644 --- a/pkg/registry/controllerstorage.go +++ b/pkg/registry/controllerstorage.go @@ -138,12 +138,12 @@ func (storage *ControllerRegistryStorage) waitForController(ctrl api.Replication // WatchAll returns ReplicationController events via a watch.Interface, implementing // apiserver.ResourceWatcher. -func (storage *ControllerRegistryStorage) WatchAll() (watch.Interface, error) { - return storage.registry.WatchControllers() +func (storage *ControllerRegistryStorage) WatchAll(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return storage.registry.WatchControllers(label, field, resourceVersion) } // WatchSingle returns events for a single ReplicationController via a watch.Interface, // implementing apiserver.ResourceWatcher. -func (storage *ControllerRegistryStorage) WatchSingle(id string) (watch.Interface, error) { +func (storage *ControllerRegistryStorage) WatchSingle(id string, resourceVersion uint64) (watch.Interface, error) { return nil, errors.New("unimplemented") } diff --git a/pkg/registry/controllerstorage_test.go b/pkg/registry/controllerstorage_test.go index 07518764ddb..9dd699ca5f7 100644 --- a/pkg/registry/controllerstorage_test.go +++ b/pkg/registry/controllerstorage_test.go @@ -29,6 +29,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) +// TODO: Why do we have this AND MemoryRegistry? type MockControllerRegistry struct { err error controllers []api.ReplicationController @@ -49,10 +50,12 @@ func (registry *MockControllerRegistry) CreateController(controller api.Replicat func (registry *MockControllerRegistry) UpdateController(controller api.ReplicationController) error { return registry.err } + func (registry *MockControllerRegistry) DeleteController(ID string) error { return registry.err } -func (registry *MockControllerRegistry) WatchControllers() (watch.Interface, error) { + +func (registry *MockControllerRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { return nil, registry.err } diff --git a/pkg/registry/etcdregistry.go b/pkg/registry/etcdregistry.go index f6aea24e7d1..fa50ee4353e 100644 --- a/pkg/registry/etcdregistry.go +++ b/pkg/registry/etcdregistry.go @@ -205,9 +205,13 @@ func (registry *EtcdRegistry) ListControllers() ([]api.ReplicationController, er } // WatchControllers begins watching for new, changed, or deleted controllers. -// TODO: Add id/selector parameters? -func (registry *EtcdRegistry) WatchControllers() (watch.Interface, error) { - return registry.helper.WatchList("/registry/controllers", tools.Everything) +func (registry *EtcdRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + if field.String() != "" { + return nil, fmt.Errorf("no field selector implemented for controllers") + } + return registry.helper.WatchList("/registry/controllers", resourceVersion, func(obj interface{}) bool { + return label.Matches(labels.Set(obj.(*api.ReplicationController).Labels)) + }) } func makeControllerKey(id string) string { diff --git a/pkg/registry/interfaces.go b/pkg/registry/interfaces.go index 0dda241197d..dfc3cd479e4 100644 --- a/pkg/registry/interfaces.go +++ b/pkg/registry/interfaces.go @@ -39,7 +39,7 @@ type PodRegistry interface { // ControllerRegistry is an interface for things that know how to store ReplicationControllers. type ControllerRegistry interface { ListControllers() ([]api.ReplicationController, error) - WatchControllers() (watch.Interface, error) + WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) GetController(controllerID string) (*api.ReplicationController, error) CreateController(controller api.ReplicationController) error UpdateController(controller api.ReplicationController) error diff --git a/pkg/registry/memory_registry.go b/pkg/registry/memory_registry.go index 59ed64d89ec..bce2fb60b10 100644 --- a/pkg/registry/memory_registry.go +++ b/pkg/registry/memory_registry.go @@ -89,7 +89,7 @@ func (registry *MemoryRegistry) ListControllers() ([]api.ReplicationController, return result, nil } -func (registry *MemoryRegistry) WatchControllers() (watch.Interface, error) { +func (registry *MemoryRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { return nil, errors.New("unimplemented") } diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index 4355ec1b509..2b3e46fec7c 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -296,18 +296,19 @@ func Everything(interface{}) bool { // WatchList begins watching the specified key's items. Items are decoded into // API objects, and any items passing 'filter' are sent down the returned -// watch.Interface. -func (h *EtcdHelper) WatchList(key string, filter FilterFunc) (watch.Interface, error) { +// watch.Interface. resourceVersion may be used to specify what version to begin +// watching (e.g., for reconnecting without missing any updateds). +func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { w := newEtcdWatcher(true, filter, h.Codec) - go w.etcdWatch(h.Client, key) + go w.etcdWatch(h.Client, key, resourceVersion) return w, nil } // Watch begins watching the specified key. Events are decoded into // API objects and sent down the returned watch.Interface. -func (h *EtcdHelper) Watch(key string) (watch.Interface, error) { +func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface, error) { w := newEtcdWatcher(false, nil, h.Codec) - go w.etcdWatch(h.Client, key) + go w.etcdWatch(h.Client, key, resourceVersion) return w, nil } @@ -350,10 +351,10 @@ func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec) *etcdWatcher { // etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called // as a goroutine. -func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string) { +func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) { defer util.HandleCrash() defer close(w.etcdCallEnded) - _, err := client.Watch(key, 0, w.list, w.etcdIncoming, w.etcdStop) + _, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop) if err != etcd.ErrWatchStoppedByUser { glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, err) } @@ -385,18 +386,20 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) { var action watch.EventType var data []byte switch res.Action { - case "create", "set": + case "create": if res.Node == nil { glog.Errorf("unexpected nil node: %#v", res) return } data = []byte(res.Node.Value) - // TODO: Is this conditional correct? - if res.EtcdIndex > 0 { - action = watch.Modified - } else { - action = watch.Added + action = watch.Added + case "set": + if res.Node == nil { + glog.Errorf("unexpected nil node: %#v", res) + return } + data = []byte(res.Node.Value) + action = watch.Modified case "delete": if res.PrevNode == nil { glog.Errorf("unexpected nil prev node: %#v", res) diff --git a/pkg/tools/etcd_tools_test.go b/pkg/tools/etcd_tools_test.go index 736ee915e4f..fbfc0df6fad 100644 --- a/pkg/tools/etcd_tools_test.go +++ b/pkg/tools/etcd_tools_test.go @@ -325,6 +325,30 @@ func TestAtomicUpdate_CreateCollision(t *testing.T) { } } +func TestWatchInterpretation_ListCreate(t *testing.T) { + w := newEtcdWatcher(true, func(interface{}) bool { + t.Errorf("unexpected filter call") + return true + }, encoding) + pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} + podBytes, _ := encoding.Encode(pod) + + go w.sendResult(&etcd.Response{ + Action: "create", + Node: &etcd.Node{ + Value: string(podBytes), + }, + }) + + got := <-w.outgoing + if e, a := watch.Added, got.Type; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + if e, a := pod, got.Object; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %v, got %v", e, a) + } +} + func TestWatchInterpretation_ListAdd(t *testing.T) { w := newEtcdWatcher(true, func(interface{}) bool { t.Errorf("unexpected filter call") @@ -341,7 +365,7 @@ func TestWatchInterpretation_ListAdd(t *testing.T) { }) got := <-w.outgoing - if e, a := watch.Added, got.Type; e != a { + if e, a := watch.Modified, got.Type; e != a { t.Errorf("Expected %v, got %v", e, a) } if e, a := pod, got.Object; !reflect.DeepEqual(e, a) { @@ -420,7 +444,7 @@ func TestWatch(t *testing.T) { fakeClient := MakeFakeEtcdClient(t) h := EtcdHelper{fakeClient, codec, versioner} - watching, err := h.Watch("/some/key") + watching, err := h.Watch("/some/key", 0) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -438,7 +462,7 @@ func TestWatch(t *testing.T) { } event := <-watching.ResultChan() - if e, a := watch.Added, event.Type; e != a { + if e, a := watch.Modified, event.Type; e != a { t.Errorf("Expected %v, got %v", e, a) } if e, a := pod, event.Object; !reflect.DeepEqual(e, a) { @@ -462,7 +486,7 @@ func TestWatchPurposefulShutdown(t *testing.T) { h := EtcdHelper{fakeClient, codec, versioner} // Test purposeful shutdown - watching, err := h.Watch("/some/key") + watching, err := h.Watch("/some/key", 0) if err != nil { t.Fatalf("Unexpected error: %v", err) } From 097147545c063caee5c50fcf3a5ea54f24731a8b Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Tue, 5 Aug 2014 15:23:33 -0700 Subject: [PATCH 2/7] Allow more general parameters to be made by client. Also fix style and comments. --- pkg/client/client.go | 4 +- pkg/client/client_test.go | 6 +-- pkg/client/request.go | 79 ++++++++++++++++++++++++-------------- pkg/client/request_test.go | 28 ++++++++++++-- 4 files changed, 79 insertions(+), 38 deletions(-) diff --git a/pkg/client/client.go b/pkg/client/client.go index 3a41f8613da..7356b4934bb 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -169,7 +169,7 @@ func (c *Client) makeURL(path string) string { // ListPods takes a selector, and returns the list of pods that match that selector func (c *Client) ListPods(selector labels.Selector) (result api.PodList, err error) { - err = c.Get().Path("pods").Selector(selector).Do().Into(&result) + err = c.Get().Path("pods").SelectorParam("labels", selector).Do().Into(&result) return } @@ -202,7 +202,7 @@ func (c *Client) UpdatePod(pod api.Pod) (result api.Pod, err error) { // ListReplicationControllers takes a selector, and returns the list of replication controllers that match that selector func (c *Client) ListReplicationControllers(selector labels.Selector) (result api.ReplicationControllerList, err error) { - err = c.Get().Path("replicationControllers").Selector(selector).Do().Into(&result) + err = c.Get().Path("replicationControllers").SelectorParam("labels", selector).Do().Into(&result) return } diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index 47a0324bf06..1ddea772b1e 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -42,7 +42,7 @@ func TestListEmptyPods(t *testing.T) { Request: testRequest{Method: "GET", Path: "/pods"}, Response: Response{StatusCode: 200, Body: api.PodList{}}, } - podList, err := c.Setup().ListPods(nil) + podList, err := c.Setup().ListPods(labels.Everything()) c.Validate(t, podList, err) } @@ -65,7 +65,7 @@ func TestListPods(t *testing.T) { }, }, } - receivedPodList, err := c.Setup().ListPods(nil) + receivedPodList, err := c.Setup().ListPods(labels.Everything()) c.Validate(t, receivedPodList, err) } @@ -191,7 +191,7 @@ func TestListControllers(t *testing.T) { }, }, } - receivedControllerList, err := c.Setup().ListReplicationControllers(nil) + receivedControllerList, err := c.Setup().ListReplicationControllers(labels.Everything()) c.Validate(t, receivedControllerList, err) } diff --git a/pkg/client/request.go b/pkg/client/request.go index 729aa13281a..e307f50e65b 100644 --- a/pkg/client/request.go +++ b/pkg/client/request.go @@ -24,6 +24,7 @@ import ( "net/http" "net/url" "path" + "strconv" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -33,18 +34,19 @@ import ( "github.com/golang/glog" ) -// Server contains info locating a kubernetes api server. -// Example usage: +// Verb begins a request with a verb (GET, POST, PUT, DELETE) +// +// Example usage of Client's request building interface: // auth, err := LoadAuth(filename) // c := New(url, auth) // resp, err := c.Verb("GET"). // Path("pods"). -// Selector("area=staging"). +// SelectorParam("labels", "area=staging"). // Timeout(10*time.Second). // Do() -// list, ok := resp.(api.PodList) - -// Verb begins a request with a verb (GET, POST, PUT, DELETE) +// if err != nil { ... } +// list, ok := resp.(*api.PodList) +// func (c *Client) Verb(verb string) *Request { return &Request{ verb: verb, @@ -52,26 +54,27 @@ func (c *Client) Verb(verb string) *Request { path: "/api/v1beta1", sync: c.Sync, timeout: c.Timeout, + params: map[string]string{}, pollPeriod: c.PollPeriod, } } -// Post begins a POST request. +// Post begins a POST request. Short for c.Verb("POST"). func (c *Client) Post() *Request { return c.Verb("POST") } -// Put begins a PUT request. +// Put begins a PUT request. Short for c.Verb("PUT"). func (c *Client) Put() *Request { return c.Verb("PUT") } -// Get begins a GET request. +// Get begins a GET request. Short for c.Verb("GET"). func (c *Client) Get() *Request { return c.Verb("GET") } -// Delete begins a DELETE request. +// Delete begins a DELETE request. Short for c.Verb("DELETE"). func (c *Client) Delete() *Request { return c.Verb("DELETE") } @@ -90,6 +93,7 @@ type Request struct { verb string path string body io.Reader + params map[string]string selector labels.Selector timeout time.Duration sync bool @@ -105,7 +109,7 @@ func (r *Request) Path(item string) *Request { return r } -// Sync sets sync/async call status. +// Sync sets sync/async call status by setting the "sync" parameter to "true"/"false" func (r *Request) Sync(sync bool) *Request { if r.err != nil { return r @@ -123,25 +127,41 @@ func (r *Request) AbsPath(path string) *Request { return r } -// ParseSelector parses the given string as a resource label selector. Optional. -func (r *Request) ParseSelector(item string) *Request { +// ParseSelectorParam parses the given string as a resource label selector. +// This is a convenience function so you don't have to first check that it's a +// validly formatted selector. +func (r *Request) ParseSelectorParam(paramName, item string) *Request { if r.err != nil { return r } - r.selector, r.err = labels.ParseSelector(item) + if sel, err := labels.ParseSelector(item); err != nil { + r.err = err + } else { + r.params[paramName] = sel.String() + } return r } -// Selector makes the request use the given selector. -func (r *Request) Selector(s labels.Selector) *Request { +// SelectorParam adds the given selector as a query parameter with the name paramName. +func (r *Request) SelectorParam(paramName string, s labels.Selector) *Request { if r.err != nil { return r } - r.selector = s + r.params[paramName] = s.String() return r } -// Timeout makes the request use the given duration as a timeout. Optional. +// UintParam creates a query parameter with the given value. +func (r *Request) UintParam(paramName string, u uint64) *Request { + if r.err != nil { + return r + } + r.params[paramName] = strconv.FormatUint(u, 10) + return r +} + +// Timeout makes the request use the given duration as a timeout. Sets the "timeout" +// parameter. Ignored if sync=false. func (r *Request) Timeout(d time.Duration) *Request { if r.err != nil { return r @@ -153,6 +173,7 @@ func (r *Request) Timeout(d time.Duration) *Request { // Body makes the request use obj as the body. Optional. // If obj is a string, try to read a file of that name. // If obj is a []byte, send it directly. +// If obj is an io.Reader, use it directly. // Otherwise, assume obj is an api type and marshall it correctly. func (r *Request) Body(obj interface{}) *Request { if r.err != nil { @@ -160,23 +181,21 @@ func (r *Request) Body(obj interface{}) *Request { } switch t := obj.(type) { case string: - data, err := ioutil.ReadFile(t) - if err != nil { + if data, err := ioutil.ReadFile(t); err != nil { r.err = err - return r + } else { + r.body = bytes.NewBuffer(data) } - r.body = bytes.NewBuffer(data) case []byte: r.body = bytes.NewBuffer(t) case io.Reader: - r.body = obj.(io.Reader) + r.body = t default: - data, err := api.Encode(obj) - if err != nil { + if data, err := api.Encode(obj); err != nil { r.err = err - return r + } else { + r.body = bytes.NewBuffer(data) } - r.body = bytes.NewBuffer(data) } return r } @@ -197,9 +216,11 @@ func (r *Request) PollPeriod(d time.Duration) *Request { func (r *Request) finalURL() string { finalURL := r.c.host + r.path query := url.Values{} - if r.selector != nil { - query.Add("labels", r.selector.String()) + for key, value := range r.params { + query.Add(key, value) } + // sync and timeout are handled specially here, to allow setting them + // in any order. if r.sync { query.Add("sync", "true") if r.timeout != 0 { diff --git a/pkg/client/request_test.go b/pkg/client/request_test.go index 0b80f4819cd..6dd9e72cd7d 100644 --- a/pkg/client/request_test.go +++ b/pkg/client/request_test.go @@ -49,7 +49,7 @@ func TestDoRequestNewWay(t *testing.T) { obj, err := s.Verb("POST"). Path("foo/bar"). Path("baz"). - ParseSelector("name=foo"). + ParseSelectorParam("labels", "name=foo"). Timeout(time.Second). Body([]byte(reqBody)). Do().Get() @@ -87,7 +87,7 @@ func TestDoRequestNewWayReader(t *testing.T) { obj, err := s.Verb("POST"). Path("foo/bar"). Path("baz"). - Selector(labels.Set{"name": "foo"}.AsSelector()). + SelectorParam("labels", labels.Set{"name": "foo"}.AsSelector()). Sync(false). Timeout(time.Second). Body(bytes.NewBuffer(reqBodyExpected)). @@ -127,7 +127,7 @@ func TestDoRequestNewWayObj(t *testing.T) { obj, err := s.Verb("POST"). Path("foo/bar"). Path("baz"). - Selector(labels.Set{"name": "foo"}.AsSelector()). + SelectorParam("labels", labels.Set{"name": "foo"}.AsSelector()). Timeout(time.Second). Body(reqObj). Do().Get() @@ -180,7 +180,7 @@ func TestDoRequestNewWayFile(t *testing.T) { obj, err := s.Verb("POST"). Path("foo/bar"). Path("baz"). - ParseSelector("name=foo"). + ParseSelectorParam("labels", "name=foo"). Timeout(time.Second). Body(file.Name()). Do().Get() @@ -244,6 +244,26 @@ func TestSync(t *testing.T) { } } +func TestUintParam(t *testing.T) { + table := []struct { + name string + testVal uint64 + expectStr string + }{ + {"foo", 31415, "?foo=31415"}, + {"bar", 42, "?bar=42"}, + {"baz", 0, "?baz=0"}, + } + + for _, item := range table { + c := New("", nil) + r := c.Get().AbsPath("").UintParam(item.name, item.testVal) + if e, a := item.expectStr, r.finalURL(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + } +} + func TestSetPollPeriod(t *testing.T) { c := New("", nil) r := c.Get() From 85ff1d3e7fd51d3704a23e58fd30b64f035a44ac Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Tue, 5 Aug 2014 16:19:32 -0700 Subject: [PATCH 3/7] Add fake client to make testing easier. --- cmd/kubecfg/kubecfg.go | 2 +- pkg/client/client.go | 15 ++++++ pkg/client/fake.go | 105 ++++++++++++++++++++++++++++++++++++++++ pkg/client/fake_test.go | 37 ++++++++++++++ 4 files changed, 158 insertions(+), 1 deletion(-) create mode 100644 pkg/client/fake.go create mode 100644 pkg/client/fake_test.go diff --git a/cmd/kubecfg/kubecfg.go b/cmd/kubecfg/kubecfg.go index 9b6a9dc2d2c..c99006af064 100644 --- a/cmd/kubecfg/kubecfg.go +++ b/cmd/kubecfg/kubecfg.go @@ -247,7 +247,7 @@ func executeAPIRequest(method string, s *kube_client.Client) bool { r := s.Verb(verb). Path(path). - ParseSelector(*selector) + ParseSelectorParam("labels", *selector) if setBody { if version != 0 { data := readConfig(storage) diff --git a/pkg/client/client.go b/pkg/client/client.go index 7356b4934bb..e7c16761158 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -28,11 +28,14 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/version" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/golang/glog" ) // Interface holds the methods for clients of Kubenetes, // an interface to allow mock testing. +// TODO: split this up by resource? +// TODO: these should return/take pointers. type Interface interface { ListPods(selector labels.Selector) (api.PodList, error) GetPod(name string) (api.Pod, error) @@ -45,6 +48,7 @@ type Interface interface { CreateReplicationController(api.ReplicationController) (api.ReplicationController, error) UpdateReplicationController(api.ReplicationController) (api.ReplicationController, error) DeleteReplicationController(string) error + WatchReplicationControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) GetService(name string) (api.Service, error) CreateService(api.Service) (api.Service, error) @@ -233,6 +237,17 @@ func (c *Client) DeleteReplicationController(name string) error { return c.Delete().Path("replicationControllers").Path(name).Do().Error() } +// WatchReplicationControllers returns a watch.Interface that watches the requested controllers. +func (c *Client) WatchReplicationControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return c.Get(). + Path("watch"). + Path("replicationControllers"). + UintParam("resourceVersion", resourceVersion). + SelectorParam("labels", label). + SelectorParam("fields", field). + Watch() +} + // GetService returns information about a particular service. func (c *Client) GetService(name string) (result api.Service, err error) { err = c.Get().Path("services").Path(name).Do().Into(&result) diff --git a/pkg/client/fake.go b/pkg/client/fake.go new file mode 100644 index 00000000000..e6f61a11c8c --- /dev/null +++ b/pkg/client/fake.go @@ -0,0 +1,105 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// FakeClient implements Interface. Meant to be embedded into a struct to get a default +// implementation. This makes faking out just the method you want to test easier. +type FakeClient struct { + // FakeClient by default keeps a simple list of the methods that have been called. + Actions []string +} + +func (client *FakeClient) ListPods(selector labels.Selector) (api.PodList, error) { + client.Actions = append(client.Actions, "list-pods") + return api.PodList{}, nil +} + +func (client *FakeClient) GetPod(name string) (api.Pod, error) { + client.Actions = append(client.Actions, "get-pod") + return api.Pod{}, nil +} + +func (client *FakeClient) DeletePod(name string) error { + client.Actions = append(client.Actions, "delete-pod") + return nil +} + +func (client *FakeClient) CreatePod(pod api.Pod) (api.Pod, error) { + client.Actions = append(client.Actions, "create-pod") + return api.Pod{}, nil +} + +func (client *FakeClient) UpdatePod(pod api.Pod) (api.Pod, error) { + client.Actions = append(client.Actions, "update-pod") + return api.Pod{}, nil +} + +func (client *FakeClient) ListReplicationControllers(selector labels.Selector) (api.ReplicationControllerList, error) { + client.Actions = append(client.Actions, "list-controllers") + return api.ReplicationControllerList{}, nil +} + +func (client *FakeClient) GetReplicationController(name string) (api.ReplicationController, error) { + client.Actions = append(client.Actions, "get-controller") + return api.ReplicationController{}, nil +} + +func (client *FakeClient) CreateReplicationController(controller api.ReplicationController) (api.ReplicationController, error) { + client.Actions = append(client.Actions, "create-controller") + return api.ReplicationController{}, nil +} + +func (client *FakeClient) UpdateReplicationController(controller api.ReplicationController) (api.ReplicationController, error) { + client.Actions = append(client.Actions, "update-controller") + return api.ReplicationController{}, nil +} + +func (client *FakeClient) DeleteReplicationController(controller string) error { + client.Actions = append(client.Actions, "delete-controller") + return nil +} + +func (client *FakeClient) WatchReplicationControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + client.Actions = append(client.Actions, "watch-controllers") + return watch.NewFake(), nil +} + +func (client *FakeClient) GetService(name string) (api.Service, error) { + client.Actions = append(client.Actions, "get-controller") + return api.Service{}, nil +} + +func (client *FakeClient) CreateService(controller api.Service) (api.Service, error) { + client.Actions = append(client.Actions, "create-service") + return api.Service{}, nil +} + +func (client *FakeClient) UpdateService(controller api.Service) (api.Service, error) { + client.Actions = append(client.Actions, "update-service") + return api.Service{}, nil +} + +func (client *FakeClient) DeleteService(controller string) error { + client.Actions = append(client.Actions, "delete-service") + return nil +} diff --git a/pkg/client/fake_test.go b/pkg/client/fake_test.go new file mode 100644 index 00000000000..9c739e24142 --- /dev/null +++ b/pkg/client/fake_test.go @@ -0,0 +1,37 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client + +import ( + "testing" +) + +// This test file just ensures that FakeClient and structs it is embedded in +// implement Interface. + +func TestFakeImplementsInterface(t *testing.T) { + _ = Interface(&FakeClient{}) +} + +type MyFake struct { + *FakeClient +} + +func TestEmbeddedFakeImplementsInterface(t *testing.T) { + _ = Interface(MyFake{&FakeClient{}}) + _ = Interface(&MyFake{&FakeClient{}}) +} From 51caf759c374812d5787a636bfb6ddfd33369cb1 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Tue, 5 Aug 2014 16:20:15 -0700 Subject: [PATCH 4/7] Add WatchReplicationControllers to kubecfg's fake --- pkg/kubecfg/kubecfg_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/kubecfg/kubecfg_test.go b/pkg/kubecfg/kubecfg_test.go index 234a97619b8..b1bc55c0661 100644 --- a/pkg/kubecfg/kubecfg_test.go +++ b/pkg/kubecfg/kubecfg_test.go @@ -27,6 +27,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) type Action struct { @@ -90,6 +91,11 @@ func (client *FakeKubeClient) DeleteReplicationController(controller string) err return nil } +func (client *FakeKubeClient) WatchReplicationControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + client.actions = append(client.actions, Action{action: "watch-controllers"}) + return watch.NewFake(), nil +} + func (client *FakeKubeClient) GetService(name string) (api.Service, error) { client.actions = append(client.actions, Action{action: "get-service", value: name}) return api.Service{}, nil From 71709ae09e2077073e823b96b2bf6166d79ea6e3 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Tue, 5 Aug 2014 16:20:50 -0700 Subject: [PATCH 5/7] Make replication controller use client --- pkg/controller/replication_controller.go | 25 ++++++++----------- pkg/controller/replication_controller_test.go | 25 ++++++++++++------- pkg/registry/etcdregistry.go | 2 +- 3 files changed, 28 insertions(+), 24 deletions(-) diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index d9845c59d1f..64367b7f7ed 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -85,28 +85,23 @@ func MakeReplicationManager(kubeClient client.Interface) *ReplicationManager { }, } rm.syncHandler = rm.syncReplicationController - rm.watchMaker = rm.makeAPIWatch return rm } // Run begins watching and syncing. func (rm *ReplicationManager) Run(period time.Duration) { rm.syncTime = time.Tick(period) - go util.Forever(func() { rm.watchControllers() }, period) + index := uint64(0) + go util.Forever(func() { rm.watchControllers(&index) }, period) } -// makeAPIWatch starts watching via the apiserver. -func (rm *ReplicationManager) makeAPIWatch() (watch.Interface, error) { - // TODO: Fix this ugly type assertion. - return rm.kubeClient.(*client.Client). - Get(). - Path("watch"). - Path("replicationControllers"). - Watch() -} - -func (rm *ReplicationManager) watchControllers() { - watching, err := rm.watchMaker() +// index is a pointer to the resource version to use/update. +func (rm *ReplicationManager) watchControllers(index *uint64) { + watching, err := rm.kubeClient.WatchReplicationControllers( + labels.Everything(), + labels.Everything(), + *index, + ) if err != nil { glog.Errorf("Unexpected failure to watch: %v", err) time.Sleep(5 * time.Second) @@ -128,6 +123,8 @@ func (rm *ReplicationManager) watchControllers() { if rc, ok := event.Object.(*api.ReplicationController); !ok { glog.Errorf("unexpected object: %#v", event.Object) } else { + // If we get disconnected, start where we left off. + *index = rc.ResourceVersion + 1 rm.syncHandler(*rc) } } diff --git a/pkg/controller/replication_controller_test.go b/pkg/controller/replication_controller_test.go index bebaa948202..b62d5fb70ab 100644 --- a/pkg/controller/replication_controller_test.go +++ b/pkg/controller/replication_controller_test.go @@ -28,6 +28,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -305,7 +306,7 @@ func TestSyncronize(t *testing.T) { w.WriteHeader(http.StatusNotFound) t.Errorf("Unexpected request for %v", req.RequestURI) }) - testServer := httptest.NewTLSServer(mux) + testServer := httptest.NewServer(mux) client := client.New(testServer.URL, nil) manager := MakeReplicationManager(client) fakePodControl := FakePodControl{} @@ -316,13 +317,18 @@ func TestSyncronize(t *testing.T) { validateSyncReplication(t, &fakePodControl, 7, 0) } -func TestWatchControllers(t *testing.T) { - fakeWatcher := watch.NewFake() - manager := MakeReplicationManager(nil) - manager.watchMaker = func() (watch.Interface, error) { - return fakeWatcher, nil - } +type FakeWatcher struct { + w *watch.FakeWatcher + *client.FakeClient +} +func (fw FakeWatcher) WatchReplicationControllers(l, f labels.Selector, rv uint64) (watch.Interface, error) { + return fw.w, nil +} + +func TestWatchControllers(t *testing.T) { + client := FakeWatcher{watch.NewFake(), &client.FakeClient{}} + manager := MakeReplicationManager(client) var testControllerSpec api.ReplicationController received := make(chan struct{}) manager.syncHandler = func(controllerSpec api.ReplicationController) error { @@ -333,11 +339,12 @@ func TestWatchControllers(t *testing.T) { return nil } - go manager.watchControllers() + index := uint64(0) + go manager.watchControllers(&index) // Test normal case testControllerSpec.ID = "foo" - fakeWatcher.Add(&testControllerSpec) + client.w.Add(&testControllerSpec) select { case <-received: diff --git a/pkg/registry/etcdregistry.go b/pkg/registry/etcdregistry.go index fa50ee4353e..d4cc759fcb6 100644 --- a/pkg/registry/etcdregistry.go +++ b/pkg/registry/etcdregistry.go @@ -206,7 +206,7 @@ func (registry *EtcdRegistry) ListControllers() ([]api.ReplicationController, er // WatchControllers begins watching for new, changed, or deleted controllers. func (registry *EtcdRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { - if field.String() != "" { + if !field.Empty() { return nil, fmt.Errorf("no field selector implemented for controllers") } return registry.helper.WatchList("/registry/controllers", resourceVersion, func(obj interface{}) bool { From 49cded3800632693be330eb14cf27d11cf785556 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Wed, 6 Aug 2014 14:55:37 -0700 Subject: [PATCH 6/7] Simplify ResourceWatcher interface to one function. --- pkg/apiserver/apiserver_test.go | 24 +++----------- pkg/apiserver/interfaces.go | 17 +++++----- pkg/apiserver/watch.go | 15 +++------ pkg/apiserver/watch_test.go | 40 +++++++++--------------- pkg/controller/replication_controller.go | 4 --- pkg/registry/controllerstorage.go | 9 +----- pkg/tools/etcd_tools_test.go | 4 +-- 7 files changed, 34 insertions(+), 79 deletions(-) diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index ceaf22c7e3c..a75e347e30c 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -64,13 +64,8 @@ type SimpleRESTStorage struct { updated *Simple created *Simple - // Valid if WatchAll or WatchSingle is called - fakeWatch *watch.FakeWatcher - - // Set if WatchSingle is called - requestedID string - - // Set if WatchAll is called + // These are set when Watch is called + fakeWatch *watch.FakeWatcher requestedLabelSelector labels.Selector requestedFieldSelector labels.Selector requestedResourceVersion uint64 @@ -135,22 +130,11 @@ func (storage *SimpleRESTStorage) Update(obj interface{}) (<-chan interface{}, e } // Implement ResourceWatcher. -func (storage *SimpleRESTStorage) WatchAll(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (storage *SimpleRESTStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { storage.requestedLabelSelector = label storage.requestedFieldSelector = field storage.requestedResourceVersion = resourceVersion - if err := storage.errors["watchAll"]; err != nil { - return nil, err - } - storage.fakeWatch = watch.NewFake() - return storage.fakeWatch, nil -} - -// Implement ResourceWatcher. -func (storage *SimpleRESTStorage) WatchSingle(id string, resourceVersion uint64) (watch.Interface, error) { - storage.requestedID = id - storage.requestedResourceVersion = resourceVersion - if err := storage.errors["watchSingle"]; err != nil { + if err := storage.errors["watch"]; err != nil { return nil, err } storage.fakeWatch = watch.NewFake() diff --git a/pkg/apiserver/interfaces.go b/pkg/apiserver/interfaces.go index 3260207a3ca..2b1aa23cab6 100644 --- a/pkg/apiserver/interfaces.go +++ b/pkg/apiserver/interfaces.go @@ -33,11 +33,13 @@ type RESTStorage interface { List(labels.Selector) (interface{}, error) // Get finds a resource in the storage by id and returns it. - // Although it can return an arbitrary error value, IsNotFound(err) is true for the returned error value err when the specified resource is not found. + // Although it can return an arbitrary error value, IsNotFound(err) is true for the + // returned error value err when the specified resource is not found. Get(id string) (interface{}, error) // Delete finds a resource in the storage and deletes it. - // Although it can return an arbitrary error value, IsNotFound(err) is true for the returned error value err when the specified resource is not found. + // Although it can return an arbitrary error value, IsNotFound(err) is true for the + // returned error value err when the specified resource is not found. Delete(id string) (<-chan interface{}, error) Create(interface{}) (<-chan interface{}, error) @@ -47,10 +49,9 @@ type RESTStorage interface { // ResourceWatcher should be implemented by all RESTStorage objects that // want to offer the ability to watch for changes through the watch api. type ResourceWatcher interface { - // label selects on labels; field selects on the objects fields. Not all fields - // are supported; an error will be returned if you try to select for a field that - // isn't supported. - WatchAll(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) - // TODO: Decide if we need to keep WatchSingle? - WatchSingle(id string, resourceVersion uint64) (watch.Interface, error) + // 'label' selects on labels; 'field' selects on the object's fields. Not all fields + // are supported; an error should be returned if 'field' tries to select on a field that + // isn't supported. 'resourceVersion' allows for continuing/starting a watch at a + // particular version. + Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) } diff --git a/pkg/apiserver/watch.go b/pkg/apiserver/watch.go index 0898aac4ef6..b2315b78563 100644 --- a/pkg/apiserver/watch.go +++ b/pkg/apiserver/watch.go @@ -33,8 +33,7 @@ type WatchHandler struct { storage map[string]RESTStorage } -func (s *APIServer) getWatchParams(query url.Values) (id string, label, field labels.Selector, resourceVersion uint64) { - id = query.Get("id") +func getWatchParams(query url.Values) (label, field labels.Selector, resourceVersion uint64) { if s, err := labels.ParseSelector(query.Get("labels")); err != nil { label = labels.Everything() } else { @@ -48,7 +47,7 @@ func (s *APIServer) getWatchParams(query url.Values) (id string, label, field la if rv, err := strconv.ParseUint(query.Get("resourceVersion"), 10, 64); err == nil { resourceVersion = rv } - return id, label, field, resourceVersion + return label, field, resourceVersion } // handleWatch processes a watch request @@ -62,14 +61,8 @@ func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { notFound(w, req) } if watcher, ok := storage.(ResourceWatcher); ok { - var watching watch.Interface - var err error - id, label, field, resourceVersion := s.getWatchParams(req.URL.Query()) - if id != "" { - watching, err = watcher.WatchSingle(id, resourceVersion) - } else { - watching, err = watcher.WatchAll(label, field, resourceVersion) - } + label, field, resourceVersion := getWatchParams(req.URL.Query()) + watching, err := watcher.Watch(label, field, resourceVersion) if err != nil { internalError(err, w) return diff --git a/pkg/apiserver/watch_test.go b/pkg/apiserver/watch_test.go index be9af611a82..693026f8edf 100644 --- a/pkg/apiserver/watch_test.go +++ b/pkg/apiserver/watch_test.go @@ -40,6 +40,7 @@ var watchTestTable = []struct { func TestWatchWebsocket(t *testing.T) { simpleStorage := &SimpleRESTStorage{} + _ = ResourceWatcher(simpleStorage) // Give compile error if this doesn't work. handler := New(map[string]RESTStorage{ "foo": simpleStorage, }, codec, "/prefix/version") @@ -48,17 +49,13 @@ func TestWatchWebsocket(t *testing.T) { dest, _ := url.Parse(server.URL) dest.Scheme = "ws" // Required by websocket, though the server never sees it. dest.Path = "/prefix/version/watch/foo" - dest.RawQuery = "id=myID" + dest.RawQuery = "" ws, err := websocket.Dial(dest.String(), "", "http://localhost") if err != nil { t.Errorf("unexpected error: %v", err) } - if a, e := simpleStorage.requestedID, "myID"; a != e { - t.Fatalf("Expected %v, got %v", e, a) - } - try := func(action watch.EventType, object interface{}) { // Send simpleStorage.fakeWatch.Action(action, object) @@ -98,7 +95,7 @@ func TestWatchHTTP(t *testing.T) { dest, _ := url.Parse(server.URL) dest.Path = "/prefix/version/watch/foo" - dest.RawQuery = "id=myID" + dest.RawQuery = "" request, err := http.NewRequest("GET", dest.String(), nil) if err != nil { @@ -114,10 +111,6 @@ func TestWatchHTTP(t *testing.T) { t.Errorf("Unexpected response %#v", response) } - if a, e := simpleStorage.requestedID, "myID"; a != e { - t.Fatalf("Expected %v, got %v", e, a) - } - decoder := json.NewDecoder(response.Body) try := func(action watch.EventType, object interface{}) { @@ -164,26 +157,27 @@ func TestWatchParamParsing(t *testing.T) { resourceVersion uint64 labelSelector string fieldSelector string - id string }{ { - rawQuery: "id=myID&resourceVersion=1234", + rawQuery: "resourceVersion=1234", resourceVersion: 1234, labelSelector: "", fieldSelector: "", - id: "myID", }, { rawQuery: "resourceVersion=314159&fields=Host%3D&labels=name%3Dfoo", resourceVersion: 314159, labelSelector: "name=foo", fieldSelector: "Host=", - id: "", + }, { + rawQuery: "fields=ID%3dfoo&resourceVersion=1492", + resourceVersion: 1492, + labelSelector: "", + fieldSelector: "ID=foo", }, { rawQuery: "", resourceVersion: 0, labelSelector: "", fieldSelector: "", - id: "", }, } @@ -191,7 +185,6 @@ func TestWatchParamParsing(t *testing.T) { simpleStorage.requestedLabelSelector = nil simpleStorage.requestedFieldSelector = nil simpleStorage.requestedResourceVersion = 5 // Prove this is set in all cases - simpleStorage.requestedID = "" dest.RawQuery = item.rawQuery resp, err := http.Get(dest.String()) if err != nil { @@ -199,19 +192,14 @@ func TestWatchParamParsing(t *testing.T) { continue } resp.Body.Close() - if e, a := item.id, simpleStorage.requestedID; e != a { - t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a) - } if e, a := item.resourceVersion, simpleStorage.requestedResourceVersion; e != a { t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a) } - if simpleStorage.requestedID == "" { - if e, a := item.labelSelector, simpleStorage.requestedLabelSelector.String(); e != a { - t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a) - } - if e, a := item.fieldSelector, simpleStorage.requestedFieldSelector.String(); e != a { - t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a) - } + if e, a := item.labelSelector, simpleStorage.requestedLabelSelector.String(); e != a { + t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a) + } + if e, a := item.fieldSelector, simpleStorage.requestedFieldSelector.String(); e != a { + t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a) } } } diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index 64367b7f7ed..d5a9a0a487e 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -24,7 +24,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/golang/glog" ) @@ -37,9 +36,6 @@ type ReplicationManager struct { // To allow injection of syncReplicationController for testing. syncHandler func(controllerSpec api.ReplicationController) error - - // To allow injection of watch creation. - watchMaker func() (watch.Interface, error) } // PodControlInterface is an interface that knows how to add or delete pods diff --git a/pkg/registry/controllerstorage.go b/pkg/registry/controllerstorage.go index 7a8cdff5835..96f761d8f72 100644 --- a/pkg/registry/controllerstorage.go +++ b/pkg/registry/controllerstorage.go @@ -17,7 +17,6 @@ limitations under the License. package registry import ( - "errors" "fmt" "time" @@ -138,12 +137,6 @@ func (storage *ControllerRegistryStorage) waitForController(ctrl api.Replication // WatchAll returns ReplicationController events via a watch.Interface, implementing // apiserver.ResourceWatcher. -func (storage *ControllerRegistryStorage) WatchAll(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (storage *ControllerRegistryStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { return storage.registry.WatchControllers(label, field, resourceVersion) } - -// WatchSingle returns events for a single ReplicationController via a watch.Interface, -// implementing apiserver.ResourceWatcher. -func (storage *ControllerRegistryStorage) WatchSingle(id string, resourceVersion uint64) (watch.Interface, error) { - return nil, errors.New("unimplemented") -} diff --git a/pkg/tools/etcd_tools_test.go b/pkg/tools/etcd_tools_test.go index fbfc0df6fad..fec310c81fd 100644 --- a/pkg/tools/etcd_tools_test.go +++ b/pkg/tools/etcd_tools_test.go @@ -329,9 +329,9 @@ func TestWatchInterpretation_ListCreate(t *testing.T) { w := newEtcdWatcher(true, func(interface{}) bool { t.Errorf("unexpected filter call") return true - }, encoding) + }, codec) pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} - podBytes, _ := encoding.Encode(pod) + podBytes, _ := codec.Encode(pod) go w.sendResult(&etcd.Response{ Action: "create", From 5dd130a350388f8c3a9aaefe3508905752870d15 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Fri, 8 Aug 2014 13:50:04 -0700 Subject: [PATCH 7/7] Prevent accidental setting of sync or timeout --- pkg/apiserver/watch_test.go | 2 +- pkg/client/request.go | 40 +++++++++++++------ pkg/client/request_test.go | 19 +++++++++ pkg/controller/replication_controller.go | 12 +++--- pkg/controller/replication_controller_test.go | 4 +- 5 files changed, 55 insertions(+), 22 deletions(-) diff --git a/pkg/apiserver/watch_test.go b/pkg/apiserver/watch_test.go index 693026f8edf..dbbfa7dedab 100644 --- a/pkg/apiserver/watch_test.go +++ b/pkg/apiserver/watch_test.go @@ -146,7 +146,7 @@ func TestWatchParamParsing(t *testing.T) { simpleStorage := &SimpleRESTStorage{} handler := New(map[string]RESTStorage{ "foo": simpleStorage, - }, "/prefix/version") + }, codec, "/prefix/version") server := httptest.NewServer(handler) dest, _ := url.Parse(server.URL) diff --git a/pkg/client/request.go b/pkg/client/request.go index e307f50e65b..ceb01215994 100644 --- a/pkg/client/request.go +++ b/pkg/client/request.go @@ -30,10 +30,15 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/golang/glog" ) +// specialParams lists parameters that are handled specially and which users of Request +// are therefore not allowed to set manually. +var specialParams = util.NewStringSet("sync", "timeout") + // Verb begins a request with a verb (GET, POST, PUT, DELETE) // // Example usage of Client's request building interface: @@ -134,12 +139,12 @@ func (r *Request) ParseSelectorParam(paramName, item string) *Request { if r.err != nil { return r } - if sel, err := labels.ParseSelector(item); err != nil { + sel, err := labels.ParseSelector(item) + if err != nil { r.err = err - } else { - r.params[paramName] = sel.String() + return r } - return r + return r.setParam(paramName, sel.String()) } // SelectorParam adds the given selector as a query parameter with the name paramName. @@ -147,8 +152,7 @@ func (r *Request) SelectorParam(paramName string, s labels.Selector) *Request { if r.err != nil { return r } - r.params[paramName] = s.String() - return r + return r.setParam(paramName, s.String()) } // UintParam creates a query parameter with the given value. @@ -156,7 +160,15 @@ func (r *Request) UintParam(paramName string, u uint64) *Request { if r.err != nil { return r } - r.params[paramName] = strconv.FormatUint(u, 10) + return r.setParam(paramName, strconv.FormatUint(u, 10)) +} + +func (r *Request) setParam(paramName, value string) *Request { + if specialParams.Has(paramName) { + r.err = fmt.Errorf("must set %v through the corresponding function, not directly.", paramName) + return r + } + r.params[paramName] = value return r } @@ -181,21 +193,23 @@ func (r *Request) Body(obj interface{}) *Request { } switch t := obj.(type) { case string: - if data, err := ioutil.ReadFile(t); err != nil { + data, err := ioutil.ReadFile(t) + if err != nil { r.err = err - } else { - r.body = bytes.NewBuffer(data) + return r } + r.body = bytes.NewBuffer(data) case []byte: r.body = bytes.NewBuffer(t) case io.Reader: r.body = t default: - if data, err := api.Encode(obj); err != nil { + data, err := api.Encode(obj) + if err != nil { r.err = err - } else { - r.body = bytes.NewBuffer(data) + return r } + r.body = bytes.NewBuffer(data) } return r } diff --git a/pkg/client/request_test.go b/pkg/client/request_test.go index 6dd9e72cd7d..5e5ddc4cc55 100644 --- a/pkg/client/request_test.go +++ b/pkg/client/request_test.go @@ -264,6 +264,25 @@ func TestUintParam(t *testing.T) { } } +func TestUnacceptableParamNames(t *testing.T) { + table := []struct { + name string + testVal string + expectSuccess bool + }{ + {"sync", "foo", false}, + {"timeout", "42", false}, + } + + for _, item := range table { + c := New("", nil) + r := c.Get().setParam(item.name, item.testVal) + if e, a := item.expectSuccess, r.err == nil; e != a { + t.Errorf("expected %v, got %v (%v)", e, a, r.err) + } + } +} + func TestSetPollPeriod(t *testing.T) { c := New("", nil) r := c.Get() diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index d5a9a0a487e..8bacad7e870 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -87,16 +87,16 @@ func MakeReplicationManager(kubeClient client.Interface) *ReplicationManager { // Run begins watching and syncing. func (rm *ReplicationManager) Run(period time.Duration) { rm.syncTime = time.Tick(period) - index := uint64(0) - go util.Forever(func() { rm.watchControllers(&index) }, period) + resourceVersion := uint64(0) + go util.Forever(func() { rm.watchControllers(&resourceVersion) }, period) } -// index is a pointer to the resource version to use/update. -func (rm *ReplicationManager) watchControllers(index *uint64) { +// resourceVersion is a pointer to the resource version to use/update. +func (rm *ReplicationManager) watchControllers(resourceVersion *uint64) { watching, err := rm.kubeClient.WatchReplicationControllers( labels.Everything(), labels.Everything(), - *index, + *resourceVersion, ) if err != nil { glog.Errorf("Unexpected failure to watch: %v", err) @@ -120,7 +120,7 @@ func (rm *ReplicationManager) watchControllers(index *uint64) { glog.Errorf("unexpected object: %#v", event.Object) } else { // If we get disconnected, start where we left off. - *index = rc.ResourceVersion + 1 + *resourceVersion = rc.ResourceVersion + 1 rm.syncHandler(*rc) } } diff --git a/pkg/controller/replication_controller_test.go b/pkg/controller/replication_controller_test.go index b62d5fb70ab..dcbab8b0af1 100644 --- a/pkg/controller/replication_controller_test.go +++ b/pkg/controller/replication_controller_test.go @@ -339,8 +339,8 @@ func TestWatchControllers(t *testing.T) { return nil } - index := uint64(0) - go manager.watchControllers(&index) + resourceVersion := uint64(0) + go manager.watchControllers(&resourceVersion) // Test normal case testControllerSpec.ID = "foo"