mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Implement client polling.
This commit is contained in:
parent
de06869d30
commit
fd5e3b0b04
@ -54,7 +54,7 @@ type StatusErr struct {
|
||||
}
|
||||
|
||||
func (s *StatusErr) Error() string {
|
||||
return fmt.Sprintf("Status: %v (%#v)", s.Status.Status, s)
|
||||
return fmt.Sprintf("Status: %v (%#v)", s.Status.Status, s.Status)
|
||||
}
|
||||
|
||||
// AuthInfo is used to store authorization information
|
||||
@ -103,16 +103,16 @@ func (c *Client) doRequest(request *http.Request) ([]byte, error) {
|
||||
if response.StatusCode < http.StatusOK || response.StatusCode > http.StatusPartialContent {
|
||||
return nil, fmt.Errorf("request [%#v] failed (%d) %s: %s", request, response.StatusCode, response.Status, string(body))
|
||||
}
|
||||
if response.StatusCode == http.StatusAccepted {
|
||||
var status api.Status
|
||||
if err := api.DecodeInto(body, &status); err == nil {
|
||||
if status.Status == api.StatusSuccess {
|
||||
return body, nil
|
||||
} else {
|
||||
return nil, &StatusErr{status}
|
||||
}
|
||||
|
||||
// If the server gave us a status back, look at what it was.
|
||||
var status api.Status
|
||||
if err := api.DecodeInto(body, &status); err == nil && status.Status != "" {
|
||||
if status.Status == api.StatusSuccess {
|
||||
return body, nil
|
||||
}
|
||||
// Sometimes the server returns 202 even though it completely handled the request.
|
||||
// "Working" requests need to be handled specially.
|
||||
// "Failed" requests are clearly just an error and it makes sense to return them as such.
|
||||
return nil, &StatusErr{status}
|
||||
}
|
||||
return body, err
|
||||
}
|
||||
|
@ -43,11 +43,12 @@ import (
|
||||
// Begin a request with a verb (GET, POST, PUT, DELETE)
|
||||
func (c *Client) Verb(verb string) *Request {
|
||||
return &Request{
|
||||
verb: verb,
|
||||
c: c,
|
||||
path: "/api/v1beta1",
|
||||
sync: true,
|
||||
timeout: 10 * time.Second,
|
||||
verb: verb,
|
||||
c: c,
|
||||
path: "/api/v1beta1",
|
||||
sync: true,
|
||||
timeout: 10 * time.Second,
|
||||
pollPeriod: 20 * time.Second,
|
||||
}
|
||||
}
|
||||
|
||||
@ -71,18 +72,24 @@ func (c *Client) Delete() *Request {
|
||||
return c.Verb("DELETE")
|
||||
}
|
||||
|
||||
// Make a request to do a single poll of the completion of the given operation.
|
||||
func (c *Client) PollFor(operationId string) *Request {
|
||||
return c.Get().Path("operations").Path(operationId).Sync(false).PollPeriod(0)
|
||||
}
|
||||
|
||||
// Request allows for building up a request to a server in a chained fashion.
|
||||
// Any errors are stored until the end of your call, so you only have to
|
||||
// check once.
|
||||
type Request struct {
|
||||
c *Client
|
||||
err error
|
||||
verb string
|
||||
path string
|
||||
body io.Reader
|
||||
selector labels.Selector
|
||||
timeout time.Duration
|
||||
sync bool
|
||||
c *Client
|
||||
err error
|
||||
verb string
|
||||
path string
|
||||
body io.Reader
|
||||
selector labels.Selector
|
||||
timeout time.Duration
|
||||
sync bool
|
||||
pollPeriod time.Duration
|
||||
}
|
||||
|
||||
// Append an item to the request path. You must call Path at least once.
|
||||
@ -170,37 +177,56 @@ func (r *Request) Body(obj interface{}) *Request {
|
||||
return r
|
||||
}
|
||||
|
||||
// Format and xecute the request. Returns the API object received, or an error.
|
||||
func (r *Request) Do() Result {
|
||||
// PollPeriod sets the poll period.
|
||||
// If the server sends back a "working" status message, then repeatedly poll the server
|
||||
// to see if the operation has completed yet, waiting 'd' between each poll.
|
||||
// If you want to handle the "working" status yourself (it'll be delivered as StatusErr),
|
||||
// set d to 0 to turn off this behavior.
|
||||
func (r *Request) PollPeriod(d time.Duration) *Request {
|
||||
if r.err != nil {
|
||||
return Result{err: r.err}
|
||||
return r
|
||||
}
|
||||
finalUrl := r.c.host + r.path
|
||||
query := url.Values{}
|
||||
if r.selector != nil {
|
||||
query.Add("labels", r.selector.String())
|
||||
}
|
||||
if r.sync {
|
||||
query.Add("sync", "true")
|
||||
if r.timeout != 0 {
|
||||
query.Add("timeout", r.timeout.String())
|
||||
r.pollPeriod = d
|
||||
return r
|
||||
}
|
||||
|
||||
// Format and execute the request. Returns the API object received, or an error.
|
||||
func (r *Request) Do() Result {
|
||||
for {
|
||||
if r.err != nil {
|
||||
return Result{err: r.err}
|
||||
}
|
||||
}
|
||||
finalUrl += "?" + query.Encode()
|
||||
req, err := http.NewRequest(r.verb, finalUrl, r.body)
|
||||
if err != nil {
|
||||
return Result{err: err}
|
||||
}
|
||||
respBody, err := r.c.doRequest(req)
|
||||
if err != nil {
|
||||
if statusErr, ok := err.(*StatusErr); ok {
|
||||
// TODO: using the information in statusErr,
|
||||
// loop querying the server to wait and retrieve
|
||||
// the actual result.
|
||||
_ = statusErr
|
||||
finalUrl := r.c.host + r.path
|
||||
query := url.Values{}
|
||||
if r.selector != nil {
|
||||
query.Add("labels", r.selector.String())
|
||||
}
|
||||
if r.sync {
|
||||
query.Add("sync", "true")
|
||||
if r.timeout != 0 {
|
||||
query.Add("timeout", r.timeout.String())
|
||||
}
|
||||
}
|
||||
finalUrl += "?" + query.Encode()
|
||||
req, err := http.NewRequest(r.verb, finalUrl, r.body)
|
||||
if err != nil {
|
||||
return Result{err: err}
|
||||
}
|
||||
respBody, err := r.c.doRequest(req)
|
||||
if err != nil {
|
||||
if statusErr, ok := err.(*StatusErr); ok {
|
||||
if statusErr.Status.Status == api.StatusWorking && r.pollPeriod != 0 {
|
||||
time.Sleep(r.pollPeriod)
|
||||
// Make a poll request
|
||||
pollOp := r.c.PollFor(statusErr.Status.Details).PollPeriod(r.pollPeriod)
|
||||
// Could also say "return r.Do()" but this way doesn't grow the callstack.
|
||||
r = pollOp
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
return Result{respBody, err}
|
||||
}
|
||||
return Result{respBody, err}
|
||||
}
|
||||
|
||||
// Result contains the result of calling Request.Do().
|
||||
|
@ -19,6 +19,7 @@ package client
|
||||
import (
|
||||
"bytes"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
"testing"
|
||||
@ -230,3 +231,75 @@ func TestSync(t *testing.T) {
|
||||
t.Errorf("'Sync' doesn't work")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetPollPeriod(t *testing.T) {
|
||||
c := New("", nil)
|
||||
r := c.Get()
|
||||
if r.pollPeriod == 0 {
|
||||
t.Errorf("polling should be on by default")
|
||||
}
|
||||
r.PollPeriod(time.Hour)
|
||||
if r.pollPeriod != time.Hour {
|
||||
t.Errorf("'PollPeriod' doesn't work")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPolling(t *testing.T) {
|
||||
objects := []interface{}{
|
||||
&api.Status{Status: api.StatusWorking, Details: "1234"},
|
||||
&api.Status{Status: api.StatusWorking, Details: "1234"},
|
||||
&api.Status{Status: api.StatusWorking, Details: "1234"},
|
||||
&api.Status{Status: api.StatusWorking, Details: "1234"},
|
||||
&api.Status{Status: api.StatusSuccess},
|
||||
}
|
||||
|
||||
callNumber := 0
|
||||
testServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
data, err := api.Encode(objects[callNumber])
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected encode error")
|
||||
}
|
||||
callNumber++
|
||||
w.Write(data)
|
||||
}))
|
||||
|
||||
auth := AuthInfo{User: "user", Password: "pass"}
|
||||
s := New(testServer.URL, &auth)
|
||||
|
||||
trials := []func(){
|
||||
func() {
|
||||
// Check that we do indeed poll when asked to.
|
||||
obj, err := s.Get().PollPeriod(5 * time.Millisecond).Do().Get()
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v %#v", err, err)
|
||||
return
|
||||
}
|
||||
if s, ok := obj.(*api.Status); !ok || s.Status != api.StatusSuccess {
|
||||
t.Errorf("Unexpected return object: %#v", obj)
|
||||
return
|
||||
}
|
||||
if callNumber != len(objects) {
|
||||
t.Errorf("Unexpected number of calls: %v", callNumber)
|
||||
}
|
||||
},
|
||||
func() {
|
||||
// Check that we don't poll when asked not to.
|
||||
obj, err := s.Get().PollPeriod(0).Do().Get()
|
||||
if err == nil {
|
||||
t.Errorf("Unexpected non error: %v", obj)
|
||||
return
|
||||
}
|
||||
if se, ok := err.(*StatusErr); !ok || se.Status.Status != api.StatusWorking {
|
||||
t.Errorf("Unexpected kind of error: %#v", err)
|
||||
return
|
||||
}
|
||||
if callNumber != 1 {
|
||||
t.Errorf("Unexpected number of calls: %v", callNumber)
|
||||
}
|
||||
},
|
||||
}
|
||||
for _, f := range trials {
|
||||
callNumber = 0
|
||||
f()
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user