mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-08 20:50:24 +00:00
Make client.Request more testable, break coupling with RESTClient
Moves polling to a function provided by the RESTClient, not innate to Request. Moves doRequest from RESTClient to Request for clarity.
This commit is contained in:
@@ -28,32 +28,65 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
watchjson "github.com/GoogleCloudPlatform/kubernetes/pkg/watch/json"
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// specialParams lists parameters that are handled specially and which users of Request
|
||||
// are therefore not allowed to set manually.
|
||||
var specialParams = util.NewStringSet("sync", "timeout")
|
||||
|
||||
// PollFunc is called when a server operation returns 202 accepted. The name of the
|
||||
// operation is extracted from the response and passed to this function. Return a
|
||||
// request to retrieve the result of the operation, or false for the second argument
|
||||
// if polling should end.
|
||||
type PollFunc func(name string) (*Request, bool)
|
||||
|
||||
// HTTPClient is an interface for testing a request object.
|
||||
type HTTPClient interface {
|
||||
Do(req *http.Request) (*http.Response, error)
|
||||
}
|
||||
|
||||
// Request allows for building up a request to a server in a chained fashion.
|
||||
// Any errors are stored until the end of your call, so you only have to
|
||||
// check once.
|
||||
type Request struct {
|
||||
c *RESTClient
|
||||
err error
|
||||
verb string
|
||||
path string
|
||||
body io.Reader
|
||||
params map[string]string
|
||||
selector labels.Selector
|
||||
timeout time.Duration
|
||||
sync bool
|
||||
pollPeriod time.Duration
|
||||
// required
|
||||
client HTTPClient
|
||||
verb string
|
||||
baseURL *url.URL
|
||||
codec runtime.Codec
|
||||
|
||||
// optional, will be invoked if the server returns a 202 to decide
|
||||
// whether to poll.
|
||||
poller PollFunc
|
||||
|
||||
// accessible via method setters
|
||||
path string
|
||||
params map[string]string
|
||||
selector labels.Selector
|
||||
sync bool
|
||||
timeout time.Duration
|
||||
|
||||
// output
|
||||
err error
|
||||
body io.Reader
|
||||
}
|
||||
|
||||
// NewRequest creates a new request with the core attributes.
|
||||
func NewRequest(client HTTPClient, verb string, baseURL *url.URL, codec runtime.Codec) *Request {
|
||||
return &Request{
|
||||
client: client,
|
||||
verb: verb,
|
||||
baseURL: baseURL,
|
||||
codec: codec,
|
||||
|
||||
path: baseURL.Path,
|
||||
}
|
||||
}
|
||||
|
||||
// Path appends an item to the request path. You must call Path at least once.
|
||||
@@ -135,6 +168,9 @@ func (r *Request) setParam(paramName, value string) *Request {
|
||||
r.err = fmt.Errorf("must set %v through the corresponding function, not directly.", paramName)
|
||||
return r
|
||||
}
|
||||
if r.params == nil {
|
||||
r.params = make(map[string]string)
|
||||
}
|
||||
r.params[paramName] = value
|
||||
return r
|
||||
}
|
||||
@@ -172,7 +208,7 @@ func (r *Request) Body(obj interface{}) *Request {
|
||||
case io.Reader:
|
||||
r.body = t
|
||||
case runtime.Object:
|
||||
data, err := r.c.Codec.Encode(t)
|
||||
data, err := r.codec.Encode(t)
|
||||
if err != nil {
|
||||
r.err = err
|
||||
return r
|
||||
@@ -184,21 +220,23 @@ func (r *Request) Body(obj interface{}) *Request {
|
||||
return r
|
||||
}
|
||||
|
||||
// PollPeriod sets the poll period.
|
||||
// If the server sends back a "working" status message, then repeatedly poll the server
|
||||
// to see if the operation has completed yet, waiting 'd' between each poll.
|
||||
// If you want to handle the "working" status yourself (it'll be delivered as StatusErr),
|
||||
// set d to 0 to turn off this behavior.
|
||||
func (r *Request) PollPeriod(d time.Duration) *Request {
|
||||
// NoPoll indicates a server "working" response should be returned as an error
|
||||
func (r *Request) NoPoll() *Request {
|
||||
return r.Poller(nil)
|
||||
}
|
||||
|
||||
// Poller indicates this request should use the specify poll function to determine whether
|
||||
// a server "working" response should be retried.
|
||||
func (r *Request) Poller(poller PollFunc) *Request {
|
||||
if r.err != nil {
|
||||
return r
|
||||
}
|
||||
r.pollPeriod = d
|
||||
r.poller = poller
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Request) finalURL() string {
|
||||
finalURL := *r.c.baseURL
|
||||
finalURL := *r.baseURL
|
||||
finalURL.Path = r.path
|
||||
query := url.Values{}
|
||||
for key, value := range r.params {
|
||||
@@ -227,18 +265,18 @@ func (r *Request) Watch() (watch.Interface, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
client := r.c.Client
|
||||
client := r.client
|
||||
if client == nil {
|
||||
client = http.DefaultClient
|
||||
}
|
||||
response, err := client.Do(req)
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if response.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("Got status: %v", response.StatusCode)
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("Got status: %v", resp.StatusCode)
|
||||
}
|
||||
return watch.NewStreamWatcher(watchjson.NewDecoder(response.Body, r.c.Codec)), nil
|
||||
return watch.NewStreamWatcher(watchjson.NewDecoder(resp.Body, r.codec)), nil
|
||||
}
|
||||
|
||||
// Stream formats and executes the request, and offers streaming of the response.
|
||||
@@ -251,51 +289,106 @@ func (r *Request) Stream() (io.ReadCloser, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
client := r.c.Client
|
||||
client := r.client
|
||||
if client == nil {
|
||||
client = http.DefaultClient
|
||||
}
|
||||
response, err := client.Do(req)
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return response.Body, nil
|
||||
return resp.Body, nil
|
||||
}
|
||||
|
||||
// Do formats and executes the request. Returns the API object received, or an error.
|
||||
// Do formats and executes the request. Returns a Result object for easy response
|
||||
// processing. Handles polling the server in the event a continuation was sent.
|
||||
func (r *Request) Do() Result {
|
||||
client := r.client
|
||||
if client == nil {
|
||||
client = http.DefaultClient
|
||||
}
|
||||
|
||||
for {
|
||||
if r.err != nil {
|
||||
return Result{err: r.err}
|
||||
}
|
||||
|
||||
req, err := http.NewRequest(r.verb, r.finalURL(), r.body)
|
||||
if err != nil {
|
||||
return Result{err: err}
|
||||
}
|
||||
respBody, created, err := r.c.doRequest(req)
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
if s, ok := err.(APIStatus); ok {
|
||||
status := s.Status()
|
||||
if status.Status == api.StatusWorking && r.pollPeriod != 0 {
|
||||
if status.Details != nil {
|
||||
id := status.Details.ID
|
||||
if len(id) > 0 {
|
||||
glog.Infof("Waiting for completion of /operations/%s", id)
|
||||
time.Sleep(r.pollPeriod)
|
||||
// Make a poll request
|
||||
pollOp := r.c.PollFor(id).PollPeriod(r.pollPeriod)
|
||||
// Could also say "return r.Do()" but this way doesn't grow the callstack.
|
||||
r = pollOp
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return Result{err: err}
|
||||
}
|
||||
return Result{respBody, created, err, r.c.Codec}
|
||||
|
||||
respBody, created, err := r.transformResponse(resp, req)
|
||||
if poll, ok := r.shouldPoll(err); ok {
|
||||
r = poll
|
||||
continue
|
||||
}
|
||||
|
||||
return Result{respBody, created, err, r.codec}
|
||||
}
|
||||
}
|
||||
|
||||
// shouldPoll checks the server error for an incomplete operation
|
||||
// and if found returns a request that would check the response.
|
||||
// If no polling is necessary or possible, it will return false.
|
||||
func (r *Request) shouldPoll(err error) (*Request, bool) {
|
||||
if err == nil || r.poller == nil {
|
||||
return nil, false
|
||||
}
|
||||
apistatus, ok := err.(APIStatus)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
status := apistatus.Status()
|
||||
if status.Status != api.StatusWorking {
|
||||
return nil, false
|
||||
}
|
||||
if status.Details == nil || len(status.Details.ID) == 0 {
|
||||
return nil, false
|
||||
}
|
||||
return r.poller(status.Details.ID)
|
||||
}
|
||||
|
||||
// transformResponse converts an API response into a structured API object.
|
||||
func (r *Request) transformResponse(resp *http.Response, req *http.Request) ([]byte, bool, error) {
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
// Did the server give us a status response?
|
||||
isStatusResponse := false
|
||||
var status api.Status
|
||||
if err := r.codec.DecodeInto(body, &status); err == nil && status.Status != "" {
|
||||
isStatusResponse = true
|
||||
}
|
||||
|
||||
switch {
|
||||
case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
|
||||
if !isStatusResponse {
|
||||
return nil, false, fmt.Errorf("request [%#v] failed (%d) %s: %s", req, resp.StatusCode, resp.Status, string(body))
|
||||
}
|
||||
return nil, false, errors.FromObject(&status)
|
||||
}
|
||||
|
||||
// If the server gave us a status back, look at what it was.
|
||||
if isStatusResponse && status.Status != api.StatusSuccess {
|
||||
// "Working" requests need to be handled specially.
|
||||
// "Failed" requests are clearly just an error and it makes sense to return them as such.
|
||||
return nil, false, errors.FromObject(&status)
|
||||
}
|
||||
|
||||
created := resp.StatusCode == http.StatusCreated
|
||||
return body, created, err
|
||||
}
|
||||
|
||||
// Result contains the result of calling Request.Do().
|
||||
type Result struct {
|
||||
body []byte
|
||||
|
Reference in New Issue
Block a user