diff --git a/cmd/kubecfg/kubecfg.go b/cmd/kubecfg/kubecfg.go index 9b6a9dc2d2c..c99006af064 100644 --- a/cmd/kubecfg/kubecfg.go +++ b/cmd/kubecfg/kubecfg.go @@ -247,7 +247,7 @@ func executeAPIRequest(method string, s *kube_client.Client) bool { r := s.Verb(verb). Path(path). - ParseSelector(*selector) + ParseSelectorParam("labels", *selector) if setBody { if version != 0 { data := readConfig(storage) diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index eaf109f515f..a75e347e30c 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -64,11 +64,11 @@ type SimpleRESTStorage struct { updated *Simple created *Simple - // Valid if WatchAll or WatchSingle is called - fakeWatch *watch.FakeWatcher - - // Set if WatchSingle is called - requestedID string + // These are set when Watch is called + fakeWatch *watch.FakeWatcher + requestedLabelSelector labels.Selector + requestedFieldSelector labels.Selector + requestedResourceVersion uint64 // If non-nil, called inside the WorkFunc when answering update, delete, create. // obj receives the original input to the update, delete, or create call. @@ -95,7 +95,7 @@ func (storage *SimpleRESTStorage) Delete(id string) (<-chan interface{}, error) if storage.injectedFunction != nil { return storage.injectedFunction(id) } - return api.Status{Status: api.StatusSuccess}, nil + return &api.Status{Status: api.StatusSuccess}, nil }), nil } @@ -130,18 +130,11 @@ func (storage *SimpleRESTStorage) Update(obj interface{}) (<-chan interface{}, e } // Implement ResourceWatcher. -func (storage *SimpleRESTStorage) WatchAll() (watch.Interface, error) { - if err := storage.errors["watchAll"]; err != nil { - return nil, err - } - storage.fakeWatch = watch.NewFake() - return storage.fakeWatch, nil -} - -// Implement ResourceWatcher. -func (storage *SimpleRESTStorage) WatchSingle(id string) (watch.Interface, error) { - storage.requestedID = id - if err := storage.errors["watchSingle"]; err != nil { +func (storage *SimpleRESTStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + storage.requestedLabelSelector = label + storage.requestedFieldSelector = field + storage.requestedResourceVersion = resourceVersion + if err := storage.errors["watch"]; err != nil { return nil, err } storage.fakeWatch = watch.NewFake() @@ -164,17 +157,17 @@ func TestNotFound(t *testing.T) { Path string } cases := map[string]T{ - "PATCH method": T{"PATCH", "/prefix/version/foo"}, - "GET long prefix": T{"GET", "/prefix/"}, - "GET missing storage": T{"GET", "/prefix/version/blah"}, - "GET with extra segment": T{"GET", "/prefix/version/foo/bar/baz"}, - "POST with extra segment": T{"POST", "/prefix/version/foo/bar"}, - "DELETE without extra segment": T{"DELETE", "/prefix/version/foo"}, - "DELETE with extra segment": T{"DELETE", "/prefix/version/foo/bar/baz"}, - "PUT without extra segment": T{"PUT", "/prefix/version/foo"}, - "PUT with extra segment": T{"PUT", "/prefix/version/foo/bar/baz"}, - "watch missing storage": T{"GET", "/prefix/version/watch/"}, - "watch with bad method": T{"POST", "/prefix/version/watch/foo/bar"}, + "PATCH method": {"PATCH", "/prefix/version/foo"}, + "GET long prefix": {"GET", "/prefix/"}, + "GET missing storage": {"GET", "/prefix/version/blah"}, + "GET with extra segment": {"GET", "/prefix/version/foo/bar/baz"}, + "POST with extra segment": {"POST", "/prefix/version/foo/bar"}, + "DELETE without extra segment": {"DELETE", "/prefix/version/foo"}, + "DELETE with extra segment": {"DELETE", "/prefix/version/foo/bar/baz"}, + "PUT without extra segment": {"PUT", "/prefix/version/foo"}, + "PUT with extra segment": {"PUT", "/prefix/version/foo/bar/baz"}, + "watch missing storage": {"GET", "/prefix/version/watch/"}, + "watch with bad method": {"POST", "/prefix/version/watch/foo/bar"}, } handler := New(map[string]RESTStorage{ "foo": &SimpleRESTStorage{}, diff --git a/pkg/apiserver/interfaces.go b/pkg/apiserver/interfaces.go index 0b47419fecc..2b1aa23cab6 100644 --- a/pkg/apiserver/interfaces.go +++ b/pkg/apiserver/interfaces.go @@ -29,14 +29,17 @@ type RESTStorage interface { New() interface{} // List selects resources in the storage which match to the selector. + // TODO: add field selector in addition to label selector. List(labels.Selector) (interface{}, error) // Get finds a resource in the storage by id and returns it. - // Although it can return an arbitrary error value, IsNotFound(err) is true for the returned error value err when the specified resource is not found. + // Although it can return an arbitrary error value, IsNotFound(err) is true for the + // returned error value err when the specified resource is not found. Get(id string) (interface{}, error) // Delete finds a resource in the storage and deletes it. - // Although it can return an arbitrary error value, IsNotFound(err) is true for the returned error value err when the specified resource is not found. + // Although it can return an arbitrary error value, IsNotFound(err) is true for the + // returned error value err when the specified resource is not found. Delete(id string) (<-chan interface{}, error) Create(interface{}) (<-chan interface{}, error) @@ -46,7 +49,9 @@ type RESTStorage interface { // ResourceWatcher should be implemented by all RESTStorage objects that // want to offer the ability to watch for changes through the watch api. type ResourceWatcher interface { - // TODO: take a query, like List, to filter out unwanted events. - WatchAll() (watch.Interface, error) - WatchSingle(id string) (watch.Interface, error) + // 'label' selects on labels; 'field' selects on the object's fields. Not all fields + // are supported; an error should be returned if 'field' tries to select on a field that + // isn't supported. 'resourceVersion' allows for continuing/starting a watch at a + // particular version. + Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) } diff --git a/pkg/apiserver/watch.go b/pkg/apiserver/watch.go index 5b8480cd168..b2315b78563 100644 --- a/pkg/apiserver/watch.go +++ b/pkg/apiserver/watch.go @@ -19,10 +19,13 @@ package apiserver import ( "encoding/json" "net/http" + "net/url" + "strconv" "code.google.com/p/go.net/websocket" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) @@ -30,6 +33,23 @@ type WatchHandler struct { storage map[string]RESTStorage } +func getWatchParams(query url.Values) (label, field labels.Selector, resourceVersion uint64) { + if s, err := labels.ParseSelector(query.Get("labels")); err != nil { + label = labels.Everything() + } else { + label = s + } + if s, err := labels.ParseSelector(query.Get("fields")); err != nil { + field = labels.Everything() + } else { + field = s + } + if rv, err := strconv.ParseUint(query.Get("resourceVersion"), 10, 64); err == nil { + resourceVersion = rv + } + return label, field, resourceVersion +} + // handleWatch processes a watch request func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { parts := splitPath(req.URL.Path) @@ -41,13 +61,8 @@ func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { notFound(w, req) } if watcher, ok := storage.(ResourceWatcher); ok { - var watching watch.Interface - var err error - if id := req.URL.Query().Get("id"); id != "" { - watching, err = watcher.WatchSingle(id) - } else { - watching, err = watcher.WatchAll() - } + label, field, resourceVersion := getWatchParams(req.URL.Query()) + watching, err := watcher.Watch(label, field, resourceVersion) if err != nil { internalError(err, w) return diff --git a/pkg/apiserver/watch_test.go b/pkg/apiserver/watch_test.go index f737b209630..dbbfa7dedab 100644 --- a/pkg/apiserver/watch_test.go +++ b/pkg/apiserver/watch_test.go @@ -40,6 +40,7 @@ var watchTestTable = []struct { func TestWatchWebsocket(t *testing.T) { simpleStorage := &SimpleRESTStorage{} + _ = ResourceWatcher(simpleStorage) // Give compile error if this doesn't work. handler := New(map[string]RESTStorage{ "foo": simpleStorage, }, codec, "/prefix/version") @@ -48,17 +49,13 @@ func TestWatchWebsocket(t *testing.T) { dest, _ := url.Parse(server.URL) dest.Scheme = "ws" // Required by websocket, though the server never sees it. dest.Path = "/prefix/version/watch/foo" - dest.RawQuery = "id=myID" + dest.RawQuery = "" ws, err := websocket.Dial(dest.String(), "", "http://localhost") if err != nil { t.Errorf("unexpected error: %v", err) } - if a, e := simpleStorage.requestedID, "myID"; a != e { - t.Fatalf("Expected %v, got %v", e, a) - } - try := func(action watch.EventType, object interface{}) { // Send simpleStorage.fakeWatch.Action(action, object) @@ -98,7 +95,7 @@ func TestWatchHTTP(t *testing.T) { dest, _ := url.Parse(server.URL) dest.Path = "/prefix/version/watch/foo" - dest.RawQuery = "id=myID" + dest.RawQuery = "" request, err := http.NewRequest("GET", dest.String(), nil) if err != nil { @@ -114,10 +111,6 @@ func TestWatchHTTP(t *testing.T) { t.Errorf("Unexpected response %#v", response) } - if a, e := simpleStorage.requestedID, "myID"; a != e { - t.Fatalf("Expected %v, got %v", e, a) - } - decoder := json.NewDecoder(response.Body) try := func(action watch.EventType, object interface{}) { @@ -148,3 +141,65 @@ func TestWatchHTTP(t *testing.T) { t.Errorf("Unexpected non-error") } } + +func TestWatchParamParsing(t *testing.T) { + simpleStorage := &SimpleRESTStorage{} + handler := New(map[string]RESTStorage{ + "foo": simpleStorage, + }, codec, "/prefix/version") + server := httptest.NewServer(handler) + + dest, _ := url.Parse(server.URL) + dest.Path = "/prefix/version/watch/foo" + + table := []struct { + rawQuery string + resourceVersion uint64 + labelSelector string + fieldSelector string + }{ + { + rawQuery: "resourceVersion=1234", + resourceVersion: 1234, + labelSelector: "", + fieldSelector: "", + }, { + rawQuery: "resourceVersion=314159&fields=Host%3D&labels=name%3Dfoo", + resourceVersion: 314159, + labelSelector: "name=foo", + fieldSelector: "Host=", + }, { + rawQuery: "fields=ID%3dfoo&resourceVersion=1492", + resourceVersion: 1492, + labelSelector: "", + fieldSelector: "ID=foo", + }, { + rawQuery: "", + resourceVersion: 0, + labelSelector: "", + fieldSelector: "", + }, + } + + for _, item := range table { + simpleStorage.requestedLabelSelector = nil + simpleStorage.requestedFieldSelector = nil + simpleStorage.requestedResourceVersion = 5 // Prove this is set in all cases + dest.RawQuery = item.rawQuery + resp, err := http.Get(dest.String()) + if err != nil { + t.Errorf("%v: unexpected error: %v", item.rawQuery, err) + continue + } + resp.Body.Close() + if e, a := item.resourceVersion, simpleStorage.requestedResourceVersion; e != a { + t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a) + } + if e, a := item.labelSelector, simpleStorage.requestedLabelSelector.String(); e != a { + t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a) + } + if e, a := item.fieldSelector, simpleStorage.requestedFieldSelector.String(); e != a { + t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a) + } + } +} diff --git a/pkg/client/client.go b/pkg/client/client.go index 3a41f8613da..e7c16761158 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -28,11 +28,14 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/version" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/golang/glog" ) // Interface holds the methods for clients of Kubenetes, // an interface to allow mock testing. +// TODO: split this up by resource? +// TODO: these should return/take pointers. type Interface interface { ListPods(selector labels.Selector) (api.PodList, error) GetPod(name string) (api.Pod, error) @@ -45,6 +48,7 @@ type Interface interface { CreateReplicationController(api.ReplicationController) (api.ReplicationController, error) UpdateReplicationController(api.ReplicationController) (api.ReplicationController, error) DeleteReplicationController(string) error + WatchReplicationControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) GetService(name string) (api.Service, error) CreateService(api.Service) (api.Service, error) @@ -169,7 +173,7 @@ func (c *Client) makeURL(path string) string { // ListPods takes a selector, and returns the list of pods that match that selector func (c *Client) ListPods(selector labels.Selector) (result api.PodList, err error) { - err = c.Get().Path("pods").Selector(selector).Do().Into(&result) + err = c.Get().Path("pods").SelectorParam("labels", selector).Do().Into(&result) return } @@ -202,7 +206,7 @@ func (c *Client) UpdatePod(pod api.Pod) (result api.Pod, err error) { // ListReplicationControllers takes a selector, and returns the list of replication controllers that match that selector func (c *Client) ListReplicationControllers(selector labels.Selector) (result api.ReplicationControllerList, err error) { - err = c.Get().Path("replicationControllers").Selector(selector).Do().Into(&result) + err = c.Get().Path("replicationControllers").SelectorParam("labels", selector).Do().Into(&result) return } @@ -233,6 +237,17 @@ func (c *Client) DeleteReplicationController(name string) error { return c.Delete().Path("replicationControllers").Path(name).Do().Error() } +// WatchReplicationControllers returns a watch.Interface that watches the requested controllers. +func (c *Client) WatchReplicationControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return c.Get(). + Path("watch"). + Path("replicationControllers"). + UintParam("resourceVersion", resourceVersion). + SelectorParam("labels", label). + SelectorParam("fields", field). + Watch() +} + // GetService returns information about a particular service. func (c *Client) GetService(name string) (result api.Service, err error) { err = c.Get().Path("services").Path(name).Do().Into(&result) diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index 47a0324bf06..1ddea772b1e 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -42,7 +42,7 @@ func TestListEmptyPods(t *testing.T) { Request: testRequest{Method: "GET", Path: "/pods"}, Response: Response{StatusCode: 200, Body: api.PodList{}}, } - podList, err := c.Setup().ListPods(nil) + podList, err := c.Setup().ListPods(labels.Everything()) c.Validate(t, podList, err) } @@ -65,7 +65,7 @@ func TestListPods(t *testing.T) { }, }, } - receivedPodList, err := c.Setup().ListPods(nil) + receivedPodList, err := c.Setup().ListPods(labels.Everything()) c.Validate(t, receivedPodList, err) } @@ -191,7 +191,7 @@ func TestListControllers(t *testing.T) { }, }, } - receivedControllerList, err := c.Setup().ListReplicationControllers(nil) + receivedControllerList, err := c.Setup().ListReplicationControllers(labels.Everything()) c.Validate(t, receivedControllerList, err) } diff --git a/pkg/client/fake.go b/pkg/client/fake.go new file mode 100644 index 00000000000..e6f61a11c8c --- /dev/null +++ b/pkg/client/fake.go @@ -0,0 +1,105 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// FakeClient implements Interface. Meant to be embedded into a struct to get a default +// implementation. This makes faking out just the method you want to test easier. +type FakeClient struct { + // FakeClient by default keeps a simple list of the methods that have been called. + Actions []string +} + +func (client *FakeClient) ListPods(selector labels.Selector) (api.PodList, error) { + client.Actions = append(client.Actions, "list-pods") + return api.PodList{}, nil +} + +func (client *FakeClient) GetPod(name string) (api.Pod, error) { + client.Actions = append(client.Actions, "get-pod") + return api.Pod{}, nil +} + +func (client *FakeClient) DeletePod(name string) error { + client.Actions = append(client.Actions, "delete-pod") + return nil +} + +func (client *FakeClient) CreatePod(pod api.Pod) (api.Pod, error) { + client.Actions = append(client.Actions, "create-pod") + return api.Pod{}, nil +} + +func (client *FakeClient) UpdatePod(pod api.Pod) (api.Pod, error) { + client.Actions = append(client.Actions, "update-pod") + return api.Pod{}, nil +} + +func (client *FakeClient) ListReplicationControllers(selector labels.Selector) (api.ReplicationControllerList, error) { + client.Actions = append(client.Actions, "list-controllers") + return api.ReplicationControllerList{}, nil +} + +func (client *FakeClient) GetReplicationController(name string) (api.ReplicationController, error) { + client.Actions = append(client.Actions, "get-controller") + return api.ReplicationController{}, nil +} + +func (client *FakeClient) CreateReplicationController(controller api.ReplicationController) (api.ReplicationController, error) { + client.Actions = append(client.Actions, "create-controller") + return api.ReplicationController{}, nil +} + +func (client *FakeClient) UpdateReplicationController(controller api.ReplicationController) (api.ReplicationController, error) { + client.Actions = append(client.Actions, "update-controller") + return api.ReplicationController{}, nil +} + +func (client *FakeClient) DeleteReplicationController(controller string) error { + client.Actions = append(client.Actions, "delete-controller") + return nil +} + +func (client *FakeClient) WatchReplicationControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + client.Actions = append(client.Actions, "watch-controllers") + return watch.NewFake(), nil +} + +func (client *FakeClient) GetService(name string) (api.Service, error) { + client.Actions = append(client.Actions, "get-controller") + return api.Service{}, nil +} + +func (client *FakeClient) CreateService(controller api.Service) (api.Service, error) { + client.Actions = append(client.Actions, "create-service") + return api.Service{}, nil +} + +func (client *FakeClient) UpdateService(controller api.Service) (api.Service, error) { + client.Actions = append(client.Actions, "update-service") + return api.Service{}, nil +} + +func (client *FakeClient) DeleteService(controller string) error { + client.Actions = append(client.Actions, "delete-service") + return nil +} diff --git a/pkg/client/fake_test.go b/pkg/client/fake_test.go new file mode 100644 index 00000000000..9c739e24142 --- /dev/null +++ b/pkg/client/fake_test.go @@ -0,0 +1,37 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client + +import ( + "testing" +) + +// This test file just ensures that FakeClient and structs it is embedded in +// implement Interface. + +func TestFakeImplementsInterface(t *testing.T) { + _ = Interface(&FakeClient{}) +} + +type MyFake struct { + *FakeClient +} + +func TestEmbeddedFakeImplementsInterface(t *testing.T) { + _ = Interface(MyFake{&FakeClient{}}) + _ = Interface(&MyFake{&FakeClient{}}) +} diff --git a/pkg/client/request.go b/pkg/client/request.go index 729aa13281a..ceb01215994 100644 --- a/pkg/client/request.go +++ b/pkg/client/request.go @@ -24,27 +24,34 @@ import ( "net/http" "net/url" "path" + "strconv" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/golang/glog" ) -// Server contains info locating a kubernetes api server. -// Example usage: +// specialParams lists parameters that are handled specially and which users of Request +// are therefore not allowed to set manually. +var specialParams = util.NewStringSet("sync", "timeout") + +// Verb begins a request with a verb (GET, POST, PUT, DELETE) +// +// Example usage of Client's request building interface: // auth, err := LoadAuth(filename) // c := New(url, auth) // resp, err := c.Verb("GET"). // Path("pods"). -// Selector("area=staging"). +// SelectorParam("labels", "area=staging"). // Timeout(10*time.Second). // Do() -// list, ok := resp.(api.PodList) - -// Verb begins a request with a verb (GET, POST, PUT, DELETE) +// if err != nil { ... } +// list, ok := resp.(*api.PodList) +// func (c *Client) Verb(verb string) *Request { return &Request{ verb: verb, @@ -52,26 +59,27 @@ func (c *Client) Verb(verb string) *Request { path: "/api/v1beta1", sync: c.Sync, timeout: c.Timeout, + params: map[string]string{}, pollPeriod: c.PollPeriod, } } -// Post begins a POST request. +// Post begins a POST request. Short for c.Verb("POST"). func (c *Client) Post() *Request { return c.Verb("POST") } -// Put begins a PUT request. +// Put begins a PUT request. Short for c.Verb("PUT"). func (c *Client) Put() *Request { return c.Verb("PUT") } -// Get begins a GET request. +// Get begins a GET request. Short for c.Verb("GET"). func (c *Client) Get() *Request { return c.Verb("GET") } -// Delete begins a DELETE request. +// Delete begins a DELETE request. Short for c.Verb("DELETE"). func (c *Client) Delete() *Request { return c.Verb("DELETE") } @@ -90,6 +98,7 @@ type Request struct { verb string path string body io.Reader + params map[string]string selector labels.Selector timeout time.Duration sync bool @@ -105,7 +114,7 @@ func (r *Request) Path(item string) *Request { return r } -// Sync sets sync/async call status. +// Sync sets sync/async call status by setting the "sync" parameter to "true"/"false" func (r *Request) Sync(sync bool) *Request { if r.err != nil { return r @@ -123,25 +132,48 @@ func (r *Request) AbsPath(path string) *Request { return r } -// ParseSelector parses the given string as a resource label selector. Optional. -func (r *Request) ParseSelector(item string) *Request { +// ParseSelectorParam parses the given string as a resource label selector. +// This is a convenience function so you don't have to first check that it's a +// validly formatted selector. +func (r *Request) ParseSelectorParam(paramName, item string) *Request { if r.err != nil { return r } - r.selector, r.err = labels.ParseSelector(item) - return r + sel, err := labels.ParseSelector(item) + if err != nil { + r.err = err + return r + } + return r.setParam(paramName, sel.String()) } -// Selector makes the request use the given selector. -func (r *Request) Selector(s labels.Selector) *Request { +// SelectorParam adds the given selector as a query parameter with the name paramName. +func (r *Request) SelectorParam(paramName string, s labels.Selector) *Request { if r.err != nil { return r } - r.selector = s + return r.setParam(paramName, s.String()) +} + +// UintParam creates a query parameter with the given value. +func (r *Request) UintParam(paramName string, u uint64) *Request { + if r.err != nil { + return r + } + return r.setParam(paramName, strconv.FormatUint(u, 10)) +} + +func (r *Request) setParam(paramName, value string) *Request { + if specialParams.Has(paramName) { + r.err = fmt.Errorf("must set %v through the corresponding function, not directly.", paramName) + return r + } + r.params[paramName] = value return r } -// Timeout makes the request use the given duration as a timeout. Optional. +// Timeout makes the request use the given duration as a timeout. Sets the "timeout" +// parameter. Ignored if sync=false. func (r *Request) Timeout(d time.Duration) *Request { if r.err != nil { return r @@ -153,6 +185,7 @@ func (r *Request) Timeout(d time.Duration) *Request { // Body makes the request use obj as the body. Optional. // If obj is a string, try to read a file of that name. // If obj is a []byte, send it directly. +// If obj is an io.Reader, use it directly. // Otherwise, assume obj is an api type and marshall it correctly. func (r *Request) Body(obj interface{}) *Request { if r.err != nil { @@ -169,7 +202,7 @@ func (r *Request) Body(obj interface{}) *Request { case []byte: r.body = bytes.NewBuffer(t) case io.Reader: - r.body = obj.(io.Reader) + r.body = t default: data, err := api.Encode(obj) if err != nil { @@ -197,9 +230,11 @@ func (r *Request) PollPeriod(d time.Duration) *Request { func (r *Request) finalURL() string { finalURL := r.c.host + r.path query := url.Values{} - if r.selector != nil { - query.Add("labels", r.selector.String()) + for key, value := range r.params { + query.Add(key, value) } + // sync and timeout are handled specially here, to allow setting them + // in any order. if r.sync { query.Add("sync", "true") if r.timeout != 0 { diff --git a/pkg/client/request_test.go b/pkg/client/request_test.go index 0b80f4819cd..5e5ddc4cc55 100644 --- a/pkg/client/request_test.go +++ b/pkg/client/request_test.go @@ -49,7 +49,7 @@ func TestDoRequestNewWay(t *testing.T) { obj, err := s.Verb("POST"). Path("foo/bar"). Path("baz"). - ParseSelector("name=foo"). + ParseSelectorParam("labels", "name=foo"). Timeout(time.Second). Body([]byte(reqBody)). Do().Get() @@ -87,7 +87,7 @@ func TestDoRequestNewWayReader(t *testing.T) { obj, err := s.Verb("POST"). Path("foo/bar"). Path("baz"). - Selector(labels.Set{"name": "foo"}.AsSelector()). + SelectorParam("labels", labels.Set{"name": "foo"}.AsSelector()). Sync(false). Timeout(time.Second). Body(bytes.NewBuffer(reqBodyExpected)). @@ -127,7 +127,7 @@ func TestDoRequestNewWayObj(t *testing.T) { obj, err := s.Verb("POST"). Path("foo/bar"). Path("baz"). - Selector(labels.Set{"name": "foo"}.AsSelector()). + SelectorParam("labels", labels.Set{"name": "foo"}.AsSelector()). Timeout(time.Second). Body(reqObj). Do().Get() @@ -180,7 +180,7 @@ func TestDoRequestNewWayFile(t *testing.T) { obj, err := s.Verb("POST"). Path("foo/bar"). Path("baz"). - ParseSelector("name=foo"). + ParseSelectorParam("labels", "name=foo"). Timeout(time.Second). Body(file.Name()). Do().Get() @@ -244,6 +244,45 @@ func TestSync(t *testing.T) { } } +func TestUintParam(t *testing.T) { + table := []struct { + name string + testVal uint64 + expectStr string + }{ + {"foo", 31415, "?foo=31415"}, + {"bar", 42, "?bar=42"}, + {"baz", 0, "?baz=0"}, + } + + for _, item := range table { + c := New("", nil) + r := c.Get().AbsPath("").UintParam(item.name, item.testVal) + if e, a := item.expectStr, r.finalURL(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + } +} + +func TestUnacceptableParamNames(t *testing.T) { + table := []struct { + name string + testVal string + expectSuccess bool + }{ + {"sync", "foo", false}, + {"timeout", "42", false}, + } + + for _, item := range table { + c := New("", nil) + r := c.Get().setParam(item.name, item.testVal) + if e, a := item.expectSuccess, r.err == nil; e != a { + t.Errorf("expected %v, got %v (%v)", e, a, r.err) + } + } +} + func TestSetPollPeriod(t *testing.T) { c := New("", nil) r := c.Get() diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index d9845c59d1f..8bacad7e870 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -24,7 +24,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/golang/glog" ) @@ -37,9 +36,6 @@ type ReplicationManager struct { // To allow injection of syncReplicationController for testing. syncHandler func(controllerSpec api.ReplicationController) error - - // To allow injection of watch creation. - watchMaker func() (watch.Interface, error) } // PodControlInterface is an interface that knows how to add or delete pods @@ -85,28 +81,23 @@ func MakeReplicationManager(kubeClient client.Interface) *ReplicationManager { }, } rm.syncHandler = rm.syncReplicationController - rm.watchMaker = rm.makeAPIWatch return rm } // Run begins watching and syncing. func (rm *ReplicationManager) Run(period time.Duration) { rm.syncTime = time.Tick(period) - go util.Forever(func() { rm.watchControllers() }, period) + resourceVersion := uint64(0) + go util.Forever(func() { rm.watchControllers(&resourceVersion) }, period) } -// makeAPIWatch starts watching via the apiserver. -func (rm *ReplicationManager) makeAPIWatch() (watch.Interface, error) { - // TODO: Fix this ugly type assertion. - return rm.kubeClient.(*client.Client). - Get(). - Path("watch"). - Path("replicationControllers"). - Watch() -} - -func (rm *ReplicationManager) watchControllers() { - watching, err := rm.watchMaker() +// resourceVersion is a pointer to the resource version to use/update. +func (rm *ReplicationManager) watchControllers(resourceVersion *uint64) { + watching, err := rm.kubeClient.WatchReplicationControllers( + labels.Everything(), + labels.Everything(), + *resourceVersion, + ) if err != nil { glog.Errorf("Unexpected failure to watch: %v", err) time.Sleep(5 * time.Second) @@ -128,6 +119,8 @@ func (rm *ReplicationManager) watchControllers() { if rc, ok := event.Object.(*api.ReplicationController); !ok { glog.Errorf("unexpected object: %#v", event.Object) } else { + // If we get disconnected, start where we left off. + *resourceVersion = rc.ResourceVersion + 1 rm.syncHandler(*rc) } } diff --git a/pkg/controller/replication_controller_test.go b/pkg/controller/replication_controller_test.go index bebaa948202..dcbab8b0af1 100644 --- a/pkg/controller/replication_controller_test.go +++ b/pkg/controller/replication_controller_test.go @@ -28,6 +28,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -305,7 +306,7 @@ func TestSyncronize(t *testing.T) { w.WriteHeader(http.StatusNotFound) t.Errorf("Unexpected request for %v", req.RequestURI) }) - testServer := httptest.NewTLSServer(mux) + testServer := httptest.NewServer(mux) client := client.New(testServer.URL, nil) manager := MakeReplicationManager(client) fakePodControl := FakePodControl{} @@ -316,13 +317,18 @@ func TestSyncronize(t *testing.T) { validateSyncReplication(t, &fakePodControl, 7, 0) } -func TestWatchControllers(t *testing.T) { - fakeWatcher := watch.NewFake() - manager := MakeReplicationManager(nil) - manager.watchMaker = func() (watch.Interface, error) { - return fakeWatcher, nil - } +type FakeWatcher struct { + w *watch.FakeWatcher + *client.FakeClient +} +func (fw FakeWatcher) WatchReplicationControllers(l, f labels.Selector, rv uint64) (watch.Interface, error) { + return fw.w, nil +} + +func TestWatchControllers(t *testing.T) { + client := FakeWatcher{watch.NewFake(), &client.FakeClient{}} + manager := MakeReplicationManager(client) var testControllerSpec api.ReplicationController received := make(chan struct{}) manager.syncHandler = func(controllerSpec api.ReplicationController) error { @@ -333,11 +339,12 @@ func TestWatchControllers(t *testing.T) { return nil } - go manager.watchControllers() + resourceVersion := uint64(0) + go manager.watchControllers(&resourceVersion) // Test normal case testControllerSpec.ID = "foo" - fakeWatcher.Add(&testControllerSpec) + client.w.Add(&testControllerSpec) select { case <-received: diff --git a/pkg/kubecfg/kubecfg_test.go b/pkg/kubecfg/kubecfg_test.go index 234a97619b8..b1bc55c0661 100644 --- a/pkg/kubecfg/kubecfg_test.go +++ b/pkg/kubecfg/kubecfg_test.go @@ -27,6 +27,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) type Action struct { @@ -90,6 +91,11 @@ func (client *FakeKubeClient) DeleteReplicationController(controller string) err return nil } +func (client *FakeKubeClient) WatchReplicationControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + client.actions = append(client.actions, Action{action: "watch-controllers"}) + return watch.NewFake(), nil +} + func (client *FakeKubeClient) GetService(name string) (api.Service, error) { client.actions = append(client.actions, Action{action: "get-service", value: name}) return api.Service{}, nil diff --git a/pkg/registry/controllerstorage.go b/pkg/registry/controllerstorage.go index b5d53c1e28e..96f761d8f72 100644 --- a/pkg/registry/controllerstorage.go +++ b/pkg/registry/controllerstorage.go @@ -17,7 +17,6 @@ limitations under the License. package registry import ( - "errors" "fmt" "time" @@ -138,12 +137,6 @@ func (storage *ControllerRegistryStorage) waitForController(ctrl api.Replication // WatchAll returns ReplicationController events via a watch.Interface, implementing // apiserver.ResourceWatcher. -func (storage *ControllerRegistryStorage) WatchAll() (watch.Interface, error) { - return storage.registry.WatchControllers() -} - -// WatchSingle returns events for a single ReplicationController via a watch.Interface, -// implementing apiserver.ResourceWatcher. -func (storage *ControllerRegistryStorage) WatchSingle(id string) (watch.Interface, error) { - return nil, errors.New("unimplemented") +func (storage *ControllerRegistryStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return storage.registry.WatchControllers(label, field, resourceVersion) } diff --git a/pkg/registry/controllerstorage_test.go b/pkg/registry/controllerstorage_test.go index 07518764ddb..9dd699ca5f7 100644 --- a/pkg/registry/controllerstorage_test.go +++ b/pkg/registry/controllerstorage_test.go @@ -29,6 +29,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) +// TODO: Why do we have this AND MemoryRegistry? type MockControllerRegistry struct { err error controllers []api.ReplicationController @@ -49,10 +50,12 @@ func (registry *MockControllerRegistry) CreateController(controller api.Replicat func (registry *MockControllerRegistry) UpdateController(controller api.ReplicationController) error { return registry.err } + func (registry *MockControllerRegistry) DeleteController(ID string) error { return registry.err } -func (registry *MockControllerRegistry) WatchControllers() (watch.Interface, error) { + +func (registry *MockControllerRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { return nil, registry.err } diff --git a/pkg/registry/etcdregistry.go b/pkg/registry/etcdregistry.go index f6aea24e7d1..d4cc759fcb6 100644 --- a/pkg/registry/etcdregistry.go +++ b/pkg/registry/etcdregistry.go @@ -205,9 +205,13 @@ func (registry *EtcdRegistry) ListControllers() ([]api.ReplicationController, er } // WatchControllers begins watching for new, changed, or deleted controllers. -// TODO: Add id/selector parameters? -func (registry *EtcdRegistry) WatchControllers() (watch.Interface, error) { - return registry.helper.WatchList("/registry/controllers", tools.Everything) +func (registry *EtcdRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + if !field.Empty() { + return nil, fmt.Errorf("no field selector implemented for controllers") + } + return registry.helper.WatchList("/registry/controllers", resourceVersion, func(obj interface{}) bool { + return label.Matches(labels.Set(obj.(*api.ReplicationController).Labels)) + }) } func makeControllerKey(id string) string { diff --git a/pkg/registry/interfaces.go b/pkg/registry/interfaces.go index 0dda241197d..dfc3cd479e4 100644 --- a/pkg/registry/interfaces.go +++ b/pkg/registry/interfaces.go @@ -39,7 +39,7 @@ type PodRegistry interface { // ControllerRegistry is an interface for things that know how to store ReplicationControllers. type ControllerRegistry interface { ListControllers() ([]api.ReplicationController, error) - WatchControllers() (watch.Interface, error) + WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) GetController(controllerID string) (*api.ReplicationController, error) CreateController(controller api.ReplicationController) error UpdateController(controller api.ReplicationController) error diff --git a/pkg/registry/memory_registry.go b/pkg/registry/memory_registry.go index 59ed64d89ec..bce2fb60b10 100644 --- a/pkg/registry/memory_registry.go +++ b/pkg/registry/memory_registry.go @@ -89,7 +89,7 @@ func (registry *MemoryRegistry) ListControllers() ([]api.ReplicationController, return result, nil } -func (registry *MemoryRegistry) WatchControllers() (watch.Interface, error) { +func (registry *MemoryRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { return nil, errors.New("unimplemented") } diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index 4355ec1b509..2b3e46fec7c 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -296,18 +296,19 @@ func Everything(interface{}) bool { // WatchList begins watching the specified key's items. Items are decoded into // API objects, and any items passing 'filter' are sent down the returned -// watch.Interface. -func (h *EtcdHelper) WatchList(key string, filter FilterFunc) (watch.Interface, error) { +// watch.Interface. resourceVersion may be used to specify what version to begin +// watching (e.g., for reconnecting without missing any updateds). +func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { w := newEtcdWatcher(true, filter, h.Codec) - go w.etcdWatch(h.Client, key) + go w.etcdWatch(h.Client, key, resourceVersion) return w, nil } // Watch begins watching the specified key. Events are decoded into // API objects and sent down the returned watch.Interface. -func (h *EtcdHelper) Watch(key string) (watch.Interface, error) { +func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface, error) { w := newEtcdWatcher(false, nil, h.Codec) - go w.etcdWatch(h.Client, key) + go w.etcdWatch(h.Client, key, resourceVersion) return w, nil } @@ -350,10 +351,10 @@ func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec) *etcdWatcher { // etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called // as a goroutine. -func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string) { +func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) { defer util.HandleCrash() defer close(w.etcdCallEnded) - _, err := client.Watch(key, 0, w.list, w.etcdIncoming, w.etcdStop) + _, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop) if err != etcd.ErrWatchStoppedByUser { glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, err) } @@ -385,18 +386,20 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) { var action watch.EventType var data []byte switch res.Action { - case "create", "set": + case "create": if res.Node == nil { glog.Errorf("unexpected nil node: %#v", res) return } data = []byte(res.Node.Value) - // TODO: Is this conditional correct? - if res.EtcdIndex > 0 { - action = watch.Modified - } else { - action = watch.Added + action = watch.Added + case "set": + if res.Node == nil { + glog.Errorf("unexpected nil node: %#v", res) + return } + data = []byte(res.Node.Value) + action = watch.Modified case "delete": if res.PrevNode == nil { glog.Errorf("unexpected nil prev node: %#v", res) diff --git a/pkg/tools/etcd_tools_test.go b/pkg/tools/etcd_tools_test.go index 736ee915e4f..fec310c81fd 100644 --- a/pkg/tools/etcd_tools_test.go +++ b/pkg/tools/etcd_tools_test.go @@ -325,6 +325,30 @@ func TestAtomicUpdate_CreateCollision(t *testing.T) { } } +func TestWatchInterpretation_ListCreate(t *testing.T) { + w := newEtcdWatcher(true, func(interface{}) bool { + t.Errorf("unexpected filter call") + return true + }, codec) + pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} + podBytes, _ := codec.Encode(pod) + + go w.sendResult(&etcd.Response{ + Action: "create", + Node: &etcd.Node{ + Value: string(podBytes), + }, + }) + + got := <-w.outgoing + if e, a := watch.Added, got.Type; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + if e, a := pod, got.Object; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %v, got %v", e, a) + } +} + func TestWatchInterpretation_ListAdd(t *testing.T) { w := newEtcdWatcher(true, func(interface{}) bool { t.Errorf("unexpected filter call") @@ -341,7 +365,7 @@ func TestWatchInterpretation_ListAdd(t *testing.T) { }) got := <-w.outgoing - if e, a := watch.Added, got.Type; e != a { + if e, a := watch.Modified, got.Type; e != a { t.Errorf("Expected %v, got %v", e, a) } if e, a := pod, got.Object; !reflect.DeepEqual(e, a) { @@ -420,7 +444,7 @@ func TestWatch(t *testing.T) { fakeClient := MakeFakeEtcdClient(t) h := EtcdHelper{fakeClient, codec, versioner} - watching, err := h.Watch("/some/key") + watching, err := h.Watch("/some/key", 0) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -438,7 +462,7 @@ func TestWatch(t *testing.T) { } event := <-watching.ResultChan() - if e, a := watch.Added, event.Type; e != a { + if e, a := watch.Modified, event.Type; e != a { t.Errorf("Expected %v, got %v", e, a) } if e, a := pod, event.Object; !reflect.DeepEqual(e, a) { @@ -462,7 +486,7 @@ func TestWatchPurposefulShutdown(t *testing.T) { h := EtcdHelper{fakeClient, codec, versioner} // Test purposeful shutdown - watching, err := h.Watch("/some/key") + watching, err := h.Watch("/some/key", 0) if err != nil { t.Fatalf("Unexpected error: %v", err) }