mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-29 21:29:24 +00:00
Support collection deletion in apiserver.
This commit is contained in:
@@ -24,6 +24,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
kubeerr "k8s.io/kubernetes/pkg/api/errors"
|
||||
etcderr "k8s.io/kubernetes/pkg/api/errors/etcd"
|
||||
"k8s.io/kubernetes/pkg/api/meta"
|
||||
"k8s.io/kubernetes/pkg/api/rest"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/api/validation"
|
||||
@@ -436,6 +437,37 @@ func (e *Etcd) Delete(ctx api.Context, name string, options *api.DeleteOptions)
|
||||
return e.finalizeDelete(out, true)
|
||||
}
|
||||
|
||||
// DeleteCollection remove all items returned by List with a given ListOptions from etcd.
|
||||
//
|
||||
// DeleteCollection is currently NOT atomic. It can happen that only subset of objects
|
||||
// will be deleted from etcd, and then an error will be returned.
|
||||
// In case of success, the list of deleted objects will be returned.
|
||||
//
|
||||
// TODO: Currently, there is no easy way to remove 'directory' entry from etcd (if we
|
||||
// are removing all objects of a given type) with the current API (it's technically
|
||||
// possibly with etcd API, but watch is not delivered correctly then).
|
||||
// It will be possible to fix it with v3 etcd API.
|
||||
func (e *Etcd) DeleteCollection(ctx api.Context, options *api.DeleteOptions, listOptions *unversioned.ListOptions) (runtime.Object, error) {
|
||||
listObj, err := e.List(ctx, listOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items, err := meta.ExtractList(listObj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, item := range items {
|
||||
accessor, err := meta.Accessor(item)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if _, err := e.Delete(ctx, accessor.Name(), options); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return listObj, nil
|
||||
}
|
||||
|
||||
func (e *Etcd) finalizeDelete(obj runtime.Object, runHooks bool) (runtime.Object, error) {
|
||||
if runHooks && e.AfterDelete != nil {
|
||||
if err := e.AfterDelete(obj); err != nil {
|
||||
|
||||
@@ -25,6 +25,9 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/registry/generic"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
@@ -90,7 +93,20 @@ func NewTestGenericEtcdRegistry(t *testing.T) (*etcdtesting.EtcdTestServer, *Etc
|
||||
return path.Join(podPrefix, id), nil
|
||||
},
|
||||
ObjectNameFunc: func(obj runtime.Object) (string, error) { return obj.(*api.Pod).Name, nil },
|
||||
Storage: s,
|
||||
PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher {
|
||||
return &generic.SelectionPredicate{
|
||||
Label: label,
|
||||
Field: field,
|
||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
pod, ok := obj.(*api.Pod)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("not a pod")
|
||||
}
|
||||
return labels.Set(pod.ObjectMeta.Labels), generic.ObjectMetaFieldsSet(pod.ObjectMeta, true), nil
|
||||
},
|
||||
}
|
||||
},
|
||||
Storage: s,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -366,6 +382,78 @@ func TestEtcdDelete(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestEtcdDeleteCollection(t *testing.T) {
|
||||
podA := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
||||
podB := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}}
|
||||
|
||||
testContext := api.WithNamespace(api.NewContext(), "test")
|
||||
server, registry := NewTestGenericEtcdRegistry(t)
|
||||
defer server.Terminate(t)
|
||||
|
||||
if _, err := registry.Create(testContext, podA); err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
if _, err := registry.Create(testContext, podB); err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// Delete all pods.
|
||||
deleted, err := registry.DeleteCollection(testContext, nil, &unversioned.ListOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
deletedPods := deleted.(*api.PodList)
|
||||
if len(deletedPods.Items) != 2 {
|
||||
t.Errorf("Unexpected number of pods deleted: %d, expected: 2", len(deletedPods.Items))
|
||||
}
|
||||
|
||||
if _, err := registry.Get(testContext, podA.Name); !errors.IsNotFound(err) {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
if _, err := registry.Get(testContext, podB.Name); !errors.IsNotFound(err) {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Test whether objects deleted with DeleteCollection are correctly delivered
|
||||
// to watchers.
|
||||
func TestEtcdDeleteCollectionWithWatch(t *testing.T) {
|
||||
podA := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
||||
|
||||
testContext := api.WithNamespace(api.NewContext(), "test")
|
||||
server, registry := NewTestGenericEtcdRegistry(t)
|
||||
defer server.Terminate(t)
|
||||
|
||||
objCreated, err := registry.Create(testContext, podA)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
podCreated := objCreated.(*api.Pod)
|
||||
|
||||
watcher, err := registry.WatchPredicate(testContext, setMatcher{sets.NewString("foo")}, podCreated.ResourceVersion)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if _, err := registry.DeleteCollection(testContext, nil, &unversioned.ListOptions{}); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
got, open := <-watcher.ResultChan()
|
||||
if !open {
|
||||
t.Errorf("Unexpected channel close")
|
||||
} else {
|
||||
if got.Type != "DELETED" {
|
||||
t.Errorf("Unexpected event type: %s", got.Type)
|
||||
}
|
||||
gotObject := got.Object.(*api.Pod)
|
||||
gotObject.ResourceVersion = podCreated.ResourceVersion
|
||||
if e, a := podCreated, gotObject; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected: %#v, got: %#v", e, a)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEtcdWatch(t *testing.T) {
|
||||
testContext := api.WithNamespace(api.NewContext(), "test")
|
||||
noNamespaceContext := api.NewContext()
|
||||
|
||||
Reference in New Issue
Block a user