mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-17 15:13:08 +00:00
Making all operations synchronous
This commit is contained in:
@@ -93,8 +93,6 @@ func NewAPIGroupVersion(storage map[string]RESTStorage, codec runtime.Codec, can
|
||||
selfLinker: selfLinker,
|
||||
ops: NewOperations(),
|
||||
admissionControl: admissionControl,
|
||||
// Delay just long enough to handle most simple write operations
|
||||
asyncOpWait: time.Millisecond * 25,
|
||||
}}
|
||||
}
|
||||
|
||||
|
@@ -689,88 +689,6 @@ func TestUpdateMissing(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreate(t *testing.T) {
|
||||
wait := sync.WaitGroup{}
|
||||
wait.Add(1)
|
||||
simpleStorage := &SimpleRESTStorage{
|
||||
injectedFunction: func(obj runtime.Object) (returnObj runtime.Object, err error) {
|
||||
wait.Wait()
|
||||
return &Simple{}, nil
|
||||
},
|
||||
}
|
||||
handler := Handle(map[string]RESTStorage{
|
||||
"foo": simpleStorage,
|
||||
}, codec, "/prefix", testVersion, selfLinker, admissionControl)
|
||||
handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
|
||||
server := httptest.NewServer(handler)
|
||||
defer server.Close()
|
||||
client := http.Client{}
|
||||
|
||||
simple := &Simple{
|
||||
Other: "foo",
|
||||
}
|
||||
data, _ := codec.Encode(simple)
|
||||
request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo", bytes.NewBuffer(data))
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
response, err := client.Do(request)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if response.StatusCode != http.StatusAccepted {
|
||||
t.Errorf("Unexpected response %#v", response)
|
||||
}
|
||||
|
||||
var itemOut api.Status
|
||||
body, err := extractBody(response, &itemOut)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if itemOut.Status != api.StatusWorking || itemOut.Details == nil || itemOut.Details.ID == "" {
|
||||
t.Errorf("Unexpected status: %#v (%s)", itemOut, string(body))
|
||||
}
|
||||
wait.Done()
|
||||
}
|
||||
|
||||
func TestCreateInvokesAdmissionControl(t *testing.T) {
|
||||
wait := sync.WaitGroup{}
|
||||
wait.Add(1)
|
||||
simpleStorage := &SimpleRESTStorage{
|
||||
injectedFunction: func(obj runtime.Object) (returnObj runtime.Object, err error) {
|
||||
wait.Wait()
|
||||
return &Simple{}, nil
|
||||
},
|
||||
}
|
||||
handler := Handle(map[string]RESTStorage{
|
||||
"foo": simpleStorage,
|
||||
}, codec, "/prefix", testVersion, selfLinker, deny.NewAlwaysDeny())
|
||||
handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
|
||||
server := httptest.NewServer(handler)
|
||||
defer server.Close()
|
||||
client := http.Client{}
|
||||
|
||||
simple := &Simple{
|
||||
Other: "foo",
|
||||
}
|
||||
data, _ := codec.Encode(simple)
|
||||
request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo", bytes.NewBuffer(data))
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
response, err := client.Do(request)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if response.StatusCode != http.StatusForbidden {
|
||||
t.Errorf("Unexpected response %#v", response)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateNotFound(t *testing.T) {
|
||||
handler := Handle(map[string]RESTStorage{
|
||||
"simple": &SimpleRESTStorage{
|
||||
@@ -831,7 +749,7 @@ func (s *setTestSelfLinker) SetSelfLink(obj runtime.Object, selfLink string) err
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestSyncCreate(t *testing.T) {
|
||||
func TestCreate(t *testing.T) {
|
||||
storage := SimpleRESTStorage{
|
||||
injectedFunction: func(obj runtime.Object) (runtime.Object, error) {
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
@@ -855,7 +773,7 @@ func TestSyncCreate(t *testing.T) {
|
||||
Other: "bar",
|
||||
}
|
||||
data, _ := codec.Encode(simple)
|
||||
request, err := http.NewRequest("POST", server.URL+"/prefix/version/ns/other/foo?sync=true", bytes.NewBuffer(data))
|
||||
request, err := http.NewRequest("POST", server.URL+"/prefix/version/ns/other/foo", bytes.NewBuffer(data))
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@@ -889,6 +807,51 @@ func TestSyncCreate(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateInvokesAdmissionControl(t *testing.T) {
|
||||
storage := SimpleRESTStorage{
|
||||
injectedFunction: func(obj runtime.Object) (runtime.Object, error) {
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
return obj, nil
|
||||
},
|
||||
}
|
||||
selfLinker := &setTestSelfLinker{
|
||||
t: t,
|
||||
name: "bar",
|
||||
namespace: "other",
|
||||
expectedSet: "/prefix/version/ns/other/foo/bar",
|
||||
}
|
||||
handler := Handle(map[string]RESTStorage{
|
||||
"foo": &storage,
|
||||
}, codec, "/prefix", testVersion, selfLinker, deny.NewAlwaysDeny())
|
||||
server := httptest.NewServer(handler)
|
||||
defer server.Close()
|
||||
client := http.Client{}
|
||||
|
||||
simple := &Simple{
|
||||
Other: "bar",
|
||||
}
|
||||
data, _ := codec.Encode(simple)
|
||||
request, err := http.NewRequest("POST", server.URL+"/prefix/version/ns/other/foo", bytes.NewBuffer(data))
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
var response *http.Response
|
||||
go func() {
|
||||
response, err = client.Do(request)
|
||||
wg.Done()
|
||||
}()
|
||||
wg.Wait()
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if response.StatusCode != http.StatusForbidden {
|
||||
t.Errorf("Unexpected status: %d, Expected: %d, %#v", response.StatusCode, http.StatusForbidden, response)
|
||||
}
|
||||
}
|
||||
|
||||
func expectApiStatus(t *testing.T, method, url string, data []byte, code int) *api.Status {
|
||||
client := http.Client{}
|
||||
request, err := http.NewRequest(method, url, bytes.NewBuffer(data))
|
||||
@@ -913,14 +876,13 @@ func expectApiStatus(t *testing.T, method, url string, data []byte, code int) *a
|
||||
return &status
|
||||
}
|
||||
|
||||
func TestAsyncDelayReturnsError(t *testing.T) {
|
||||
func TestDelayReturnsError(t *testing.T) {
|
||||
storage := SimpleRESTStorage{
|
||||
injectedFunction: func(obj runtime.Object) (runtime.Object, error) {
|
||||
return nil, apierrs.NewAlreadyExists("foo", "bar")
|
||||
},
|
||||
}
|
||||
handler := Handle(map[string]RESTStorage{"foo": &storage}, codec, "/prefix", testVersion, selfLinker, admissionControl)
|
||||
handler.(*defaultAPIServer).group.handler.asyncOpWait = time.Millisecond / 2
|
||||
server := httptest.NewServer(handler)
|
||||
defer server.Close()
|
||||
|
||||
@@ -930,63 +892,6 @@ func TestAsyncDelayReturnsError(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestAsyncCreateError(t *testing.T) {
|
||||
ch := make(chan struct{})
|
||||
storage := SimpleRESTStorage{
|
||||
injectedFunction: func(obj runtime.Object) (runtime.Object, error) {
|
||||
<-ch
|
||||
return nil, apierrs.NewAlreadyExists("foo", "bar")
|
||||
},
|
||||
}
|
||||
selfLinker := &setTestSelfLinker{
|
||||
t: t,
|
||||
name: "bar",
|
||||
expectedSet: "/prefix/version/foo/bar",
|
||||
}
|
||||
handler := Handle(map[string]RESTStorage{"foo": &storage}, codec, "/prefix", testVersion, selfLinker, admissionControl)
|
||||
handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
|
||||
server := httptest.NewServer(handler)
|
||||
defer server.Close()
|
||||
|
||||
simple := &Simple{Other: "bar"}
|
||||
data, _ := codec.Encode(simple)
|
||||
|
||||
status := expectApiStatus(t, "POST", fmt.Sprintf("%s/prefix/version/foo", server.URL), data, http.StatusAccepted)
|
||||
if status.Status != api.StatusWorking || status.Details == nil || status.Details.ID == "" {
|
||||
t.Errorf("Unexpected status %#v", status)
|
||||
}
|
||||
|
||||
otherStatus := expectApiStatus(t, "GET", fmt.Sprintf("%s/prefix/version/operations/%s", server.URL, status.Details.ID), []byte{}, http.StatusAccepted)
|
||||
if !reflect.DeepEqual(status, otherStatus) {
|
||||
t.Errorf("Expected %#v, Got %#v", status, otherStatus)
|
||||
}
|
||||
|
||||
ch <- struct{}{}
|
||||
time.Sleep(time.Millisecond)
|
||||
|
||||
finalStatus := expectApiStatus(t, "GET", fmt.Sprintf("%s/prefix/version/operations/%s?after=1", server.URL, status.Details.ID), []byte{}, http.StatusOK)
|
||||
expectedErr := apierrs.NewAlreadyExists("foo", "bar")
|
||||
expectedStatus := &api.Status{
|
||||
Status: api.StatusFailure,
|
||||
Code: http.StatusConflict,
|
||||
Reason: "AlreadyExists",
|
||||
Message: expectedErr.Error(),
|
||||
Details: &api.StatusDetails{
|
||||
Kind: "foo",
|
||||
ID: "bar",
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(expectedStatus, finalStatus) {
|
||||
t.Errorf("Expected %#v, Got %#v", expectedStatus, finalStatus)
|
||||
if finalStatus.Details != nil {
|
||||
t.Logf("Details %#v, Got %#v", *expectedStatus.Details, *finalStatus.Details)
|
||||
}
|
||||
}
|
||||
if !selfLinker.called {
|
||||
t.Errorf("Never set self link")
|
||||
}
|
||||
}
|
||||
|
||||
type UnregisteredAPIObject struct {
|
||||
Value string
|
||||
}
|
||||
@@ -1031,7 +936,7 @@ func TestWriteRAWJSONMarshalError(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncCreateTimeout(t *testing.T) {
|
||||
func TestCreateTimeout(t *testing.T) {
|
||||
testOver := make(chan struct{})
|
||||
defer close(testOver)
|
||||
storage := SimpleRESTStorage{
|
||||
@@ -1049,8 +954,8 @@ func TestSyncCreateTimeout(t *testing.T) {
|
||||
|
||||
simple := &Simple{Other: "foo"}
|
||||
data, _ := codec.Encode(simple)
|
||||
itemOut := expectApiStatus(t, "POST", server.URL+"/prefix/version/foo?sync=true&timeout=4ms", data, http.StatusAccepted)
|
||||
if itemOut.Status != api.StatusWorking || itemOut.Details == nil || itemOut.Details.ID == "" {
|
||||
itemOut := expectApiStatus(t, "POST", server.URL+"/prefix/version/foo?timeout=4ms", data, http.StatusAccepted)
|
||||
if itemOut.Status != api.StatusFailure || itemOut.Reason != api.StatusReasonTimeout {
|
||||
t.Errorf("Unexpected status %#v", itemOut)
|
||||
}
|
||||
}
|
||||
|
@@ -199,9 +199,8 @@ func (op *Operation) StatusOrResult() (description RESTResult, finished bool) {
|
||||
|
||||
if op.finished == nil {
|
||||
return RESTResult{Object: &api.Status{
|
||||
Status: api.StatusWorking,
|
||||
Reason: api.StatusReasonWorking,
|
||||
Details: &api.StatusDetails{ID: op.ID, Kind: "operation"},
|
||||
Status: api.StatusFailure,
|
||||
Reason: api.StatusReasonTimeout,
|
||||
}}, false
|
||||
}
|
||||
return op.result, true
|
||||
|
@@ -1,225 +0,0 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
// TODO: remove dependency on api, apiserver should be generic
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
)
|
||||
|
||||
func TestOperation(t *testing.T) {
|
||||
ops := NewOperations()
|
||||
|
||||
c := make(chan RESTResult)
|
||||
called := make(chan struct{})
|
||||
op := ops.NewOperation(c, func(RESTResult) { go close(called) })
|
||||
// Allow context switch, so that op's ID can get added to the map and Get will work.
|
||||
// This is just so we can test Get. Ordinary users have no need to call Get immediately
|
||||
// after calling NewOperation, because it returns the operation directly.
|
||||
time.Sleep(time.Millisecond)
|
||||
go func() {
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
c <- RESTResult{Object: &Simple{ObjectMeta: api.ObjectMeta{Name: "All done"}}}
|
||||
}()
|
||||
|
||||
if op.expired(time.Now().Add(-time.Minute)) {
|
||||
t.Errorf("Expired before finished: %#v", op)
|
||||
}
|
||||
ops.expire(time.Minute)
|
||||
if tmp := ops.Get(op.ID); tmp == nil {
|
||||
t.Errorf("expire incorrectly removed the operation %#v", ops)
|
||||
}
|
||||
|
||||
op.WaitFor(10 * time.Millisecond)
|
||||
if _, completed := op.StatusOrResult(); completed {
|
||||
t.Errorf("Unexpectedly fast completion")
|
||||
}
|
||||
|
||||
const waiters = 10
|
||||
var waited int32
|
||||
for i := 0; i < waiters; i++ {
|
||||
go func() {
|
||||
op.WaitFor(time.Hour)
|
||||
atomic.AddInt32(&waited, 1)
|
||||
}()
|
||||
}
|
||||
|
||||
op.WaitFor(time.Minute)
|
||||
if _, completed := op.StatusOrResult(); !completed {
|
||||
t.Errorf("Unexpectedly slow completion")
|
||||
}
|
||||
|
||||
_, open := <-called
|
||||
if open {
|
||||
t.Errorf("expected hook to be called!")
|
||||
}
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
finished := atomic.LoadInt32(&waited)
|
||||
if finished != waiters {
|
||||
t.Errorf("Multiple waiters doesn't work, only %v finished", finished)
|
||||
}
|
||||
|
||||
if op.expired(time.Now().Add(-time.Second)) {
|
||||
t.Errorf("Should not be expired: %#v", op)
|
||||
}
|
||||
if !op.expired(time.Now().Add(-80 * time.Millisecond)) {
|
||||
t.Errorf("Should be expired: %#v", op)
|
||||
}
|
||||
|
||||
ops.expire(80 * time.Millisecond)
|
||||
if tmp := ops.Get(op.ID); tmp != nil {
|
||||
t.Errorf("expire failed to remove the operation %#v", ops)
|
||||
}
|
||||
|
||||
if op.result.Object.(*Simple).Name != "All done" {
|
||||
t.Errorf("Got unexpected result: %#v", op.result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperationsList(t *testing.T) {
|
||||
testOver := make(chan struct{})
|
||||
defer close(testOver)
|
||||
simpleStorage := &SimpleRESTStorage{
|
||||
injectedFunction: func(obj runtime.Object) (runtime.Object, error) {
|
||||
// Eliminate flakes by ensuring the create operation takes longer than this test.
|
||||
<-testOver
|
||||
return obj, nil
|
||||
},
|
||||
}
|
||||
handler := Handle(map[string]RESTStorage{
|
||||
"foo": simpleStorage,
|
||||
}, codec, "/prefix", "version", selfLinker, admissionControl)
|
||||
handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
|
||||
server := httptest.NewServer(handler)
|
||||
defer server.Close()
|
||||
client := http.Client{}
|
||||
|
||||
simple := &Simple{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo"},
|
||||
}
|
||||
data, err := codec.Encode(simple)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
response, err := client.Post(server.URL+"/prefix/version/foo", "application/json", bytes.NewBuffer(data))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if response.StatusCode != http.StatusAccepted {
|
||||
t.Fatalf("Unexpected response %#v", response)
|
||||
}
|
||||
|
||||
response, err = client.Get(server.URL + "/prefix/version/operations")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if response.StatusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code %#v", response)
|
||||
}
|
||||
body, err := ioutil.ReadAll(response.Body)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
obj, err := codec.Decode(body)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
oplist, ok := obj.(*api.OperationList)
|
||||
if !ok {
|
||||
t.Fatalf("expected ServerOpList, got %#v", obj)
|
||||
}
|
||||
if len(oplist.Items) != 1 {
|
||||
t.Errorf("expected 1 operation, got %#v", obj)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpGet(t *testing.T) {
|
||||
testOver := make(chan struct{})
|
||||
defer close(testOver)
|
||||
simpleStorage := &SimpleRESTStorage{
|
||||
injectedFunction: func(obj runtime.Object) (runtime.Object, error) {
|
||||
// Eliminate flakes by ensuring the create operation takes longer than this test.
|
||||
<-testOver
|
||||
return obj, nil
|
||||
},
|
||||
}
|
||||
handler := Handle(map[string]RESTStorage{
|
||||
"foo": simpleStorage,
|
||||
}, codec, "/prefix", "version", selfLinker, admissionControl)
|
||||
handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
|
||||
server := httptest.NewServer(handler)
|
||||
defer server.Close()
|
||||
client := http.Client{}
|
||||
|
||||
simple := &Simple{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo"},
|
||||
}
|
||||
data, err := codec.Encode(simple)
|
||||
t.Log(string(data))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo", bytes.NewBuffer(data))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
response, err := client.Do(request)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if response.StatusCode != http.StatusAccepted {
|
||||
t.Fatalf("Unexpected response %#v", response)
|
||||
}
|
||||
|
||||
var itemOut api.Status
|
||||
body, err := extractBody(response, &itemOut)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if itemOut.Status != api.StatusWorking || itemOut.Details == nil || itemOut.Details.ID == "" {
|
||||
t.Fatalf("Unexpected status: %#v (%s)", itemOut, string(body))
|
||||
}
|
||||
|
||||
req2, err := http.NewRequest("GET", server.URL+"/prefix/version/operations/"+itemOut.Details.ID, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
_, err = client.Do(req2)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if response.StatusCode != http.StatusAccepted {
|
||||
t.Errorf("Unexpected response %#v", response)
|
||||
}
|
||||
}
|
@@ -37,7 +37,6 @@ type RESTHandler struct {
|
||||
canonicalPrefix string
|
||||
selfLinker runtime.SelfLinker
|
||||
ops *Operations
|
||||
asyncOpWait time.Duration
|
||||
admissionControl admission.Interface
|
||||
}
|
||||
|
||||
@@ -144,12 +143,11 @@ func curry(f func(runtime.Object, *http.Request) error, req *http.Request) func(
|
||||
// DELETE /foo/bar delete 'bar'
|
||||
// Returns 404 if the method/pattern doesn't match one of these entries
|
||||
// The s accepts several query parameters:
|
||||
// sync=[false|true] Synchronous request (only applies to create, update, delete operations)
|
||||
// timeout=<duration> Timeout for synchronous requests, only applies if sync=true
|
||||
// timeout=<duration> Timeout for synchronous requests
|
||||
// labels=<label-selector> Used for filtering list operations
|
||||
func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w http.ResponseWriter, storage RESTStorage, namespace, kind string) {
|
||||
ctx := api.WithNamespace(api.NewContext(), namespace)
|
||||
sync := req.URL.Query().Get("sync") == "true"
|
||||
// TODO: Document the timeout query parameter.
|
||||
timeout := parseTimeout(req.URL.Query().Get("timeout"))
|
||||
switch req.Method {
|
||||
case "GET":
|
||||
@@ -235,7 +233,7 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
|
||||
errorJSON(err, h.codec, w)
|
||||
return
|
||||
}
|
||||
op := h.createOperation(out, sync, timeout, curry(h.setSelfLinkAddName, req))
|
||||
op := h.createOperation(out, timeout, curry(h.setSelfLinkAddName, req))
|
||||
h.finishReq(op, req, w)
|
||||
|
||||
case "DELETE":
|
||||
@@ -261,7 +259,7 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
|
||||
errorJSON(err, h.codec, w)
|
||||
return
|
||||
}
|
||||
op := h.createOperation(out, sync, timeout, nil)
|
||||
op := h.createOperation(out, timeout, nil)
|
||||
h.finishReq(op, req, w)
|
||||
|
||||
case "PUT":
|
||||
@@ -299,7 +297,7 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
|
||||
errorJSON(err, h.codec, w)
|
||||
return
|
||||
}
|
||||
op := h.createOperation(out, sync, timeout, curry(h.setSelfLink, req))
|
||||
op := h.createOperation(out, timeout, curry(h.setSelfLink, req))
|
||||
h.finishReq(op, req, w)
|
||||
|
||||
default:
|
||||
@@ -308,13 +306,9 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
|
||||
}
|
||||
|
||||
// createOperation creates an operation to process a channel response.
|
||||
func (h *RESTHandler) createOperation(out <-chan RESTResult, sync bool, timeout time.Duration, onReceive func(RESTResult)) *Operation {
|
||||
func (h *RESTHandler) createOperation(out <-chan RESTResult, timeout time.Duration, onReceive func(RESTResult)) *Operation {
|
||||
op := h.ops.NewOperation(out, onReceive)
|
||||
if sync {
|
||||
op.WaitFor(timeout)
|
||||
} else if h.asyncOpWait != 0 {
|
||||
op.WaitFor(h.asyncOpWait)
|
||||
}
|
||||
op.WaitFor(timeout)
|
||||
return op
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user