Merge pull request #12528 from wojtek-t/rewrite_events

Rewrite events registry to be generic
This commit is contained in:
Piotr Szczesniak 2015-08-12 12:02:13 +02:00
commit 7186b48d48
14 changed files with 150 additions and 683 deletions

View File

@ -1238,6 +1238,14 @@
"required": false,
"allowMultiple": false
},
{
"type": "v1.DeleteOptions",
"paramType": "body",
"name": "body",
"description": "",
"required": true,
"allowMultiple": false
},
{
"type": "string",
"paramType": "path",

View File

@ -53,7 +53,7 @@ import (
controlleretcd "k8s.io/kubernetes/pkg/registry/controller/etcd"
"k8s.io/kubernetes/pkg/registry/endpoint"
endpointsetcd "k8s.io/kubernetes/pkg/registry/endpoint/etcd"
"k8s.io/kubernetes/pkg/registry/event"
eventetcd "k8s.io/kubernetes/pkg/registry/event/etcd"
expcontrolleretcd "k8s.io/kubernetes/pkg/registry/experimental/controller/etcd"
limitrangeetcd "k8s.io/kubernetes/pkg/registry/limitrange/etcd"
"k8s.io/kubernetes/pkg/registry/minion"
@ -432,7 +432,7 @@ func (m *Master) init(c *Config) {
podTemplateStorage := podtemplateetcd.NewREST(c.DatabaseStorage)
eventRegistry := event.NewEtcdRegistry(c.DatabaseStorage, uint64(c.EventTTL.Seconds()))
eventStorage := eventetcd.NewStorage(c.DatabaseStorage, uint64(c.EventTTL.Seconds()))
limitRangeStorage := limitrangeetcd.NewStorage(c.DatabaseStorage)
resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotaetcd.NewStorage(c.DatabaseStorage)
@ -492,7 +492,7 @@ func (m *Master) init(c *Config) {
"endpoints": endpointsStorage,
"nodes": nodeStorage,
"nodes/status": nodeStatusStorage,
"events": event.NewStorage(eventRegistry),
"events": eventStorage,
"limitRanges": limitRangeStorage,
"resourceQuotas": resourceQuotaStorage,

View File

@ -0,0 +1,62 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/event"
"k8s.io/kubernetes/pkg/registry/generic"
etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
type REST struct {
*etcdgeneric.Etcd
}
func NewStorage(s storage.Interface, ttl uint64) *REST {
prefix := "/events"
store := &etcdgeneric.Etcd{
NewFunc: func() runtime.Object { return &api.Event{} },
NewListFunc: func() runtime.Object { return &api.EventList{} },
KeyRootFunc: func(ctx api.Context) string {
return etcdgeneric.NamespaceKeyRootFunc(ctx, prefix)
},
KeyFunc: func(ctx api.Context, id string) (string, error) {
return etcdgeneric.NamespaceKeyFunc(ctx, prefix, id)
},
ObjectNameFunc: func(obj runtime.Object) (string, error) {
return obj.(*api.Event).Name, nil
},
PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher {
return event.MatchEvent(label, field)
},
TTLFunc: func(runtime.Object, uint64, bool) (uint64, error) {
return ttl, nil
},
EndpointName: "events",
CreateStrategy: event.Strategy,
UpdateStrategy: event.Strategy,
Storage: s,
}
return &REST{store}
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package event
package etcd
import (
"reflect"
@ -23,7 +23,6 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/registry/generic"
etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd"
"k8s.io/kubernetes/pkg/runtime"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
@ -36,22 +35,24 @@ import (
var testTTL uint64 = 60
func NewTestEventEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, generic.Registry) {
func NewTestEventStorage(t *testing.T) (*tools.FakeEtcdClient, *REST) {
f := tools.NewFakeEtcdClient(t)
f.TestIndex = true
s := etcdstorage.NewEtcdStorage(f, testapi.Codec(), etcdtest.PathPrefix())
return f, NewEtcdRegistry(s, testTTL)
return f, NewStorage(s, testTTL)
}
func TestEventCreate(t *testing.T) {
eventA := &api.Event{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Reason: "forTesting",
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Reason: "forTesting",
InvolvedObject: api.ObjectReference{Name: "bar", Namespace: api.NamespaceDefault},
}
eventB := &api.Event{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Reason: "forTesting",
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Reason: "forTesting",
InvolvedObject: api.ObjectReference{Name: "bar", Namespace: api.NamespaceDefault},
}
nodeWithEventA := tools.EtcdResponseWithError{
@ -100,14 +101,24 @@ func TestEventCreate(t *testing.T) {
}
for name, item := range table {
fakeClient, registry := NewTestEventEtcdRegistry(t)
fakeClient, storage := NewTestEventStorage(t)
fakeClient.Data[path] = item.existing
err := registry.CreateWithName(ctx, key, item.toCreate)
_, err := storage.Create(ctx, item.toCreate)
if !item.errOK(err) {
t.Errorf("%v: unexpected error: %v", name, err)
}
if e, a := item.expect, fakeClient.Data[path]; !reflect.DeepEqual(e, a) {
// nullify fields set by infrastructure
received := fakeClient.Data[path]
var event api.Event
if err := testapi.Codec().DecodeInto([]byte(received.R.Node.Value), &event); err != nil {
t.Errorf("unexpected error: %v", err)
}
event.ObjectMeta.CreationTimestamp = util.Time{}
event.ObjectMeta.UID = ""
received.R.Node.Value = runtime.EncodeOrDie(testapi.Codec(), &event)
if e, a := item.expect, received; !reflect.DeepEqual(e, a) {
t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a))
}
}
@ -115,16 +126,19 @@ func TestEventCreate(t *testing.T) {
func TestEventUpdate(t *testing.T) {
eventA := &api.Event{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Reason: "forTesting",
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Reason: "forTesting",
InvolvedObject: api.ObjectReference{Name: "foo", Namespace: api.NamespaceDefault},
}
eventB := &api.Event{
ObjectMeta: api.ObjectMeta{Name: "bar", Namespace: api.NamespaceDefault},
Reason: "for testing again",
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Reason: "for testing again",
InvolvedObject: api.ObjectReference{Name: "foo", Namespace: api.NamespaceDefault},
}
eventC := &api.Event{
ObjectMeta: api.ObjectMeta{Name: "pan", Namespace: api.NamespaceDefault, ResourceVersion: "1"},
Reason: "for testing again something else",
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault, ResourceVersion: "1"},
Reason: "for testing again something else",
InvolvedObject: api.ObjectReference{Name: "foo", Namespace: api.NamespaceDefault},
}
nodeWithEventA := tools.EtcdResponseWithError{
@ -203,14 +217,24 @@ func TestEventUpdate(t *testing.T) {
}
for name, item := range table {
fakeClient, registry := NewTestEventEtcdRegistry(t)
fakeClient, storage := NewTestEventStorage(t)
fakeClient.Data[path] = item.existing
err := registry.UpdateWithName(ctx, key, item.toUpdate)
_, _, err := storage.Update(ctx, item.toUpdate)
if !item.errOK(err) {
t.Errorf("%v: unexpected error: %v", name, err)
}
if e, a := item.expect, fakeClient.Data[path]; !reflect.DeepEqual(e, a) {
// nullify fields set by infrastructure
received := fakeClient.Data[path]
var event api.Event
if err := testapi.Codec().DecodeInto([]byte(received.R.Node.Value), &event); err != nil {
t.Errorf("unexpected error: %v", err)
}
event.ObjectMeta.CreationTimestamp = util.Time{}
event.ObjectMeta.UID = ""
received.R.Node.Value = runtime.EncodeOrDie(testapi.Codec(), &event)
if e, a := item.expect, received; !reflect.DeepEqual(e, a) {
t.Errorf("%v:\n%s", name, util.ObjectGoPrintDiff(e, a))
}
}

View File

@ -1,53 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package event
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/registry/generic"
etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
// registry implements custom changes to generic.Etcd.
type registry struct {
*etcdgeneric.Etcd
}
// NewEtcdRegistry returns a registry which will store Events in the given
// EtcdStorage. ttl is the time that Events will be retained by the system.
func NewEtcdRegistry(s storage.Interface, ttl uint64) generic.Registry {
prefix := "/events"
return registry{
Etcd: &etcdgeneric.Etcd{
NewFunc: func() runtime.Object { return &api.Event{} },
NewListFunc: func() runtime.Object { return &api.EventList{} },
EndpointName: "events",
KeyRootFunc: func(ctx api.Context) string {
return etcdgeneric.NamespaceKeyRootFunc(ctx, prefix)
},
KeyFunc: func(ctx api.Context, id string) (string, error) {
return etcdgeneric.NamespaceKeyFunc(ctx, prefix, id)
},
TTLFunc: func(runtime.Object, uint64, bool) (uint64, error) {
return ttl, nil
},
Storage: s,
},
}
}

View File

@ -26,94 +26,51 @@ import (
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/pkg/util/fielderrors"
)
// REST adapts an event registry into apiserver's RESTStorage model.
type REST struct {
registry generic.Registry
type eventStrategy struct {
runtime.ObjectTyper
api.NameGenerator
}
// NewStorage returns a new REST. You must use a registry created by
// NewEtcdRegistry unless you're testing.
func NewStorage(registry generic.Registry) *REST {
return &REST{
registry: registry,
}
// Strategy is the default logic that pplies when creating and updating
// Event objects via the REST API.
var Strategy = eventStrategy{api.Scheme, api.SimpleNameGenerator}
func (eventStrategy) NamespaceScoped() bool {
return true
}
func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) {
event, ok := obj.(*api.Event)
if !ok {
return nil, errors.NewInternalError(fmt.Errorf("received object is not of type event: %#v", obj))
}
if api.NamespaceValue(ctx) != "" {
if !api.ValidNamespace(ctx, &event.ObjectMeta) {
return nil, errors.NewConflict("event", event.Namespace, fmt.Errorf("event.namespace does not match the provided context"))
}
}
if errs := validation.ValidateEvent(event); len(errs) > 0 {
return nil, errors.NewInvalid("event", event.Name, errs)
}
api.FillObjectMetaSystemFields(ctx, &event.ObjectMeta)
err := rs.registry.CreateWithName(ctx, event.Name, event)
if err != nil {
return nil, err
}
return rs.registry.Get(ctx, event.Name)
func (eventStrategy) PrepareForCreate(obj runtime.Object) {
}
// Update replaces an existing Event instance in storage.registry, with the given instance.
func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) {
event, ok := obj.(*api.Event)
if !ok {
return nil, false, errors.NewInternalError(fmt.Errorf("received object is not of type event: %#v", obj))
}
if api.NamespaceValue(ctx) != "" {
if !api.ValidNamespace(ctx, &event.ObjectMeta) {
return nil, false, errors.NewConflict("event", event.Namespace, fmt.Errorf("event.namespace does not match the provided context"))
}
}
if errs := validation.ValidateEvent(event); len(errs) > 0 {
return nil, false, errors.NewInvalid("event", event.Name, errs)
}
api.FillObjectMetaSystemFields(ctx, &event.ObjectMeta)
err := rs.registry.UpdateWithName(ctx, event.Name, event)
if err != nil {
return nil, false, err
}
out, err := rs.registry.Get(ctx, event.Name)
return out, false, err
func (eventStrategy) PrepareForUpdate(obj, old runtime.Object) {
}
func (rs *REST) Delete(ctx api.Context, name string) (runtime.Object, error) {
obj, err := rs.registry.Get(ctx, name)
if err != nil {
return nil, err
}
_, ok := obj.(*api.Event)
if !ok {
return nil, errors.NewInternalError(fmt.Errorf("stored object %s is not of type event: %#v", name, obj))
}
return rs.registry.Delete(ctx, name, nil)
func (eventStrategy) Validate(ctx api.Context, obj runtime.Object) fielderrors.ValidationErrorList {
event := obj.(*api.Event)
return validation.ValidateEvent(event)
}
func (rs *REST) Get(ctx api.Context, name string) (runtime.Object, error) {
obj, err := rs.registry.Get(ctx, name)
if err != nil {
return nil, err
}
event, ok := obj.(*api.Event)
if !ok {
return nil, errors.NewInternalError(fmt.Errorf("stored object %s is not of type event: %#v", name, obj))
}
return event, err
func (eventStrategy) AllowCreateOnUpdate() bool {
return true
}
func (rs *REST) getAttrs(obj runtime.Object) (objLabels labels.Set, objFields fields.Set, err error) {
func (eventStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object) fielderrors.ValidationErrorList {
event := obj.(*api.Event)
return validation.ValidateEvent(event)
}
func (eventStrategy) AllowUnconditionalUpdate() bool {
return true
}
func MatchEvent(label labels.Selector, field fields.Selector) generic.Matcher {
return &generic.SelectionPredicate{label, field, getAttrs}
}
func getAttrs(obj runtime.Object) (objLabels labels.Set, objFields fields.Set, err error) {
event, ok := obj.(*api.Event)
if !ok {
return nil, nil, errors.NewInternalError(fmt.Errorf("object is not of type event: %#v", obj))
@ -135,22 +92,3 @@ func (rs *REST) getAttrs(obj runtime.Object) (objLabels labels.Set, objFields fi
"source": event.Source.Component,
}, nil
}
func (rs *REST) List(ctx api.Context, label labels.Selector, field fields.Selector) (runtime.Object, error) {
return rs.registry.ListPredicate(ctx, &generic.SelectionPredicate{Label: label, Field: field, GetAttrs: rs.getAttrs})
}
// Watch returns Events events via a watch.Interface.
// It implements rest.Watcher.
func (rs *REST) Watch(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) {
return rs.registry.WatchPredicate(ctx, &generic.SelectionPredicate{Label: label, Field: field, GetAttrs: rs.getAttrs}, resourceVersion)
}
// New returns a new api.Event
func (*REST) New() runtime.Object {
return &api.Event{}
}
func (*REST) NewList() runtime.Object {
return &api.EventList{}
}

View File

@ -21,24 +21,12 @@ import (
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/registrytest"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/watch"
)
type testRegistry struct {
*registrytest.GenericRegistry
}
func NewTestREST() (testRegistry, *REST) {
reg := testRegistry{registrytest.NewGeneric(nil)}
return reg, NewStorage(reg)
}
func testEvent(name string) *api.Event {
return &api.Event{
ObjectMeta: api.ObjectMeta{
@ -52,115 +40,7 @@ func testEvent(name string) *api.Event {
}
}
func TestRESTCreate(t *testing.T) {
table := []struct {
ctx api.Context
event *api.Event
valid bool
}{
{
ctx: api.NewDefaultContext(),
event: testEvent("foo"),
valid: true,
}, {
ctx: api.NewContext(),
event: testEvent("bar"),
valid: true,
}, {
ctx: api.WithNamespace(api.NewContext(), "nondefault"),
event: testEvent("bazzzz"),
valid: false,
},
}
for _, item := range table {
_, storage := NewTestREST()
c, err := storage.Create(item.ctx, item.event)
if !item.valid {
if err == nil {
ctxNS := api.NamespaceValue(item.ctx)
t.Errorf("unexpected non-error for %v (%v, %v)", item.event.Name, ctxNS, item.event.Namespace)
}
continue
}
if err != nil {
t.Errorf("%v: Unexpected error %v", item.event.Name, err)
continue
}
if !api.HasObjectMetaSystemFieldValues(&item.event.ObjectMeta) {
t.Errorf("storage did not populate object meta field values")
}
if e, a := item.event, c; !reflect.DeepEqual(e, a) {
t.Errorf("diff: %s", util.ObjectDiff(e, a))
}
// Ensure we implement the interface
_ = rest.Watcher(storage)
}
}
func TestRESTUpdate(t *testing.T) {
_, rest := NewTestREST()
eventA := testEvent("foo")
_, err := rest.Create(api.NewDefaultContext(), eventA)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
got, err := rest.Get(api.NewDefaultContext(), eventA.Name)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
if e, a := eventA, got; !reflect.DeepEqual(e, a) {
t.Errorf("diff: %s", util.ObjectDiff(e, a))
}
eventB := testEvent("bar")
_, _, err = rest.Update(api.NewDefaultContext(), eventB)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
got2, err := rest.Get(api.NewDefaultContext(), eventB.Name)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
if e, a := eventB, got2; !reflect.DeepEqual(e, a) {
t.Errorf("diff: %s", util.ObjectDiff(e, a))
}
}
func TestRESTDelete(t *testing.T) {
_, rest := NewTestREST()
eventA := testEvent("foo")
_, err := rest.Create(api.NewDefaultContext(), eventA)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
c, err := rest.Delete(api.NewDefaultContext(), eventA.Name)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
if stat := c.(*api.Status); stat.Status != api.StatusSuccess {
t.Errorf("unexpected status: %v", stat)
}
}
func TestRESTGet(t *testing.T) {
_, rest := NewTestREST()
eventA := testEvent("foo")
_, err := rest.Create(api.NewDefaultContext(), eventA)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
got, err := rest.Get(api.NewDefaultContext(), eventA.Name)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
if e, a := eventA, got; !reflect.DeepEqual(e, a) {
t.Errorf("diff: %s", util.ObjectDiff(e, a))
}
}
func TestRESTgetAttrs(t *testing.T) {
_, rest := NewTestREST()
func TestGetAttrs(t *testing.T) {
eventA := &api.Event{
ObjectMeta: api.ObjectMeta{Name: "f0118"},
InvolvedObject: api.ObjectReference{
@ -175,7 +55,7 @@ func TestRESTgetAttrs(t *testing.T) {
Reason: "ForTesting",
Source: api.EventSource{Component: "test"},
}
label, field, err := rest.getAttrs(eventA)
label, field, err := getAttrs(eventA)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
@ -198,82 +78,3 @@ func TestRESTgetAttrs(t *testing.T) {
t.Errorf("diff: %s", util.ObjectDiff(e, a))
}
}
func TestRESTList(t *testing.T) {
reg, rest := NewTestREST()
eventA := &api.Event{
InvolvedObject: api.ObjectReference{
Kind: "Pod",
Name: "foo",
UID: "long uid string",
APIVersion: testapi.Version(),
ResourceVersion: "0",
FieldPath: "",
},
Reason: "ForTesting",
Source: api.EventSource{Component: "GoodSource"},
}
eventB := &api.Event{
InvolvedObject: api.ObjectReference{
Kind: "Pod",
Name: "bar",
UID: "other long uid string",
APIVersion: testapi.Version(),
ResourceVersion: "0",
FieldPath: "",
},
Reason: "ForTesting",
Source: api.EventSource{Component: "GoodSource"},
}
eventC := &api.Event{
InvolvedObject: api.ObjectReference{
Kind: "Pod",
Name: "baz",
UID: "yet another long uid string",
APIVersion: testapi.Version(),
ResourceVersion: "0",
FieldPath: "",
},
Reason: "ForTesting",
Source: api.EventSource{Component: "OtherSource"},
}
reg.ObjectList = &api.EventList{
Items: []api.Event{*eventA, *eventB, *eventC},
}
got, err := rest.List(api.NewContext(), labels.Everything(), fields.Set{"source": "GoodSource"}.AsSelector())
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
expect := &api.EventList{
Items: []api.Event{*eventA, *eventB},
}
if e, a := expect, got; !reflect.DeepEqual(e, a) {
t.Errorf("diff: %s", util.ObjectDiff(e, a))
}
}
func TestRESTWatch(t *testing.T) {
eventA := &api.Event{
InvolvedObject: api.ObjectReference{
Kind: "Pod",
Name: "foo",
UID: "long uid string",
APIVersion: testapi.Version(),
ResourceVersion: "0",
FieldPath: "",
},
Reason: "ForTesting",
}
reg, rest := NewTestREST()
wi, err := rest.Watch(api.NewContext(), labels.Everything(), fields.Everything(), "0")
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
go func() {
reg.Broadcaster.Action(watch.Added, eventA)
}()
got := <-wi.ResultChan()
if e, a := eventA, got.Object; !reflect.DeepEqual(e, a) {
t.Errorf("diff: %s", util.ObjectDiff(e, a))
}
}

View File

@ -174,30 +174,6 @@ func (e *Etcd) ListPredicate(ctx api.Context, m generic.Matcher) (runtime.Object
return generic.FilterList(list, m, generic.DecoratorFunc(e.Decorator))
}
// CreateWithName inserts a new item with the provided name
// DEPRECATED: use Create instead
func (e *Etcd) CreateWithName(ctx api.Context, name string, obj runtime.Object) error {
key, err := e.KeyFunc(ctx, name)
if err != nil {
return err
}
if e.CreateStrategy != nil {
if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
return err
}
}
ttl, err := e.calculateTTL(obj, 0, false)
if err != nil {
return err
}
err = e.Storage.Create(key, obj, nil, ttl)
err = etcderr.InterpretCreateError(err, e.EndpointName, name)
if err == nil && e.Decorator != nil {
err = e.Decorator(obj)
}
return err
}
// Create inserts a new item according to the unique key from the object.
func (e *Etcd) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) {
trace := util.NewTrace("Create " + reflect.TypeOf(obj).String())
@ -238,25 +214,6 @@ func (e *Etcd) Create(ctx api.Context, obj runtime.Object) (runtime.Object, erro
return out, nil
}
// UpdateWithName updates the item with the provided name
// DEPRECATED: use Update instead
func (e *Etcd) UpdateWithName(ctx api.Context, name string, obj runtime.Object) error {
key, err := e.KeyFunc(ctx, name)
if err != nil {
return err
}
ttl, err := e.calculateTTL(obj, 0, true)
if err != nil {
return err
}
err = e.Storage.Set(key, obj, nil, ttl)
err = etcderr.InterpretUpdateError(err, e.EndpointName, name)
if err == nil && e.Decorator != nil {
err = e.Decorator(obj)
}
return err
}
// Update performs an atomic update and set of the object. Returns the result of the update
// or an error. If the registry allows create-on-update, the create flow will be executed.
// A bool is returned along with the object and any errors, to indicate object creation.

View File

@ -307,83 +307,6 @@ func TestEtcdCreate(t *testing.T) {
}
}
// DEPRECATED
func TestEtcdCreateWithName(t *testing.T) {
podA := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Spec: api.PodSpec{NodeName: "machine"},
}
podB := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Spec: api.PodSpec{NodeName: "machine2"},
}
nodeWithPodA := tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(testapi.Codec(), podA),
ModifiedIndex: 1,
CreatedIndex: 1,
},
},
E: nil,
}
emptyNode := tools.EtcdResponseWithError{
R: &etcd.Response{},
E: tools.EtcdErrorNotFound,
}
key := "foo"
table := map[string]struct {
existing tools.EtcdResponseWithError
expect tools.EtcdResponseWithError
toCreate runtime.Object
objOK func(obj runtime.Object) bool
errOK func(error) bool
}{
"normal": {
existing: emptyNode,
toCreate: podA,
objOK: hasCreated(t, podA),
errOK: func(err error) bool { return err == nil },
},
"preExisting": {
existing: nodeWithPodA,
expect: nodeWithPodA,
toCreate: podB,
errOK: errors.IsAlreadyExists,
},
}
for name, item := range table {
fakeClient, registry := NewTestGenericEtcdRegistry(t)
path := etcdtest.AddPrefix("pods/foo")
fakeClient.Data[path] = item.existing
err := registry.CreateWithName(api.NewDefaultContext(), key, item.toCreate)
if !item.errOK(err) {
t.Errorf("%v: unexpected error: %v", name, err)
}
actual := fakeClient.Data[path]
if item.objOK != nil {
obj, err := api.Scheme.Decode([]byte(actual.R.Node.Value))
if err != nil {
t.Errorf("unable to decode stored value for %#v", actual)
continue
}
if !item.objOK(obj) {
t.Errorf("%v: unexpected response: %v", name, actual)
}
} else {
if e, a := item.expect, actual; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a))
}
}
}
}
func TestEtcdUpdate(t *testing.T) {
podA := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
@ -520,82 +443,6 @@ func TestEtcdUpdate(t *testing.T) {
}
}
// DEPRECATED
func TestEtcdUpdateWithName(t *testing.T) {
podA := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.PodSpec{NodeName: "machine"},
}
podB := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: "1"},
Spec: api.PodSpec{NodeName: "machine2"},
}
nodeWithPodA := tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(testapi.Codec(), podA),
ModifiedIndex: 1,
CreatedIndex: 1,
},
},
E: nil,
}
nodeWithPodB := tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(testapi.Codec(), podB),
ModifiedIndex: 1,
CreatedIndex: 1,
},
},
E: nil,
}
emptyNode := tools.EtcdResponseWithError{
R: &etcd.Response{},
E: tools.EtcdErrorNotFound,
}
key := "foo"
table := map[string]struct {
existing tools.EtcdResponseWithError
expect tools.EtcdResponseWithError
toUpdate runtime.Object
errOK func(error) bool
}{
"normal": {
existing: nodeWithPodA,
expect: nodeWithPodB,
toUpdate: podB,
errOK: func(err error) bool { return err == nil },
},
"notExisting": {
existing: emptyNode,
expect: nodeWithPodA,
toUpdate: podA,
// TODO: Should updating a non-existing thing fail?
errOK: func(err error) bool { return err == nil },
},
}
for name, item := range table {
fakeClient, registry := NewTestGenericEtcdRegistry(t)
path := etcdtest.AddPrefix("pods/foo")
fakeClient.Data[path] = item.existing
err := registry.UpdateWithName(api.NewContext(), key, item.toUpdate)
if !item.errOK(err) {
t.Errorf("%v: unexpected error: %v", name, err)
}
if e, a := item.expect, fakeClient.Data[path]; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a))
}
}
}
func TestEtcdGet(t *testing.T) {
podA := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: "1"},

