mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-12 20:57:20 +00:00
Allow clients to determine the difference between create or update on PUT
PUT allows an object to be created (http 201). This allows REST code to indicate an object has been created and clients to react to it. APIServer now deals with <-chan RESTResult instead of <-chan runtime.Object, allowing more data to be passed through.
This commit is contained in:
@@ -103,7 +103,7 @@ func (storage *SimpleRESTStorage) Get(ctx api.Context, id string) (runtime.Objec
|
||||
return api.Scheme.CopyOrDie(&storage.item), storage.errors["get"]
|
||||
}
|
||||
|
||||
func (storage *SimpleRESTStorage) Delete(ctx api.Context, id string) (<-chan runtime.Object, error) {
|
||||
func (storage *SimpleRESTStorage) Delete(ctx api.Context, id string) (<-chan RESTResult, error) {
|
||||
storage.deleted = id
|
||||
if err := storage.errors["delete"]; err != nil {
|
||||
return nil, err
|
||||
@@ -120,7 +120,7 @@ func (storage *SimpleRESTStorage) New() runtime.Object {
|
||||
return &Simple{}
|
||||
}
|
||||
|
||||
func (storage *SimpleRESTStorage) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Object, error) {
|
||||
func (storage *SimpleRESTStorage) Create(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error) {
|
||||
storage.created = obj.(*Simple)
|
||||
if err := storage.errors["create"]; err != nil {
|
||||
return nil, err
|
||||
@@ -133,7 +133,7 @@ func (storage *SimpleRESTStorage) Create(ctx api.Context, obj runtime.Object) (<
|
||||
}), nil
|
||||
}
|
||||
|
||||
func (storage *SimpleRESTStorage) Update(ctx api.Context, obj runtime.Object) (<-chan runtime.Object, error) {
|
||||
func (storage *SimpleRESTStorage) Update(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error) {
|
||||
storage.updated = obj.(*Simple)
|
||||
if err := storage.errors["update"]; err != nil {
|
||||
return nil, err
|
||||
|
@@ -28,13 +28,38 @@ type WorkFunc func() (result runtime.Object, err error)
|
||||
|
||||
// MakeAsync takes a function and executes it, delivering the result in the way required
|
||||
// by RESTStorage's Update, Delete, and Create methods.
|
||||
func MakeAsync(fn WorkFunc) <-chan runtime.Object {
|
||||
channel := make(chan runtime.Object)
|
||||
func MakeAsync(fn WorkFunc) <-chan RESTResult {
|
||||
channel := make(chan RESTResult)
|
||||
go func() {
|
||||
defer util.HandleCrash()
|
||||
obj, err := fn()
|
||||
if err != nil {
|
||||
channel <- errToAPIStatus(err)
|
||||
channel <- RESTResult{Object: errToAPIStatus(err)}
|
||||
} else {
|
||||
channel <- RESTResult{Object: obj}
|
||||
}
|
||||
// 'close' is used to signal that no further values will
|
||||
// be written to the channel. Not strictly necessary, but
|
||||
// also won't hurt.
|
||||
close(channel)
|
||||
}()
|
||||
return channel
|
||||
}
|
||||
|
||||
// WorkFunc is used to perform any time consuming work for an api call, after
|
||||
// the input has been validated. Pass one of these to MakeAsync to create an
|
||||
// appropriate return value for the Update, Delete, and Create methods.
|
||||
type WorkResultFunc func() (result RESTResult, err error)
|
||||
|
||||
// MakeAsync takes a function and executes it, delivering the result in the way required
|
||||
// by RESTStorage's Update, Delete, and Create methods.
|
||||
func MakeAsyncResult(fn WorkResultFunc) <-chan RESTResult {
|
||||
channel := make(chan RESTResult)
|
||||
go func() {
|
||||
defer util.HandleCrash()
|
||||
obj, err := fn()
|
||||
if err != nil {
|
||||
channel <- RESTResult{Object: errToAPIStatus(err)}
|
||||
} else {
|
||||
channel <- obj
|
||||
}
|
||||
|
@@ -65,6 +65,7 @@ func RecoverPanics(handler http.Handler) http.Handler {
|
||||
defer httplog.NewLogged(req, &w).StacktraceWhen(
|
||||
httplog.StatusIsNot(
|
||||
http.StatusOK,
|
||||
http.StatusCreated,
|
||||
http.StatusAccepted,
|
||||
http.StatusMovedPermanently,
|
||||
http.StatusTemporaryRedirect,
|
||||
|
@@ -41,10 +41,27 @@ type RESTStorage interface {
|
||||
// Delete finds a resource in the storage and deletes it.
|
||||
// Although it can return an arbitrary error value, IsNotFound(err) is true for the
|
||||
// returned error value err when the specified resource is not found.
|
||||
Delete(ctx api.Context, id string) (<-chan runtime.Object, error)
|
||||
Delete(ctx api.Context, id string) (<-chan RESTResult, error)
|
||||
|
||||
Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Object, error)
|
||||
Update(ctx api.Context, obj runtime.Object) (<-chan runtime.Object, error)
|
||||
// Create creates a new version of a resource.
|
||||
Create(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error)
|
||||
|
||||
// Update finds a resource in the storage and updates it. Some implementations
|
||||
// may allow updates creates the object - they should set the Created flag of
|
||||
// the returned RESTResultto true. In the event of an asynchronous error returned
|
||||
// via an api.Status object, the Created flag is ignored.
|
||||
Update(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error)
|
||||
}
|
||||
|
||||
// RESTResult indicates the result of a REST transformation.
|
||||
type RESTResult struct {
|
||||
// The result of this operation. May be nil if the operation has no meaningful
|
||||
// result (like Delete)
|
||||
runtime.Object
|
||||
|
||||
// May be set true to indicate that the Update operation resulted in the object
|
||||
// being created.
|
||||
Created bool
|
||||
}
|
||||
|
||||
// ResourceWatcher should be implemented by all RESTStorage objects that
|
||||
|
@@ -53,7 +53,8 @@ func (h *OperationHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
obj, complete := op.StatusOrResult()
|
||||
result, complete := op.StatusOrResult()
|
||||
obj := result.Object
|
||||
if complete {
|
||||
writeJSON(http.StatusOK, h.codec, obj, w)
|
||||
} else {
|
||||
@@ -64,9 +65,9 @@ func (h *OperationHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
// Operation represents an ongoing action which the server is performing.
|
||||
type Operation struct {
|
||||
ID string
|
||||
result runtime.Object
|
||||
onReceive func(runtime.Object)
|
||||
awaiting <-chan runtime.Object
|
||||
result RESTResult
|
||||
onReceive func(RESTResult)
|
||||
awaiting <-chan RESTResult
|
||||
finished *time.Time
|
||||
lock sync.Mutex
|
||||
notify chan struct{}
|
||||
@@ -93,7 +94,7 @@ func NewOperations() *Operations {
|
||||
|
||||
// NewOperation adds a new operation. It is lock-free. 'onReceive' will be called
|
||||
// with the value read from 'from', when it is read.
|
||||
func (ops *Operations) NewOperation(from <-chan runtime.Object, onReceive func(runtime.Object)) *Operation {
|
||||
func (ops *Operations) NewOperation(from <-chan RESTResult, onReceive func(RESTResult)) *Operation {
|
||||
id := atomic.AddInt64(&ops.lastID, 1)
|
||||
op := &Operation{
|
||||
ID: strconv.FormatInt(id, 10),
|
||||
@@ -192,16 +193,16 @@ func (op *Operation) expired(limitTime time.Time) bool {
|
||||
|
||||
// StatusOrResult returns status information or the result of the operation if it is complete,
|
||||
// with a bool indicating true in the latter case.
|
||||
func (op *Operation) StatusOrResult() (description runtime.Object, finished bool) {
|
||||
func (op *Operation) StatusOrResult() (description RESTResult, finished bool) {
|
||||
op.lock.Lock()
|
||||
defer op.lock.Unlock()
|
||||
|
||||
if op.finished == nil {
|
||||
return &api.Status{
|
||||
return RESTResult{Object: &api.Status{
|
||||
Status: api.StatusWorking,
|
||||
Reason: api.StatusReasonWorking,
|
||||
Details: &api.StatusDetails{ID: op.ID, Kind: "operation"},
|
||||
}, false
|
||||
}}, false
|
||||
}
|
||||
return op.result, true
|
||||
}
|
||||
|
@@ -34,16 +34,16 @@ import (
|
||||
func TestOperation(t *testing.T) {
|
||||
ops := NewOperations()
|
||||
|
||||
c := make(chan runtime.Object)
|
||||
c := make(chan RESTResult)
|
||||
called := make(chan struct{})
|
||||
op := ops.NewOperation(c, func(runtime.Object) { go close(called) })
|
||||
op := ops.NewOperation(c, func(RESTResult) { go close(called) })
|
||||
// Allow context switch, so that op's ID can get added to the map and Get will work.
|
||||
// This is just so we can test Get. Ordinary users have no need to call Get immediately
|
||||
// after calling NewOperation, because it returns the operation directly.
|
||||
time.Sleep(time.Millisecond)
|
||||
go func() {
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
c <- &Simple{ObjectMeta: api.ObjectMeta{Name: "All done"}}
|
||||
c <- RESTResult{Object: &Simple{ObjectMeta: api.ObjectMeta{Name: "All done"}}}
|
||||
}()
|
||||
|
||||
if op.expired(time.Now().Add(-time.Minute)) {
|
||||
@@ -96,7 +96,7 @@ func TestOperation(t *testing.T) {
|
||||
t.Errorf("expire failed to remove the operation %#v", ops)
|
||||
}
|
||||
|
||||
if op.result.(*Simple).Name != "All done" {
|
||||
if op.result.Object.(*Simple).Name != "All done" {
|
||||
t.Errorf("Got unexpected result: %#v", op.result)
|
||||
}
|
||||
}
|
||||
|
@@ -29,6 +29,7 @@ import (
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// RESTHandler implements HTTP verbs on a set of RESTful resources identified by name.
|
||||
type RESTHandler struct {
|
||||
storage map[string]RESTStorage
|
||||
codec runtime.Codec
|
||||
@@ -78,9 +79,9 @@ func (h *RESTHandler) setSelfLinkAddName(obj runtime.Object, req *http.Request)
|
||||
}
|
||||
|
||||
// curry adapts either of the self link setting functions into a function appropriate for operation's hook.
|
||||
func curry(f func(runtime.Object, *http.Request) error, req *http.Request) func(runtime.Object) {
|
||||
return func(obj runtime.Object) {
|
||||
if err := f(obj, req); err != nil {
|
||||
func curry(f func(runtime.Object, *http.Request) error, req *http.Request) func(RESTResult) {
|
||||
return func(obj RESTResult) {
|
||||
if err := f(obj.Object, req); err != nil {
|
||||
glog.Errorf("unable to set self link for %#v: %v", obj, err)
|
||||
}
|
||||
}
|
||||
@@ -217,7 +218,7 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
|
||||
}
|
||||
|
||||
// createOperation creates an operation to process a channel response.
|
||||
func (h *RESTHandler) createOperation(out <-chan runtime.Object, sync bool, timeout time.Duration, onReceive func(runtime.Object)) *Operation {
|
||||
func (h *RESTHandler) createOperation(out <-chan RESTResult, sync bool, timeout time.Duration, onReceive func(RESTResult)) *Operation {
|
||||
op := h.ops.NewOperation(out, onReceive)
|
||||
if sync {
|
||||
op.WaitFor(timeout)
|
||||
@@ -230,9 +231,13 @@ func (h *RESTHandler) createOperation(out <-chan runtime.Object, sync bool, time
|
||||
// finishReq finishes up a request, waiting until the operation finishes or, after a timeout, creating an
|
||||
// Operation to receive the result and returning its ID down the writer.
|
||||
func (h *RESTHandler) finishReq(op *Operation, req *http.Request, w http.ResponseWriter) {
|
||||
obj, complete := op.StatusOrResult()
|
||||
result, complete := op.StatusOrResult()
|
||||
obj := result.Object
|
||||
if complete {
|
||||
status := http.StatusOK
|
||||
if result.Created {
|
||||
status = http.StatusCreated
|
||||
}
|
||||
switch stat := obj.(type) {
|
||||
case *api.Status:
|
||||
if stat.Code != 0 {
|
||||
|
69
pkg/apiserver/resthandler_test.go
Normal file
69
pkg/apiserver/resthandler_test.go
Normal file
@@ -0,0 +1,69 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
)
|
||||
|
||||
func TestFinishReq(t *testing.T) {
|
||||
handler := &RESTHandler{codec: api.Codec}
|
||||
op := &Operation{finished: &time.Time{}, result: RESTResult{Object: &api.Status{Code: http.StatusNotFound}}}
|
||||
resp := httptest.NewRecorder()
|
||||
handler.finishReq(op, nil, resp)
|
||||
status := &api.Status{}
|
||||
if err := json.Unmarshal([]byte(resp.Body.String()), status); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if resp.Code != http.StatusNotFound || status.Code != http.StatusNotFound {
|
||||
t.Errorf("unexpected status: %#v", status)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFinishReqUnwrap(t *testing.T) {
|
||||
handler := &RESTHandler{codec: api.Codec}
|
||||
op := &Operation{finished: &time.Time{}, result: RESTResult{Created: true, Object: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}}}
|
||||
resp := httptest.NewRecorder()
|
||||
handler.finishReq(op, nil, resp)
|
||||
obj := &api.Pod{}
|
||||
if err := json.Unmarshal([]byte(resp.Body.String()), obj); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if resp.Code != http.StatusCreated || obj.Name != "foo" {
|
||||
t.Errorf("unexpected object: %#v", obj)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFinishReqUnwrapStatus(t *testing.T) {
|
||||
handler := &RESTHandler{codec: api.Codec}
|
||||
op := &Operation{finished: &time.Time{}, result: RESTResult{Created: true, Object: &api.Status{Code: http.StatusNotFound}}}
|
||||
resp := httptest.NewRecorder()
|
||||
handler.finishReq(op, nil, resp)
|
||||
obj := &api.Status{}
|
||||
if err := json.Unmarshal([]byte(resp.Body.String()), obj); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if resp.Code != http.StatusNotFound || obj.Code != http.StatusNotFound {
|
||||
t.Errorf("unexpected object: %#v", obj)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user