Merge pull request #259 from lavalamp/api_long_op

Implement client polling.
This commit is contained in:
brendandburns 2014-06-26 19:06:18 -07:00
commit 9e90c14369
3 changed files with 148 additions and 49 deletions

View File

@ -54,7 +54,7 @@ type StatusErr struct {
} }
func (s *StatusErr) Error() string { func (s *StatusErr) Error() string {
return fmt.Sprintf("Status: %v (%#v)", s.Status.Status, s) return fmt.Sprintf("Status: %v (%#v)", s.Status.Status, s.Status)
} }
// AuthInfo is used to store authorization information // AuthInfo is used to store authorization information
@ -103,16 +103,16 @@ func (c *Client) doRequest(request *http.Request) ([]byte, error) {
if response.StatusCode < http.StatusOK || response.StatusCode > http.StatusPartialContent { if response.StatusCode < http.StatusOK || response.StatusCode > http.StatusPartialContent {
return nil, fmt.Errorf("request [%#v] failed (%d) %s: %s", request, response.StatusCode, response.Status, string(body)) return nil, fmt.Errorf("request [%#v] failed (%d) %s: %s", request, response.StatusCode, response.Status, string(body))
} }
if response.StatusCode == http.StatusAccepted {
var status api.Status // If the server gave us a status back, look at what it was.
if err := api.DecodeInto(body, &status); err == nil { var status api.Status
if status.Status == api.StatusSuccess { if err := api.DecodeInto(body, &status); err == nil && status.Status != "" {
return body, nil if status.Status == api.StatusSuccess {
} else { return body, nil
return nil, &StatusErr{status}
}
} }
// Sometimes the server returns 202 even though it completely handled the request. // "Working" requests need to be handled specially.
// "Failed" requests are clearly just an error and it makes sense to return them as such.
return nil, &StatusErr{status}
} }
return body, err return body, err
} }

View File

@ -43,11 +43,12 @@ import (
// Begin a request with a verb (GET, POST, PUT, DELETE) // Begin a request with a verb (GET, POST, PUT, DELETE)
func (c *Client) Verb(verb string) *Request { func (c *Client) Verb(verb string) *Request {
return &Request{ return &Request{
verb: verb, verb: verb,
c: c, c: c,
path: "/api/v1beta1", path: "/api/v1beta1",
sync: true, sync: true,
timeout: 10 * time.Second, timeout: 10 * time.Second,
pollPeriod: 20 * time.Second,
} }
} }
@ -71,18 +72,24 @@ func (c *Client) Delete() *Request {
return c.Verb("DELETE") return c.Verb("DELETE")
} }
// Make a request to do a single poll of the completion of the given operation.
func (c *Client) PollFor(operationId string) *Request {
return c.Get().Path("operations").Path(operationId).Sync(false).PollPeriod(0)
}
// Request allows for building up a request to a server in a chained fashion. // Request allows for building up a request to a server in a chained fashion.
// Any errors are stored until the end of your call, so you only have to // Any errors are stored until the end of your call, so you only have to
// check once. // check once.
type Request struct { type Request struct {
c *Client c *Client
err error err error
verb string verb string
path string path string
body io.Reader body io.Reader
selector labels.Selector selector labels.Selector
timeout time.Duration timeout time.Duration
sync bool sync bool
pollPeriod time.Duration
} }
// Append an item to the request path. You must call Path at least once. // Append an item to the request path. You must call Path at least once.
@ -170,37 +177,56 @@ func (r *Request) Body(obj interface{}) *Request {
return r return r
} }
// Format and xecute the request. Returns the API object received, or an error. // PollPeriod sets the poll period.
func (r *Request) Do() Result { // If the server sends back a "working" status message, then repeatedly poll the server
// to see if the operation has completed yet, waiting 'd' between each poll.
// If you want to handle the "working" status yourself (it'll be delivered as StatusErr),
// set d to 0 to turn off this behavior.
func (r *Request) PollPeriod(d time.Duration) *Request {
if r.err != nil { if r.err != nil {
return Result{err: r.err} return r
} }
finalUrl := r.c.host + r.path r.pollPeriod = d
query := url.Values{} return r
if r.selector != nil { }
query.Add("labels", r.selector.String())
} // Format and execute the request. Returns the API object received, or an error.
if r.sync { func (r *Request) Do() Result {
query.Add("sync", "true") for {
if r.timeout != 0 { if r.err != nil {
query.Add("timeout", r.timeout.String()) return Result{err: r.err}
} }
} finalUrl := r.c.host + r.path
finalUrl += "?" + query.Encode() query := url.Values{}
req, err := http.NewRequest(r.verb, finalUrl, r.body) if r.selector != nil {
if err != nil { query.Add("labels", r.selector.String())
return Result{err: err}
}
respBody, err := r.c.doRequest(req)
if err != nil {
if statusErr, ok := err.(*StatusErr); ok {
// TODO: using the information in statusErr,
// loop querying the server to wait and retrieve
// the actual result.
_ = statusErr
} }
if r.sync {
query.Add("sync", "true")
if r.timeout != 0 {
query.Add("timeout", r.timeout.String())
}
}
finalUrl += "?" + query.Encode()
req, err := http.NewRequest(r.verb, finalUrl, r.body)
if err != nil {
return Result{err: err}
}
respBody, err := r.c.doRequest(req)
if err != nil {
if statusErr, ok := err.(*StatusErr); ok {
if statusErr.Status.Status == api.StatusWorking && r.pollPeriod != 0 {
time.Sleep(r.pollPeriod)
// Make a poll request
pollOp := r.c.PollFor(statusErr.Status.Details).PollPeriod(r.pollPeriod)
// Could also say "return r.Do()" but this way doesn't grow the callstack.
r = pollOp
continue
}
}
}
return Result{respBody, err}
} }
return Result{respBody, err}
} }
// Result contains the result of calling Request.Do(). // Result contains the result of calling Request.Do().