View File

@ -17,11 +17,9 @@ limitations under the License.
package generic
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
)
// AttrFunc returns label and field sets for List or Watch to compare against, or an error.
@ -124,19 +122,6 @@ var (
// DecoratorFunc can mutate the provided object prior to being returned.
type DecoratorFunc func(obj runtime.Object) error
// Registry knows how to store & list any runtime.Object. Can be used for
// any object types which don't require special features from the storage
// layer.
// DEPRECATED: replace with direct implementation of RESTStorage
type Registry interface {
ListPredicate(api.Context, Matcher) (runtime.Object, error)
CreateWithName(ctx api.Context, id string, obj runtime.Object) error
UpdateWithName(ctx api.Context, id string, obj runtime.Object) error
Get(ctx api.Context, id string) (runtime.Object, error)
Delete(ctx api.Context, id string, options *api.DeleteOptions) (runtime.Object, error)
WatchPredicate(ctx api.Context, m Matcher, resourceVersion string) (watch.Interface, error)
}
// FilterList filters any list object that conforms to the api conventions,
// provided that 'm' works with the concrete type of list. d is an optional
// decorator for the returned functions. Only matching items are decorated.

View File

@ -25,7 +25,6 @@ import (
"k8s.io/kubernetes/pkg/api/rest/resttest"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/registrytest"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
@ -36,10 +35,6 @@ import (
"github.com/coreos/go-etcd/etcd"
)
type testRegistry struct {
*registrytest.GenericRegistry
}
func newStorage(t *testing.T) (*REST, *StatusREST, *tools.FakeEtcdClient, storage.Interface) {
fakeEtcdClient := tools.NewFakeEtcdClient(t)
fakeEtcdClient.TestIndex = true

View File

@ -25,7 +25,6 @@ import (
"k8s.io/kubernetes/pkg/api/rest/resttest"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/registrytest"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
@ -36,10 +35,6 @@ import (
"github.com/coreos/go-etcd/etcd"
)
type testRegistry struct {
*registrytest.GenericRegistry
}
func newStorage(t *testing.T) (*REST, *StatusREST, *tools.FakeEtcdClient, storage.Interface) {
fakeEtcdClient := tools.NewFakeEtcdClient(t)
fakeEtcdClient.TestIndex = true

View File

@ -1,92 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package registrytest
import (
"sync"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
)
// GenericRegistry knows how to store & list any runtime.Object.
type GenericRegistry struct {
Err error
Object runtime.Object
ObjectList runtime.Object
sync.Mutex
Broadcaster *watch.Broadcaster
}
func NewGeneric(list runtime.Object) *GenericRegistry {
return &GenericRegistry{
ObjectList: list,
Broadcaster: watch.NewBroadcaster(0, watch.WaitIfChannelFull),
}
}
func (r *GenericRegistry) ListPredicate(ctx api.Context, m generic.Matcher) (runtime.Object, error) {
r.Lock()
defer r.Unlock()
if r.Err != nil {
return nil, r.Err
}
return generic.FilterList(r.ObjectList, m, nil)
}
func (r *GenericRegistry) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) {
// TODO: wire filter down into the mux; it needs access to current and previous state :(
return r.Broadcaster.Watch(), nil
}
func (r *GenericRegistry) Get(ctx api.Context, id string) (runtime.Object, error) {
r.Lock()
defer r.Unlock()
if r.Err != nil {
return nil, r.Err
}
if r.Object != nil {
return r.Object, nil
}
panic("generic registry should either have an object or an error for Get")
}
func (r *GenericRegistry) CreateWithName(ctx api.Context, id string, obj runtime.Object) error {
r.Lock()
defer r.Unlock()
r.Object = obj
r.Broadcaster.Action(watch.Added, obj)
return r.Err
}
func (r *GenericRegistry) UpdateWithName(ctx api.Context, id string, obj runtime.Object) error {
r.Lock()
defer r.Unlock()
r.Object = obj
r.Broadcaster.Action(watch.Modified, obj)
return r.Err
}
func (r *GenericRegistry) Delete(ctx api.Context, id string, options *api.DeleteOptions) (runtime.Object, error) {
r.Lock()
defer r.Unlock()
r.Broadcaster.Action(watch.Deleted, r.Object)
return &api.Status{Status: api.StatusSuccess}, r.Err
}