mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 06:54:01 +00:00
Remove asynchronous channel on RESTStorage interfaces
This commit is contained in:
parent
d167c11b59
commit
79cb93002e
@ -85,12 +85,12 @@ func (t *Tester) TestCreateResetsUserData(valid runtime.Object) {
|
||||
objectMeta.UID = "bad-uid"
|
||||
objectMeta.CreationTimestamp = now
|
||||
|
||||
channel, err := t.storage.(apiserver.RESTCreater).Create(api.NewDefaultContext(), valid)
|
||||
obj, err := t.storage.(apiserver.RESTCreater).Create(api.NewDefaultContext(), valid)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if obj := <-channel; obj.Object == nil {
|
||||
t.Fatalf("Unexpected object from channel: %#v", obj)
|
||||
if obj == nil {
|
||||
t.Fatalf("Unexpected object from result: %#v", obj)
|
||||
}
|
||||
if objectMeta.UID == "bad-uid" || objectMeta.CreationTimestamp == now {
|
||||
t.Errorf("ObjectMeta did not reset basic fields: %#v", objectMeta)
|
||||
@ -111,12 +111,12 @@ func (t *Tester) TestCreateHasMetadata(valid runtime.Object) {
|
||||
context = api.NewContext()
|
||||
}
|
||||
|
||||
channel, err := t.storage.(apiserver.RESTCreater).Create(context, valid)
|
||||
obj, err := t.storage.(apiserver.RESTCreater).Create(context, valid)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if obj := <-channel; obj.Object == nil {
|
||||
t.Fatalf("Unexpected object from channel: %#v", obj)
|
||||
if obj == nil {
|
||||
t.Fatalf("Unexpected object from result: %#v", obj)
|
||||
}
|
||||
if !api.HasObjectMetaSystemFieldValues(objectMeta) {
|
||||
t.Errorf("storage did not populate object meta field values")
|
||||
@ -148,12 +148,8 @@ func (t *Tester) TestCreateGeneratesNameReturnsTryAgain(valid runtime.Object) {
|
||||
|
||||
objectMeta.GenerateName = "test-"
|
||||
t.withStorageError(errors.NewAlreadyExists("kind", "thing"), func() {
|
||||
ch, err := t.storage.(apiserver.RESTCreater).Create(api.NewDefaultContext(), valid)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
res := <-ch
|
||||
if err := errors.FromObject(res.Object); err == nil || !errors.IsTryAgainLater(err) {
|
||||
_, err := t.storage.(apiserver.RESTCreater).Create(api.NewDefaultContext(), valid)
|
||||
if err == nil || !errors.IsTryAgainLater(err) {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
})
|
||||
|
@ -180,17 +180,17 @@ func (storage *SimpleRESTStorage) Get(ctx api.Context, id string) (runtime.Objec
|
||||
return api.Scheme.CopyOrDie(&storage.item), storage.errors["get"]
|
||||
}
|
||||
|
||||
func (storage *SimpleRESTStorage) Delete(ctx api.Context, id string) (<-chan RESTResult, error) {
|
||||
func (storage *SimpleRESTStorage) Delete(ctx api.Context, id string) (runtime.Object, error) {
|
||||
storage.deleted = id
|
||||
if err := storage.errors["delete"]; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return MakeAsync(func() (runtime.Object, error) {
|
||||
if storage.injectedFunction != nil {
|
||||
return storage.injectedFunction(&Simple{ObjectMeta: api.ObjectMeta{Name: id}})
|
||||
}
|
||||
return &api.Status{Status: api.StatusSuccess}, nil
|
||||
}), nil
|
||||
var obj runtime.Object = &api.Status{Status: api.StatusSuccess}
|
||||
var err error
|
||||
if storage.injectedFunction != nil {
|
||||
obj, err = storage.injectedFunction(&Simple{ObjectMeta: api.ObjectMeta{Name: id}})
|
||||
}
|
||||
return obj, err
|
||||
}
|
||||
|
||||
func (storage *SimpleRESTStorage) New() runtime.Object {
|
||||
@ -201,30 +201,28 @@ func (storage *SimpleRESTStorage) NewList() runtime.Object {
|
||||
return &SimpleList{}
|
||||
}
|
||||
|
||||
func (storage *SimpleRESTStorage) Create(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error) {
|
||||
func (storage *SimpleRESTStorage) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) {
|
||||
storage.created = obj.(*Simple)
|
||||
if err := storage.errors["create"]; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return MakeAsync(func() (runtime.Object, error) {
|
||||
if storage.injectedFunction != nil {
|
||||
return storage.injectedFunction(obj)
|
||||
}
|
||||
return obj, nil
|
||||
}), nil
|
||||
var err error
|
||||
if storage.injectedFunction != nil {
|
||||
obj, err = storage.injectedFunction(obj)
|
||||
}
|
||||
return obj, err
|
||||
}
|
||||
|
||||
func (storage *SimpleRESTStorage) Update(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error) {
|
||||
func (storage *SimpleRESTStorage) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) {
|
||||
storage.updated = obj.(*Simple)
|
||||
if err := storage.errors["update"]; err != nil {
|
||||
return nil, err
|
||||
return nil, false, err
|
||||
}
|
||||
return MakeAsync(func() (runtime.Object, error) {
|
||||
if storage.injectedFunction != nil {
|
||||
return storage.injectedFunction(obj)
|
||||
}
|
||||
return obj, nil
|
||||
}), nil
|
||||
var err error
|
||||
if storage.injectedFunction != nil {
|
||||
obj, err = storage.injectedFunction(obj)
|
||||
}
|
||||
return obj, false, err
|
||||
}
|
||||
|
||||
// Implement ResourceWatcher.
|
||||
@ -994,7 +992,7 @@ func TestCreate(t *testing.T) {
|
||||
if !reflect.DeepEqual(&itemOut, simple) {
|
||||
t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simple, string(body))
|
||||
}
|
||||
if response.StatusCode != http.StatusOK {
|
||||
if response.StatusCode != http.StatusCreated {
|
||||
t.Errorf("Unexpected status: %d, Expected: %d, %#v", response.StatusCode, http.StatusOK, response)
|
||||
}
|
||||
if !selfLinker.called {
|
||||
|
@ -45,28 +45,3 @@ func MakeAsync(fn WorkFunc) <-chan RESTResult {
|
||||
}()
|
||||
return channel
|
||||
}
|
||||
|
||||
// WorkFunc is used to perform any time consuming work for an api call, after
|
||||
// the input has been validated. Pass one of these to MakeAsync to create an
|
||||
// appropriate return value for the Update, Delete, and Create methods.
|
||||
type WorkResultFunc func() (result RESTResult, err error)
|
||||
|
||||
// MakeAsync takes a function and executes it, delivering the result in the way required
|
||||
// by RESTStorage's Update, Delete, and Create methods.
|
||||
func MakeAsyncResult(fn WorkResultFunc) <-chan RESTResult {
|
||||
channel := make(chan RESTResult)
|
||||
go func() {
|
||||
defer util.HandleCrash()
|
||||
obj, err := fn()
|
||||
if err != nil {
|
||||
channel <- RESTResult{Object: errToAPIStatus(err)}
|
||||
} else {
|
||||
channel <- obj
|
||||
}
|
||||
// 'close' is used to signal that no further values will
|
||||
// be written to the channel. Not strictly necessary, but
|
||||
// also won't hurt.
|
||||
close(channel)
|
||||
}()
|
||||
return channel
|
||||
}
|
||||
|
@ -52,7 +52,9 @@ type RESTDeleter interface {
|
||||
// Delete finds a resource in the storage and deletes it.
|
||||
// Although it can return an arbitrary error value, IsNotFound(err) is true for the
|
||||
// returned error value err when the specified resource is not found.
|
||||
Delete(ctx api.Context, id string) (<-chan RESTResult, error)
|
||||
// Delete *may* return the object that was deleted, or a status object indicating additional
|
||||
// information about deletion.
|
||||
Delete(ctx api.Context, id string) (runtime.Object, error)
|
||||
}
|
||||
|
||||
type RESTCreater interface {
|
||||
@ -61,7 +63,7 @@ type RESTCreater interface {
|
||||
New() runtime.Object
|
||||
|
||||
// Create creates a new version of a resource.
|
||||
Create(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error)
|
||||
Create(ctx api.Context, obj runtime.Object) (runtime.Object, error)
|
||||
}
|
||||
|
||||
type RESTUpdater interface {
|
||||
@ -70,10 +72,9 @@ type RESTUpdater interface {
|
||||
New() runtime.Object
|
||||
|
||||
// Update finds a resource in the storage and updates it. Some implementations
|
||||
// may allow updates creates the object - they should set the Created flag of
|
||||
// the returned RESTResult to true. In the event of an asynchronous error returned
|
||||
// via an api.Status object, the Created flag is ignored.
|
||||
Update(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error)
|
||||
// may allow updates creates the object - they should set the created boolean
|
||||
// to true.
|
||||
Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error)
|
||||
}
|
||||
|
||||
// RESTResult indicates the result of a REST transformation.
|
||||
|
@ -17,7 +17,6 @@ limitations under the License.
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
@ -145,29 +144,20 @@ func CreateResource(r RESTCreater, namespaceFn ResourceNamespaceFunc, linkFn Lin
|
||||
return
|
||||
}
|
||||
|
||||
out, err := r.Create(ctx, obj)
|
||||
result, err := finishRequest(timeout, func() (runtime.Object, error) {
|
||||
return r.Create(ctx, obj)
|
||||
})
|
||||
if err != nil {
|
||||
errorJSON(err, codec, w)
|
||||
return
|
||||
}
|
||||
|
||||
result, err := finishRequest(out, timeout, codec)
|
||||
if err != nil {
|
||||
if err := linkFn(req, result); err != nil {
|
||||
errorJSON(err, codec, w)
|
||||
return
|
||||
}
|
||||
|
||||
item := result.Object
|
||||
if err := linkFn(req, item); err != nil {
|
||||
errorJSON(err, codec, w)
|
||||
return
|
||||
}
|
||||
|
||||
status := http.StatusOK
|
||||
if result.Created {
|
||||
status = http.StatusCreated
|
||||
}
|
||||
writeJSON(status, codec, item, w)
|
||||
writeJSON(http.StatusCreated, codec, result, w)
|
||||
}
|
||||
}
|
||||
|
||||
@ -223,29 +213,27 @@ func UpdateResource(r RESTUpdater, nameFn ResourceNameFunc, objNameFunc ObjectNa
|
||||
return
|
||||
}
|
||||
|
||||
out, err := r.Update(ctx, obj)
|
||||
wasCreated := false
|
||||
result, err := finishRequest(timeout, func() (runtime.Object, error) {
|
||||
obj, created, err := r.Update(ctx, obj)
|
||||
wasCreated = created
|
||||
return obj, err
|
||||
})
|
||||
if err != nil {
|
||||
errorJSON(err, codec, w)
|
||||
return
|
||||
}
|
||||
|
||||
result, err := finishRequest(out, timeout, codec)
|
||||
if err != nil {
|
||||
errorJSON(err, codec, w)
|
||||
return
|
||||
}
|
||||
|
||||
item := result.Object
|
||||
if err := linkFn(req, item); err != nil {
|
||||
if err := linkFn(req, result); err != nil {
|
||||
errorJSON(err, codec, w)
|
||||
return
|
||||
}
|
||||
|
||||
status := http.StatusOK
|
||||
if result.Created {
|
||||
if wasCreated {
|
||||
status = http.StatusCreated
|
||||
}
|
||||
writeJSON(status, codec, item, w)
|
||||
writeJSON(status, codec, result, w)
|
||||
}
|
||||
}
|
||||
|
||||
@ -273,13 +261,9 @@ func DeleteResource(r RESTDeleter, nameFn ResourceNameFunc, linkFn LinkResourceF
|
||||
return
|
||||
}
|
||||
|
||||
out, err := r.Delete(ctx, name)
|
||||
if err != nil {
|
||||
errorJSON(err, codec, w)
|
||||
return
|
||||
}
|
||||
|
||||
result, err := finishRequest(out, timeout, codec)
|
||||
result, err := finishRequest(timeout, func() (runtime.Object, error) {
|
||||
return r.Delete(ctx, name)
|
||||
})
|
||||
if err != nil {
|
||||
errorJSON(err, codec, w)
|
||||
return
|
||||
@ -287,9 +271,8 @@ func DeleteResource(r RESTDeleter, nameFn ResourceNameFunc, linkFn LinkResourceF
|
||||
|
||||
// if the RESTDeleter returns a nil object, fill out a status. Callers may return a valid
|
||||
// object with the response.
|
||||
item := result.Object
|
||||
if item == nil {
|
||||
item = &api.Status{
|
||||
if result == nil {
|
||||
result = &api.Status{
|
||||
Status: api.StatusSuccess,
|
||||
Code: http.StatusOK,
|
||||
Details: &api.StatusDetails{
|
||||
@ -297,24 +280,43 @@ func DeleteResource(r RESTDeleter, nameFn ResourceNameFunc, linkFn LinkResourceF
|
||||
Kind: kind,
|
||||
},
|
||||
}
|
||||
} else {
|
||||
// when a non-status response is returned, set the self link
|
||||
if _, ok := result.(*api.Status); !ok {
|
||||
if err := linkFn(req, result); err != nil {
|
||||
errorJSON(err, codec, w)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
writeJSON(http.StatusOK, codec, item, w)
|
||||
writeJSON(http.StatusOK, codec, result, w)
|
||||
}
|
||||
}
|
||||
|
||||
// finishRequest waits for the result channel to close or clear, and writes the appropriate response.
|
||||
// resultFunc is a function that returns a rest result and can be run in a goroutine
|
||||
type resultFunc func() (runtime.Object, error)
|
||||
|
||||
// finishRequest makes a given resultFunc asynchronous and handles errors returned by the response.
|
||||
// Any api.Status object returned is considered an "error", which interrupts the normal response flow.
|
||||
func finishRequest(ch <-chan RESTResult, timeout time.Duration, codec runtime.Codec) (*RESTResult, error) {
|
||||
select {
|
||||
case result, ok := <-ch:
|
||||
if !ok {
|
||||
// likely programming error
|
||||
return nil, fmt.Errorf("operation channel closed without returning result")
|
||||
func finishRequest(timeout time.Duration, fn resultFunc) (result runtime.Object, err error) {
|
||||
ch := make(chan runtime.Object)
|
||||
errCh := make(chan error)
|
||||
go func() {
|
||||
if result, err := fn(); err != nil {
|
||||
errCh <- err
|
||||
} else {
|
||||
ch <- result
|
||||
}
|
||||
if status, ok := result.Object.(*api.Status); ok {
|
||||
}()
|
||||
|
||||
select {
|
||||
case result = <-ch:
|
||||
if status, ok := result.(*api.Status); ok {
|
||||
return nil, errors.FromObject(status)
|
||||
}
|
||||
return &result, nil
|
||||
return result, nil
|
||||
case err = <-errCh:
|
||||
return nil, err
|
||||
case <-time.After(timeout):
|
||||
return nil, errors.NewTimeoutError("request did not complete within allowed duration")
|
||||
}
|
||||
|
@ -17,7 +17,6 @@ limitations under the License.
|
||||
package master
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
"time"
|
||||
@ -92,15 +91,8 @@ func (m *Master) createMasterNamespaceIfNeeded(ns string) error {
|
||||
Namespace: "",
|
||||
},
|
||||
}
|
||||
c, err := m.storage["namespaces"].(apiserver.RESTCreater).Create(ctx, namespace)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp := <-c
|
||||
if _, ok := resp.Object.(*api.Service); ok {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("unexpected response %#v", resp)
|
||||
_, err := m.storage["namespaces"].(apiserver.RESTCreater).Create(ctx, namespace)
|
||||
return err
|
||||
}
|
||||
|
||||
// createMasterServiceIfNeeded will create the specified service if it
|
||||
@ -126,18 +118,8 @@ func (m *Master) createMasterServiceIfNeeded(serviceName string, serviceIP net.I
|
||||
SessionAffinity: api.AffinityTypeNone,
|
||||
},
|
||||
}
|
||||
// Kids, don't do this at home: this is a hack. There's no good way to call the business
|
||||
// logic which lives in the REST object from here.
|
||||
c, err := m.storage["services"].(apiserver.RESTCreater).Create(ctx, svc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp := <-c
|
||||
if _, ok := resp.Object.(*api.Service); ok {
|
||||
// If all worked, we get back an *api.Service object.
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("unexpected response: %#v", resp.Object)
|
||||
_, err := m.storage["services"].(apiserver.RESTCreater).Create(ctx, svc)
|
||||
return err
|
||||
}
|
||||
|
||||
// ensureEndpointsContain sets the endpoints for the given service. Also removes
|
||||
|
Loading…
Reference in New Issue
Block a user