View File

@ -19,6 +19,7 @@ package client
import ( import (
"bytes" "bytes"
"io/ioutil" "io/ioutil"
"net/http"
"net/http/httptest" "net/http/httptest"
"reflect" "reflect"
"testing" "testing"
@ -230,3 +231,75 @@ func TestSync(t *testing.T) {
t.Errorf("'Sync' doesn't work") t.Errorf("'Sync' doesn't work")
} }
} }
func TestSetPollPeriod(t *testing.T) {
c := New("", nil)
r := c.Get()
if r.pollPeriod == 0 {
t.Errorf("polling should be on by default")
}
r.PollPeriod(time.Hour)
if r.pollPeriod != time.Hour {
t.Errorf("'PollPeriod' doesn't work")
}
}
func TestPolling(t *testing.T) {
objects := []interface{}{
&api.Status{Status: api.StatusWorking, Details: "1234"},
&api.Status{Status: api.StatusWorking, Details: "1234"},
&api.Status{Status: api.StatusWorking, Details: "1234"},
&api.Status{Status: api.StatusWorking, Details: "1234"},
&api.Status{Status: api.StatusSuccess},
}
callNumber := 0
testServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data, err := api.Encode(objects[callNumber])
if err != nil {
t.Errorf("Unexpected encode error")
}
callNumber++
w.Write(data)
}))
auth := AuthInfo{User: "user", Password: "pass"}
s := New(testServer.URL, &auth)
trials := []func(){
func() {
// Check that we do indeed poll when asked to.
obj, err := s.Get().PollPeriod(5 * time.Millisecond).Do().Get()
if err != nil {
t.Errorf("Unexpected error: %v %#v", err, err)
return
}
if s, ok := obj.(*api.Status); !ok || s.Status != api.StatusSuccess {
t.Errorf("Unexpected return object: %#v", obj)
return
}
if callNumber != len(objects) {
t.Errorf("Unexpected number of calls: %v", callNumber)
}
},
func() {
// Check that we don't poll when asked not to.
obj, err := s.Get().PollPeriod(0).Do().Get()
if err == nil {
t.Errorf("Unexpected non error: %v", obj)
return
}
if se, ok := err.(*StatusErr); !ok || se.Status.Status != api.StatusWorking {
t.Errorf("Unexpected kind of error: %#v", err)
return
}
if callNumber != 1 {
t.Errorf("Unexpected number of calls: %v", callNumber)
}
},
}
for _, f := range trials {
callNumber = 0
f()
}
}