Merge pull request #3707 from nikhiljindal/preOperations

Removing support for asynchronous server requests
This commit is contained in:
Clayton Coleman 2015-01-28 13:02:52 -05:00
commit 5117189e03
15 changed files with 117 additions and 527 deletions

View File

@ -136,7 +136,6 @@ func startComponents(manifestURL string) (apiServerURL string) {
cl := client.NewOrDie(&client.Config{Host: apiServer.URL, Version: testapi.Version()})
cl.PollPeriod = time.Millisecond * 100
cl.Sync = true
helper, err := master.NewEtcdHelper(etcdClient, "")
if err != nil {

View File

@ -62,7 +62,6 @@ func newApiClient(addr string, port int) *client.Client {
apiServerURL := fmt.Sprintf("http://%s:%d", addr, port)
cl := client.NewOrDie(&client.Config{Host: apiServerURL, Version: testapi.Version()})
cl.PollPeriod = time.Second * 1
cl.Sync = true
return cl
}

View File

@ -879,18 +879,6 @@ const (
// Status code 500.
StatusReasonUnknown StatusReason = ""
// StatusReasonWorking means the server is processing this request and will complete
// at a future time.
// Details (optional):
// "kind" string - the name of the resource being referenced ("operation" today)
// "id" string - the identifier of the Operation resource where updates
// will be returned
// Headers (optional):
// "Location" - HTTP header populated with a URL that can retrieved the final
// status of this operation.
// Status code 202
StatusReasonWorking StatusReason = "Working"
// StatusReasonForbidden means the server can be reached and understood the request, but refuses
// to take any further action. It is the result of the server being configured to deny access for some reason
// to the requested resource by the client.
@ -939,6 +927,12 @@ const (
// Status code 422
StatusReasonInvalid StatusReason = "Invalid"
// StatusReasonTimeout means that the request could not be completed within the given time.
// Clients can get this response only when they specified a timeout param in the request.
// The request might succeed with an increased value of timeout param.
// Status code 504
StatusReasonTimeout StatusReason = "Timeout"
// StatusReasonBadRequest means that the request itself was invalid, because the request
// doesn't make any sense, for example deleting a read-only object. This is different than
// StatusReasonInvalid above which indicates that the API call could possibly succeed, but the

View File

@ -689,18 +689,6 @@ const (
// Status code 500.
StatusReasonUnknown StatusReason = ""
// StatusReasonWorking means the server is processing this request and will complete
// at a future time.
// Details (optional):
// "kind" string - the name of the resource being referenced ("operation" today)
// "id" string - the identifier of the Operation resource where updates
// will be returned
// Headers (optional):
// "Location" - HTTP header populated with a URL that can retrieved the final
// status of this operation.
// Status code 202
StatusReasonWorking StatusReason = "Working"
// StatusReasonNotFound means one or more resources required for this operation
// could not be found.
// Details (optional):

View File

@ -649,18 +649,6 @@ const (
// Status code 500.
StatusReasonUnknown StatusReason = ""
// StatusReasonWorking means the server is processing this request and will complete
// at a future time.
// Details (optional):
// "kind" string - the name of the resource being referenced ("operation" today)
// "id" string - the identifier of the Operation resource where updates
// will be returned
// Headers (optional):
// "Location" - HTTP header populated with a URL that can retrieved the final
// status of this operation.
// Status code 202
StatusReasonWorking StatusReason = "Working"
// StatusReasonNotFound means one or more resources required for this operation
// could not be found.
// Details (optional):

View File

@ -889,18 +889,6 @@ const (
// Status code 500.
StatusReasonUnknown StatusReason = ""
// StatusReasonWorking means the server is processing this request and will complete
// at a future time.
// Details (optional):
// "kind" string - the name of the resource being referenced ("operation" today)
// "id" string - the identifier of the Operation resource where updates
// will be returned
// Headers (optional):
// "Location" - HTTP header populated with a URL that can retrieved the final
// status of this operation.
// Status code 202
StatusReasonWorking StatusReason = "Working"
// StatusReasonNotFound means one or more resources required for this operation
// could not be found.
// Details (optional):

View File

@ -91,8 +91,6 @@ func NewAPIGroupVersion(storage map[string]RESTStorage, codec runtime.Codec, can
selfLinker: selfLinker,
ops: NewOperations(),
admissionControl: admissionControl,
// Delay just long enough to handle most simple write operations
asyncOpWait: time.Millisecond * 25,
}}
}

View File

@ -689,88 +689,6 @@ func TestUpdateMissing(t *testing.T) {
}
}
func TestCreate(t *testing.T) {
wait := sync.WaitGroup{}
wait.Add(1)
simpleStorage := &SimpleRESTStorage{
injectedFunction: func(obj runtime.Object) (returnObj runtime.Object, err error) {
wait.Wait()
return &Simple{}, nil
},
}
handler := Handle(map[string]RESTStorage{
"foo": simpleStorage,
}, codec, "/prefix", testVersion, selfLinker, admissionControl)
handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
server := httptest.NewServer(handler)
defer server.Close()
client := http.Client{}
simple := &Simple{
Other: "foo",
}
data, _ := codec.Encode(simple)
request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo", bytes.NewBuffer(data))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
response, err := client.Do(request)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if response.StatusCode != http.StatusAccepted {
t.Errorf("Unexpected response %#v", response)
}
var itemOut api.Status
body, err := extractBody(response, &itemOut)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if itemOut.Status != api.StatusWorking || itemOut.Details == nil || itemOut.Details.ID == "" {
t.Errorf("Unexpected status: %#v (%s)", itemOut, string(body))
}
wait.Done()
}
func TestCreateInvokesAdmissionControl(t *testing.T) {
wait := sync.WaitGroup{}
wait.Add(1)
simpleStorage := &SimpleRESTStorage{
injectedFunction: func(obj runtime.Object) (returnObj runtime.Object, err error) {
wait.Wait()
return &Simple{}, nil
},
}
handler := Handle(map[string]RESTStorage{
"foo": simpleStorage,
}, codec, "/prefix", testVersion, selfLinker, deny.NewAlwaysDeny())
handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
server := httptest.NewServer(handler)
defer server.Close()
client := http.Client{}
simple := &Simple{
Other: "foo",
}
data, _ := codec.Encode(simple)
request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo", bytes.NewBuffer(data))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
response, err := client.Do(request)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if response.StatusCode != http.StatusForbidden {
t.Errorf("Unexpected response %#v", response)
}
}
func TestCreateNotFound(t *testing.T) {
handler := Handle(map[string]RESTStorage{
"simple": &SimpleRESTStorage{
@ -831,7 +749,7 @@ func (s *setTestSelfLinker) SetSelfLink(obj runtime.Object, selfLink string) err
return nil
}
func TestSyncCreate(t *testing.T) {
func TestCreate(t *testing.T) {
storage := SimpleRESTStorage{
injectedFunction: func(obj runtime.Object) (runtime.Object, error) {
time.Sleep(5 * time.Millisecond)
@ -855,7 +773,7 @@ func TestSyncCreate(t *testing.T) {
Other: "bar",
}
data, _ := codec.Encode(simple)
request, err := http.NewRequest("POST", server.URL+"/prefix/version/ns/other/foo?sync=true", bytes.NewBuffer(data))
request, err := http.NewRequest("POST", server.URL+"/prefix/version/ns/other/foo", bytes.NewBuffer(data))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -889,6 +807,51 @@ func TestSyncCreate(t *testing.T) {
}
}
func TestCreateInvokesAdmissionControl(t *testing.T) {
storage := SimpleRESTStorage{
injectedFunction: func(obj runtime.Object) (runtime.Object, error) {
time.Sleep(5 * time.Millisecond)
return obj, nil
},
}
selfLinker := &setTestSelfLinker{
t: t,
name: "bar",
namespace: "other",
expectedSet: "/prefix/version/ns/other/foo/bar",
}
handler := Handle(map[string]RESTStorage{
"foo": &storage,
}, codec, "/prefix", testVersion, selfLinker, deny.NewAlwaysDeny())
server := httptest.NewServer(handler)
defer server.Close()
client := http.Client{}
simple := &Simple{
Other: "bar",
}
data, _ := codec.Encode(simple)
request, err := http.NewRequest("POST", server.URL+"/prefix/version/ns/other/foo", bytes.NewBuffer(data))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
wg := sync.WaitGroup{}
wg.Add(1)
var response *http.Response
go func() {
response, err = client.Do(request)
wg.Done()
}()
wg.Wait()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if response.StatusCode != http.StatusForbidden {
t.Errorf("Unexpected status: %d, Expected: %d, %#v", response.StatusCode, http.StatusForbidden, response)
}
}
func expectApiStatus(t *testing.T, method, url string, data []byte, code int) *api.Status {
client := http.Client{}
request, err := http.NewRequest(method, url, bytes.NewBuffer(data))
@ -913,14 +876,13 @@ func expectApiStatus(t *testing.T, method, url string, data []byte, code int) *a
return &status
}
func TestAsyncDelayReturnsError(t *testing.T) {
func TestDelayReturnsError(t *testing.T) {
storage := SimpleRESTStorage{
injectedFunction: func(obj runtime.Object) (runtime.Object, error) {
return nil, apierrs.NewAlreadyExists("foo", "bar")
},
}
handler := Handle(map[string]RESTStorage{"foo": &storage}, codec, "/prefix", testVersion, selfLinker, admissionControl)
handler.(*defaultAPIServer).group.handler.asyncOpWait = time.Millisecond / 2
server := httptest.NewServer(handler)
defer server.Close()
@ -930,63 +892,6 @@ func TestAsyncDelayReturnsError(t *testing.T) {
}
}
func TestAsyncCreateError(t *testing.T) {
ch := make(chan struct{})
storage := SimpleRESTStorage{
injectedFunction: func(obj runtime.Object) (runtime.Object, error) {
<-ch
return nil, apierrs.NewAlreadyExists("foo", "bar")
},
}
selfLinker := &setTestSelfLinker{
t: t,
name: "bar",
expectedSet: "/prefix/version/foo/bar",
}
handler := Handle(map[string]RESTStorage{"foo": &storage}, codec, "/prefix", testVersion, selfLinker, admissionControl)
handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
server := httptest.NewServer(handler)
defer server.Close()
simple := &Simple{Other: "bar"}
data, _ := codec.Encode(simple)
status := expectApiStatus(t, "POST", fmt.Sprintf("%s/prefix/version/foo", server.URL), data, http.StatusAccepted)
if status.Status != api.StatusWorking || status.Details == nil || status.Details.ID == "" {
t.Errorf("Unexpected status %#v", status)
}
otherStatus := expectApiStatus(t, "GET", fmt.Sprintf("%s/prefix/version/operations/%s", server.URL, status.Details.ID), []byte{}, http.StatusAccepted)
if !reflect.DeepEqual(status, otherStatus) {
t.Errorf("Expected %#v, Got %#v", status, otherStatus)
}
ch <- struct{}{}
time.Sleep(time.Millisecond)
finalStatus := expectApiStatus(t, "GET", fmt.Sprintf("%s/prefix/version/operations/%s?after=1", server.URL, status.Details.ID), []byte{}, http.StatusOK)
expectedErr := apierrs.NewAlreadyExists("foo", "bar")
expectedStatus := &api.Status{
Status: api.StatusFailure,
Code: http.StatusConflict,
Reason: "AlreadyExists",
Message: expectedErr.Error(),
Details: &api.StatusDetails{
Kind: "foo",
ID: "bar",
},
}
if !reflect.DeepEqual(expectedStatus, finalStatus) {
t.Errorf("Expected %#v, Got %#v", expectedStatus, finalStatus)
if finalStatus.Details != nil {
t.Logf("Details %#v, Got %#v", *expectedStatus.Details, *finalStatus.Details)
}
}
if !selfLinker.called {
t.Errorf("Never set self link")
}
}
type UnregisteredAPIObject struct {
Value string
}
@ -1031,7 +936,7 @@ func TestWriteRAWJSONMarshalError(t *testing.T) {
}
}
func TestSyncCreateTimeout(t *testing.T) {
func TestCreateTimeout(t *testing.T) {
testOver := make(chan struct{})
defer close(testOver)
storage := SimpleRESTStorage{
@ -1049,8 +954,8 @@ func TestSyncCreateTimeout(t *testing.T) {
simple := &Simple{Other: "foo"}
data, _ := codec.Encode(simple)
itemOut := expectApiStatus(t, "POST", server.URL+"/prefix/version/foo?sync=true&timeout=4ms", data, http.StatusAccepted)
if itemOut.Status != api.StatusWorking || itemOut.Details == nil || itemOut.Details.ID == "" {
itemOut := expectApiStatus(t, "POST", server.URL+"/prefix/version/foo?timeout=4ms", data, http.StatusAccepted)
if itemOut.Status != api.StatusFailure || itemOut.Reason != api.StatusReasonTimeout {
t.Errorf("Unexpected status %#v", itemOut)
}
}

View File

@ -199,9 +199,8 @@ func (op *Operation) StatusOrResult() (description RESTResult, finished bool) {
if op.finished == nil {
return RESTResult{Object: &api.Status{
Status: api.StatusWorking,
Reason: api.StatusReasonWorking,
Details: &api.StatusDetails{ID: op.ID, Kind: "operation"},
Status: api.StatusFailure,
Reason: api.StatusReasonTimeout,
}}, false
}
return op.result, true

View File

@ -1,225 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"bytes"
"io/ioutil"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
"time"
// TODO: remove dependency on api, apiserver should be generic
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
)
func TestOperation(t *testing.T) {
ops := NewOperations()
c := make(chan RESTResult)
called := make(chan struct{})
op := ops.NewOperation(c, func(RESTResult) { go close(called) })
// Allow context switch, so that op's ID can get added to the map and Get will work.
// This is just so we can test Get. Ordinary users have no need to call Get immediately
// after calling NewOperation, because it returns the operation directly.
time.Sleep(time.Millisecond)
go func() {
time.Sleep(500 * time.Millisecond)
c <- RESTResult{Object: &Simple{ObjectMeta: api.ObjectMeta{Name: "All done"}}}
}()
if op.expired(time.Now().Add(-time.Minute)) {
t.Errorf("Expired before finished: %#v", op)
}
ops.expire(time.Minute)
if tmp := ops.Get(op.ID); tmp == nil {
t.Errorf("expire incorrectly removed the operation %#v", ops)
}
op.WaitFor(10 * time.Millisecond)
if _, completed := op.StatusOrResult(); completed {
t.Errorf("Unexpectedly fast completion")
}
const waiters = 10
var waited int32
for i := 0; i < waiters; i++ {
go func() {
op.WaitFor(time.Hour)
atomic.AddInt32(&waited, 1)
}()
}
op.WaitFor(time.Minute)
if _, completed := op.StatusOrResult(); !completed {
t.Errorf("Unexpectedly slow completion")
}
_, open := <-called
if open {
t.Errorf("expected hook to be called!")
}
time.Sleep(100 * time.Millisecond)
finished := atomic.LoadInt32(&waited)
if finished != waiters {
t.Errorf("Multiple waiters doesn't work, only %v finished", finished)
}
if op.expired(time.Now().Add(-time.Second)) {
t.Errorf("Should not be expired: %#v", op)
}
if !op.expired(time.Now().Add(-80 * time.Millisecond)) {
t.Errorf("Should be expired: %#v", op)
}
ops.expire(80 * time.Millisecond)
if tmp := ops.Get(op.ID); tmp != nil {
t.Errorf("expire failed to remove the operation %#v", ops)
}
if op.result.Object.(*Simple).Name != "All done" {
t.Errorf("Got unexpected result: %#v", op.result)
}
}
func TestOperationsList(t *testing.T) {
testOver := make(chan struct{})
defer close(testOver)
simpleStorage := &SimpleRESTStorage{
injectedFunction: func(obj runtime.Object) (runtime.Object, error) {
// Eliminate flakes by ensuring the create operation takes longer than this test.
<-testOver
return obj, nil
},
}
handler := Handle(map[string]RESTStorage{
"foo": simpleStorage,
}, codec, "/prefix", "version", selfLinker, admissionControl)
handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
server := httptest.NewServer(handler)
defer server.Close()
client := http.Client{}
simple := &Simple{
ObjectMeta: api.ObjectMeta{Name: "foo"},
}
data, err := codec.Encode(simple)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
response, err := client.Post(server.URL+"/prefix/version/foo", "application/json", bytes.NewBuffer(data))
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if response.StatusCode != http.StatusAccepted {
t.Fatalf("Unexpected response %#v", response)
}
response, err = client.Get(server.URL + "/prefix/version/operations")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if response.StatusCode != http.StatusOK {
t.Fatalf("unexpected status code %#v", response)
}
body, err := ioutil.ReadAll(response.Body)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
obj, err := codec.Decode(body)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
oplist, ok := obj.(*api.OperationList)
if !ok {
t.Fatalf("expected ServerOpList, got %#v", obj)
}
if len(oplist.Items) != 1 {
t.Errorf("expected 1 operation, got %#v", obj)
}
}
func TestOpGet(t *testing.T) {
testOver := make(chan struct{})
defer close(testOver)
simpleStorage := &SimpleRESTStorage{
injectedFunction: func(obj runtime.Object) (runtime.Object, error) {
// Eliminate flakes by ensuring the create operation takes longer than this test.
<-testOver
return obj, nil
},
}
handler := Handle(map[string]RESTStorage{
"foo": simpleStorage,
}, codec, "/prefix", "version", selfLinker, admissionControl)
handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
server := httptest.NewServer(handler)
defer server.Close()
client := http.Client{}
simple := &Simple{
ObjectMeta: api.ObjectMeta{Name: "foo"},
}
data, err := codec.Encode(simple)
t.Log(string(data))
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo", bytes.NewBuffer(data))
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
response, err := client.Do(request)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if response.StatusCode != http.StatusAccepted {
t.Fatalf("Unexpected response %#v", response)
}
var itemOut api.Status
body, err := extractBody(response, &itemOut)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if itemOut.Status != api.StatusWorking || itemOut.Details == nil || itemOut.Details.ID == "" {
t.Fatalf("Unexpected status: %#v (%s)", itemOut, string(body))
}
req2, err := http.NewRequest("GET", server.URL+"/prefix/version/operations/"+itemOut.Details.ID, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
_, err = client.Do(req2)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if response.StatusCode != http.StatusAccepted {
t.Errorf("Unexpected response %#v", response)
}
}

View File

@ -37,7 +37,6 @@ type RESTHandler struct {
canonicalPrefix string
selfLinker runtime.SelfLinker
ops *Operations
asyncOpWait time.Duration
admissionControl admission.Interface
}
@ -144,12 +143,11 @@ func curry(f func(runtime.Object, *http.Request) error, req *http.Request) func(
// DELETE /foo/bar delete 'bar'
// Returns 404 if the method/pattern doesn't match one of these entries
// The s accepts several query parameters:
// sync=[false|true] Synchronous request (only applies to create, update, delete operations)
// timeout=<duration> Timeout for synchronous requests, only applies if sync=true
// timeout=<duration> Timeout for synchronous requests
// labels=<label-selector> Used for filtering list operations
func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w http.ResponseWriter, storage RESTStorage, namespace, kind string) {
ctx := api.WithNamespace(api.NewContext(), namespace)
sync := req.URL.Query().Get("sync") == "true"
// TODO: Document the timeout query parameter.
timeout := parseTimeout(req.URL.Query().Get("timeout"))
switch req.Method {
case "GET":
@ -235,7 +233,7 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
errorJSON(err, h.codec, w)
return
}
op := h.createOperation(out, sync, timeout, curry(h.setSelfLinkAddName, req))
op := h.createOperation(out, timeout, curry(h.setSelfLinkAddName, req))
h.finishReq(op, req, w)
case "DELETE":
@ -261,7 +259,7 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
errorJSON(err, h.codec, w)
return
}
op := h.createOperation(out, sync, timeout, nil)
op := h.createOperation(out, timeout, nil)
h.finishReq(op, req, w)
case "PUT":
@ -299,7 +297,7 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
errorJSON(err, h.codec, w)
return
}
op := h.createOperation(out, sync, timeout, curry(h.setSelfLink, req))
op := h.createOperation(out, timeout, curry(h.setSelfLink, req))
h.finishReq(op, req, w)
default:
@ -308,13 +306,9 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
}
// createOperation creates an operation to process a channel response.
func (h *RESTHandler) createOperation(out <-chan RESTResult, sync bool, timeout time.Duration, onReceive func(RESTResult)) *Operation {
func (h *RESTHandler) createOperation(out <-chan RESTResult, timeout time.Duration, onReceive func(RESTResult)) *Operation {
op := h.ops.NewOperation(out, onReceive)
if sync {
op.WaitFor(timeout)
} else if h.asyncOpWait != 0 {
op.WaitFor(h.asyncOpWait)
}
op.WaitFor(timeout)
return op
}

View File

@ -40,7 +40,7 @@ import (
// specialParams lists parameters that are handled specially and which users of Request
// are therefore not allowed to set manually.
var specialParams = util.NewStringSet("sync", "timeout")
var specialParams = util.NewStringSet("timeout")
// PollFunc is called when a server operation returns 202 accepted. The name of the
// operation is extracted from the response and passed to this function. Return a
@ -108,7 +108,6 @@ type Request struct {
resource string
resourceName string
selector labels.Selector
sync bool
timeout time.Duration
// output
@ -178,15 +177,6 @@ func (r *Request) Name(resourceName string) *Request {
return r
}
// Sync sets sync/async call status by setting the "sync" parameter to "true"/"false".
func (r *Request) Sync(sync bool) *Request {
if r.err != nil {
return r
}
r.sync = sync
return r
}
// Namespace applies the namespace scope to a request (<resource>/[ns/<namespace>/]<name>)
func (r *Request) Namespace(namespace string) *Request {
if r.err != nil {
@ -271,7 +261,7 @@ func (r *Request) setParam(paramName, value string) *Request {
}
// Timeout makes the request use the given duration as a timeout. Sets the "timeout"
// parameter. Ignored if sync=false.
// parameter.
func (r *Request) Timeout(d time.Duration) *Request {
if r.err != nil {
return r
@ -360,13 +350,9 @@ func (r *Request) finalURL() string {
query.Add("namespace", r.namespace)
}
// sync and timeout are handled specially here, to allow setting them
// in any order.
if r.sync {
query.Add("sync", "true")
if r.timeout != 0 {
query.Add("timeout", r.timeout.String())
}
// timeout is handled specially here.
if r.timeout != 0 {
query.Add("timeout", r.timeout.String())
}
finalURL.RawQuery = query.Encode()
return finalURL.String()

View File

@ -64,8 +64,7 @@ func TestRequestWithErrorWontChange(t *testing.T) {
NoPoll().
Body("foo").
Poller(skipPolling).
Timeout(time.Millisecond).
Sync(true)
Timeout(time.Millisecond)
if changed != &r {
t.Errorf("returned request should point to the same object")
}
@ -501,7 +500,7 @@ func TestDoRequestNewWay(t *testing.T) {
} else if !reflect.DeepEqual(obj, expectedObj) {
t.Errorf("Expected: %#v, got %#v", expectedObj, obj)
}
fakeHandler.ValidateRequest(t, "/api/v1beta2/foo/bar/baz?labels=name%3Dfoo", "POST", &reqBody)
fakeHandler.ValidateRequest(t, "/api/v1beta2/foo/bar/baz?labels=name%3Dfoo&timeout=1s", "POST", &reqBody)
if fakeHandler.RequestReceived.Header["Authorization"] == nil {
t.Errorf("Request is missing authorization header: %#v", *fakeHandler.RequestReceived)
}
@ -524,7 +523,6 @@ func TestDoRequestNewWayReader(t *testing.T) {
Name("baz").
Prefix("foo").
SelectorParam("labels", labels.Set{"name": "foo"}.AsSelector()).
Sync(true).
Timeout(time.Second).
Body(bytes.NewBuffer(reqBodyExpected)).
Do().Get()
@ -538,7 +536,7 @@ func TestDoRequestNewWayReader(t *testing.T) {
t.Errorf("Expected: %#v, got %#v", expectedObj, obj)
}
tmpStr := string(reqBodyExpected)
fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz?labels=name%3Dfoo&sync=true&timeout=1s", "POST", &tmpStr)
fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz?labels=name%3Dfoo&timeout=1s", "POST", &tmpStr)
if fakeHandler.RequestReceived.Header["Authorization"] == nil {
t.Errorf("Request is missing authorization header: %#v", *fakeHandler.RequestReceived)
}
@ -574,7 +572,7 @@ func TestDoRequestNewWayObj(t *testing.T) {
t.Errorf("Expected: %#v, got %#v", expectedObj, obj)
}
tmpStr := string(reqBodyExpected)
fakeHandler.ValidateRequest(t, "/api/v1beta2/foo/bar/baz?labels=name%3Dfoo", "POST", &tmpStr)
fakeHandler.ValidateRequest(t, "/api/v1beta2/foo/bar/baz?labels=name%3Dfoo&timeout=1s", "POST", &tmpStr)
if fakeHandler.RequestReceived.Header["Authorization"] == nil {
t.Errorf("Request is missing authorization header: %#v", *fakeHandler.RequestReceived)
}
@ -626,7 +624,7 @@ func TestDoRequestNewWayFile(t *testing.T) {
t.Errorf("expected object was not created")
}
tmpStr := string(reqBodyExpected)
fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz?labels=name%3Dfoo", "POST", &tmpStr)
fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz?labels=name%3Dfoo&timeout=1s", "POST", &tmpStr)
if fakeHandler.RequestReceived.Header["Authorization"] == nil {
t.Errorf("Request is missing authorization header: %#v", *fakeHandler.RequestReceived)
}
@ -669,7 +667,7 @@ func TestWasCreated(t *testing.T) {
}
tmpStr := string(reqBodyExpected)
fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz?labels=name%3Dfoo", "PUT", &tmpStr)
fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz?labels=name%3Dfoo&timeout=1s", "PUT", &tmpStr)
if fakeHandler.RequestReceived.Header["Authorization"] == nil {
t.Errorf("Request is missing authorization header: %#v", *fakeHandler.RequestReceived)
}
@ -700,22 +698,6 @@ func TestAbsPath(t *testing.T) {
}
}
func TestSync(t *testing.T) {
c := NewOrDie(&Config{})
r := c.Get()
if r.sync {
t.Errorf("sync has wrong default")
}
r.Sync(false)
if r.sync {
t.Errorf("'Sync' doesn't work")
}
r.Sync(true)
if !r.sync {
t.Errorf("'Sync' doesn't work")
}
}
func TestUintParam(t *testing.T) {
table := []struct {
name string
@ -742,7 +724,6 @@ func TestUnacceptableParamNames(t *testing.T) {
testVal string
expectSuccess bool
}{
{"sync", "foo", false},
{"timeout", "42", false},
}

View File

@ -55,7 +55,6 @@ type RESTClient struct {
// be called.
Poller PollFunc
Sync bool
PollPeriod time.Duration
Timeout time.Duration
}
@ -80,10 +79,7 @@ func NewRESTClient(baseURL *url.URL, apiVersion string, c runtime.Codec, legacyB
LegacyBehavior: legacyBehavior,
// Make asynchronous requests by default
Sync: false,
// Poll frequently when asynchronous requests are provided
// Poll frequently
PollPeriod: time.Second * 2,
}
}
@ -110,7 +106,7 @@ func (c *RESTClient) Verb(verb string) *Request {
if poller == nil {
poller = c.DefaultPoll
}
return NewRequest(c.Client, verb, c.baseURL, c.Codec, c.LegacyBehavior, c.LegacyBehavior).Poller(poller).Sync(c.Sync).Timeout(c.Timeout)
return NewRequest(c.Client, verb, c.baseURL, c.Codec, c.LegacyBehavior, c.LegacyBehavior).Poller(poller).Timeout(c.Timeout)
}
// Post begins a POST request. Short for c.Verb("POST").
@ -135,7 +131,7 @@ func (c *RESTClient) Delete() *Request {
// PollFor makes a request to do a single poll of the completion of the given operation.
func (c *RESTClient) Operation(name string) *Request {
return c.Get().Resource("operations").Name(name).Sync(false).NoPoll()
return c.Get().Resource("operations").Name(name).NoPoll()
}
// DefaultPoll performs a polling action based on the PollPeriod set on the Client.

View File

@ -153,9 +153,8 @@ var aEndpoints string = `
}
`
// To ensure that a POST completes before a dependent GET, make operations
// effectively synchronous with the following parameters.
var syncFlags = "?sync=true&timeout=60s"
// To ensure that a POST completes before a dependent GET, set a timeout.
var timeoutFlag = "?timeout=60s"
// Requests to try. Each one should be forbidden or not forbidden
// depending on the authentication and authorization setup of the master.
@ -182,11 +181,11 @@ func getTestRequests() []struct {
}{
// Normal methods on pods
{"GET", "/api/v1beta1/pods", "", code200},
{"POST", "/api/v1beta1/pods" + syncFlags, aPod, code200},
{"PUT", "/api/v1beta1/pods/a" + syncFlags, aPod, code500}, // See #2114 about why 500
{"POST", "/api/v1beta1/pods" + timeoutFlag, aPod, code200},
{"PUT", "/api/v1beta1/pods/a" + timeoutFlag, aPod, code500}, // See #2114 about why 500
{"GET", "/api/v1beta1/pods", "", code200},
{"GET", "/api/v1beta1/pods/a", "", code200},
{"DELETE", "/api/v1beta1/pods/a" + syncFlags, "", code200},
{"DELETE", "/api/v1beta1/pods/a" + timeoutFlag, "", code200},
// Non-standard methods (not expected to work,
// but expected to pass/fail authorization prior to
@ -202,53 +201,53 @@ func getTestRequests() []struct {
// Normal methods on services
{"GET", "/api/v1beta1/services", "", code200},
{"POST", "/api/v1beta1/services" + syncFlags, aService, code200},
{"PUT", "/api/v1beta1/services/a" + syncFlags, aService, code409}, // TODO: GET and put back server-provided fields to avoid a 422
{"POST", "/api/v1beta1/services" + timeoutFlag, aService, code200},
{"PUT", "/api/v1beta1/services/a" + timeoutFlag, aService, code409}, // TODO: GET and put back server-provided fields to avoid a 422
{"GET", "/api/v1beta1/services", "", code200},
{"GET", "/api/v1beta1/services/a", "", code200},
{"DELETE", "/api/v1beta1/services/a" + syncFlags, "", code200},
{"DELETE", "/api/v1beta1/services/a" + timeoutFlag, "", code200},
// Normal methods on replicationControllers
{"GET", "/api/v1beta1/replicationControllers", "", code200},
{"POST", "/api/v1beta1/replicationControllers" + syncFlags, aRC, code200},
{"PUT", "/api/v1beta1/replicationControllers/a" + syncFlags, aRC, code409}, // See #2115 about why 409
{"POST", "/api/v1beta1/replicationControllers" + timeoutFlag, aRC, code200},
{"PUT", "/api/v1beta1/replicationControllers/a" + timeoutFlag, aRC, code409}, // See #2115 about why 409
{"GET", "/api/v1beta1/replicationControllers", "", code200},
{"GET", "/api/v1beta1/replicationControllers/a", "", code200},
{"DELETE", "/api/v1beta1/replicationControllers/a" + syncFlags, "", code200},
{"DELETE", "/api/v1beta1/replicationControllers/a" + timeoutFlag, "", code200},
// Normal methods on endpoints
{"GET", "/api/v1beta1/endpoints", "", code200},
{"POST", "/api/v1beta1/endpoints" + syncFlags, aEndpoints, code200},
{"PUT", "/api/v1beta1/endpoints/a" + syncFlags, aEndpoints, code200},
{"POST", "/api/v1beta1/endpoints" + timeoutFlag, aEndpoints, code200},
{"PUT", "/api/v1beta1/endpoints/a" + timeoutFlag, aEndpoints, code200},
{"GET", "/api/v1beta1/endpoints", "", code200},
{"GET", "/api/v1beta1/endpoints/a", "", code200},
{"DELETE", "/api/v1beta1/endpoints/a" + syncFlags, "", code405},
{"DELETE", "/api/v1beta1/endpoints/a" + timeoutFlag, "", code405},
// Normal methods on minions
{"GET", "/api/v1beta1/minions", "", code200},
{"POST", "/api/v1beta1/minions" + syncFlags, aMinion, code200},
{"PUT", "/api/v1beta1/minions/a" + syncFlags, aMinion, code422}, // TODO: GET and put back server-provided fields to avoid a 422
{"POST", "/api/v1beta1/minions" + timeoutFlag, aMinion, code200},
{"PUT", "/api/v1beta1/minions/a" + timeoutFlag, aMinion, code422}, // TODO: GET and put back server-provided fields to avoid a 422
{"GET", "/api/v1beta1/minions", "", code200},
{"GET", "/api/v1beta1/minions/a", "", code200},
{"DELETE", "/api/v1beta1/minions/a" + syncFlags, "", code200},
{"DELETE", "/api/v1beta1/minions/a" + timeoutFlag, "", code200},
// Normal methods on events
{"GET", "/api/v1beta1/events", "", code200},
{"POST", "/api/v1beta1/events" + syncFlags, aEvent, code200},
{"PUT", "/api/v1beta1/events/a" + syncFlags, aEvent, code405},
{"POST", "/api/v1beta1/events" + timeoutFlag, aEvent, code200},
{"PUT", "/api/v1beta1/events/a" + timeoutFlag, aEvent, code405},
{"GET", "/api/v1beta1/events", "", code200},
{"GET", "/api/v1beta1/events", "", code200},
{"GET", "/api/v1beta1/events/a", "", code200},
{"DELETE", "/api/v1beta1/events/a" + syncFlags, "", code200},
{"DELETE", "/api/v1beta1/events/a" + timeoutFlag, "", code200},
// Normal methods on bindings
{"GET", "/api/v1beta1/bindings", "", code405}, // Bindings are write-only
{"POST", "/api/v1beta1/pods" + syncFlags, aPod, code200}, // Need a pod to bind or you get a 404
{"POST", "/api/v1beta1/bindings" + syncFlags, aBinding, code200},
{"PUT", "/api/v1beta1/bindings/a" + syncFlags, aBinding, code405},
{"GET", "/api/v1beta1/bindings", "", code405}, // Bindings are write-only
{"POST", "/api/v1beta1/pods" + timeoutFlag, aPod, code200}, // Need a pod to bind or you get a 404
{"POST", "/api/v1beta1/bindings" + timeoutFlag, aBinding, code200},
{"PUT", "/api/v1beta1/bindings/a" + timeoutFlag, aBinding, code405},
{"GET", "/api/v1beta1/bindings", "", code405},
{"GET", "/api/v1beta1/bindings/a", "", code405}, // No bindings instances
{"DELETE", "/api/v1beta1/bindings/a" + syncFlags, "", code405},
{"DELETE", "/api/v1beta1/bindings/a" + timeoutFlag, "", code405},
// Non-existent object type.
{"GET", "/api/v1beta1/foo", "", code404},
@ -256,7 +255,7 @@ func getTestRequests() []struct {
{"PUT", "/api/v1beta1/foo/a", `{"foo": "foo"}`, code404},
{"GET", "/api/v1beta1/foo", "", code404},
{"GET", "/api/v1beta1/foo/a", "", code404},
{"DELETE", "/api/v1beta1/foo" + syncFlags, "", code404},
{"DELETE", "/api/v1beta1/foo" + timeoutFlag, "", code404},
// Operations
{"GET", "/api/v1beta1/operations", "", code200},
@ -271,6 +270,7 @@ func getTestRequests() []struct {
// Non-object endpoints
{"GET", "/", "", code200},
{"GET", "/api", "", code200},
{"GET", "/healthz", "", code200},
{"GET", "/version", "", code200},
}
@ -635,20 +635,20 @@ func TestNamespaceAuthorization(t *testing.T) {
statusCodes map[int]bool // allowed status codes.
}{
{"POST", "/api/v1beta1/pods" + syncFlags + "&namespace=foo", aPod, code200},
{"POST", "/api/v1beta1/pods" + timeoutFlag + "&namespace=foo", aPod, code200},
{"GET", "/api/v1beta1/pods?namespace=foo", "", code200},
{"GET", "/api/v1beta1/pods/a?namespace=foo", "", code200},
{"DELETE", "/api/v1beta1/pods/a" + syncFlags + "&namespace=foo", "", code200},
{"DELETE", "/api/v1beta1/pods/a" + timeoutFlag + "&namespace=foo", "", code200},
{"POST", "/api/v1beta1/pods" + syncFlags + "&namespace=bar", aPod, code403},
{"POST", "/api/v1beta1/pods" + timeoutFlag + "&namespace=bar", aPod, code403},
{"GET", "/api/v1beta1/pods?namespace=bar", "", code403},
{"GET", "/api/v1beta1/pods/a?namespace=bar", "", code403},
{"DELETE", "/api/v1beta1/pods/a" + syncFlags + "&namespace=bar", "", code403},
{"DELETE", "/api/v1beta1/pods/a" + timeoutFlag + "&namespace=bar", "", code403},
{"POST", "/api/v1beta1/pods" + syncFlags, aPod, code403},
{"POST", "/api/v1beta1/pods" + timeoutFlag, aPod, code403},
{"GET", "/api/v1beta1/pods", "", code403},
{"GET", "/api/v1beta1/pods/a", "", code403},
{"DELETE", "/api/v1beta1/pods/a" + syncFlags, "", code403},
{"DELETE", "/api/v1beta1/pods/a" + timeoutFlag, "", code403},
}
for _, r := range requests {
@ -718,15 +718,15 @@ func TestKindAuthorization(t *testing.T) {
body string
statusCodes map[int]bool // allowed status codes.
}{
{"POST", "/api/v1beta1/services" + syncFlags, aService, code200},
{"POST", "/api/v1beta1/services" + timeoutFlag, aService, code200},
{"GET", "/api/v1beta1/services", "", code200},
{"GET", "/api/v1beta1/services/a", "", code200},
{"DELETE", "/api/v1beta1/services/a" + syncFlags, "", code200},
{"DELETE", "/api/v1beta1/services/a" + timeoutFlag, "", code200},
{"POST", "/api/v1beta1/pods" + syncFlags, aPod, code403},
{"POST", "/api/v1beta1/pods" + timeoutFlag, aPod, code403},
{"GET", "/api/v1beta1/pods", "", code403},
{"GET", "/api/v1beta1/pods/a", "", code403},
{"DELETE", "/api/v1beta1/pods/a" + syncFlags, "", code403},
{"DELETE", "/api/v1beta1/pods/a" + timeoutFlag, "", code403},
}
for _, r := range requests {