diff --git a/pkg/registry/generic/registry/store_test.go b/pkg/registry/generic/registry/store_test.go index f70133b1aa3..81381389b09 100644 --- a/pkg/registry/generic/registry/store_test.go +++ b/pkg/registry/generic/registry/store_test.go @@ -21,9 +21,9 @@ import ( "path" "reflect" "strconv" - "testing" - "sync" + "testing" + "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" @@ -35,12 +35,14 @@ import ( "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/registry/generic" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" "k8s.io/kubernetes/pkg/storage/etcd/etcdtest" etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing" storagetesting "k8s.io/kubernetes/pkg/storage/testing" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/validation/field" + "k8s.io/kubernetes/pkg/util/wait" ) type testRESTStrategy struct { @@ -77,55 +79,8 @@ func (t *testRESTStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Obje } func (t *testRESTStrategy) Canonicalize(obj runtime.Object) {} -func hasCreated(t *testing.T, pod *api.Pod) func(runtime.Object) bool { - return func(obj runtime.Object) bool { - actualPod := obj.(*api.Pod) - if !api.Semantic.DeepDerivative(pod.Status, actualPod.Status) { - t.Errorf("not a deep derivative %#v", actualPod) - return false - } - return api.HasObjectMetaSystemFieldValues(&actualPod.ObjectMeta) - } -} - func NewTestGenericStoreRegistry(t *testing.T) (*etcdtesting.EtcdTestServer, *Store) { - podPrefix := "/pods" - server := etcdtesting.NewEtcdTestClientServer(t) - s := etcdstorage.NewEtcdStorage(server.Client, testapi.Default.StorageCodec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize) - strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false, true} - - return server, &Store{ - NewFunc: func() runtime.Object { return &api.Pod{} }, - NewListFunc: func() runtime.Object { return &api.PodList{} }, - QualifiedResource: api.Resource("pods"), - CreateStrategy: strategy, - UpdateStrategy: strategy, - DeleteStrategy: strategy, - KeyRootFunc: func(ctx api.Context) string { - return podPrefix - }, - KeyFunc: func(ctx api.Context, id string) (string, error) { - if _, ok := api.NamespaceFrom(ctx); !ok { - return "", fmt.Errorf("namespace is required") - } - return path.Join(podPrefix, id), nil - }, - ObjectNameFunc: func(obj runtime.Object) (string, error) { return obj.(*api.Pod).Name, nil }, - PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher { - return &generic.SelectionPredicate{ - Label: label, - Field: field, - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { - pod, ok := obj.(*api.Pod) - if !ok { - return nil, nil, fmt.Errorf("not a pod") - } - return labels.Set(pod.ObjectMeta.Labels), generic.ObjectMetaFieldsSet(pod.ObjectMeta, true), nil - }, - } - }, - Storage: s, - } + return newTestGenericStoreRegistry(t, false) } // setMatcher is a matcher that matches any pod with id in the set. @@ -235,6 +190,70 @@ func TestStoreList(t *testing.T) { } } +// TestStoreListResourceVersion tests that if List with ResourceVersion > 0, it will wait until +// the results are as fresh as given version. +func TestStoreListResourceVersion(t *testing.T) { + fooPod := &api.Pod{ + ObjectMeta: api.ObjectMeta{Namespace: "test", Name: "foo"}, + Spec: api.PodSpec{NodeName: "machine"}, + } + barPod := &api.Pod{ + ObjectMeta: api.ObjectMeta{Namespace: "test", Name: "bar"}, + Spec: api.PodSpec{NodeName: "machine"}, + } + ctx := api.WithNamespace(api.NewContext(), "test") + + server, registry := newTestGenericStoreRegistry(t, true) + defer server.Terminate(t) + + obj, err := registry.Create(ctx, fooPod) + if err != nil { + t.Fatal(err) + } + + versioner := etcdstorage.APIObjectVersioner{} + rev, err := versioner.ObjectResourceVersion(obj) + if err != nil { + t.Fatal(err) + } + + waitListCh := make(chan runtime.Object, 1) + go func(listRev uint64) { + option := &api.ListOptions{ResourceVersion: strconv.FormatUint(listRev, 10)} + // It will wait until we create the second pod. + l, err := registry.List(ctx, option) + if err != nil { + close(waitListCh) + t.Fatal(err) + return + } + waitListCh <- l + }(rev + 1) + + select { + case <-time.After(500 * time.Millisecond): + case l := <-waitListCh: + t.Fatalf("expected waiting, but get %#v", l) + } + + if _, err := registry.Create(ctx, barPod); err != nil { + t.Fatal(err) + } + + select { + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("timeout after %v", wait.ForeverTestTimeout) + case l, ok := <-waitListCh: + if !ok { + return + } + pl := l.(*api.PodList).Items + if len(pl) != 2 { + t.Errorf("Expected get 2 items, but got %d", len(pl)) + } + } +} + func TestStoreCreate(t *testing.T) { podA := &api.Pod{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "test"}, @@ -983,3 +1002,55 @@ func TestStoreWatch(t *testing.T) { server.Terminate(t) } } + +func newTestGenericStoreRegistry(t *testing.T, hasCacheEnabled bool) (*etcdtesting.EtcdTestServer, *Store) { + podPrefix := "/pods" + server := etcdtesting.NewEtcdTestClientServer(t) + strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false, true} + s := etcdstorage.NewEtcdStorage(server.Client, testapi.Default.StorageCodec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize) + if hasCacheEnabled { + config := storage.CacherConfig{ + CacheCapacity: 10, + Storage: s, + Versioner: etcdstorage.APIObjectVersioner{}, + Type: &api.Pod{}, + ResourcePrefix: podPrefix, + KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) }, + NewListFunc: func() runtime.Object { return &api.PodList{} }, + } + s = storage.NewCacherFromConfig(config) + } + + return server, &Store{ + NewFunc: func() runtime.Object { return &api.Pod{} }, + NewListFunc: func() runtime.Object { return &api.PodList{} }, + QualifiedResource: api.Resource("pods"), + CreateStrategy: strategy, + UpdateStrategy: strategy, + DeleteStrategy: strategy, + KeyRootFunc: func(ctx api.Context) string { + return podPrefix + }, + KeyFunc: func(ctx api.Context, id string) (string, error) { + if _, ok := api.NamespaceFrom(ctx); !ok { + return "", fmt.Errorf("namespace is required") + } + return path.Join(podPrefix, id), nil + }, + ObjectNameFunc: func(obj runtime.Object) (string, error) { return obj.(*api.Pod).Name, nil }, + PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher { + return &generic.SelectionPredicate{ + Label: label, + Field: field, + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + pod, ok := obj.(*api.Pod) + if !ok { + return nil, nil, fmt.Errorf("not a pod") + } + return labels.Set(pod.ObjectMeta.Labels), generic.ObjectMetaFieldsSet(pod.ObjectMeta, true), nil + }, + } + }, + Storage: s, + } +}