mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-09 12:07:47 +00:00
Merge pull request #28524 from hongchaodeng/listopt
Automatic merge from submit-queue ListOptions: add test for ResourceVersion > 0 in List ref: #28472 Done: - Add a test for ResourceVersion > 0 in registry (cache) store List() - Fix the docs.
This commit is contained in:
commit
6a4de0927c
@ -2232,7 +2232,11 @@ type ListOptions struct {
|
||||
FieldSelector fields.Selector
|
||||
// If true, watch for changes to this list
|
||||
Watch bool
|
||||
// The resource version to watch (no effect on list yet)
|
||||
// For watch, it's the resource version to watch.
|
||||
// For list,
|
||||
// - if unset, then the result is returned from remote storage based on quorum-read flag;
|
||||
// - if it's 0, then we simply return what we currently have in cache, no guarantee;
|
||||
// - if set to non zero, then the result is as fresh as given rv.
|
||||
ResourceVersion string
|
||||
// Timeout for the list/watch call.
|
||||
TimeoutSeconds *int64
|
||||
|
@ -21,9 +21,9 @@ import (
|
||||
"path"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
@ -35,12 +35,14 @@ import (
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/registry/generic"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/storage"
|
||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
|
||||
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
|
||||
storagetesting "k8s.io/kubernetes/pkg/storage/testing"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/util/validation/field"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
)
|
||||
|
||||
type testRESTStrategy struct {
|
||||
@ -77,55 +79,8 @@ func (t *testRESTStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Obje
|
||||
}
|
||||
func (t *testRESTStrategy) Canonicalize(obj runtime.Object) {}
|
||||
|
||||
func hasCreated(t *testing.T, pod *api.Pod) func(runtime.Object) bool {
|
||||
return func(obj runtime.Object) bool {
|
||||
actualPod := obj.(*api.Pod)
|
||||
if !api.Semantic.DeepDerivative(pod.Status, actualPod.Status) {
|
||||
t.Errorf("not a deep derivative %#v", actualPod)
|
||||
return false
|
||||
}
|
||||
return api.HasObjectMetaSystemFieldValues(&actualPod.ObjectMeta)
|
||||
}
|
||||
}
|
||||
|
||||
func NewTestGenericStoreRegistry(t *testing.T) (*etcdtesting.EtcdTestServer, *Store) {
|
||||
podPrefix := "/pods"
|
||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||
s := etcdstorage.NewEtcdStorage(server.Client, testapi.Default.StorageCodec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize)
|
||||
strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false, true}
|
||||
|
||||
return server, &Store{
|
||||
NewFunc: func() runtime.Object { return &api.Pod{} },
|
||||
NewListFunc: func() runtime.Object { return &api.PodList{} },
|
||||
QualifiedResource: api.Resource("pods"),
|
||||
CreateStrategy: strategy,
|
||||
UpdateStrategy: strategy,
|
||||
DeleteStrategy: strategy,
|
||||
KeyRootFunc: func(ctx api.Context) string {
|
||||
return podPrefix
|
||||
},
|
||||
KeyFunc: func(ctx api.Context, id string) (string, error) {
|
||||
if _, ok := api.NamespaceFrom(ctx); !ok {
|
||||
return "", fmt.Errorf("namespace is required")
|
||||
}
|
||||
return path.Join(podPrefix, id), nil
|
||||
},
|
||||
ObjectNameFunc: func(obj runtime.Object) (string, error) { return obj.(*api.Pod).Name, nil },
|
||||
PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher {
|
||||
return &generic.SelectionPredicate{
|
||||
Label: label,
|
||||
Field: field,
|
||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
pod, ok := obj.(*api.Pod)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("not a pod")
|
||||
}
|
||||
return labels.Set(pod.ObjectMeta.Labels), generic.ObjectMetaFieldsSet(pod.ObjectMeta, true), nil
|
||||
},
|
||||
}
|
||||
},
|
||||
Storage: s,
|
||||
}
|
||||
return newTestGenericStoreRegistry(t, false)
|
||||
}
|
||||
|
||||
// setMatcher is a matcher that matches any pod with id in the set.
|
||||
@ -235,6 +190,70 @@ func TestStoreList(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestStoreListResourceVersion tests that if List with ResourceVersion > 0, it will wait until
|
||||
// the results are as fresh as given version.
|
||||
func TestStoreListResourceVersion(t *testing.T) {
|
||||
fooPod := &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{Namespace: "test", Name: "foo"},
|
||||
Spec: api.PodSpec{NodeName: "machine"},
|
||||
}
|
||||
barPod := &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{Namespace: "test", Name: "bar"},
|
||||
Spec: api.PodSpec{NodeName: "machine"},
|
||||
}
|
||||
ctx := api.WithNamespace(api.NewContext(), "test")
|
||||
|
||||
server, registry := newTestGenericStoreRegistry(t, true)
|
||||
defer server.Terminate(t)
|
||||
|
||||
obj, err := registry.Create(ctx, fooPod)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
versioner := etcdstorage.APIObjectVersioner{}
|
||||
rev, err := versioner.ObjectResourceVersion(obj)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
waitListCh := make(chan runtime.Object, 1)
|
||||
go func(listRev uint64) {
|
||||
option := &api.ListOptions{ResourceVersion: strconv.FormatUint(listRev, 10)}
|
||||
// It will wait until we create the second pod.
|
||||
l, err := registry.List(ctx, option)
|
||||
if err != nil {
|
||||
close(waitListCh)
|
||||
t.Fatal(err)
|
||||
return
|
||||
}
|
||||
waitListCh <- l
|
||||
}(rev + 1)
|
||||
|
||||
select {
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
case l := <-waitListCh:
|
||||
t.Fatalf("expected waiting, but get %#v", l)
|
||||
}
|
||||
|
||||
if _, err := registry.Create(ctx, barPod); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
|
||||
case l, ok := <-waitListCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
pl := l.(*api.PodList).Items
|
||||
if len(pl) != 2 {
|
||||
t.Errorf("Expected get 2 items, but got %d", len(pl))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestStoreCreate(t *testing.T) {
|
||||
podA := &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "test"},
|
||||
@ -983,3 +1002,55 @@ func TestStoreWatch(t *testing.T) {
|
||||
server.Terminate(t)
|
||||
}
|
||||
}
|
||||
|
||||
func newTestGenericStoreRegistry(t *testing.T, hasCacheEnabled bool) (*etcdtesting.EtcdTestServer, *Store) {
|
||||
podPrefix := "/pods"
|
||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||
strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false, true}
|
||||
s := etcdstorage.NewEtcdStorage(server.Client, testapi.Default.StorageCodec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize)
|
||||
if hasCacheEnabled {
|
||||
config := storage.CacherConfig{
|
||||
CacheCapacity: 10,
|
||||
Storage: s,
|
||||
Versioner: etcdstorage.APIObjectVersioner{},
|
||||
Type: &api.Pod{},
|
||||
ResourcePrefix: podPrefix,
|
||||
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) },
|
||||
NewListFunc: func() runtime.Object { return &api.PodList{} },
|
||||
}
|
||||
s = storage.NewCacherFromConfig(config)
|
||||
}
|
||||
|
||||
return server, &Store{
|
||||
NewFunc: func() runtime.Object { return &api.Pod{} },
|
||||
NewListFunc: func() runtime.Object { return &api.PodList{} },
|
||||
QualifiedResource: api.Resource("pods"),
|
||||
CreateStrategy: strategy,
|
||||
UpdateStrategy: strategy,
|
||||
DeleteStrategy: strategy,
|
||||
KeyRootFunc: func(ctx api.Context) string {
|
||||
return podPrefix
|
||||
},
|
||||
KeyFunc: func(ctx api.Context, id string) (string, error) {
|
||||
if _, ok := api.NamespaceFrom(ctx); !ok {
|
||||
return "", fmt.Errorf("namespace is required")
|
||||
}
|
||||
return path.Join(podPrefix, id), nil
|
||||
},
|
||||
ObjectNameFunc: func(obj runtime.Object) (string, error) { return obj.(*api.Pod).Name, nil },
|
||||
PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher {
|
||||
return &generic.SelectionPredicate{
|
||||
Label: label,
|
||||
Field: field,
|
||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
pod, ok := obj.(*api.Pod)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("not a pod")
|
||||
}
|
||||
return labels.Set(pod.ObjectMeta.Labels), generic.ObjectMetaFieldsSet(pod.ObjectMeta, true), nil
|
||||
},
|
||||
}
|
||||
},
|
||||
Storage: s,
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user