diff --git a/pkg/api/rest/resttest/resttest.go b/pkg/api/rest/resttest/resttest.go index fa3fb7de03e..a5bceff4fd6 100644 --- a/pkg/api/rest/resttest/resttest.go +++ b/pkg/api/rest/resttest/resttest.go @@ -85,12 +85,12 @@ func (t *Tester) TestCreateResetsUserData(valid runtime.Object) { objectMeta.UID = "bad-uid" objectMeta.CreationTimestamp = now - channel, err := t.storage.(apiserver.RESTCreater).Create(api.NewDefaultContext(), valid) + obj, err := t.storage.(apiserver.RESTCreater).Create(api.NewDefaultContext(), valid) if err != nil { t.Fatalf("Unexpected error: %v", err) } - if obj := <-channel; obj.Object == nil { - t.Fatalf("Unexpected object from channel: %#v", obj) + if obj == nil { + t.Fatalf("Unexpected object from result: %#v", obj) } if objectMeta.UID == "bad-uid" || objectMeta.CreationTimestamp == now { t.Errorf("ObjectMeta did not reset basic fields: %#v", objectMeta) @@ -111,12 +111,12 @@ func (t *Tester) TestCreateHasMetadata(valid runtime.Object) { context = api.NewContext() } - channel, err := t.storage.(apiserver.RESTCreater).Create(context, valid) + obj, err := t.storage.(apiserver.RESTCreater).Create(context, valid) if err != nil { t.Fatalf("Unexpected error: %v", err) } - if obj := <-channel; obj.Object == nil { - t.Fatalf("Unexpected object from channel: %#v", obj) + if obj == nil { + t.Fatalf("Unexpected object from result: %#v", obj) } if !api.HasObjectMetaSystemFieldValues(objectMeta) { t.Errorf("storage did not populate object meta field values") @@ -148,12 +148,8 @@ func (t *Tester) TestCreateGeneratesNameReturnsTryAgain(valid runtime.Object) { objectMeta.GenerateName = "test-" t.withStorageError(errors.NewAlreadyExists("kind", "thing"), func() { - ch, err := t.storage.(apiserver.RESTCreater).Create(api.NewDefaultContext(), valid) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - res := <-ch - if err := errors.FromObject(res.Object); err == nil || !errors.IsTryAgainLater(err) { + _, err := t.storage.(apiserver.RESTCreater).Create(api.NewDefaultContext(), valid) + if err == nil || !errors.IsTryAgainLater(err) { t.Fatalf("Unexpected error: %v", err) } }) diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index f00e66076a7..55be37e4f30 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -180,17 +180,17 @@ func (storage *SimpleRESTStorage) Get(ctx api.Context, id string) (runtime.Objec return api.Scheme.CopyOrDie(&storage.item), storage.errors["get"] } -func (storage *SimpleRESTStorage) Delete(ctx api.Context, id string) (<-chan RESTResult, error) { +func (storage *SimpleRESTStorage) Delete(ctx api.Context, id string) (runtime.Object, error) { storage.deleted = id if err := storage.errors["delete"]; err != nil { return nil, err } - return MakeAsync(func() (runtime.Object, error) { - if storage.injectedFunction != nil { - return storage.injectedFunction(&Simple{ObjectMeta: api.ObjectMeta{Name: id}}) - } - return &api.Status{Status: api.StatusSuccess}, nil - }), nil + var obj runtime.Object = &api.Status{Status: api.StatusSuccess} + var err error + if storage.injectedFunction != nil { + obj, err = storage.injectedFunction(&Simple{ObjectMeta: api.ObjectMeta{Name: id}}) + } + return obj, err } func (storage *SimpleRESTStorage) New() runtime.Object { @@ -201,30 +201,28 @@ func (storage *SimpleRESTStorage) NewList() runtime.Object { return &SimpleList{} } -func (storage *SimpleRESTStorage) Create(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error) { +func (storage *SimpleRESTStorage) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { storage.created = obj.(*Simple) if err := storage.errors["create"]; err != nil { return nil, err } - return MakeAsync(func() (runtime.Object, error) { - if storage.injectedFunction != nil { - return storage.injectedFunction(obj) - } - return obj, nil - }), nil + var err error + if storage.injectedFunction != nil { + obj, err = storage.injectedFunction(obj) + } + return obj, err } -func (storage *SimpleRESTStorage) Update(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error) { +func (storage *SimpleRESTStorage) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) { storage.updated = obj.(*Simple) if err := storage.errors["update"]; err != nil { - return nil, err + return nil, false, err } - return MakeAsync(func() (runtime.Object, error) { - if storage.injectedFunction != nil { - return storage.injectedFunction(obj) - } - return obj, nil - }), nil + var err error + if storage.injectedFunction != nil { + obj, err = storage.injectedFunction(obj) + } + return obj, false, err } // Implement ResourceWatcher. @@ -994,7 +992,7 @@ func TestCreate(t *testing.T) { if !reflect.DeepEqual(&itemOut, simple) { t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simple, string(body)) } - if response.StatusCode != http.StatusOK { + if response.StatusCode != http.StatusCreated { t.Errorf("Unexpected status: %d, Expected: %d, %#v", response.StatusCode, http.StatusOK, response) } if !selfLinker.called { diff --git a/pkg/apiserver/async.go b/pkg/apiserver/async.go index a96a151be34..ca1d0aa32cf 100644 --- a/pkg/apiserver/async.go +++ b/pkg/apiserver/async.go @@ -45,28 +45,3 @@ func MakeAsync(fn WorkFunc) <-chan RESTResult { }() return channel } - -// WorkFunc is used to perform any time consuming work for an api call, after -// the input has been validated. Pass one of these to MakeAsync to create an -// appropriate return value for the Update, Delete, and Create methods. -type WorkResultFunc func() (result RESTResult, err error) - -// MakeAsync takes a function and executes it, delivering the result in the way required -// by RESTStorage's Update, Delete, and Create methods. -func MakeAsyncResult(fn WorkResultFunc) <-chan RESTResult { - channel := make(chan RESTResult) - go func() { - defer util.HandleCrash() - obj, err := fn() - if err != nil { - channel <- RESTResult{Object: errToAPIStatus(err)} - } else { - channel <- obj - } - // 'close' is used to signal that no further values will - // be written to the channel. Not strictly necessary, but - // also won't hurt. - close(channel) - }() - return channel -} diff --git a/pkg/apiserver/interfaces.go b/pkg/apiserver/interfaces.go index 4bf2329e00f..233bd12236e 100644 --- a/pkg/apiserver/interfaces.go +++ b/pkg/apiserver/interfaces.go @@ -52,7 +52,9 @@ type RESTDeleter interface { // Delete finds a resource in the storage and deletes it. // Although it can return an arbitrary error value, IsNotFound(err) is true for the // returned error value err when the specified resource is not found. - Delete(ctx api.Context, id string) (<-chan RESTResult, error) + // Delete *may* return the object that was deleted, or a status object indicating additional + // information about deletion. + Delete(ctx api.Context, id string) (runtime.Object, error) } type RESTCreater interface { @@ -61,7 +63,7 @@ type RESTCreater interface { New() runtime.Object // Create creates a new version of a resource. - Create(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error) + Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) } type RESTUpdater interface { @@ -70,10 +72,9 @@ type RESTUpdater interface { New() runtime.Object // Update finds a resource in the storage and updates it. Some implementations - // may allow updates creates the object - they should set the Created flag of - // the returned RESTResult to true. In the event of an asynchronous error returned - // via an api.Status object, the Created flag is ignored. - Update(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error) + // may allow updates creates the object - they should set the created boolean + // to true. + Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) } // RESTResult indicates the result of a REST transformation. diff --git a/pkg/apiserver/resthandler.go b/pkg/apiserver/resthandler.go index 11e725aa19e..b09be207367 100644 --- a/pkg/apiserver/resthandler.go +++ b/pkg/apiserver/resthandler.go @@ -17,7 +17,6 @@ limitations under the License. package apiserver import ( - "fmt" "net/http" "time" @@ -145,29 +144,20 @@ func CreateResource(r RESTCreater, namespaceFn ResourceNamespaceFunc, linkFn Lin return } - out, err := r.Create(ctx, obj) + result, err := finishRequest(timeout, func() (runtime.Object, error) { + return r.Create(ctx, obj) + }) if err != nil { errorJSON(err, codec, w) return } - result, err := finishRequest(out, timeout, codec) - if err != nil { + if err := linkFn(req, result); err != nil { errorJSON(err, codec, w) return } - item := result.Object - if err := linkFn(req, item); err != nil { - errorJSON(err, codec, w) - return - } - - status := http.StatusOK - if result.Created { - status = http.StatusCreated - } - writeJSON(status, codec, item, w) + writeJSON(http.StatusCreated, codec, result, w) } } @@ -223,29 +213,27 @@ func UpdateResource(r RESTUpdater, nameFn ResourceNameFunc, objNameFunc ObjectNa return } - out, err := r.Update(ctx, obj) + wasCreated := false + result, err := finishRequest(timeout, func() (runtime.Object, error) { + obj, created, err := r.Update(ctx, obj) + wasCreated = created + return obj, err + }) if err != nil { errorJSON(err, codec, w) return } - result, err := finishRequest(out, timeout, codec) - if err != nil { - errorJSON(err, codec, w) - return - } - - item := result.Object - if err := linkFn(req, item); err != nil { + if err := linkFn(req, result); err != nil { errorJSON(err, codec, w) return } status := http.StatusOK - if result.Created { + if wasCreated { status = http.StatusCreated } - writeJSON(status, codec, item, w) + writeJSON(status, codec, result, w) } } @@ -273,13 +261,9 @@ func DeleteResource(r RESTDeleter, nameFn ResourceNameFunc, linkFn LinkResourceF return } - out, err := r.Delete(ctx, name) - if err != nil { - errorJSON(err, codec, w) - return - } - - result, err := finishRequest(out, timeout, codec) + result, err := finishRequest(timeout, func() (runtime.Object, error) { + return r.Delete(ctx, name) + }) if err != nil { errorJSON(err, codec, w) return @@ -287,9 +271,8 @@ func DeleteResource(r RESTDeleter, nameFn ResourceNameFunc, linkFn LinkResourceF // if the RESTDeleter returns a nil object, fill out a status. Callers may return a valid // object with the response. - item := result.Object - if item == nil { - item = &api.Status{ + if result == nil { + result = &api.Status{ Status: api.StatusSuccess, Code: http.StatusOK, Details: &api.StatusDetails{ @@ -297,24 +280,43 @@ func DeleteResource(r RESTDeleter, nameFn ResourceNameFunc, linkFn LinkResourceF Kind: kind, }, } + } else { + // when a non-status response is returned, set the self link + if _, ok := result.(*api.Status); !ok { + if err := linkFn(req, result); err != nil { + errorJSON(err, codec, w) + return + } + } } - writeJSON(http.StatusOK, codec, item, w) + writeJSON(http.StatusOK, codec, result, w) } } -// finishRequest waits for the result channel to close or clear, and writes the appropriate response. +// resultFunc is a function that returns a rest result and can be run in a goroutine +type resultFunc func() (runtime.Object, error) + +// finishRequest makes a given resultFunc asynchronous and handles errors returned by the response. // Any api.Status object returned is considered an "error", which interrupts the normal response flow. -func finishRequest(ch <-chan RESTResult, timeout time.Duration, codec runtime.Codec) (*RESTResult, error) { - select { - case result, ok := <-ch: - if !ok { - // likely programming error - return nil, fmt.Errorf("operation channel closed without returning result") +func finishRequest(timeout time.Duration, fn resultFunc) (result runtime.Object, err error) { + ch := make(chan runtime.Object) + errCh := make(chan error) + go func() { + if result, err := fn(); err != nil { + errCh <- err + } else { + ch <- result } - if status, ok := result.Object.(*api.Status); ok { + }() + + select { + case result = <-ch: + if status, ok := result.(*api.Status); ok { return nil, errors.FromObject(status) } - return &result, nil + return result, nil + case err = <-errCh: + return nil, err case <-time.After(timeout): return nil, errors.NewTimeoutError("request did not complete within allowed duration") } diff --git a/pkg/master/publish.go b/pkg/master/publish.go index 47b9803f478..ced152954b9 100644 --- a/pkg/master/publish.go +++ b/pkg/master/publish.go @@ -17,7 +17,6 @@ limitations under the License. package master import ( - "fmt" "net" "strconv" "time" @@ -92,15 +91,8 @@ func (m *Master) createMasterNamespaceIfNeeded(ns string) error { Namespace: "", }, } - c, err := m.storage["namespaces"].(apiserver.RESTCreater).Create(ctx, namespace) - if err != nil { - return err - } - resp := <-c - if _, ok := resp.Object.(*api.Service); ok { - return nil - } - return fmt.Errorf("unexpected response %#v", resp) + _, err := m.storage["namespaces"].(apiserver.RESTCreater).Create(ctx, namespace) + return err } // createMasterServiceIfNeeded will create the specified service if it @@ -126,18 +118,8 @@ func (m *Master) createMasterServiceIfNeeded(serviceName string, serviceIP net.I SessionAffinity: api.AffinityTypeNone, }, } - // Kids, don't do this at home: this is a hack. There's no good way to call the business - // logic which lives in the REST object from here. - c, err := m.storage["services"].(apiserver.RESTCreater).Create(ctx, svc) - if err != nil { - return err - } - resp := <-c - if _, ok := resp.Object.(*api.Service); ok { - // If all worked, we get back an *api.Service object. - return nil - } - return fmt.Errorf("unexpected response: %#v", resp.Object) + _, err := m.storage["services"].(apiserver.RESTCreater).Create(ctx, svc) + return err } // ensureEndpointsContain sets the endpoints for the given service. Also removes