diff --git a/cmd/cloudcfg/cloudcfg.go b/cmd/cloudcfg/cloudcfg.go index dabea2fccbd..8675ebec7f4 100644 --- a/cmd/cloudcfg/cloudcfg.go +++ b/cmd/cloudcfg/cloudcfg.go @@ -151,11 +151,11 @@ func executeAPIRequest(method string, auth *kube_client.AuthInfo) bool { r := s.Verb(verb). Path("api/v1beta1"). Path(parseStorage()). - Selector(*selector) + ParseSelector(*selector) if method == "create" || method == "update" { r.Body(readConfig(parseStorage())) } - obj, err := r.Do() + obj, err := r.Do().Get() if err != nil { log.Fatalf("Got request error: %v\n", err) return false diff --git a/pkg/client/client.go b/pkg/client/client.go index 2fc8cb694e0..444fa65bb7c 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -17,23 +17,20 @@ limitations under the License. package client import ( - "bytes" "crypto/tls" - "encoding/json" "fmt" "io" "io/ioutil" "log" "net/http" - "net/url" - "strings" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" ) // ClientInterface holds the methods for clients of Kubenetes, an interface to allow mock testing type ClientInterface interface { - ListPods(selector map[string]string) (api.PodList, error) + ListPods(selector labels.Selector) (api.PodList, error) GetPod(name string) (api.Pod, error) DeletePod(name string) error CreatePod(api.Pod) (api.Pod, error) @@ -123,143 +120,79 @@ func (c *Client) rawRequest(method, path string, requestBody io.Reader, target i return body, err } -func (client Client) makeURL(path string) string { +func (client *Client) makeURL(path string) string { return client.host + "/api/v1beta1/" + path } -// EncodeSelector transforms a selector expressed as a key/value map, into a -// comma separated, key=value encoding. -func EncodeSelector(selector map[string]string) string { - parts := make([]string, 0, len(selector)) - for key, value := range selector { - parts = append(parts, key+"="+value) - } - return url.QueryEscape(strings.Join(parts, ",")) -} - -// DecodeSelector transforms a selector from a comma separated, key=value format into -// a key/value map. -func DecodeSelector(selector string) map[string]string { - result := map[string]string{} - if len(selector) == 0 { - return result - } - parts := strings.Split(selector, ",") - for _, part := range parts { - pieces := strings.Split(part, "=") - if len(pieces) == 2 { - result[pieces[0]] = pieces[1] - } else { - log.Printf("Invalid selector: %s", selector) - } - } - return result -} - // ListPods takes a selector, and returns the list of pods that match that selector -func (client Client) ListPods(selector map[string]string) (api.PodList, error) { - path := "pods" - if selector != nil && len(selector) > 0 { - path += "?labels=" + EncodeSelector(selector) - } - var result api.PodList - _, err := client.rawRequest("GET", path, nil, &result) - return result, err +func (client *Client) ListPods(selector labels.Selector) (result api.PodList, err error) { + err = client.Get().Path("pods").Selector(selector).Do().Into(&result) + return } // GetPod takes the name of the pod, and returns the corresponding Pod object, and an error if it occurs -func (client Client) GetPod(name string) (api.Pod, error) { - var result api.Pod - _, err := client.rawRequest("GET", "pods/"+name, nil, &result) - return result, err +func (client *Client) GetPod(name string) (result api.Pod, err error) { + err = client.Get().Path("pods").Path(name).Do().Into(&result) + return } // DeletePod takes the name of the pod, and returns an error if one occurs -func (client Client) DeletePod(name string) error { - _, err := client.rawRequest("DELETE", "pods/"+name, nil, nil) - return err +func (client *Client) DeletePod(name string) error { + return client.Delete().Path("pods").Path(name).Do().Error() } // CreatePod takes the representation of a pod. Returns the server's representation of the pod, and an error, if it occurs -func (client Client) CreatePod(pod api.Pod) (api.Pod, error) { - var result api.Pod - body, err := json.Marshal(pod) - if err == nil { - _, err = client.rawRequest("POST", "pods", bytes.NewBuffer(body), &result) - } - return result, err +func (client *Client) CreatePod(pod api.Pod) (result api.Pod, err error) { + err = client.Post().Path("pods").Body(pod).Do().Into(&result) + return } // UpdatePod takes the representation of a pod to update. Returns the server's representation of the pod, and an error, if it occurs -func (client Client) UpdatePod(pod api.Pod) (api.Pod, error) { - var result api.Pod - body, err := json.Marshal(pod) - if err == nil { - _, err = client.rawRequest("PUT", "pods/"+pod.ID, bytes.NewBuffer(body), &result) - } - return result, err +func (client *Client) UpdatePod(pod api.Pod) (result api.Pod, err error) { + err = client.Put().Path("pods").Path(pod.ID).Body(pod).Do().Into(&result) + return } // GetReplicationController returns information about a particular replication controller -func (client Client) GetReplicationController(name string) (api.ReplicationController, error) { - var result api.ReplicationController - _, err := client.rawRequest("GET", "replicationControllers/"+name, nil, &result) - return result, err +func (client *Client) GetReplicationController(name string) (result api.ReplicationController, err error) { + err = client.Get().Path("replicationControllers").Path(name).Do().Into(&result) + return } // CreateReplicationController creates a new replication controller -func (client Client) CreateReplicationController(controller api.ReplicationController) (api.ReplicationController, error) { - var result api.ReplicationController - body, err := json.Marshal(controller) - if err == nil { - _, err = client.rawRequest("POST", "replicationControllers", bytes.NewBuffer(body), &result) - } - return result, err +func (client *Client) CreateReplicationController(controller api.ReplicationController) (result api.ReplicationController, err error) { + err = client.Post().Path("replicationControllers").Body(controller).Do().Into(&result) + return } // UpdateReplicationController updates an existing replication controller -func (client Client) UpdateReplicationController(controller api.ReplicationController) (api.ReplicationController, error) { - var result api.ReplicationController - body, err := json.Marshal(controller) - if err == nil { - _, err = client.rawRequest("PUT", "replicationControllers/"+controller.ID, bytes.NewBuffer(body), &result) - } - return result, err +func (client *Client) UpdateReplicationController(controller api.ReplicationController) (result api.ReplicationController, err error) { + err = client.Put().Path("replicationControllers").Path(controller.ID).Body(controller).Do().Into(&result) + return } -func (client Client) DeleteReplicationController(name string) error { - _, err := client.rawRequest("DELETE", "replicationControllers/"+name, nil, nil) - return err +func (client *Client) DeleteReplicationController(name string) error { + return client.Delete().Path("replicationControllers").Path(name).Do().Error() } // GetReplicationController returns information about a particular replication controller -func (client Client) GetService(name string) (api.Service, error) { - var result api.Service - _, err := client.rawRequest("GET", "services/"+name, nil, &result) - return result, err +func (client *Client) GetService(name string) (result api.Service, err error) { + err = client.Get().Path("services").Path(name).Do().Into(&result) + return } // CreateReplicationController creates a new replication controller -func (client Client) CreateService(svc api.Service) (api.Service, error) { - var result api.Service - body, err := json.Marshal(svc) - if err == nil { - _, err = client.rawRequest("POST", "services", bytes.NewBuffer(body), &result) - } - return result, err +func (client *Client) CreateService(svc api.Service) (result api.Service, err error) { + err = client.Post().Path("services").Body(svc).Do().Into(&result) + return } // UpdateReplicationController updates an existing replication controller -func (client Client) UpdateService(svc api.Service) (api.Service, error) { - var result api.Service - body, err := json.Marshal(svc) - if err == nil { - _, err = client.rawRequest("PUT", "services/"+svc.ID, bytes.NewBuffer(body), &result) - } - return result, err +func (client *Client) UpdateService(svc api.Service) (result api.Service, err error) { + err = client.Put().Path("services").Path(svc.ID).Body(svc).Do().Into(&result) + return } -func (client Client) DeleteService(name string) error { - _, err := client.rawRequest("DELETE", "services/"+name, nil, nil) - return err +func (client *Client) DeleteService(name string) error { + return client.Delete().Path("services").Path(name).Do().Error() } diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index 07061e980e8..41563790a37 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -17,7 +17,6 @@ limitations under the License. package client import ( - "encoding/json" "net/http" "net/http/httptest" "net/url" @@ -96,7 +95,7 @@ func TestListPodsLabels(t *testing.T) { } c.Setup() c.QueryValidator["labels"] = validateLabels - selector := map[string]string{"foo": "bar", "name": "baz"} + selector := labels.Set{"foo": "bar", "name": "baz"}.AsSelector() receivedPodList, err := c.ListPods(selector) c.Validate(t, receivedPodList, err) } @@ -260,7 +259,7 @@ func TestCreateController(t *testing.T) { func body(obj interface{}, raw *string) *string { if obj != nil { - bs, _ := json.Marshal(obj) + bs, _ := api.Encode(obj) body := string(bs) return &body } diff --git a/pkg/client/request.go b/pkg/client/request.go index 3c711fe3763..c75e13f7a6d 100644 --- a/pkg/client/request.go +++ b/pkg/client/request.go @@ -34,7 +34,6 @@ import ( // auth, err := LoadAuth(filename) // c := New(url, auth) // resp, err := c.Verb("GET"). -// Path("api/v1beta1"). // Path("pods"). // Selector("area=staging"). // Timeout(10*time.Second). @@ -46,17 +45,39 @@ func (c *Client) Verb(verb string) *Request { return &Request{ verb: verb, c: c, - path: "/", + path: "/api/v1beta1", } } +// Begin a POST request. +func (c *Client) Post() *Request { + return c.Verb("POST") +} + +// Begin a PUT request. +func (c *Client) Put() *Request { + return c.Verb("PUT") +} + +// Begin a GET request. +func (c *Client) Get() *Request { + return c.Verb("GET") +} + +// Begin a DELETE request. +func (c *Client) Delete() *Request { + return c.Verb("DELETE") +} + // Request allows for building up a request to a server in a chained fashion. +// Any errors are stored until the end of your call, so you only have to +// check once. type Request struct { c *Client err error verb string path string - body interface{} + body io.Reader selector labels.Selector timeout time.Duration } @@ -70,8 +91,8 @@ func (r *Request) Path(item string) *Request { return r } -// Use the given item as a resource label selector. Optional. -func (r *Request) Selector(item string) *Request { +// Parse the given string as a resource label selector. Optional. +func (r *Request) ParseSelector(item string) *Request { if r.err != nil { return r } @@ -79,6 +100,15 @@ func (r *Request) Selector(item string) *Request { return r } +// Use the given selector. +func (r *Request) Selector(s labels.Selector) *Request { + if r.err != nil { + return r + } + r.selector = s + return r +} + // Use the given duration as a timeout. Optional. func (r *Request) Timeout(d time.Duration) *Request { if r.err != nil { @@ -96,14 +126,31 @@ func (r *Request) Body(obj interface{}) *Request { if r.err != nil { return r } - r.body = obj + switch t := obj.(type) { + case string: + data, err := ioutil.ReadFile(t) + if err != nil { + r.err = err + return r + } + r.body = bytes.NewBuffer(data) + case []byte: + r.body = bytes.NewBuffer(t) + default: + data, err := api.Encode(obj) + if err != nil { + r.err = err + return r + } + r.body = bytes.NewBuffer(data) + } return r } -// Format and xecute the request. Returns the API object received, or an error. -func (r *Request) Do() (interface{}, error) { +// Format and execute the request. +func (r *Request) Do() Result { if r.err != nil { - return nil, r.err + return Result{err: r.err} } finalUrl := r.c.host + r.path query := url.Values{} @@ -114,32 +161,42 @@ func (r *Request) Do() (interface{}, error) { query.Add("timeout", r.timeout.String()) } finalUrl += "?" + query.Encode() - var body io.Reader - if r.body != nil { - switch t := r.body.(type) { - case string: - data, err := ioutil.ReadFile(t) - if err != nil { - return nil, err - } - body = bytes.NewBuffer(data) - case []byte: - body = bytes.NewBuffer(t) - default: - data, err := api.Encode(r.body) - if err != nil { - return nil, err - } - body = bytes.NewBuffer(data) - } - } - req, err := http.NewRequest(r.verb, finalUrl, body) + req, err := http.NewRequest(r.verb, finalUrl, r.body) if err != nil { - return nil, err + return Result{err: err} } - str, err := r.c.doRequest(req) - if err != nil { - return nil, err - } - return api.Decode([]byte(str)) + respBody, err := r.c.doRequest(req) + return Result{respBody, err} +} + +// Result contains the result of calling Request.Do(). +type Result struct { + body []byte + err error +} + +// Raw returns the raw result. +func (r Result) Raw() ([]byte, error) { + return r.body, r.err +} + +// Get returns the result as an object. +func (r Result) Get() (interface{}, error) { + if r.err != nil { + return nil, r.err + } + return api.Decode(r.body) +} + +// Into stores the result into obj, if possible.. +func (r Result) Into(obj interface{}) error { + if r.err != nil { + return r.err + } + return api.DecodeInto(r.body, obj) +} + +// Returns the error executing the request, nil if no error occurred. +func (r Result) Error() error { + return r.err } diff --git a/pkg/client/request_test.go b/pkg/client/request_test.go index 41070591cd0..a9bc8c1c39b 100644 --- a/pkg/client/request_test.go +++ b/pkg/client/request_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) @@ -42,10 +43,10 @@ func TestDoRequestNewWay(t *testing.T) { obj, err := s.Verb("POST"). Path("foo/bar"). Path("baz"). - Selector("name=foo"). + ParseSelector("name=foo"). Timeout(time.Second). Body([]byte(reqBody)). - Do() + Do().Get() if err != nil { t.Errorf("Unexpected error: %v %#v", err, err) return @@ -55,7 +56,7 @@ func TestDoRequestNewWay(t *testing.T) { } else if !reflect.DeepEqual(obj, expectedObj) { t.Errorf("Expected: %#v, got %#v", expectedObj, obj) } - fakeHandler.ValidateRequest(t, "/foo/bar/baz", "POST", &reqBody) + fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz", "POST", &reqBody) if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&timeout=1s" { t.Errorf("Unexpected query: %v", fakeHandler.RequestReceived.URL.RawQuery) } @@ -80,10 +81,10 @@ func TestDoRequestNewWayObj(t *testing.T) { obj, err := s.Verb("POST"). Path("foo/bar"). Path("baz"). - Selector("name=foo"). + Selector(labels.Set{"name": "foo"}.AsSelector()). Timeout(time.Second). Body(reqObj). - Do() + Do().Get() if err != nil { t.Errorf("Unexpected error: %v %#v", err, err) return @@ -94,7 +95,7 @@ func TestDoRequestNewWayObj(t *testing.T) { t.Errorf("Expected: %#v, got %#v", expectedObj, obj) } tmpStr := string(reqBodyExpected) - fakeHandler.ValidateRequest(t, "/foo/bar/baz", "POST", &tmpStr) + fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz", "POST", &tmpStr) if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&timeout=1s" { t.Errorf("Unexpected query: %v", fakeHandler.RequestReceived.URL.RawQuery) } @@ -125,10 +126,10 @@ func TestDoRequestNewWayFile(t *testing.T) { obj, err := s.Verb("POST"). Path("foo/bar"). Path("baz"). - Selector("name=foo"). + ParseSelector("name=foo"). Timeout(time.Second). Body(file.Name()). - Do() + Do().Get() if err != nil { t.Errorf("Unexpected error: %v %#v", err, err) return @@ -139,7 +140,7 @@ func TestDoRequestNewWayFile(t *testing.T) { t.Errorf("Expected: %#v, got %#v", expectedObj, obj) } tmpStr := string(reqBodyExpected) - fakeHandler.ValidateRequest(t, "/foo/bar/baz", "POST", &tmpStr) + fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz", "POST", &tmpStr) if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&timeout=1s" { t.Errorf("Unexpected query: %v", fakeHandler.RequestReceived.URL.RawQuery) } @@ -147,3 +148,19 @@ func TestDoRequestNewWayFile(t *testing.T) { t.Errorf("Request is missing authorization header: %#v", *fakeHandler.RequestReceived) } } + +func TestVerbs(t *testing.T) { + c := New("", nil) + if r := c.Post(); r.verb != "POST" { + t.Errorf("Post verb is wrong") + } + if r := c.Put(); r.verb != "PUT" { + t.Errorf("Put verb is wrong") + } + if r := c.Get(); r.verb != "GET" { + t.Errorf("Get verb is wrong") + } + if r := c.Delete(); r.verb != "DELETE" { + t.Errorf("Delete verb is wrong") + } +} diff --git a/pkg/cloudcfg/cloudcfg.go b/pkg/cloudcfg/cloudcfg.go index 0bd6a195a8c..41e93b4f3cb 100644 --- a/pkg/cloudcfg/cloudcfg.go +++ b/pkg/cloudcfg/cloudcfg.go @@ -28,6 +28,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "gopkg.in/v1/yaml" ) @@ -71,9 +72,9 @@ func Update(name string, client client.ClientInterface, updatePeriod time.Durati if err != nil { return err } - labels := controller.DesiredState.ReplicaSelector + s := labels.Set(controller.DesiredState.ReplicaSelector).AsSelector() - podList, err := client.ListPods(labels) + podList, err := client.ListPods(s) if err != nil { return err } diff --git a/pkg/cloudcfg/cloudcfg_test.go b/pkg/cloudcfg/cloudcfg_test.go index eb586c878ba..eb3cbcc50ef 100644 --- a/pkg/cloudcfg/cloudcfg_test.go +++ b/pkg/cloudcfg/cloudcfg_test.go @@ -24,6 +24,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" ) // TODO: This doesn't reduce typing enough to make it worth the less readable errors. Remove. @@ -44,7 +45,7 @@ type FakeKubeClient struct { ctrl api.ReplicationController } -func (client *FakeKubeClient) ListPods(selector map[string]string) (api.PodList, error) { +func (client *FakeKubeClient) ListPods(selector labels.Selector) (api.PodList, error) { client.actions = append(client.actions, Action{action: "list-pods"}) return client.pods, nil } diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index a3876f2d48c..5658dc6d424 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -26,6 +26,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/coreos/go-etcd/etcd" ) @@ -177,7 +178,8 @@ func (rm *ReplicationManager) filterActivePods(pods []api.Pod) []api.Pod { } func (rm *ReplicationManager) syncReplicationController(controllerSpec api.ReplicationController) error { - podList, err := rm.kubeClient.ListPods(controllerSpec.DesiredState.ReplicaSelector) + s := labels.Set(controllerSpec.DesiredState.ReplicaSelector).AsSelector() + podList, err := rm.kubeClient.ListPods(s) if err != nil { return err }