Merge pull request #4419 from smarterclayton/expand_generic_resources

Expand the generic registry
This commit is contained in:
Clayton Coleman 2015-02-13 13:53:20 -05:00
commit c977a45864
22 changed files with 478 additions and 108 deletions

View File

@ -87,12 +87,12 @@ func CheckGeneratedNameError(strategy RESTCreateStrategy, err error, obj runtime
}
// objectMetaAndKind retrieves kind and ObjectMeta from a runtime object, or returns an error.
func objectMetaAndKind(strategy RESTCreateStrategy, obj runtime.Object) (*api.ObjectMeta, string, error) {
func objectMetaAndKind(typer runtime.ObjectTyper, obj runtime.Object) (*api.ObjectMeta, string, error) {
objectMeta, err := api.ObjectMetaFor(obj)
if err != nil {
return nil, "", errors.NewInternalError(err)
}
_, kind, err := strategy.ObjectVersionAndKind(obj)
_, kind, err := typer.ObjectVersionAndKind(obj)
if err != nil {
return nil, "", errors.NewInternalError(err)
}

View File

@ -23,6 +23,27 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
)
// ObjectFunc is a function to act on a given object. An error may be returned
// if the hook cannot be completed. An ObjectFunc may transform the provided
// object.
type ObjectFunc func(obj runtime.Object) error
// AllFuncs returns an ObjectFunc that attempts to run all of the provided functions
// in order, returning early if there are any errors.
func AllFuncs(fns ...ObjectFunc) ObjectFunc {
return func(obj runtime.Object) error {
for _, fn := range fns {
if fn == nil {
continue
}
if err := fn(obj); err != nil {
return err
}
}
return nil
}
}
// rcStrategy implements behavior for Replication Controllers.
// TODO: move to a replicationcontroller specific package.
type rcStrategy struct {
@ -60,7 +81,7 @@ type podStrategy struct {
// Pods is the default logic that applies when creating and updating Pod
// objects.
var Pods RESTCreateStrategy = podStrategy{api.Scheme, api.SimpleNameGenerator}
var Pods = podStrategy{api.Scheme, api.SimpleNameGenerator}
// NamespaceScoped is true for pods.
func (podStrategy) NamespaceScoped() bool {
@ -70,7 +91,9 @@ func (podStrategy) NamespaceScoped() bool {
// ResetBeforeCreate clears fields that are not allowed to be set by end users on creation.
func (podStrategy) ResetBeforeCreate(obj runtime.Object) {
pod := obj.(*api.Pod)
pod.Status = api.PodStatus{}
pod.Status = api.PodStatus{
Phase: api.PodPending,
}
}
// Validate validates a new pod.
@ -79,6 +102,16 @@ func (podStrategy) Validate(obj runtime.Object) errors.ValidationErrorList {
return validation.ValidatePod(pod)
}
// AllowCreateOnUpdate is false for pods.
func (podStrategy) AllowCreateOnUpdate() bool {
return false
}
// ValidateUpdate is the default update validation for an end user.
func (podStrategy) ValidateUpdate(obj, old runtime.Object) errors.ValidationErrorList {
return validation.ValidatePodUpdate(obj.(*api.Pod), old.(*api.Pod))
}
// svcStrategy implements behavior for Services
// TODO: move to a service specific package.
type svcStrategy struct {

59
pkg/api/rest/update.go Normal file
View File

@ -0,0 +1,59 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rest
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
)
// RESTUpdateStrategy defines the minimum validation, accepted input, and
// name generation behavior to update an object that follows Kubernetes
// API conventions. A resource may have many UpdateStrategies, depending on
// the call pattern in use.
type RESTUpdateStrategy interface {
runtime.ObjectTyper
// NamespaceScoped returns true if the object must be within a namespace.
NamespaceScoped() bool
// AllowCreateOnUpdate returns true if the object can be created by a PUT.
AllowCreateOnUpdate() bool
// ValidateUpdate is invoked after default fields in the object have been filled in before
// the object is persisted.
ValidateUpdate(obj, old runtime.Object) errors.ValidationErrorList
}
// BeforeUpdate ensures that common operations for all resources are performed on update. It only returns
// errors that can be converted to api.Status. It will invoke update validation with the provided existing
// and updated objects.
func BeforeUpdate(strategy RESTUpdateStrategy, ctx api.Context, obj, old runtime.Object) error {
objectMeta, kind, kerr := objectMetaAndKind(strategy, obj)
if kerr != nil {
return kerr
}
if strategy.NamespaceScoped() {
if !api.ValidNamespace(ctx, objectMeta) {
return errors.NewBadRequest("the namespace of the provided object does not match the namespace sent on the request")
}
} else {
objectMeta.Namespace = api.NamespaceNone
}
if errs := strategy.ValidateUpdate(obj, old); len(errs) > 0 {
return errors.NewInvalid(kind, objectMeta.Name, errs)
}
return nil
}

View File

@ -561,7 +561,8 @@ func ValidatePodSpec(spec *api.PodSpec) errs.ValidationErrorList {
return allErrs
}
// ValidatePodUpdate tests to see if the update is legal
// ValidatePodUpdate tests to see if the update is legal for an end user to make. newPod is updated with fields
// that cannot be changed.
func ValidatePodUpdate(newPod, oldPod *api.Pod) errs.ValidationErrorList {
allErrs := errs.ValidationErrorList{}
@ -584,6 +585,7 @@ func ValidatePodUpdate(newPod, oldPod *api.Pod) errs.ValidationErrorList {
allErrs = append(allErrs, errs.NewFieldInvalid("spec.containers", newPod.Spec.Containers, "some fields are immutable"))
}
newPod.Status = oldPod.Status
return allErrs
}

View File

@ -148,7 +148,11 @@ func CreateResource(r RESTCreater, ctxFn ContextFunc, namespaceFn ResourceNamesp
}
result, err := finishRequest(timeout, func() (runtime.Object, error) {
return r.Create(ctx, obj)
out, err := r.Create(ctx, obj)
if status, ok := out.(*api.Status); ok && err == nil && status.Code == 0 {
status.Code = http.StatusCreated
}
return out, err
})
if err != nil {
errorJSON(err, codec, w)

View File

@ -18,7 +18,6 @@ package event
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic"
etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
@ -28,28 +27,6 @@ import (
// registry implements custom changes to generic.Etcd.
type registry struct {
*etcdgeneric.Etcd
ttl uint64
}
// Create stores the object with a ttl, so that events don't stay in the system forever.
func (r registry) Create(ctx api.Context, id string, obj runtime.Object) error {
key, err := r.Etcd.KeyFunc(ctx, id)
if err != nil {
return err
}
err = r.Etcd.Helper.CreateObj(key, obj, r.ttl)
return etcderr.InterpretCreateError(err, r.Etcd.EndpointName, id)
}
// Update replaces an existing instance of the object, and sets a ttl so that the event
// doesn't stay in the system forever.
func (r registry) Update(ctx api.Context, id string, obj runtime.Object) error {
key, err := r.Etcd.KeyFunc(ctx, id)
if err != nil {
return err
}
err = r.Etcd.Helper.SetObj(key, obj, r.ttl)
return etcderr.InterpretUpdateError(err, r.Etcd.EndpointName, id)
}
// NewEtcdRegistry returns a registry which will store Events in the given
@ -66,8 +43,10 @@ func NewEtcdRegistry(h tools.EtcdHelper, ttl uint64) generic.Registry {
KeyFunc: func(ctx api.Context, id string) (string, error) {
return etcdgeneric.NamespaceKeyFunc(ctx, "/registry/events", id)
},
TTLFunc: func(runtime.Object, bool) (uint64, error) {
return ttl, nil
},
Helper: h,
},
ttl: ttl,
}
}

View File

@ -98,7 +98,7 @@ func TestEventCreate(t *testing.T) {
for name, item := range table {
fakeClient, registry := NewTestEventEtcdRegistry(t)
fakeClient.Data[path] = item.existing
err := registry.Create(ctx, key, item.toCreate)
err := registry.CreateWithName(ctx, key, item.toCreate)
if !item.errOK(err) {
t.Errorf("%v: unexpected error: %v", name, err)
}
@ -200,7 +200,7 @@ func TestEventUpdate(t *testing.T) {
for name, item := range table {
fakeClient, registry := NewTestEventEtcdRegistry(t)
fakeClient.Data[path] = item.existing
err := registry.Update(ctx, key, item.toUpdate)
err := registry.UpdateWithName(ctx, key, item.toUpdate)
if !item.errOK(err) {
t.Errorf("%v: unexpected error: %v", name, err)
}

View File

@ -56,7 +56,7 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err
}
api.FillObjectMetaSystemFields(ctx, &event.ObjectMeta)
err := rs.registry.Create(ctx, event.Name, event)
err := rs.registry.CreateWithName(ctx, event.Name, event)
if err != nil {
return nil, err
}
@ -79,7 +79,7 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo
}
api.FillObjectMetaSystemFields(ctx, &event.ObjectMeta)
err := rs.registry.Update(ctx, event.Name, event)
err := rs.registry.UpdateWithName(ctx, event.Name, event)
if err != nil {
return nil, false, err
}
@ -87,8 +87,8 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo
return out, false, err
}
func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) {
obj, err := rs.registry.Get(ctx, id)
func (rs *REST) Delete(ctx api.Context, name string) (runtime.Object, error) {
obj, err := rs.registry.Get(ctx, name)
if err != nil {
return nil, err
}
@ -96,7 +96,7 @@ func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) {
if !ok {
return nil, fmt.Errorf("invalid object type")
}
return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(ctx, id)
return rs.registry.Delete(ctx, name)
}
func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {

View File

@ -20,10 +20,13 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
kubeerr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)
// Etcd implements generic.Registry, backing it with etcd storage.
@ -31,6 +34,15 @@ import (
// non-generic functions if needed.
// You must supply a value for every field below before use; these are
// left public as it's meant to be overridable if need be.
// This object is intended to be copyable so that it can be used in
// different ways but share the same underlying behavior.
//
// The intended use of this type is embedding within a Kind specific
// RESTStorage implementation. This type provides CRUD semantics on
// a Kubelike resource, handling details like conflict detection with
// ResourceVersion and semantics. The RESTCreateStrategy and
// RESTUpdateStrategy are generic across all backends, and encapsulate
// logic specific to the API.
type Etcd struct {
// Called to make a new object, should return e.g., &api.Pod{}
NewFunc func() runtime.Object
@ -45,7 +57,34 @@ type Etcd struct {
KeyRootFunc func(ctx api.Context) string
// Called for Create/Update/Get/Delete
KeyFunc func(ctx api.Context, id string) (string, error)
KeyFunc func(ctx api.Context, name string) (string, error)
// Called to get the name of an object
ObjectNameFunc func(obj runtime.Object) (string, error)
// Return the TTL objects should be persisted with. Update is true if this
// is an operation against an existing object.
TTLFunc func(obj runtime.Object, update bool) (uint64, error)
// Called on all objects returned from the underlying store, after
// the exit hooks are invoked. Decorators are intended for integrations
// that are above etcd and should only be used for specific cases where
// storage of the value in etcd is not appropriate, since they cannot
// be watched.
Decorator rest.ObjectFunc
// Allows extended behavior during creation
CreateStrategy rest.RESTCreateStrategy
// On create of an object, attempt to run a further operation.
AfterCreate rest.ObjectFunc
// Allows extended behavior during updates
UpdateStrategy rest.RESTUpdateStrategy
// On update of an object, attempt to run a further operation.
AfterUpdate rest.ObjectFunc
// If true, return the object that was deleted. Otherwise, return a generic
// success status response.
ReturnDeletedObject bool
// On deletion of an object, attempt to run a further operation.
AfterDelete rest.ObjectFunc
// Used for all etcd access functions
Helper tools.EtcdHelper
@ -63,16 +102,16 @@ func NamespaceKeyRootFunc(ctx api.Context, prefix string) string {
// NamespaceKeyFunc is the default function for constructing etcd paths to a resource relative to prefix enforcing namespace rules.
// If no namespace is on context, it errors.
func NamespaceKeyFunc(ctx api.Context, prefix string, id string) (string, error) {
func NamespaceKeyFunc(ctx api.Context, prefix string, name string) (string, error) {
key := NamespaceKeyRootFunc(ctx, prefix)
ns, ok := api.NamespaceFrom(ctx)
if !ok || len(ns) == 0 {
return "", kubeerr.NewBadRequest("Namespace parameter required.")
}
if len(id) == 0 {
return "", kubeerr.NewBadRequest("Namespace parameter required.")
if len(name) == 0 {
return "", kubeerr.NewBadRequest("Name parameter required.")
}
key = key + "/" + id
key = key + "/" + name
return key, nil
}
@ -83,52 +122,203 @@ func (e *Etcd) List(ctx api.Context, m generic.Matcher) (runtime.Object, error)
if err != nil {
return nil, err
}
return generic.FilterList(list, m)
return generic.FilterList(list, m, generic.DecoratorFunc(e.Decorator))
}
// Create inserts a new item.
func (e *Etcd) Create(ctx api.Context, id string, obj runtime.Object) error {
key, err := e.KeyFunc(ctx, id)
// CreateWithName inserts a new item with the provided name
func (e *Etcd) CreateWithName(ctx api.Context, name string, obj runtime.Object) error {
key, err := e.KeyFunc(ctx, name)
if err != nil {
return err
}
err = e.Helper.CreateObj(key, obj, 0)
return etcderr.InterpretCreateError(err, e.EndpointName, id)
if e.CreateStrategy != nil {
if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
return err
}
}
ttl := uint64(0)
if e.TTLFunc != nil {
ttl, err = e.TTLFunc(obj, false)
if err != nil {
return err
}
}
err = e.Helper.CreateObj(key, obj, ttl)
err = etcderr.InterpretCreateError(err, e.EndpointName, name)
if err == nil && e.Decorator != nil {
err = e.Decorator(obj)
}
return err
}
// Update updates the item.
func (e *Etcd) Update(ctx api.Context, id string, obj runtime.Object) error {
key, err := e.KeyFunc(ctx, id)
// Create inserts a new item according to the unique key from the object.
func (e *Etcd) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) {
if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
return nil, err
}
name, err := e.ObjectNameFunc(obj)
if err != nil {
return nil, err
}
key, err := e.KeyFunc(ctx, name)
if err != nil {
return nil, err
}
ttl := uint64(0)
if e.TTLFunc != nil {
ttl, err = e.TTLFunc(obj, false)
if err != nil {
return nil, err
}
}
out := e.NewFunc()
if err := e.Helper.Create(key, obj, out, ttl); err != nil {
err = etcderr.InterpretCreateError(err, e.EndpointName, name)
err = rest.CheckGeneratedNameError(e.CreateStrategy, err, obj)
return nil, err
}
if e.AfterCreate != nil {
if err := e.AfterCreate(out); err != nil {
return nil, err
}
}
if e.Decorator != nil {
if err := e.Decorator(obj); err != nil {
return nil, err
}
}
return out, nil
}
// UpdateWithName updates the item with the provided name
func (e *Etcd) UpdateWithName(ctx api.Context, name string, obj runtime.Object) error {
key, err := e.KeyFunc(ctx, name)
if err != nil {
return err
}
// TODO: verify that SetObj checks ResourceVersion before succeeding.
err = e.Helper.SetObj(key, obj, 0 /* ttl */)
return etcderr.InterpretUpdateError(err, e.EndpointName, id)
ttl := uint64(0)
if e.TTLFunc != nil {
ttl, err = e.TTLFunc(obj, false)
if err != nil {
return err
}
}
err = e.Helper.SetObj(key, obj, ttl)
err = etcderr.InterpretUpdateError(err, e.EndpointName, name)
if err == nil && e.Decorator != nil {
err = e.Decorator(obj)
}
return err
}
// Update performs an atomic update and set of the object. Returns the result of the update
// or an error. If the registry allows create-on-update, the create flow will be executed.
func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) {
name, err := e.ObjectNameFunc(obj)
if err != nil {
return nil, false, err
}
key, err := e.KeyFunc(ctx, name)
if err != nil {
return nil, false, err
}
creating := false
out := e.NewFunc()
err = e.Helper.AtomicUpdate(key, out, true, func(existing runtime.Object) (runtime.Object, error) {
version, err := e.Helper.ResourceVersioner.ResourceVersion(existing)
if err != nil {
return nil, err
}
if version == 0 {
if !e.UpdateStrategy.AllowCreateOnUpdate() {
return nil, kubeerr.NewAlreadyExists(e.EndpointName, name)
}
creating = true
if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
return nil, err
}
return obj, nil
}
creating = false
if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil {
return nil, err
}
// TODO: expose TTL
return obj, nil
})
if err != nil {
if creating {
err = etcderr.InterpretCreateError(err, e.EndpointName, name)
err = rest.CheckGeneratedNameError(e.CreateStrategy, err, obj)
} else {
err = etcderr.InterpretUpdateError(err, e.EndpointName, name)
}
return nil, false, err
}
if creating {
if e.AfterCreate != nil {
if err := e.AfterCreate(out); err != nil {
return nil, false, err
}
}
} else {
if e.AfterUpdate != nil {
if err := e.AfterUpdate(out); err != nil {
return nil, false, err
}
}
}
if e.Decorator != nil {
if err := e.Decorator(obj); err != nil {
return nil, false, err
}
}
return out, creating, nil
}
// Get retrieves the item from etcd.
func (e *Etcd) Get(ctx api.Context, id string) (runtime.Object, error) {
func (e *Etcd) Get(ctx api.Context, name string) (runtime.Object, error) {
obj := e.NewFunc()
key, err := e.KeyFunc(ctx, id)
key, err := e.KeyFunc(ctx, name)
if err != nil {
return nil, err
}
err = e.Helper.ExtractObj(key, obj, false)
if err != nil {
return nil, etcderr.InterpretGetError(err, e.EndpointName, id)
return nil, etcderr.InterpretGetError(err, e.EndpointName, name)
}
if e.Decorator != nil {
if err := e.Decorator(obj); err != nil {
return nil, err
}
}
return obj, nil
}
// Delete removes the item from etcd.
func (e *Etcd) Delete(ctx api.Context, id string) error {
key, err := e.KeyFunc(ctx, id)
func (e *Etcd) Delete(ctx api.Context, name string) (runtime.Object, error) {
key, err := e.KeyFunc(ctx, name)
if err != nil {
return err
return nil, err
}
err = e.Helper.Delete(key, false)
return etcderr.InterpretDeleteError(err, e.EndpointName, id)
obj := e.NewFunc()
if err := e.Helper.DeleteObj(key, obj); err != nil {
return nil, etcderr.InterpretDeleteError(err, e.EndpointName, name)
}
if e.AfterDelete != nil {
if err := e.AfterDelete(obj); err != nil {
return nil, err
}
}
if e.Decorator != nil {
if err := e.Decorator(obj); err != nil {
return nil, err
}
}
if e.ReturnDeletedObject {
return obj, nil
}
return &api.Status{Status: api.StatusSuccess}, nil
}
// Watch starts a watch for the items that m matches.
@ -140,6 +330,16 @@ func (e *Etcd) Watch(ctx api.Context, m generic.Matcher, resourceVersion string)
}
return e.Helper.WatchList(e.KeyRootFunc(ctx), version, func(obj runtime.Object) bool {
matches, err := m.Matches(obj)
return err == nil && matches
if err != nil {
glog.Errorf("unable to match watch: %v", err)
return false
}
if matches && e.Decorator != nil {
if err := e.Decorator(obj); err != nil {
glog.Errorf("unable to decorate watch: %v", err)
return false
}
}
return matches
})
}

View File

@ -203,7 +203,7 @@ func TestEtcdCreate(t *testing.T) {
for name, item := range table {
fakeClient, registry := NewTestGenericEtcdRegistry(t)
fakeClient.Data[path] = item.existing
err := registry.Create(api.NewContext(), key, item.toCreate)
err := registry.CreateWithName(api.NewContext(), key, item.toCreate)
if !item.errOK(err) {
t.Errorf("%v: unexpected error: %v", name, err)
}
@ -278,7 +278,7 @@ func TestEtcdUpdate(t *testing.T) {
for name, item := range table {
fakeClient, registry := NewTestGenericEtcdRegistry(t)
fakeClient.Data[path] = item.existing
err := registry.Update(api.NewContext(), key, item.toUpdate)
err := registry.UpdateWithName(api.NewContext(), key, item.toUpdate)
if !item.errOK(err) {
t.Errorf("%v: unexpected error: %v", name, err)
}
@ -390,11 +390,14 @@ func TestEtcdDelete(t *testing.T) {
for name, item := range table {
fakeClient, registry := NewTestGenericEtcdRegistry(t)
fakeClient.Data[path] = item.existing
err := registry.Delete(api.NewContext(), key)
_, err := registry.Delete(api.NewContext(), key)
if !item.errOK(err) {
t.Errorf("%v: unexpected error: %v", name, err)
}
if item.expect.E != nil {
item.expect.E.(*etcd.EtcdError).Index = fakeClient.ChangeIndex
}
if e, a := item.expect, fakeClient.Data[path]; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a))
}

View File

@ -67,21 +67,26 @@ func (m matcherFunc) Matches(obj runtime.Object) (bool, error) {
return m(obj)
}
// DecoratorFunc can mutate the provided object prior to being returned.
type DecoratorFunc func(obj runtime.Object) error
// Registry knows how to store & list any runtime.Object. Can be used for
// any object types which don't require special features from the storage
// layer.
// DEPRECATED: replace with direct implementation of RESTStorage
type Registry interface {
List(api.Context, Matcher) (runtime.Object, error)
Create(ctx api.Context, id string, obj runtime.Object) error
Update(ctx api.Context, id string, obj runtime.Object) error
CreateWithName(ctx api.Context, id string, obj runtime.Object) error
UpdateWithName(ctx api.Context, id string, obj runtime.Object) error
Get(ctx api.Context, id string) (runtime.Object, error)
Delete(ctx api.Context, id string) error
Delete(ctx api.Context, id string) (runtime.Object, error)
Watch(ctx api.Context, m Matcher, resourceVersion string) (watch.Interface, error)
}
// FilterList filters any list object that conforms to the api conventions,
// provided that 'm' works with the concrete type of list.
func FilterList(list runtime.Object, m Matcher) (filtered runtime.Object, err error) {
// provided that 'm' works with the concrete type of list. d is an optional
// decorator for the returned functions. Only matching items are decorated.
func FilterList(list runtime.Object, m Matcher, d DecoratorFunc) (filtered runtime.Object, err error) {
// TODO: push a matcher down into tools.EtcdHelper to avoid all this
// nonsense. This is a lot of unnecessary copies.
items, err := runtime.ExtractList(list)
@ -95,6 +100,11 @@ func FilterList(list runtime.Object, m Matcher) (filtered runtime.Object, err er
return nil, err
}
if match {
if d != nil {
if err := d(obj); err != nil {
return nil, err
}
}
filteredItems = append(filteredItems, obj)
}
}

View File

@ -124,6 +124,7 @@ func TestFilterList(t *testing.T) {
}
return i.ID[0] == 'b', nil
}),
nil,
)
if err != nil {
t.Fatalf("Unexpected error %v", err)

View File

@ -109,7 +109,7 @@ func TestLimitRangeCreate(t *testing.T) {
for name, item := range table {
fakeClient, registry := NewTestLimitRangeEtcdRegistry(t)
fakeClient.Data[path] = item.existing
err := registry.Create(ctx, key, item.toCreate)
err := registry.CreateWithName(ctx, key, item.toCreate)
if !item.errOK(err) {
t.Errorf("%v: unexpected error: %v", name, err)
}

View File

@ -62,7 +62,7 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err
}
api.FillObjectMetaSystemFields(ctx, &limitRange.ObjectMeta)
err := rs.registry.Create(ctx, limitRange.Name, limitRange)
err := rs.registry.CreateWithName(ctx, limitRange.Name, limitRange)
if err != nil {
return nil, err
}
@ -97,7 +97,7 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo
return nil, false, errors.NewInvalid("limitRange", editLimitRange.Name, errs)
}
if err := rs.registry.Update(ctx, editLimitRange.Name, editLimitRange); err != nil {
if err := rs.registry.UpdateWithName(ctx, editLimitRange.Name, editLimitRange); err != nil {
return nil, false, err
}
out, err := rs.registry.Get(ctx, editLimitRange.Name)
@ -114,7 +114,7 @@ func (rs *REST) Delete(ctx api.Context, name string) (runtime.Object, error) {
if !ok {
return nil, fmt.Errorf("invalid object type")
}
return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(ctx, name)
return rs.registry.Delete(ctx, name)
}
// Get gets a LimitRange with the specified name

View File

@ -48,7 +48,7 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err
if err := rest.BeforeCreate(rest.Namespaces, ctx, obj); err != nil {
return nil, err
}
if err := rs.registry.Create(ctx, namespace.Name, namespace); err != nil {
if err := rs.registry.CreateWithName(ctx, namespace.Name, namespace); err != nil {
err = rest.CheckGeneratedNameError(rest.Namespaces, err, namespace)
return nil, err
}
@ -72,7 +72,7 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo
return nil, false, kerrors.NewInvalid("namespace", namespace.Name, errs)
}
if err := rs.registry.Update(ctx, oldNamespace.Name, oldNamespace); err != nil {
if err := rs.registry.UpdateWithName(ctx, oldNamespace.Name, oldNamespace); err != nil {
return nil, false, err
}
out, err := rs.registry.Get(ctx, oldNamespace.Name)
@ -80,8 +80,8 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo
}
// Delete deletes the Namespace with the specified name
func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) {
obj, err := rs.registry.Get(ctx, id)
func (rs *REST) Delete(ctx api.Context, name string) (runtime.Object, error) {
obj, err := rs.registry.Get(ctx, name)
if err != nil {
return nil, err
}
@ -89,7 +89,7 @@ func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) {
if !ok {
return nil, fmt.Errorf("invalid object type")
}
return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(ctx, id)
return rs.registry.Delete(ctx, name)
}
func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {

View File

@ -49,7 +49,7 @@ func (r *GenericRegistry) List(ctx api.Context, m generic.Matcher) (runtime.Obje
if r.Err != nil {
return nil, r.Err
}
return generic.FilterList(r.ObjectList, m)
return generic.FilterList(r.ObjectList, m, nil)
}
func (r *GenericRegistry) Watch(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) {
@ -63,7 +63,7 @@ func (r *GenericRegistry) Get(ctx api.Context, id string) (runtime.Object, error
return r.Object, r.Err
}
func (r *GenericRegistry) Create(ctx api.Context, id string, obj runtime.Object) error {
func (r *GenericRegistry) CreateWithName(ctx api.Context, id string, obj runtime.Object) error {
r.Lock()
defer r.Unlock()
r.Object = obj
@ -71,7 +71,7 @@ func (r *GenericRegistry) Create(ctx api.Context, id string, obj runtime.Object)
return r.Err
}
func (r *GenericRegistry) Update(ctx api.Context, id string, obj runtime.Object) error {
func (r *GenericRegistry) UpdateWithName(ctx api.Context, id string, obj runtime.Object) error {
r.Lock()
defer r.Unlock()
r.Object = obj
@ -79,9 +79,9 @@ func (r *GenericRegistry) Update(ctx api.Context, id string, obj runtime.Object)
return r.Err
}
func (r *GenericRegistry) Delete(ctx api.Context, id string) error {
func (r *GenericRegistry) Delete(ctx api.Context, id string) (runtime.Object, error) {
r.Lock()
defer r.Unlock()
r.Broadcaster.Action(watch.Deleted, r.Object)
return r.Err
return &api.Status{Status: api.StatusSuccess}, r.Err
}

View File

@ -53,7 +53,7 @@ func (r *registry) ApplyStatus(ctx api.Context, usage *api.ResourceQuotaUsage) e
resourceQuota := obj.(*api.ResourceQuota)
resourceQuota.ResourceVersion = usage.ResourceVersion
resourceQuota.Status = usage.Status
return r.Update(ctx, resourceQuota.Name, resourceQuota)
return r.UpdateWithName(ctx, resourceQuota.Name, resourceQuota)
}
// NewEtcdRegistry returns a registry which will store ResourceQuota in the given helper

View File

@ -104,7 +104,7 @@ func TestResourceQuotaCreate(t *testing.T) {
for name, item := range table {
fakeClient, registry := NewTestLimitRangeEtcdRegistry(t)
fakeClient.Data[path] = item.existing
err := registry.Create(ctx, key, item.toCreate)
err := registry.CreateWithName(ctx, key, item.toCreate)
if !item.errOK(err) {
t.Errorf("%v: unexpected error: %v, %v", name, err, path)
}

View File

@ -65,7 +65,7 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err
}
api.FillObjectMetaSystemFields(ctx, &resourceQuota.ObjectMeta)
err := rs.registry.Create(ctx, resourceQuota.Name, resourceQuota)
err := rs.registry.CreateWithName(ctx, resourceQuota.Name, resourceQuota)
if err != nil {
return nil, err
}
@ -100,7 +100,7 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo
return nil, false, errors.NewInvalid("resourceQuota", editResourceQuota.Name, errs)
}
if err := rs.registry.Update(ctx, editResourceQuota.Name, editResourceQuota); err != nil {
if err := rs.registry.UpdateWithName(ctx, editResourceQuota.Name, editResourceQuota); err != nil {
return nil, false, err
}
out, err := rs.registry.Get(ctx, editResourceQuota.Name)
@ -117,7 +117,7 @@ func (rs *REST) Delete(ctx api.Context, name string) (runtime.Object, error) {
if !ok {
return nil, fmt.Errorf("invalid object type")
}
return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(ctx, name)
return rs.registry.Delete(ctx, name)
}
// Get gets a ResourceQuota with the specified name

View File

@ -236,7 +236,19 @@ func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr runtime.Object, ignore
if err != nil && !IsEtcdNotFound(err) {
return "", 0, err
}
if err != nil || response.Node == nil || len(response.Node.Value) == 0 {
return h.extractObj(response, err, objPtr, ignoreNotFound, false)
}
func (h *EtcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, modifiedIndex uint64, err error) {
var node *etcd.Node
if response != nil {
if prevNode {
node = response.PrevNode
} else {
node = response.Node
}
}
if inErr != nil || node == nil || len(node.Value) == 0 {
if ignoreNotFound {
v, err := conversion.EnforcePtr(objPtr)
if err != nil {
@ -244,18 +256,18 @@ func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr runtime.Object, ignore
}
v.Set(reflect.Zero(v.Type()))
return "", 0, nil
} else if err != nil {
return "", 0, err
} else if inErr != nil {
return "", 0, inErr
}
return "", 0, fmt.Errorf("key '%v' found no nodes field: %#v", key, response)
return "", 0, fmt.Errorf("unable to locate a value on the response: %#v", response)
}
body = response.Node.Value
body = node.Value
err = h.Codec.DecodeInto([]byte(body), objPtr)
if h.ResourceVersioner != nil {
_ = h.ResourceVersioner.SetResourceVersion(objPtr, response.Node.ModifiedIndex)
_ = h.ResourceVersioner.SetResourceVersion(objPtr, node.ModifiedIndex)
// being unable to set the version does not prevent the object from being extracted
}
return body, response.Node.ModifiedIndex, err
return body, node.ModifiedIndex, err
}
// CreateObj adds a new object at a key unless it already exists. 'ttl' is time-to-live in seconds,
@ -275,12 +287,50 @@ func (h *EtcdHelper) CreateObj(key string, obj runtime.Object, ttl uint64) error
return err
}
// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live in seconds,
// and 0 means forever. If no error is returned, out will be set to the read value from etcd.
func (h *EtcdHelper) Create(key string, obj, out runtime.Object, ttl uint64) error {
data, err := h.Codec.Encode(obj)
if err != nil {
return err
}
if h.ResourceVersioner != nil {
if version, err := h.ResourceVersioner.ResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion may not be set on objects to be created")
}
}
response, err := h.Client.Create(key, string(data), ttl)
if err != nil {
return err
}
if _, err := conversion.EnforcePtr(out); err != nil {
panic("unable to convert output object to pointer")
}
_, _, err = h.extractObj(response, err, out, false, false)
return err
}
// Delete removes the specified key.
func (h *EtcdHelper) Delete(key string, recursive bool) error {
_, err := h.Client.Delete(key, recursive)
return err
}
// DeleteObj removes the specified key and returns the value that existed at that spot.
func (h *EtcdHelper) DeleteObj(key string, out runtime.Object) error {
if _, err := conversion.EnforcePtr(out); err != nil {
panic("unable to convert output object to pointer")
}
response, err := h.Client.Delete(key, false)
if !IsEtcdNotFound(err) {
// if the object that existed prior to the delete is returned by etcd, update out.
if err != nil || response.PrevNode != nil {
_, _, err = h.extractObj(response, err, out, false, true)
}
}
return err
}
// SetObj marshals obj via json, and stores under key. Will do an atomic update if obj's ResourceVersion
// field is set. 'ttl' is time-to-live in seconds, and 0 means forever.
func (h *EtcdHelper) SetObj(key string, obj runtime.Object, ttl uint64) error {
@ -359,10 +409,11 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, ignoreNo
return nil
}
_, err = h.Client.CompareAndSwap(key, string(data), 0, origBody, index)
response, err := h.Client.CompareAndSwap(key, string(data), 0, origBody, index)
if IsEtcdTestFailed(err) {
continue
}
_, _, err = h.extractObj(response, err, ptrToType, false, false)
return err
}
}

View File

@ -520,13 +520,13 @@ func TestAtomicUpdateNoChange(t *testing.T) {
// Update an existing node with the same data
callbackCalled := false
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
fakeClient.Err = errors.New("should not be called")
err = helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) {
fakeClient.Err = errors.New("should not be called")
callbackCalled = true
return objUpdate, nil
})
if err != nil {
t.Errorf("Unexpected error %#v", err)
t.Fatalf("Unexpected error %#v", err)
}
if !callbackCalled {
t.Errorf("tryUpdate callback should have been called.")

View File

@ -34,6 +34,7 @@ type EtcdResponseWithError struct {
// TestLogger is a type passed to Test functions to support formatted test logs.
type TestLogger interface {
Fatalf(format string, args ...interface{})
Errorf(format string, args ...interface{})
Logf(format string, args ...interface{})
}
@ -85,6 +86,10 @@ func NewFakeEtcdClient(t TestLogger) *FakeEtcdClient {
return ret
}
func (f *FakeEtcdClient) SetError(err error) {
f.Err = err
}
func (f *FakeEtcdClient) GetCluster() []string {
return f.Machines
}
@ -93,6 +98,13 @@ func (f *FakeEtcdClient) ExpectNotFoundGet(key string) {
f.expectNotFoundGetSet[key] = struct{}{}
}
func (f *FakeEtcdClient) NewError(code int) *etcd.EtcdError {
return &etcd.EtcdError{
ErrorCode: code,
Index: f.ChangeIndex,
}
}
func (f *FakeEtcdClient) generateIndex() uint64 {
if !f.TestIndex {
return 0
@ -121,6 +133,10 @@ func (f *FakeEtcdClient) AddChild(key, data string, ttl uint64) (*etcd.Response,
}
func (f *FakeEtcdClient) Get(key string, sort, recursive bool) (*etcd.Response, error) {
if f.Err != nil {
return nil, f.Err
}
f.Mutex.Lock()
defer f.Mutex.Unlock()
defer f.updateResponse(key)
@ -128,9 +144,9 @@ func (f *FakeEtcdClient) Get(key string, sort, recursive bool) (*etcd.Response,
result := f.Data[key]
if result.R == nil {
if _, ok := f.expectNotFoundGetSet[key]; !ok {
f.t.Errorf("Unexpected get for %s", key)
f.t.Fatalf("data for %s was not defined prior to invoking Get", key)
}
return &etcd.Response{}, EtcdErrorNotFound
return &etcd.Response{}, f.NewError(EtcdErrorCodeNotFound)
}
f.t.Logf("returning %v: %#v %#v", key, result.R, result.E)
@ -166,7 +182,7 @@ func (f *FakeEtcdClient) setLocked(key, value string, ttl uint64) (*etcd.Respons
if f.nodeExists(key) {
prevResult := f.Data[key]
createdIndex := prevResult.R.Node.CreatedIndex
f.t.Logf("updating %v, index %v -> %v", key, createdIndex, i)
f.t.Logf("updating %v, index %v -> %v (ttl: %d)", key, createdIndex, i, ttl)
result := EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
@ -181,7 +197,7 @@ func (f *FakeEtcdClient) setLocked(key, value string, ttl uint64) (*etcd.Respons
return result.R, nil
}
f.t.Logf("creating %v, index %v", key, i)
f.t.Logf("creating %v, index %v (ttl: %d)", key, i, ttl)
result := EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
@ -262,15 +278,27 @@ func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, err
f.Mutex.Lock()
defer f.Mutex.Unlock()
existing := f.Data[key]
index := f.generateIndex()
f.Data[key] = EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
R: &etcd.Response{},
E: &etcd.EtcdError{
ErrorCode: EtcdErrorCodeNotFound,
Index: index,
},
E: EtcdErrorNotFound,
}
res := &etcd.Response{
Action: "delete",
Node: nil,
PrevNode: nil,
EtcdIndex: index,
}
if existing.R != nil && existing.R.Node != nil {
res.PrevNode = existing.R.Node
}
f.DeletedKeys = append(f.DeletedKeys, key)
return &etcd.Response{}, nil
return res, nil
}
func (f *FakeEtcdClient) WaitForWatchCompletion() {