From fd5e3b0b04879a1fb7651501f29febc70c47f841 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Thu, 26 Jun 2014 16:10:38 -0700 Subject: [PATCH] Implement client polling. --- pkg/client/client.go | 20 +++---- pkg/client/request.go | 104 +++++++++++++++++++++++-------------- pkg/client/request_test.go | 73 ++++++++++++++++++++++++++ 3 files changed, 148 insertions(+), 49 deletions(-) diff --git a/pkg/client/client.go b/pkg/client/client.go index a3a2e51945b..6ee978abc44 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -54,7 +54,7 @@ type StatusErr struct { } func (s *StatusErr) Error() string { - return fmt.Sprintf("Status: %v (%#v)", s.Status.Status, s) + return fmt.Sprintf("Status: %v (%#v)", s.Status.Status, s.Status) } // AuthInfo is used to store authorization information @@ -103,16 +103,16 @@ func (c *Client) doRequest(request *http.Request) ([]byte, error) { if response.StatusCode < http.StatusOK || response.StatusCode > http.StatusPartialContent { return nil, fmt.Errorf("request [%#v] failed (%d) %s: %s", request, response.StatusCode, response.Status, string(body)) } - if response.StatusCode == http.StatusAccepted { - var status api.Status - if err := api.DecodeInto(body, &status); err == nil { - if status.Status == api.StatusSuccess { - return body, nil - } else { - return nil, &StatusErr{status} - } + + // If the server gave us a status back, look at what it was. + var status api.Status + if err := api.DecodeInto(body, &status); err == nil && status.Status != "" { + if status.Status == api.StatusSuccess { + return body, nil } - // Sometimes the server returns 202 even though it completely handled the request. + // "Working" requests need to be handled specially. + // "Failed" requests are clearly just an error and it makes sense to return them as such. + return nil, &StatusErr{status} } return body, err } diff --git a/pkg/client/request.go b/pkg/client/request.go index ea643421433..16d5f7061d8 100644 --- a/pkg/client/request.go +++ b/pkg/client/request.go @@ -43,11 +43,12 @@ import ( // Begin a request with a verb (GET, POST, PUT, DELETE) func (c *Client) Verb(verb string) *Request { return &Request{ - verb: verb, - c: c, - path: "/api/v1beta1", - sync: true, - timeout: 10 * time.Second, + verb: verb, + c: c, + path: "/api/v1beta1", + sync: true, + timeout: 10 * time.Second, + pollPeriod: 20 * time.Second, } } @@ -71,18 +72,24 @@ func (c *Client) Delete() *Request { return c.Verb("DELETE") } +// Make a request to do a single poll of the completion of the given operation. +func (c *Client) PollFor(operationId string) *Request { + return c.Get().Path("operations").Path(operationId).Sync(false).PollPeriod(0) +} + // Request allows for building up a request to a server in a chained fashion. // Any errors are stored until the end of your call, so you only have to // check once. type Request struct { - c *Client - err error - verb string - path string - body io.Reader - selector labels.Selector - timeout time.Duration - sync bool + c *Client + err error + verb string + path string + body io.Reader + selector labels.Selector + timeout time.Duration + sync bool + pollPeriod time.Duration } // Append an item to the request path. You must call Path at least once. @@ -170,37 +177,56 @@ func (r *Request) Body(obj interface{}) *Request { return r } -// Format and xecute the request. Returns the API object received, or an error. -func (r *Request) Do() Result { +// PollPeriod sets the poll period. +// If the server sends back a "working" status message, then repeatedly poll the server +// to see if the operation has completed yet, waiting 'd' between each poll. +// If you want to handle the "working" status yourself (it'll be delivered as StatusErr), +// set d to 0 to turn off this behavior. +func (r *Request) PollPeriod(d time.Duration) *Request { if r.err != nil { - return Result{err: r.err} + return r } - finalUrl := r.c.host + r.path - query := url.Values{} - if r.selector != nil { - query.Add("labels", r.selector.String()) - } - if r.sync { - query.Add("sync", "true") - if r.timeout != 0 { - query.Add("timeout", r.timeout.String()) + r.pollPeriod = d + return r +} + +// Format and execute the request. Returns the API object received, or an error. +func (r *Request) Do() Result { + for { + if r.err != nil { + return Result{err: r.err} } - } - finalUrl += "?" + query.Encode() - req, err := http.NewRequest(r.verb, finalUrl, r.body) - if err != nil { - return Result{err: err} - } - respBody, err := r.c.doRequest(req) - if err != nil { - if statusErr, ok := err.(*StatusErr); ok { - // TODO: using the information in statusErr, - // loop querying the server to wait and retrieve - // the actual result. - _ = statusErr + finalUrl := r.c.host + r.path + query := url.Values{} + if r.selector != nil { + query.Add("labels", r.selector.String()) } + if r.sync { + query.Add("sync", "true") + if r.timeout != 0 { + query.Add("timeout", r.timeout.String()) + } + } + finalUrl += "?" + query.Encode() + req, err := http.NewRequest(r.verb, finalUrl, r.body) + if err != nil { + return Result{err: err} + } + respBody, err := r.c.doRequest(req) + if err != nil { + if statusErr, ok := err.(*StatusErr); ok { + if statusErr.Status.Status == api.StatusWorking && r.pollPeriod != 0 { + time.Sleep(r.pollPeriod) + // Make a poll request + pollOp := r.c.PollFor(statusErr.Status.Details).PollPeriod(r.pollPeriod) + // Could also say "return r.Do()" but this way doesn't grow the callstack. + r = pollOp + continue + } + } + } + return Result{respBody, err} } - return Result{respBody, err} } // Result contains the result of calling Request.Do(). diff --git a/pkg/client/request_test.go b/pkg/client/request_test.go index c6ec542c240..104e8a70715 100644 --- a/pkg/client/request_test.go +++ b/pkg/client/request_test.go @@ -19,6 +19,7 @@ package client import ( "bytes" "io/ioutil" + "net/http" "net/http/httptest" "reflect" "testing" @@ -230,3 +231,75 @@ func TestSync(t *testing.T) { t.Errorf("'Sync' doesn't work") } } + +func TestSetPollPeriod(t *testing.T) { + c := New("", nil) + r := c.Get() + if r.pollPeriod == 0 { + t.Errorf("polling should be on by default") + } + r.PollPeriod(time.Hour) + if r.pollPeriod != time.Hour { + t.Errorf("'PollPeriod' doesn't work") + } +} + +func TestPolling(t *testing.T) { + objects := []interface{}{ + &api.Status{Status: api.StatusWorking, Details: "1234"}, + &api.Status{Status: api.StatusWorking, Details: "1234"}, + &api.Status{Status: api.StatusWorking, Details: "1234"}, + &api.Status{Status: api.StatusWorking, Details: "1234"}, + &api.Status{Status: api.StatusSuccess}, + } + + callNumber := 0 + testServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + data, err := api.Encode(objects[callNumber]) + if err != nil { + t.Errorf("Unexpected encode error") + } + callNumber++ + w.Write(data) + })) + + auth := AuthInfo{User: "user", Password: "pass"} + s := New(testServer.URL, &auth) + + trials := []func(){ + func() { + // Check that we do indeed poll when asked to. + obj, err := s.Get().PollPeriod(5 * time.Millisecond).Do().Get() + if err != nil { + t.Errorf("Unexpected error: %v %#v", err, err) + return + } + if s, ok := obj.(*api.Status); !ok || s.Status != api.StatusSuccess { + t.Errorf("Unexpected return object: %#v", obj) + return + } + if callNumber != len(objects) { + t.Errorf("Unexpected number of calls: %v", callNumber) + } + }, + func() { + // Check that we don't poll when asked not to. + obj, err := s.Get().PollPeriod(0).Do().Get() + if err == nil { + t.Errorf("Unexpected non error: %v", obj) + return + } + if se, ok := err.(*StatusErr); !ok || se.Status.Status != api.StatusWorking { + t.Errorf("Unexpected kind of error: %#v", err) + return + } + if callNumber != 1 { + t.Errorf("Unexpected number of calls: %v", callNumber) + } + }, + } + for _, f := range trials { + callNumber = 0 + f() + } +}