Merge pull request #40822 from sttts/sttts-more-cutoffs-6

Automatic merge from submit-queue

genericapiserver: cut off more dependencies – episode 6

Follow-up of https://github.com/kubernetes/kubernetes/pull/40808

approved based on #40363
This commit is contained in:
Kubernetes Submit Queue 2017-02-02 05:10:10 -08:00 committed by GitHub
commit 4a4678aafa
29 changed files with 406 additions and 293 deletions

View File

@ -63,7 +63,7 @@ type DiscoveryServerOptions struct {
// NewCommandStartMaster provides a CLI handler for 'start master' command // NewCommandStartMaster provides a CLI handler for 'start master' command
func NewCommandStartDiscoveryServer(out, err io.Writer) *cobra.Command { func NewCommandStartDiscoveryServer(out, err io.Writer) *cobra.Command {
o := &DiscoveryServerOptions{ o := &DiscoveryServerOptions{
Etcd: genericoptions.NewEtcdOptions(), Etcd: genericoptions.NewEtcdOptions(api.Scheme),
SecureServing: genericoptions.NewSecureServingOptions(), SecureServing: genericoptions.NewSecureServingOptions(),
Authentication: genericoptions.NewDelegatingAuthenticationOptions(), Authentication: genericoptions.NewDelegatingAuthenticationOptions(),
Authorization: genericoptions.NewDelegatingAuthorizationOptions(), Authorization: genericoptions.NewDelegatingAuthorizationOptions(),

View File

@ -65,7 +65,7 @@ type ServerRunOptions struct {
func NewServerRunOptions() *ServerRunOptions { func NewServerRunOptions() *ServerRunOptions {
s := ServerRunOptions{ s := ServerRunOptions{
GenericServerRunOptions: genericoptions.NewServerRunOptions(), GenericServerRunOptions: genericoptions.NewServerRunOptions(),
Etcd: genericoptions.NewEtcdOptions(), Etcd: genericoptions.NewEtcdOptions(api.Scheme),
SecureServing: genericoptions.NewSecureServingOptions(), SecureServing: genericoptions.NewSecureServingOptions(),
InsecureServing: genericoptions.NewInsecureServingOptions(), InsecureServing: genericoptions.NewInsecureServingOptions(),
Authentication: kubeoptions.NewBuiltInAuthenticationOptions().WithAll(), Authentication: kubeoptions.NewBuiltInAuthenticationOptions().WithAll(),

View File

@ -49,6 +49,7 @@ func newStorageFactory() genericapiserver.StorageFactory {
config := storagebackend.Config{ config := storagebackend.Config{
Prefix: genericoptions.DefaultEtcdPathPrefix, Prefix: genericoptions.DefaultEtcdPathPrefix,
ServerList: []string{"http://127.0.0.1:2379"}, ServerList: []string{"http://127.0.0.1:2379"},
Copier: api.Scheme,
} }
storageFactory := genericapiserver.NewDefaultStorageFactory(config, "application/json", api.Codecs, genericapiserver.NewDefaultResourceEncodingConfig(), genericapiserver.NewResourceConfig()) storageFactory := genericapiserver.NewDefaultStorageFactory(config, "application/json", api.Codecs, genericapiserver.NewDefaultResourceEncodingConfig(), genericapiserver.NewResourceConfig())
@ -67,7 +68,7 @@ type ServerRunOptions struct {
func NewServerRunOptions() *ServerRunOptions { func NewServerRunOptions() *ServerRunOptions {
s := ServerRunOptions{ s := ServerRunOptions{
GenericServerRunOptions: genericoptions.NewServerRunOptions(), GenericServerRunOptions: genericoptions.NewServerRunOptions(),
Etcd: genericoptions.NewEtcdOptions(), Etcd: genericoptions.NewEtcdOptions(api.Scheme),
SecureServing: genericoptions.NewSecureServingOptions(), SecureServing: genericoptions.NewSecureServingOptions(),
InsecureServing: genericoptions.NewInsecureServingOptions(), InsecureServing: genericoptions.NewInsecureServingOptions(),
Authentication: kubeoptions.NewBuiltInAuthenticationOptions().WithAll(), Authentication: kubeoptions.NewBuiltInAuthenticationOptions().WithAll(),

View File

@ -15,6 +15,7 @@ go_library(
], ],
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//pkg/api:go_default_library",
"//pkg/features:go_default_library", "//pkg/features:go_default_library",
"//pkg/kubeapiserver/options:go_default_library", "//pkg/kubeapiserver/options:go_default_library",
"//vendor:github.com/spf13/pflag", "//vendor:github.com/spf13/pflag",

View File

@ -21,6 +21,7 @@ import (
"time" "time"
genericoptions "k8s.io/apiserver/pkg/server/options" genericoptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/kubernetes/pkg/api"
kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options" kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options"
// add the kubernetes feature gates // add the kubernetes feature gates
@ -47,7 +48,7 @@ type ServerRunOptions struct {
func NewServerRunOptions() *ServerRunOptions { func NewServerRunOptions() *ServerRunOptions {
s := ServerRunOptions{ s := ServerRunOptions{
GenericServerRunOptions: genericoptions.NewServerRunOptions(), GenericServerRunOptions: genericoptions.NewServerRunOptions(),
Etcd: genericoptions.NewEtcdOptions(), Etcd: genericoptions.NewEtcdOptions(api.Scheme),
SecureServing: genericoptions.NewSecureServingOptions(), SecureServing: genericoptions.NewSecureServingOptions(),
InsecureServing: genericoptions.NewInsecureServingOptions(), InsecureServing: genericoptions.NewInsecureServingOptions(),
Authentication: kubeoptions.NewBuiltInAuthenticationOptions().WithAll(), Authentication: kubeoptions.NewBuiltInAuthenticationOptions().WithAll(),

View File

@ -1235,7 +1235,7 @@ func TestStoreWatch(t *testing.T) {
func newTestGenericStoreRegistry(t *testing.T, hasCacheEnabled bool) (factory.DestroyFunc, *Store) { func newTestGenericStoreRegistry(t *testing.T, hasCacheEnabled bool) (factory.DestroyFunc, *Store) {
podPrefix := "/pods" podPrefix := "/pods"
server, sc := etcdtesting.NewUnsecuredEtcd3TestClientServer(t) server, sc := etcdtesting.NewUnsecuredEtcd3TestClientServer(t, api.Scheme)
strategy := &testRESTStrategy{api.Scheme, names.SimpleNameGenerator, true, false, true} strategy := &testRESTStrategy{api.Scheme, names.SimpleNameGenerator, true, false, true}
sc.Codec = testapi.Default.StorageCodec() sc.Codec = testapi.Default.StorageCodec()

View File

@ -75,7 +75,7 @@ func init() {
// setUp is a convience function for setting up for (most) tests. // setUp is a convience function for setting up for (most) tests.
func setUp(t *testing.T) (*etcdtesting.EtcdTestServer, Config, *assert.Assertions) { func setUp(t *testing.T) (*etcdtesting.EtcdTestServer, Config, *assert.Assertions) {
etcdServer, _ := etcdtesting.NewUnsecuredEtcd3TestClientServer(t) etcdServer, _ := etcdtesting.NewUnsecuredEtcd3TestClientServer(t, scheme)
config := NewConfig() config := NewConfig()
config.PublicAddress = net.ParseIP("192.168.10.4") config.PublicAddress = net.ParseIP("192.168.10.4")

View File

@ -113,6 +113,7 @@ func TestUpdateEtcdOverrides(t *testing.T) {
defaultConfig := storagebackend.Config{ defaultConfig := storagebackend.Config{
Prefix: options.DefaultEtcdPathPrefix, Prefix: options.DefaultEtcdPathPrefix,
ServerList: defaultEtcdLocation, ServerList: defaultEtcdLocation,
Copier: scheme,
} }
storageFactory := NewDefaultStorageFactory(defaultConfig, "", api.Codecs, NewDefaultResourceEncodingConfig(), NewResourceConfig()) storageFactory := NewDefaultStorageFactory(defaultConfig, "", api.Codecs, NewDefaultResourceEncodingConfig(), NewResourceConfig())
storageFactory.SetEtcdLocation(test.resource, test.servers) storageFactory.SetEtcdLocation(test.resource, test.servers)

View File

@ -59,7 +59,7 @@ import (
// setUp is a convience function for setting up for (most) tests. // setUp is a convience function for setting up for (most) tests.
func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.Assertions) { func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.Assertions) {
server, storageConfig := etcdtesting.NewUnsecuredEtcd3TestClientServer(t) server, storageConfig := etcdtesting.NewUnsecuredEtcd3TestClientServer(t, api.Scheme)
config := &Config{ config := &Config{
GenericConfig: genericapiserver.NewConfig(), GenericConfig: genericapiserver.NewConfig(),

View File

@ -38,7 +38,7 @@ import (
) )
func NewEtcdStorage(t *testing.T, group string) (*storagebackend.Config, *etcdtesting.EtcdTestServer) { func NewEtcdStorage(t *testing.T, group string) (*storagebackend.Config, *etcdtesting.EtcdTestServer) {
server, config := etcdtesting.NewUnsecuredEtcd3TestClientServer(t) server, config := etcdtesting.NewUnsecuredEtcd3TestClientServer(t, api.Scheme)
config.Codec = testapi.Groups[group].StorageCodec() config.Codec = testapi.Groups[group].StorageCodec()
return config, server return config, server
} }

View File

@ -18,7 +18,6 @@ go_library(
], ],
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//pkg/api:go_default_library",
"//pkg/storage/etcd/util:go_default_library", "//pkg/storage/etcd/util:go_default_library",
"//vendor:github.com/coreos/etcd/client", "//vendor:github.com/coreos/etcd/client",
"//vendor:github.com/golang/glog", "//vendor:github.com/golang/glog",
@ -46,22 +45,24 @@ go_test(
library = ":go_default_library", library = ":go_default_library",
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//pkg/api:go_default_library",
"//pkg/api/testapi:go_default_library",
"//pkg/api/testing:go_default_library",
"//pkg/storage/etcd/etcdtest:go_default_library", "//pkg/storage/etcd/etcdtest:go_default_library",
"//pkg/storage/etcd/testing:go_default_library", "//pkg/storage/etcd/testing:go_default_library",
"//pkg/storage/testing:go_default_library", "//pkg/storage/testing:go_default_library",
"//pkg/storage/tests:go_default_library",
"//vendor:github.com/coreos/etcd/client", "//vendor:github.com/coreos/etcd/client",
"//vendor:golang.org/x/net/context", "//vendor:golang.org/x/net/context",
"//vendor:k8s.io/apimachinery/pkg/api/equality", "//vendor:k8s.io/apimachinery/pkg/api/equality",
"//vendor:k8s.io/apimachinery/pkg/api/testing",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/conversion", "//vendor:k8s.io/apimachinery/pkg/conversion",
"//vendor:k8s.io/apimachinery/pkg/fields", "//vendor:k8s.io/apimachinery/pkg/fields",
"//vendor:k8s.io/apimachinery/pkg/labels", "//vendor:k8s.io/apimachinery/pkg/labels",
"//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
"//vendor:k8s.io/apimachinery/pkg/runtime/serializer", "//vendor:k8s.io/apimachinery/pkg/runtime/serializer",
"//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/apiserver/pkg/apis/example",
"//vendor:k8s.io/apiserver/pkg/apis/example/v1",
"//vendor:k8s.io/apiserver/pkg/storage", "//vendor:k8s.io/apiserver/pkg/storage",
], ],
) )

View File

@ -36,19 +36,18 @@ import (
"k8s.io/apiserver/pkg/storage/etcd/metrics" "k8s.io/apiserver/pkg/storage/etcd/metrics"
utilcache "k8s.io/apiserver/pkg/util/cache" utilcache "k8s.io/apiserver/pkg/util/cache"
utiltrace "k8s.io/apiserver/pkg/util/trace" utiltrace "k8s.io/apiserver/pkg/util/trace"
"k8s.io/kubernetes/pkg/api"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
) )
// Creates a new storage interface from the client // Creates a new storage interface from the client
// TODO: deprecate in favor of storage.Config abstraction over time // TODO: deprecate in favor of storage.Config abstraction over time
func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int) storage.Interface { func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int, copier runtime.ObjectCopier) storage.Interface {
return &etcdHelper{ return &etcdHelper{
etcdMembersAPI: etcd.NewMembersAPI(client), etcdMembersAPI: etcd.NewMembersAPI(client),
etcdKeysAPI: etcd.NewKeysAPI(client), etcdKeysAPI: etcd.NewKeysAPI(client),
codec: codec, codec: codec,
versioner: APIObjectVersioner{}, versioner: APIObjectVersioner{},
copier: api.Scheme, copier: copier,
pathPrefix: path.Join("/", prefix), pathPrefix: path.Join("/", prefix),
quorum: quorum, quorum: quorum,
cache: utilcache.NewCache(cacheSize), cache: utilcache.NewCache(cacheSize),

View File

@ -25,26 +25,30 @@ import (
etcd "github.com/coreos/etcd/client" etcd "github.com/coreos/etcd/client"
"golang.org/x/net/context" "golang.org/x/net/context"
apitesting "k8s.io/apimachinery/pkg/api/testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
apitesting "k8s.io/kubernetes/pkg/api/testing"
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest" "k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing" etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
storagetesting "k8s.io/kubernetes/pkg/storage/testing" storagetesting "k8s.io/kubernetes/pkg/storage/testing"
storagetests "k8s.io/kubernetes/pkg/storage/tests"
) )
func testScheme(t *testing.T) (*runtime.Scheme, runtime.Codec) { func testScheme(t *testing.T) (*runtime.Scheme, serializer.CodecFactory) {
scheme := runtime.NewScheme() scheme := runtime.NewScheme()
scheme.Log(t) scheme.Log(t)
scheme.AddKnownTypes(api.Registry.GroupOrDie(api.GroupName).GroupVersion, &storagetesting.TestResource{}) scheme.AddKnownTypes(schema.GroupVersion{Version: runtime.APIVersionInternal}, &storagetesting.TestResource{})
scheme.AddKnownTypes(testapi.Default.InternalGroupVersion(), &storagetesting.TestResource{}) scheme.AddKnownTypes(schema.GroupVersion{Version: runtime.APIVersionInternal}, &storagetesting.TestResource{})
example.AddToScheme(scheme)
examplev1.AddToScheme(scheme)
if err := scheme.AddConversionFuncs( if err := scheme.AddConversionFuncs(
func(in *storagetesting.TestResource, out *storagetesting.TestResource, s conversion.Scope) error { func(in *storagetesting.TestResource, out *storagetesting.TestResource, s conversion.Scope) error {
*out = *in *out = *in
@ -57,17 +61,17 @@ func testScheme(t *testing.T) (*runtime.Scheme, runtime.Codec) {
); err != nil { ); err != nil {
panic(err) panic(err)
} }
codec := serializer.NewCodecFactory(scheme).LegacyCodec(api.Registry.GroupOrDie(api.GroupName).GroupVersion) codecs := serializer.NewCodecFactory(scheme)
return scheme, codec return scheme, codecs
} }
func newEtcdHelper(client etcd.Client, codec runtime.Codec, prefix string) etcdHelper { func newEtcdHelper(client etcd.Client, scheme *runtime.Scheme, codec runtime.Codec, prefix string) etcdHelper {
return *NewEtcdStorage(client, codec, prefix, false, etcdtest.DeserializationCacheSize).(*etcdHelper) return *NewEtcdStorage(client, codec, prefix, false, etcdtest.DeserializationCacheSize, scheme).(*etcdHelper)
} }
// Returns an encoded version of api.Pod with the given name. // Returns an encoded version of example.Pod with the given name.
func getEncodedPod(name string) string { func getEncodedPod(name string, codec runtime.Codec) string {
pod, _ := runtime.Encode(testapi.Default.Codec(), &api.Pod{ pod, _ := runtime.Encode(codec, &examplev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: name}, ObjectMeta: metav1.ObjectMeta{Name: name},
}) })
return string(pod) return string(pod)
@ -81,9 +85,9 @@ func createObj(t *testing.T, helper etcdHelper, name string, obj, out runtime.Ob
return err return err
} }
func createPodList(t *testing.T, helper etcdHelper, list *api.PodList) error { func createPodList(t *testing.T, helper etcdHelper, list *example.PodList) error {
for i := range list.Items { for i := range list.Items {
returnedObj := &api.Pod{} returnedObj := &example.Pod{}
err := createObj(t, helper, list.Items[i].Name, &list.Items[i], returnedObj, 0) err := createObj(t, helper, list.Items[i].Name, &list.Items[i], returnedObj, 0)
if err != nil { if err != nil {
return err return err
@ -94,29 +98,31 @@ func createPodList(t *testing.T, helper etcdHelper, list *api.PodList) error {
} }
func TestList(t *testing.T) { func TestList(t *testing.T) {
scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
server := etcdtesting.NewEtcdTestClientServer(t) server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t) defer server.Terminate(t)
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix()) helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
list := api.PodList{ list := example.PodList{
Items: []api.Pod{ Items: []example.Pod{
{ {
ObjectMeta: metav1.ObjectMeta{Name: "bar"}, ObjectMeta: metav1.ObjectMeta{Name: "bar"},
Spec: apitesting.DeepEqualSafePodSpec(), Spec: storagetests.DeepEqualSafePodSpec(),
}, },
{ {
ObjectMeta: metav1.ObjectMeta{Name: "baz"}, ObjectMeta: metav1.ObjectMeta{Name: "baz"},
Spec: apitesting.DeepEqualSafePodSpec(), Spec: storagetests.DeepEqualSafePodSpec(),
}, },
{ {
ObjectMeta: metav1.ObjectMeta{Name: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: apitesting.DeepEqualSafePodSpec(), Spec: storagetests.DeepEqualSafePodSpec(),
}, },
}, },
} }
createPodList(t, helper, &list) createPodList(t, helper, &list)
var got api.PodList var got example.PodList
// TODO: a sorted filter function could be applied such implied // TODO: a sorted filter function could be applied such implied
// ordering on the returned list doesn't matter. // ordering on the returned list doesn't matter.
err := helper.List(context.TODO(), "/", "", storage.Everything, &got) err := helper.List(context.TODO(), "/", "", storage.Everything, &got)
@ -130,23 +136,25 @@ func TestList(t *testing.T) {
} }
func TestListFiltered(t *testing.T) { func TestListFiltered(t *testing.T) {
scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
server := etcdtesting.NewEtcdTestClientServer(t) server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t) defer server.Terminate(t)
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix()) helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
list := api.PodList{ list := example.PodList{
Items: []api.Pod{ Items: []example.Pod{
{ {
ObjectMeta: metav1.ObjectMeta{Name: "bar"}, ObjectMeta: metav1.ObjectMeta{Name: "bar"},
Spec: apitesting.DeepEqualSafePodSpec(), Spec: storagetests.DeepEqualSafePodSpec(),
}, },
{ {
ObjectMeta: metav1.ObjectMeta{Name: "baz"}, ObjectMeta: metav1.ObjectMeta{Name: "baz"},
Spec: apitesting.DeepEqualSafePodSpec(), Spec: storagetests.DeepEqualSafePodSpec(),
}, },
{ {
ObjectMeta: metav1.ObjectMeta{Name: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: apitesting.DeepEqualSafePodSpec(), Spec: storagetests.DeepEqualSafePodSpec(),
}, },
}, },
} }
@ -157,11 +165,11 @@ func TestListFiltered(t *testing.T) {
Label: labels.Everything(), Label: labels.Everything(),
Field: fields.SelectorFromSet(fields.Set{"metadata.name": "bar"}), Field: fields.SelectorFromSet(fields.Set{"metadata.name": "bar"}),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod := obj.(*api.Pod) pod := obj.(*example.Pod)
return labels.Set(pod.Labels), fields.Set{"metadata.name": pod.Name}, nil return labels.Set(pod.Labels), fields.Set{"metadata.name": pod.Name}, nil
}, },
} }
var got api.PodList var got example.PodList
err := helper.List(context.TODO(), "/", "", p, &got) err := helper.List(context.TODO(), "/", "", p, &got)
if err != nil { if err != nil {
t.Errorf("Unexpected error %v", err) t.Errorf("Unexpected error %v", err)
@ -174,31 +182,33 @@ func TestListFiltered(t *testing.T) {
// TestListAcrossDirectories ensures that the client excludes directories and flattens tree-response - simulates cross-namespace query // TestListAcrossDirectories ensures that the client excludes directories and flattens tree-response - simulates cross-namespace query
func TestListAcrossDirectories(t *testing.T) { func TestListAcrossDirectories(t *testing.T) {
scheme, codecs := testScheme(t)
server := etcdtesting.NewEtcdTestClientServer(t) server := etcdtesting.NewEtcdTestClientServer(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
defer server.Terminate(t) defer server.Terminate(t)
roothelper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix()) roothelper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
helper1 := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix()+"/dir1") helper1 := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()+"/dir1")
helper2 := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix()+"/dir2") helper2 := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()+"/dir2")
list := api.PodList{ list := example.PodList{
Items: []api.Pod{ Items: []example.Pod{
{ {
ObjectMeta: metav1.ObjectMeta{Name: "baz"}, ObjectMeta: metav1.ObjectMeta{Name: "baz"},
Spec: apitesting.DeepEqualSafePodSpec(), Spec: storagetests.DeepEqualSafePodSpec(),
}, },
{ {
ObjectMeta: metav1.ObjectMeta{Name: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: apitesting.DeepEqualSafePodSpec(), Spec: storagetests.DeepEqualSafePodSpec(),
}, },
{ {
ObjectMeta: metav1.ObjectMeta{Name: "bar"}, ObjectMeta: metav1.ObjectMeta{Name: "bar"},
Spec: apitesting.DeepEqualSafePodSpec(), Spec: storagetests.DeepEqualSafePodSpec(),
}, },
}, },
} }
returnedObj := &api.Pod{} returnedObj := &example.Pod{}
// create the 1st 2 elements in one directory // create the 1st 2 elements in one directory
createObj(t, helper1, list.Items[0].Name, &list.Items[0], returnedObj, 0) createObj(t, helper1, list.Items[0].Name, &list.Items[0], returnedObj, 0)
list.Items[0] = *returnedObj list.Items[0] = *returnedObj
@ -208,7 +218,7 @@ func TestListAcrossDirectories(t *testing.T) {
createObj(t, helper2, list.Items[2].Name, &list.Items[2], returnedObj, 0) createObj(t, helper2, list.Items[2].Name, &list.Items[2], returnedObj, 0)
list.Items[2] = *returnedObj list.Items[2] = *returnedObj
var got api.PodList var got example.PodList
err := roothelper.List(context.TODO(), "/", "", storage.Everything, &got) err := roothelper.List(context.TODO(), "/", "", storage.Everything, &got)
if err != nil { if err != nil {
t.Errorf("Unexpected error %v", err) t.Errorf("Unexpected error %v", err)
@ -219,15 +229,17 @@ func TestListAcrossDirectories(t *testing.T) {
} }
func TestGet(t *testing.T) { func TestGet(t *testing.T) {
scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
server := etcdtesting.NewEtcdTestClientServer(t) server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t) defer server.Terminate(t)
key := "/some/key" key := "/some/key"
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix()) helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
expect := api.Pod{ expect := example.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: apitesting.DeepEqualSafePodSpec(), Spec: storagetests.DeepEqualSafePodSpec(),
} }
var got api.Pod var got example.Pod
if err := helper.Create(context.TODO(), key, &expect, &got, 0); err != nil { if err := helper.Create(context.TODO(), key, &expect, &got, 0); err != nil {
t.Errorf("Unexpected error %#v", err) t.Errorf("Unexpected error %#v", err)
} }
@ -241,12 +253,14 @@ func TestGet(t *testing.T) {
} }
func TestGetNotFoundErr(t *testing.T) { func TestGetNotFoundErr(t *testing.T) {
scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: "v1"})
server := etcdtesting.NewEtcdTestClientServer(t) server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t) defer server.Terminate(t)
boguskey := "/some/boguskey" boguskey := "/some/boguskey"
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix()) helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
var got api.Pod var got example.Pod
err := helper.Get(context.TODO(), boguskey, "", &got, false) err := helper.Get(context.TODO(), boguskey, "", &got, false)
if !storage.IsNotFound(err) { if !storage.IsNotFound(err) {
t.Errorf("Unexpected reponse on key=%v, err=%v", boguskey, err) t.Errorf("Unexpected reponse on key=%v, err=%v", boguskey, err)
@ -254,16 +268,18 @@ func TestGetNotFoundErr(t *testing.T) {
} }
func TestCreate(t *testing.T) { func TestCreate(t *testing.T) {
obj := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
server := etcdtesting.NewEtcdTestClientServer(t) server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t) defer server.Terminate(t)
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix()) helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
returnedObj := &api.Pod{} returnedObj := &example.Pod{}
err := helper.Create(context.TODO(), "/some/key", obj, returnedObj, 5) err := helper.Create(context.TODO(), "/some/key", obj, returnedObj, 5)
if err != nil { if err != nil {
t.Errorf("Unexpected error %#v", err) t.Errorf("Unexpected error %#v", err)
} }
_, err = runtime.Encode(testapi.Default.Codec(), obj) _, err = runtime.Encode(codec, obj)
if err != nil { if err != nil {
t.Errorf("Unexpected error %#v", err) t.Errorf("Unexpected error %#v", err)
} }
@ -271,7 +287,7 @@ func TestCreate(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("Unexpected error %#v", err) t.Errorf("Unexpected error %#v", err)
} }
_, err = runtime.Encode(testapi.Default.Codec(), returnedObj) _, err = runtime.Encode(codec, returnedObj)
if err != nil { if err != nil {
t.Errorf("Unexpected error %#v", err) t.Errorf("Unexpected error %#v", err)
} }
@ -281,10 +297,12 @@ func TestCreate(t *testing.T) {
} }
func TestCreateNilOutParam(t *testing.T) { func TestCreateNilOutParam(t *testing.T) {
obj := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
server := etcdtesting.NewEtcdTestClientServer(t) server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t) defer server.Terminate(t)
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix()) helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
err := helper.Create(context.TODO(), "/some/key", obj, nil, 5) err := helper.Create(context.TODO(), "/some/key", obj, nil, 5)
if err != nil { if err != nil {
t.Errorf("Unexpected error %#v", err) t.Errorf("Unexpected error %#v", err)
@ -292,11 +310,12 @@ func TestCreateNilOutParam(t *testing.T) {
} }
func TestGuaranteedUpdate(t *testing.T) { func TestGuaranteedUpdate(t *testing.T) {
_, codec := testScheme(t) scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal})
server := etcdtesting.NewEtcdTestClientServer(t) server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t) defer server.Terminate(t)
key := "/some/key" key := "/some/key"
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1} obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1}
err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
@ -337,11 +356,12 @@ func TestGuaranteedUpdate(t *testing.T) {
} }
func TestGuaranteedUpdateNoChange(t *testing.T) { func TestGuaranteedUpdateNoChange(t *testing.T) {
_, codec := testScheme(t) scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal})
server := etcdtesting.NewEtcdTestClientServer(t) server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t) defer server.Terminate(t)
key := "/some/key" key := "/some/key"
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1} obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1}
err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
@ -367,11 +387,12 @@ func TestGuaranteedUpdateNoChange(t *testing.T) {
} }
func TestGuaranteedUpdateKeyNotFound(t *testing.T) { func TestGuaranteedUpdateKeyNotFound(t *testing.T) {
_, codec := testScheme(t) scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal})
server := etcdtesting.NewEtcdTestClientServer(t) server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t) defer server.Terminate(t)
key := "/some/key" key := "/some/key"
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
// Create a new node. // Create a new node.
obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1} obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1}
@ -394,11 +415,12 @@ func TestGuaranteedUpdateKeyNotFound(t *testing.T) {
} }
func TestGuaranteedUpdate_CreateCollision(t *testing.T) { func TestGuaranteedUpdate_CreateCollision(t *testing.T) {
_, codec := testScheme(t) scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal})
server := etcdtesting.NewEtcdTestClientServer(t) server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t) defer server.Terminate(t)
key := "/some/key" key := "/some/key"
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
const concurrency = 10 const concurrency = 10
var wgDone sync.WaitGroup var wgDone sync.WaitGroup
@ -443,13 +465,15 @@ func TestGuaranteedUpdate_CreateCollision(t *testing.T) {
} }
func TestGuaranteedUpdateUIDMismatch(t *testing.T) { func TestGuaranteedUpdateUIDMismatch(t *testing.T) {
scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
server := etcdtesting.NewEtcdTestClientServer(t) server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t) defer server.Terminate(t)
prefix := path.Join("/", etcdtest.PathPrefix()) prefix := path.Join("/", etcdtest.PathPrefix())
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), prefix) helper := newEtcdHelper(server.Client, scheme, codec, prefix)
obj := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}} obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}
podPtr := &api.Pod{} podPtr := &example.Pod{}
err := helper.Create(context.TODO(), "/some/key", obj, podPtr, 0) err := helper.Create(context.TODO(), "/some/key", obj, podPtr, 0)
if err != nil { if err != nil {
t.Fatalf("Unexpected error %#v", err) t.Fatalf("Unexpected error %#v", err)
@ -463,13 +487,15 @@ func TestGuaranteedUpdateUIDMismatch(t *testing.T) {
} }
func TestDeleteUIDMismatch(t *testing.T) { func TestDeleteUIDMismatch(t *testing.T) {
scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
server := etcdtesting.NewEtcdTestClientServer(t) server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t) defer server.Terminate(t)
prefix := path.Join("/", etcdtest.PathPrefix()) prefix := path.Join("/", etcdtest.PathPrefix())
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), prefix) helper := newEtcdHelper(server.Client, scheme, codec, prefix)
obj := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}} obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}
podPtr := &api.Pod{} podPtr := &example.Pod{}
err := helper.Create(context.TODO(), "/some/key", obj, podPtr, 0) err := helper.Create(context.TODO(), "/some/key", obj, podPtr, 0)
if err != nil { if err != nil {
t.Fatalf("Unexpected error %#v", err) t.Fatalf("Unexpected error %#v", err)
@ -503,23 +529,25 @@ func (f *fakeDeleteKeysAPI) Get(ctx context.Context, key string, opts *etcd.GetO
// deletion yet. Etcd will fail the deletion and report the conflict. etcdHelper // deletion yet. Etcd will fail the deletion and report the conflict. etcdHelper
// should retry until there is no conflict. // should retry until there is no conflict.
func TestDeleteWithRetry(t *testing.T) { func TestDeleteWithRetry(t *testing.T) {
scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
server := etcdtesting.NewEtcdTestClientServer(t) server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t) defer server.Terminate(t)
prefix := path.Join("/", etcdtest.PathPrefix()) prefix := path.Join("/", etcdtest.PathPrefix())
obj := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}} obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}
// fakeGet returns a large ModifiedIndex to emulate the case that another // fakeGet returns a large ModifiedIndex to emulate the case that another
// party has updated the object. // party has updated the object.
fakeGet := func(ctx context.Context, key string, opts *etcd.GetOptions) (*etcd.Response, error) { fakeGet := func(ctx context.Context, key string, opts *etcd.GetOptions) (*etcd.Response, error) {
data, _ := runtime.Encode(testapi.Default.Codec(), obj) data, _ := runtime.Encode(codec, obj)
return &etcd.Response{Node: &etcd.Node{Value: string(data), ModifiedIndex: 99}}, nil return &etcd.Response{Node: &etcd.Node{Value: string(data), ModifiedIndex: 99}}, nil
} }
expectedRetries := 3 expectedRetries := 3
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), prefix) helper := newEtcdHelper(server.Client, scheme, codec, prefix)
fake := &fakeDeleteKeysAPI{KeysAPI: helper.etcdKeysAPI, fakeGetCap: expectedRetries, fakeGetFunc: fakeGet} fake := &fakeDeleteKeysAPI{KeysAPI: helper.etcdKeysAPI, fakeGetCap: expectedRetries, fakeGetFunc: fakeGet}
helper.etcdKeysAPI = fake helper.etcdKeysAPI = fake
returnedObj := &api.Pod{} returnedObj := &example.Pod{}
err := helper.Create(context.TODO(), "/some/key", obj, returnedObj, 0) err := helper.Create(context.TODO(), "/some/key", obj, returnedObj, 0)
if err != nil { if err != nil {
t.Errorf("Unexpected error %#v", err) t.Errorf("Unexpected error %#v", err)

View File

@ -21,12 +21,14 @@ import (
"testing" "testing"
apiequality "k8s.io/apimachinery/pkg/api/equality" apiequality "k8s.io/apimachinery/pkg/api/equality"
apitesting "k8s.io/apimachinery/pkg/api/testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest" "k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing" etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
@ -49,11 +51,12 @@ func (f *fakeEtcdCache) addToCache(index uint64, obj runtime.Object) {
var _ etcdCache = &fakeEtcdCache{} var _ etcdCache = &fakeEtcdCache{}
func TestWatchInterpretations(t *testing.T) { func TestWatchInterpretations(t *testing.T) {
codec := testapi.Default.Codec() _, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
// Declare some pods to make the test cases compact. // Declare some pods to make the test cases compact.
podFoo := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} podFoo := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
podBar := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}} podBar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
podBaz := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz"}} podBaz := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz"}}
// All of these test cases will be run with the firstLetterIsB Filter. // All of these test cases will be run with the firstLetterIsB Filter.
table := map[string]struct { table := map[string]struct {
@ -126,7 +129,7 @@ func TestWatchInterpretations(t *testing.T) {
}, },
} }
firstLetterIsB := func(obj runtime.Object) bool { firstLetterIsB := func(obj runtime.Object) bool {
return obj.(*api.Pod).Name[0] == 'b' return obj.(*example.Pod).Name[0] == 'b'
} }
for name, item := range table { for name, item := range table {
for _, action := range item.actions { for _, action := range item.actions {
@ -168,7 +171,8 @@ func TestWatchInterpretations(t *testing.T) {
} }
func TestWatchInterpretation_ResponseNotSet(t *testing.T) { func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
_, codec := testScheme(t) _, codecs := testScheme(t)
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, &fakeEtcdCache{}) w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, &fakeEtcdCache{})
w.emit = func(e watch.Event) { w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e) t.Errorf("Unexpected emit: %v", e)
@ -181,7 +185,8 @@ func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
} }
func TestWatchInterpretation_ResponseNoNode(t *testing.T) { func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
_, codec := testScheme(t) _, codecs := testScheme(t)
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
actions := []string{"create", "set", "compareAndSwap", "delete"} actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions { for _, action := range actions {
w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, &fakeEtcdCache{}) w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, &fakeEtcdCache{})
@ -196,7 +201,8 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
} }
func TestWatchInterpretation_ResponseBadData(t *testing.T) { func TestWatchInterpretation_ResponseBadData(t *testing.T) {
_, codec := testScheme(t) _, codecs := testScheme(t)
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
actions := []string{"create", "set", "compareAndSwap", "delete"} actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions { for _, action := range actions {
w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, &fakeEtcdCache{}) w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, &fakeEtcdCache{})
@ -220,9 +226,10 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
} }
func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) { func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) {
codec := testapi.Default.Codec() _, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
filter := func(obj runtime.Object) bool { filter := func(obj runtime.Object) bool {
return obj.(*api.Pod).Name != "bar" return obj.(*example.Pod).Name != "bar"
} }
w := newEtcdWatcher(false, false, nil, filter, codec, versioner, nil, &fakeEtcdCache{}) w := newEtcdWatcher(false, false, nil, filter, codec, versioner, nil, &fakeEtcdCache{})
@ -231,8 +238,8 @@ func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) {
eventChan <- e eventChan <- e
} }
fooPod := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} fooPod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
barPod := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}} barPod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
fooBytes, err := runtime.Encode(codec, fooPod) fooBytes, err := runtime.Encode(codec, fooPod)
if err != nil { if err != nil {
t.Fatalf("Encode failed: %v", err) t.Fatalf("Encode failed: %v", err)
@ -279,7 +286,7 @@ func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) {
t.Errorf("#%d: event type want=Deleted, get=%s", i, ev.Type) t.Errorf("#%d: event type want=Deleted, get=%s", i, ev.Type)
return return
} }
rv := ev.Object.(*api.Pod).ResourceVersion rv := ev.Object.(*example.Pod).ResourceVersion
if rv != tt.expRV { if rv != tt.expRV {
t.Errorf("#%d: resource version want=%s, get=%s", i, tt.expRV, rv) t.Errorf("#%d: resource version want=%s, get=%s", i, tt.expRV, rv)
} }
@ -288,11 +295,12 @@ func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) {
} }
func TestWatch(t *testing.T) { func TestWatch(t *testing.T) {
codec := testapi.Default.Codec() scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
server := etcdtesting.NewEtcdTestClientServer(t) server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t) defer server.Terminate(t)
key := "/some/key" key := "/some/key"
h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) h := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything) watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)
if err != nil { if err != nil {
@ -301,8 +309,8 @@ func TestWatch(t *testing.T) {
// watching is explicitly closed below. // watching is explicitly closed below.
// Test normal case // Test normal case
pod := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
returnObj := &api.Pod{} returnObj := &example.Pod{}
err = h.Create(context.TODO(), key, pod, returnObj, 0) err = h.Create(context.TODO(), key, pod, returnObj, 0)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
@ -330,36 +338,25 @@ func TestWatch(t *testing.T) {
} }
} }
func emptySubsets() []api.EndpointSubset {
return []api.EndpointSubset{}
}
func makeSubsets(ip string, port int) []api.EndpointSubset {
return []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: ip}},
Ports: []api.EndpointPort{{Port: int32(port)}},
}}
}
func TestWatchEtcdState(t *testing.T) { func TestWatchEtcdState(t *testing.T) {
codec := testapi.Default.Codec() scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
key := "/somekey/foo" key := "/somekey/foo"
server := etcdtesting.NewEtcdTestClientServer(t) server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t) defer server.Terminate(t)
h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) h := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything) watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
defer watching.Stop() defer watching.Stop()
endpoint := &api.Endpoints{ pod := &example.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Subsets: emptySubsets(),
} }
err = h.Create(context.TODO(), key, endpoint, endpoint, 0) err = h.Create(context.TODO(), key, pod, pod, 0)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -369,20 +366,21 @@ func TestWatchEtcdState(t *testing.T) {
t.Errorf("Unexpected event %#v", event) t.Errorf("Unexpected event %#v", event)
} }
subset := makeSubsets("127.0.0.1", 9000) pod.ResourceVersion = ""
endpoint.Subsets = subset pod.Status = example.PodStatus{
endpoint.ResourceVersion = "" Phase: example.PodPhase("Running"),
}
// CAS the previous value // CAS the previous value
updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
newObj, err := api.Scheme.DeepCopy(endpoint) newObj, err := scheme.DeepCopy(pod)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
return nil, nil, err return nil, nil, err
} }
return newObj.(*api.Endpoints), nil, nil return newObj.(*example.Pod), nil, nil
} }
err = h.GuaranteedUpdate(context.TODO(), key, &api.Endpoints{}, false, nil, updateFn) err = h.GuaranteedUpdate(context.TODO(), key, &example.Pod{}, false, nil, updateFn)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -392,20 +390,21 @@ func TestWatchEtcdState(t *testing.T) {
t.Errorf("Unexpected event %#v", event) t.Errorf("Unexpected event %#v", event)
} }
if e, a := endpoint, event.Object; !apiequality.Semantic.DeepDerivative(e, a) { if e, a := pod, event.Object; !apiequality.Semantic.DeepDerivative(e, a) {
t.Errorf("Unexpected error: expected %#v, got %#v", e, a) t.Errorf("Unexpected error: expected %#v, got %#v", e, a)
} }
} }
func TestWatchFromZeroIndex(t *testing.T) { func TestWatchFromZeroIndex(t *testing.T) {
codec := testapi.Default.Codec() scheme, codecs := testScheme(t)
pod := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
key := "/somekey/foo" key := "/somekey/foo"
server := etcdtesting.NewEtcdTestClientServer(t) server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t) defer server.Terminate(t)
h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) h := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
// set before the watch and verify events // set before the watch and verify events
err := h.Create(context.TODO(), key, pod, pod, 0) err := h.Create(context.TODO(), key, pod, pod, 0)
@ -428,11 +427,11 @@ func TestWatchFromZeroIndex(t *testing.T) {
// check for concatenation on watch event with CAS // check for concatenation on watch event with CAS
updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
pod := input.(*api.Pod) pod := input.(*example.Pod)
pod.Name = "bar" pod.Name = "bar"
return pod, nil, nil return pod, nil, nil
} }
err = h.GuaranteedUpdate(context.TODO(), key, &api.Pod{}, false, nil, updateFn) err = h.GuaranteedUpdate(context.TODO(), key, &example.Pod{}, false, nil, updateFn)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -451,11 +450,11 @@ func TestWatchFromZeroIndex(t *testing.T) {
pod.Name = "baz" pod.Name = "baz"
updateFn = func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { updateFn = func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
pod := input.(*api.Pod) pod := input.(*example.Pod)
pod.Name = "baz" pod.Name = "baz"
return pod, nil, nil return pod, nil, nil
} }
err = h.GuaranteedUpdate(context.TODO(), key, &api.Pod{}, false, nil, updateFn) err = h.GuaranteedUpdate(context.TODO(), key, &example.Pod{}, false, nil, updateFn)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -471,11 +470,12 @@ func TestWatchFromZeroIndex(t *testing.T) {
} }
func TestWatchListFromZeroIndex(t *testing.T) { func TestWatchListFromZeroIndex(t *testing.T) {
codec := testapi.Default.Codec() scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
prefix := "/some/key" prefix := "/some/key"
server := etcdtesting.NewEtcdTestClientServer(t) server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t) defer server.Terminate(t)
h := newEtcdHelper(server.Client, codec, prefix) h := newEtcdHelper(server.Client, scheme, codec, prefix)
watching, err := h.WatchList(context.TODO(), "/", "0", storage.Everything) watching, err := h.WatchList(context.TODO(), "/", "0", storage.Everything)
if err != nil { if err != nil {
@ -484,7 +484,7 @@ func TestWatchListFromZeroIndex(t *testing.T) {
defer watching.Stop() defer watching.Stop()
// creates foo which should trigger the WatchList for "/" // creates foo which should trigger the WatchList for "/"
pod := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
err = h.Create(context.TODO(), pod.Name, pod, pod, 0) err = h.Create(context.TODO(), pod.Name, pod, pod, 0)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
@ -501,12 +501,13 @@ func TestWatchListFromZeroIndex(t *testing.T) {
} }
func TestWatchListIgnoresRootKey(t *testing.T) { func TestWatchListIgnoresRootKey(t *testing.T) {
codec := testapi.Default.Codec() scheme, codecs := testScheme(t)
pod := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
key := "/some/key" key := "/some/key"
server := etcdtesting.NewEtcdTestClientServer(t) server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t) defer server.Terminate(t)
h := newEtcdHelper(server.Client, codec, key) h := newEtcdHelper(server.Client, scheme, codec, key)
watching, err := h.WatchList(context.TODO(), key, "0", storage.Everything) watching, err := h.WatchList(context.TODO(), key, "0", storage.Everything)
if err != nil { if err != nil {
@ -532,11 +533,12 @@ func TestWatchListIgnoresRootKey(t *testing.T) {
} }
func TestWatchPurposefulShutdown(t *testing.T) { func TestWatchPurposefulShutdown(t *testing.T) {
_, codec := testScheme(t) scheme, codecs := testScheme(t)
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
server := etcdtesting.NewEtcdTestClientServer(t) server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t) defer server.Terminate(t)
key := "/some/key" key := "/some/key"
h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) h := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
// Test purposeful shutdown // Test purposeful shutdown
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything) watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)

View File

@ -24,6 +24,7 @@ go_library(
"//vendor:github.com/coreos/etcd/pkg/types", "//vendor:github.com/coreos/etcd/pkg/types",
"//vendor:github.com/golang/glog", "//vendor:github.com/golang/glog",
"//vendor:golang.org/x/net/context", "//vendor:golang.org/x/net/context",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apiserver/pkg/storage/storagebackend", "//vendor:k8s.io/apiserver/pkg/storage/storagebackend",
], ],

View File

@ -27,6 +27,7 @@ import (
"testing" "testing"
"time" "time"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest" "k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
@ -310,7 +311,7 @@ func NewUnsecuredEtcdTestClientServer(t *testing.T) *EtcdTestServer {
} }
// NewEtcd3TestClientServer creates a new client and server for testing // NewEtcd3TestClientServer creates a new client and server for testing
func NewUnsecuredEtcd3TestClientServer(t *testing.T) (*EtcdTestServer, *storagebackend.Config) { func NewUnsecuredEtcd3TestClientServer(t *testing.T, scheme *runtime.Scheme) (*EtcdTestServer, *storagebackend.Config) {
server := &EtcdTestServer{ server := &EtcdTestServer{
v3Cluster: integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}), v3Cluster: integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}),
} }
@ -320,6 +321,7 @@ func NewUnsecuredEtcd3TestClientServer(t *testing.T) (*EtcdTestServer, *storageb
Prefix: etcdtest.PathPrefix(), Prefix: etcdtest.PathPrefix(),
ServerList: server.V3Client.Endpoints(), ServerList: server.V3Client.Endpoints(),
DeserializationCacheSize: etcdtest.DeserializationCacheSize, DeserializationCacheSize: etcdtest.DeserializationCacheSize,
Copier: scheme,
} }
return server, config return server, config
} }

View File

@ -44,19 +44,21 @@ go_test(
library = ":go_default_library", library = ":go_default_library",
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//pkg/api:go_default_library",
"//pkg/api/testapi:go_default_library",
"//vendor:github.com/coreos/etcd/clientv3", "//vendor:github.com/coreos/etcd/clientv3",
"//vendor:github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes", "//vendor:github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes",
"//vendor:github.com/coreos/etcd/integration", "//vendor:github.com/coreos/etcd/integration",
"//vendor:golang.org/x/net/context", "//vendor:golang.org/x/net/context",
"//vendor:k8s.io/apimachinery/pkg/api/testing",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/fields", "//vendor:k8s.io/apimachinery/pkg/fields",
"//vendor:k8s.io/apimachinery/pkg/labels", "//vendor:k8s.io/apimachinery/pkg/labels",
"//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/runtime/schema", "//vendor:k8s.io/apimachinery/pkg/runtime/schema",
"//vendor:k8s.io/apimachinery/pkg/runtime/serializer",
"//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/apiserver/pkg/apis/example",
"//vendor:k8s.io/apiserver/pkg/apis/example/v1",
"//vendor:k8s.io/apiserver/pkg/storage", "//vendor:k8s.io/apiserver/pkg/storage",
], ],
) )

View File

@ -22,27 +22,38 @@ import (
"sync" "sync"
"testing" "testing"
apitesting "k8s.io/apimachinery/pkg/api/testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"github.com/coreos/etcd/integration" "github.com/coreos/etcd/integration"
"golang.org/x/net/context" "golang.org/x/net/context"
"k8s.io/apimachinery/pkg/watch"
) )
var scheme = runtime.NewScheme()
var codecs = serializer.NewCodecFactory(scheme)
func init() {
metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
example.AddToScheme(scheme)
examplev1.AddToScheme(scheme)
}
func TestCreate(t *testing.T) { func TestCreate(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
etcdClient := cluster.RandClient() etcdClient := cluster.RandClient()
key := "/testkey" key := "/testkey"
out := &api.Pod{} out := &example.Pod{}
obj := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
// verify that kv pair is empty before set // verify that kv pair is empty before set
getResp, err := etcdClient.KV.Get(ctx, key) getResp, err := etcdClient.KV.Get(ctx, key)
@ -79,10 +90,10 @@ func TestCreateWithTTL(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
input := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
key := "/somekey" key := "/somekey"
out := &api.Pod{} out := &example.Pod{}
if err := store.Create(ctx, key, input, out, 1); err != nil { if err := store.Create(ctx, key, input, out, 1); err != nil {
t.Fatalf("Create failed: %v", err) t.Fatalf("Create failed: %v", err)
} }
@ -97,9 +108,9 @@ func TestCreateWithTTL(t *testing.T) {
func TestCreateWithKeyExist(t *testing.T) { func TestCreateWithKeyExist(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
obj := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
key, _ := testPropogateStore(ctx, t, store, obj) key, _ := testPropogateStore(ctx, t, store, obj)
out := &api.Pod{} out := &example.Pod{}
err := store.Create(ctx, key, obj, out, 0) err := store.Create(ctx, key, obj, out, 0)
if err == nil || !storage.IsNodeExist(err) { if err == nil || !storage.IsNodeExist(err) {
t.Errorf("expecting key exists error, but get: %s", err) t.Errorf("expecting key exists error, but get: %s", err)
@ -109,13 +120,13 @@ func TestCreateWithKeyExist(t *testing.T) {
func TestGet(t *testing.T) { func TestGet(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
key, storedObj := testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
tests := []struct { tests := []struct {
key string key string
ignoreNotFound bool ignoreNotFound bool
expectNotFoundErr bool expectNotFoundErr bool
expectedOut *api.Pod expectedOut *example.Pod
}{{ // test get on existing item }{{ // test get on existing item
key: key, key: key,
ignoreNotFound: false, ignoreNotFound: false,
@ -129,11 +140,11 @@ func TestGet(t *testing.T) {
key: "/non-existing", key: "/non-existing",
ignoreNotFound: true, ignoreNotFound: true,
expectNotFoundErr: false, expectNotFoundErr: false,
expectedOut: &api.Pod{}, expectedOut: &example.Pod{},
}} }}
for i, tt := range tests { for i, tt := range tests {
out := &api.Pod{} out := &example.Pod{}
err := store.Get(ctx, tt.key, "", out, tt.ignoreNotFound) err := store.Get(ctx, tt.key, "", out, tt.ignoreNotFound)
if tt.expectNotFoundErr { if tt.expectNotFoundErr {
if err == nil || !storage.IsNotFound(err) { if err == nil || !storage.IsNotFound(err) {
@ -153,11 +164,11 @@ func TestGet(t *testing.T) {
func TestUnconditionalDelete(t *testing.T) { func TestUnconditionalDelete(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
key, storedObj := testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
tests := []struct { tests := []struct {
key string key string
expectedObj *api.Pod expectedObj *example.Pod
expectNotFoundErr bool expectNotFoundErr bool
}{{ // test unconditional delete on existing key }{{ // test unconditional delete on existing key
key: key, key: key,
@ -170,7 +181,7 @@ func TestUnconditionalDelete(t *testing.T) {
}} }}
for i, tt := range tests { for i, tt := range tests {
out := &api.Pod{} // reset out := &example.Pod{} // reset
err := store.Delete(ctx, tt.key, out, nil) err := store.Delete(ctx, tt.key, out, nil)
if tt.expectNotFoundErr { if tt.expectNotFoundErr {
if err == nil || !storage.IsNotFound(err) { if err == nil || !storage.IsNotFound(err) {
@ -190,7 +201,7 @@ func TestUnconditionalDelete(t *testing.T) {
func TestConditionalDelete(t *testing.T) { func TestConditionalDelete(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
key, storedObj := testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
tests := []struct { tests := []struct {
precondition *storage.Preconditions precondition *storage.Preconditions
@ -204,7 +215,7 @@ func TestConditionalDelete(t *testing.T) {
}} }}
for i, tt := range tests { for i, tt := range tests {
out := &api.Pod{} out := &example.Pod{}
err := store.Delete(ctx, key, out, tt.precondition) err := store.Delete(ctx, key, out, tt.precondition)
if tt.expectInvalidObjErr { if tt.expectInvalidObjErr {
if err == nil || !storage.IsInvalidObj(err) { if err == nil || !storage.IsInvalidObj(err) {
@ -218,23 +229,23 @@ func TestConditionalDelete(t *testing.T) {
if !reflect.DeepEqual(storedObj, out) { if !reflect.DeepEqual(storedObj, out) {
t.Errorf("#%d: pod want=%#v, get=%#v", i, storedObj, out) t.Errorf("#%d: pod want=%#v, get=%#v", i, storedObj, out)
} }
key, storedObj = testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}) key, storedObj = testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
} }
} }
func TestGetToList(t *testing.T) { func TestGetToList(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
key, storedObj := testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
tests := []struct { tests := []struct {
key string key string
pred storage.SelectionPredicate pred storage.SelectionPredicate
expectedOut []*api.Pod expectedOut []*example.Pod
}{{ // test GetToList on existing key }{{ // test GetToList on existing key
key: key, key: key,
pred: storage.Everything, pred: storage.Everything,
expectedOut: []*api.Pod{storedObj}, expectedOut: []*example.Pod{storedObj},
}, { // test GetToList on non-existing key }, { // test GetToList on non-existing key
key: "/non-existing", key: "/non-existing",
pred: storage.Everything, pred: storage.Everything,
@ -245,7 +256,7 @@ func TestGetToList(t *testing.T) {
Label: labels.Everything(), Label: labels.Everything(),
Field: fields.ParseSelectorOrDie("metadata.name!=" + storedObj.Name), Field: fields.ParseSelectorOrDie("metadata.name!=" + storedObj.Name),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod := obj.(*api.Pod) pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, nil return nil, fields.Set{"metadata.name": pod.Name}, nil
}, },
}, },
@ -253,7 +264,7 @@ func TestGetToList(t *testing.T) {
}} }}
for i, tt := range tests { for i, tt := range tests {
out := &api.PodList{} out := &example.PodList{}
err := store.GetToList(ctx, tt.key, "", tt.pred, out) err := store.GetToList(ctx, tt.key, "", tt.pred, out)
if err != nil { if err != nil {
t.Fatalf("GetToList failed: %v", err) t.Fatalf("GetToList failed: %v", err)
@ -274,7 +285,7 @@ func TestGetToList(t *testing.T) {
func TestGuaranteedUpdate(t *testing.T) { func TestGuaranteedUpdate(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
key, storeObj := testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}) key, storeObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
tests := []struct { tests := []struct {
key string key string
@ -328,7 +339,7 @@ func TestGuaranteedUpdate(t *testing.T) {
}} }}
for i, tt := range tests { for i, tt := range tests {
out := &api.Pod{} out := &example.Pod{}
name := fmt.Sprintf("foo-%d", i) name := fmt.Sprintf("foo-%d", i)
if tt.expectNoUpdate { if tt.expectNoUpdate {
name = storeObj.Name name = storeObj.Name
@ -337,7 +348,7 @@ func TestGuaranteedUpdate(t *testing.T) {
err := store.GuaranteedUpdate(ctx, tt.key, out, tt.ignoreNotFound, tt.precondition, err := store.GuaranteedUpdate(ctx, tt.key, out, tt.ignoreNotFound, tt.precondition,
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
if tt.expectNotFoundErr && tt.ignoreNotFound { if tt.expectNotFoundErr && tt.ignoreNotFound {
if pod := obj.(*api.Pod); pod.Name != "" { if pod := obj.(*example.Pod); pod.Name != "" {
t.Errorf("#%d: expecting zero value, but get=%#v", i, pod) t.Errorf("#%d: expecting zero value, but get=%#v", i, pod)
} }
} }
@ -382,10 +393,10 @@ func TestGuaranteedUpdateWithTTL(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
input := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
key := "/somekey" key := "/somekey"
out := &api.Pod{} out := &example.Pod{}
err := store.GuaranteedUpdate(ctx, key, out, true, nil, err := store.GuaranteedUpdate(ctx, key, out, true, nil,
func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) { func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) {
ttl := uint64(1) ttl := uint64(1)
@ -405,7 +416,7 @@ func TestGuaranteedUpdateWithTTL(t *testing.T) {
func TestGuaranteedUpdateWithConflict(t *testing.T) { func TestGuaranteedUpdateWithConflict(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
key, _ := testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) key, _ := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
errChan := make(chan error, 1) errChan := make(chan error, 1)
var firstToFinish sync.WaitGroup var firstToFinish sync.WaitGroup
@ -414,9 +425,9 @@ func TestGuaranteedUpdateWithConflict(t *testing.T) {
secondToEnter.Add(1) secondToEnter.Add(1)
go func() { go func() {
err := store.GuaranteedUpdate(ctx, key, &api.Pod{}, false, nil, err := store.GuaranteedUpdate(ctx, key, &example.Pod{}, false, nil,
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
pod := obj.(*api.Pod) pod := obj.(*example.Pod)
pod.Name = "foo-1" pod.Name = "foo-1"
secondToEnter.Wait() secondToEnter.Wait()
return pod, nil return pod, nil
@ -426,14 +437,14 @@ func TestGuaranteedUpdateWithConflict(t *testing.T) {
}() }()
updateCount := 0 updateCount := 0
err := store.GuaranteedUpdate(ctx, key, &api.Pod{}, false, nil, err := store.GuaranteedUpdate(ctx, key, &example.Pod{}, false, nil,
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
if updateCount == 0 { if updateCount == 0 {
secondToEnter.Done() secondToEnter.Done()
firstToFinish.Wait() firstToFinish.Wait()
} }
updateCount++ updateCount++
pod := obj.(*api.Pod) pod := obj.(*example.Pod)
pod.Name = "foo-2" pod.Name = "foo-2"
return pod, nil return pod, nil
})) }))
@ -450,9 +461,10 @@ func TestGuaranteedUpdateWithConflict(t *testing.T) {
} }
func TestList(t *testing.T) { func TestList(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t) defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), false, testapi.Default.Codec(), "") store := newStore(cluster.RandClient(), false, codec, "")
ctx := context.Background() ctx := context.Background()
// Setup storage with the following structure: // Setup storage with the following structure:
@ -468,21 +480,21 @@ func TestList(t *testing.T) {
// - test // - test
preset := []struct { preset := []struct {
key string key string
obj *api.Pod obj *example.Pod
storedObj *api.Pod storedObj *example.Pod
}{{ }{{
key: "/one-level/test", key: "/one-level/test",
obj: &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
}, { }, {
key: "/two-level/1/test", key: "/two-level/1/test",
obj: &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
}, { }, {
key: "/two-level/2/test", key: "/two-level/2/test",
obj: &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}},
}} }}
for i, ps := range preset { for i, ps := range preset {
preset[i].storedObj = &api.Pod{} preset[i].storedObj = &example.Pod{}
err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0) err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0)
if err != nil { if err != nil {
t.Fatalf("Set failed: %v", err) t.Fatalf("Set failed: %v", err)
@ -492,11 +504,11 @@ func TestList(t *testing.T) {
tests := []struct { tests := []struct {
prefix string prefix string
pred storage.SelectionPredicate pred storage.SelectionPredicate
expectedOut []*api.Pod expectedOut []*example.Pod
}{{ // test List on existing key }{{ // test List on existing key
prefix: "/one-level/", prefix: "/one-level/",
pred: storage.Everything, pred: storage.Everything,
expectedOut: []*api.Pod{preset[0].storedObj}, expectedOut: []*example.Pod{preset[0].storedObj},
}, { // test List on non-existing key }, { // test List on non-existing key
prefix: "/non-existing/", prefix: "/non-existing/",
pred: storage.Everything, pred: storage.Everything,
@ -507,7 +519,7 @@ func TestList(t *testing.T) {
Label: labels.Everything(), Label: labels.Everything(),
Field: fields.ParseSelectorOrDie("metadata.name!=" + preset[0].storedObj.Name), Field: fields.ParseSelectorOrDie("metadata.name!=" + preset[0].storedObj.Name),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod := obj.(*api.Pod) pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, nil return nil, fields.Set{"metadata.name": pod.Name}, nil
}, },
}, },
@ -515,11 +527,11 @@ func TestList(t *testing.T) {
}, { // test List with multiple levels of directories and expect flattened result }, { // test List with multiple levels of directories and expect flattened result
prefix: "/two-level/", prefix: "/two-level/",
pred: storage.Everything, pred: storage.Everything,
expectedOut: []*api.Pod{preset[1].storedObj, preset[2].storedObj}, expectedOut: []*example.Pod{preset[1].storedObj, preset[2].storedObj},
}} }}
for i, tt := range tests { for i, tt := range tests {
out := &api.PodList{} out := &example.PodList{}
err := store.List(ctx, tt.prefix, "0", tt.pred, out) err := store.List(ctx, tt.prefix, "0", tt.pred, out)
if err != nil { if err != nil {
t.Fatalf("List failed: %v", err) t.Fatalf("List failed: %v", err)
@ -538,18 +550,19 @@ func TestList(t *testing.T) {
} }
func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) { func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
store := newStore(cluster.RandClient(), false, testapi.Default.Codec(), "") store := newStore(cluster.RandClient(), false, codec, "")
ctx := context.Background() ctx := context.Background()
return ctx, store, cluster return ctx, store, cluster
} }
// testPropogateStore helps propogates store with objects, automates key generation, and returns // testPropogateStore helps propogates store with objects, automates key generation, and returns
// keys and stored objects. // keys and stored objects.
func testPropogateStore(ctx context.Context, t *testing.T, store *store, obj *api.Pod) (string, *api.Pod) { func testPropogateStore(ctx context.Context, t *testing.T, store *store, obj *example.Pod) (string, *example.Pod) {
// Setup store with a key and grab the output for returning. // Setup store with a key and grab the output for returning.
key := "/testkey" key := "/testkey"
setOutput := &api.Pod{} setOutput := &example.Pod{}
err := store.Create(ctx, key, obj, setOutput, 0) err := store.Create(ctx, key, obj, setOutput, 0)
if err != nil { if err != nil {
t.Fatalf("Set failed: %v", err) t.Fatalf("Set failed: %v", err)

View File

@ -28,6 +28,8 @@ import (
"github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/integration" "github.com/coreos/etcd/integration"
"golang.org/x/net/context" "golang.org/x/net/context"
apitesting "k8s.io/apimachinery/pkg/api/testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
@ -35,9 +37,9 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
) )
func TestWatch(t *testing.T) { func TestWatch(t *testing.T) {
@ -55,8 +57,8 @@ func TestWatchList(t *testing.T) {
func testWatch(t *testing.T, recursive bool) { func testWatch(t *testing.T, recursive bool) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
podFoo := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} podFoo := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
podBar := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}} podBar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
tests := []struct { tests := []struct {
key string key string
@ -73,7 +75,7 @@ func testWatch(t *testing.T, recursive bool) {
Label: labels.Everything(), Label: labels.Everything(),
Field: fields.ParseSelectorOrDie("metadata.name=bar"), Field: fields.ParseSelectorOrDie("metadata.name=bar"),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod := obj.(*api.Pod) pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, nil return nil, fields.Set{"metadata.name": pod.Name}, nil
}, },
}, },
@ -88,7 +90,7 @@ func testWatch(t *testing.T, recursive bool) {
Label: labels.Everything(), Label: labels.Everything(),
Field: fields.ParseSelectorOrDie("metadata.name!=bar"), Field: fields.ParseSelectorOrDie("metadata.name!=bar"),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod := obj.(*api.Pod) pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, nil return nil, fields.Set{"metadata.name": pod.Name}, nil
}, },
}, },
@ -98,9 +100,9 @@ func testWatch(t *testing.T, recursive bool) {
if err != nil { if err != nil {
t.Fatalf("Watch failed: %v", err) t.Fatalf("Watch failed: %v", err)
} }
var prevObj *api.Pod var prevObj *example.Pod
for _, watchTest := range tt.watchTests { for _, watchTest := range tt.watchTests {
out := &api.Pod{} out := &example.Pod{}
key := tt.key key := tt.key
if recursive { if recursive {
key = key + "/item" key = key + "/item"
@ -130,12 +132,12 @@ func testWatch(t *testing.T, recursive bool) {
func TestDeleteTriggerWatch(t *testing.T) { func TestDeleteTriggerWatch(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
key, storedObj := testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything) w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything)
if err != nil { if err != nil {
t.Fatalf("Watch failed: %v", err) t.Fatalf("Watch failed: %v", err)
} }
if err := store.Delete(ctx, key, &api.Pod{}, nil); err != nil { if err := store.Delete(ctx, key, &example.Pod{}, nil); err != nil {
t.Fatalf("Delete failed: %v", err) t.Fatalf("Delete failed: %v", err)
} }
testCheckEventType(t, watch.Deleted, w) testCheckEventType(t, watch.Deleted, w)
@ -147,7 +149,7 @@ func TestDeleteTriggerWatch(t *testing.T) {
func TestWatchFromZero(t *testing.T) { func TestWatchFromZero(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
key, storedObj := testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}})
w, err := store.Watch(ctx, key, "0", storage.Everything) w, err := store.Watch(ctx, key, "0", storage.Everything)
if err != nil { if err != nil {
@ -157,10 +159,10 @@ func TestWatchFromZero(t *testing.T) {
w.Stop() w.Stop()
// Update // Update
out := &api.Pod{} out := &example.Pod{}
err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
func(runtime.Object) (runtime.Object, error) { func(runtime.Object) (runtime.Object, error) {
return &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns", Annotations: map[string]string{"a": "1"}}}, nil return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns", Annotations: map[string]string{"a": "1"}}}, nil
})) }))
if err != nil { if err != nil {
t.Fatalf("GuaranteedUpdate failed: %v", err) t.Fatalf("GuaranteedUpdate failed: %v", err)
@ -175,10 +177,10 @@ func TestWatchFromZero(t *testing.T) {
w.Stop() w.Stop()
// Update again // Update again
out = &api.Pod{} out = &example.Pod{}
err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
func(runtime.Object) (runtime.Object, error) { func(runtime.Object) (runtime.Object, error) {
return &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}, nil return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}, nil
})) }))
if err != nil { if err != nil {
t.Fatalf("GuaranteedUpdate failed: %v", err) t.Fatalf("GuaranteedUpdate failed: %v", err)
@ -207,33 +209,34 @@ func TestWatchFromZero(t *testing.T) {
func TestWatchFromNoneZero(t *testing.T) { func TestWatchFromNoneZero(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
key, storedObj := testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything) w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything)
if err != nil { if err != nil {
t.Fatalf("Watch failed: %v", err) t.Fatalf("Watch failed: %v", err)
} }
out := &api.Pod{} out := &example.Pod{}
store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
func(runtime.Object) (runtime.Object, error) { func(runtime.Object) (runtime.Object, error) {
return &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, err return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, err
})) }))
testCheckResult(t, 0, watch.Modified, w, out) testCheckResult(t, 0, watch.Modified, w, out)
} }
func TestWatchError(t *testing.T) { func TestWatchError(t *testing.T) {
codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)}
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t) defer cluster.Terminate(t)
invalidStore := newStore(cluster.RandClient(), false, &testCodec{testapi.Default.Codec()}, "") invalidStore := newStore(cluster.RandClient(), false, codec, "")
ctx := context.Background() ctx := context.Background()
w, err := invalidStore.Watch(ctx, "/abc", "0", storage.Everything) w, err := invalidStore.Watch(ctx, "/abc", "0", storage.Everything)
if err != nil { if err != nil {
t.Fatalf("Watch failed: %v", err) t.Fatalf("Watch failed: %v", err)
} }
validStore := newStore(cluster.RandClient(), false, testapi.Default.Codec(), "") validStore := newStore(cluster.RandClient(), false, codec, "")
validStore.GuaranteedUpdate(ctx, "/abc", &api.Pod{}, true, nil, storage.SimpleUpdate( validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate(
func(runtime.Object) (runtime.Object, error) { func(runtime.Object) (runtime.Object, error) {
return &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil
})) }))
testCheckEventType(t, watch.Error, w) testCheckEventType(t, watch.Error, w)
} }
@ -286,7 +289,7 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) { func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
key, storedObj := testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything) w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything)
if err != nil { if err != nil {
@ -294,12 +297,12 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
} }
etcdW := cluster.RandClient().Watch(ctx, "/", clientv3.WithPrefix()) etcdW := cluster.RandClient().Watch(ctx, "/", clientv3.WithPrefix())
if err := store.Delete(ctx, key, &api.Pod{}, &storage.Preconditions{}); err != nil { if err := store.Delete(ctx, key, &example.Pod{}, &storage.Preconditions{}); err != nil {
t.Fatalf("Delete failed: %v", err) t.Fatalf("Delete failed: %v", err)
} }
e := <-w.ResultChan() e := <-w.ResultChan()
watchedDeleteObj := e.Object.(*api.Pod) watchedDeleteObj := e.Object.(*example.Pod)
var wres clientv3.WatchResponse var wres clientv3.WatchResponse
wres = <-etcdW wres = <-etcdW
@ -314,7 +317,7 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
} }
type testWatchStruct struct { type testWatchStruct struct {
obj *api.Pod obj *example.Pod
expectEvent bool expectEvent bool
watchType watch.EventType watchType watch.EventType
} }
@ -338,7 +341,7 @@ func testCheckEventType(t *testing.T, expectEventType watch.EventType, w watch.I
} }
} }
func testCheckResult(t *testing.T, i int, expectEventType watch.EventType, w watch.Interface, expectObj *api.Pod) { func testCheckResult(t *testing.T, i int, expectEventType watch.EventType, w watch.Interface, expectObj *example.Pod) {
select { select {
case res := <-w.ResultChan(): case res := <-w.ResultChan():
if res.Type != expectEventType { if res.Type != expectEventType {
@ -359,8 +362,8 @@ func testCheckStop(t *testing.T, i int, w watch.Interface) {
if ok { if ok {
var obj string var obj string
switch e.Object.(type) { switch e.Object.(type) {
case *api.Pod: case *example.Pod:
obj = e.Object.(*api.Pod).Name obj = e.Object.(*example.Pod).Name
case *metav1.Status: case *metav1.Status:
obj = e.Object.(*metav1.Status).Message obj = e.Object.(*metav1.Status).Message
} }

View File

@ -39,12 +39,13 @@ go_test(
"//vendor:github.com/coreos/etcd/integration", "//vendor:github.com/coreos/etcd/integration",
"//vendor:github.com/coreos/etcd/pkg/transport", "//vendor:github.com/coreos/etcd/pkg/transport",
"//vendor:golang.org/x/net/context", "//vendor:golang.org/x/net/context",
"//vendor:k8s.io/apimachinery/pkg/api/testing",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
"//vendor:k8s.io/apimachinery/pkg/runtime/serializer", "//vendor:k8s.io/apimachinery/pkg/runtime/serializer",
"//vendor:k8s.io/apiserver/pkg/apis/example",
"//vendor:k8s.io/apiserver/pkg/apis/example/v1",
"//vendor:k8s.io/apiserver/pkg/storage/storagebackend", "//vendor:k8s.io/apiserver/pkg/storage/storagebackend",
"//vendor:k8s.io/client-go/pkg/api",
"//vendor:k8s.io/client-go/pkg/api/v1",
], ],
) )

View File

@ -39,7 +39,7 @@ func newETCD2Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize) s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize, c.Copier)
return s, tr.CloseIdleConnections, nil return s, tr.CloseIdleConnections, nil
} }

View File

@ -23,28 +23,31 @@ import (
"path/filepath" "path/filepath"
"testing" "testing"
"golang.org/x/net/context"
"k8s.io/apimachinery/pkg/runtime/schema"
runtimeserializer "k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/storage/storagebackend"
clientapi "k8s.io/client-go/pkg/api"
clientapiv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/kubernetes/pkg/storage/etcd/testing/testingcert"
"github.com/coreos/etcd/integration" "github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/transport"
"golang.org/x/net/context"
apitesting "k8s.io/apimachinery/pkg/api/testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/kubernetes/pkg/storage/etcd/testing/testingcert"
) )
func TestTLSConnection(t *testing.T) { var scheme = runtime.NewScheme()
scheme := runtime.NewScheme() var codecs = serializer.NewCodecFactory(scheme)
codecs := runtimeserializer.NewCodecFactory(scheme)
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
// TODO: use k8s.io/apiserver internal type instead of borrowing it from client-go func init() {
clientapi.AddToScheme(scheme) metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
clientapiv1.AddToScheme(scheme) example.AddToScheme(scheme)
examplev1.AddToScheme(scheme)
}
func TestTLSConnection(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
certFile, keyFile, caFile := configureTLSCerts(t) certFile, keyFile, caFile := configureTLSCerts(t)
defer os.RemoveAll(filepath.Dir(certFile)) defer os.RemoveAll(filepath.Dir(certFile))
@ -68,13 +71,14 @@ func TestTLSConnection(t *testing.T) {
KeyFile: keyFile, KeyFile: keyFile,
CAFile: caFile, CAFile: caFile,
Codec: codec, Codec: codec,
Copier: scheme,
} }
storage, destroyFunc, err := newETCD3Storage(cfg) storage, destroyFunc, err := newETCD3Storage(cfg)
defer destroyFunc() defer destroyFunc()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
err = storage.Create(context.TODO(), "/abc", &clientapi.Pod{}, nil, 0) err = storage.Create(context.TODO(), "/abc", &example.Pod{}, nil, 0)
if err != nil { if err != nil {
t.Fatalf("Create failed: %v", err) t.Fatalf("Create failed: %v", err)
} }

View File

@ -4,12 +4,14 @@ licenses(["notice"])
load( load(
"@io_bazel_rules_go//go:def.bzl", "@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test", "go_test",
) )
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = ["cacher_test.go"], srcs = ["cacher_test.go"],
library = ":go_default_library",
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//pkg/storage/etcd:go_default_library", "//pkg/storage/etcd:go_default_library",
@ -20,17 +22,19 @@ go_test(
"//vendor:k8s.io/apimachinery/pkg/api/equality", "//vendor:k8s.io/apimachinery/pkg/api/equality",
"//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/api/meta", "//vendor:k8s.io/apimachinery/pkg/api/meta",
"//vendor:k8s.io/apimachinery/pkg/api/testing",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/fields", "//vendor:k8s.io/apimachinery/pkg/fields",
"//vendor:k8s.io/apimachinery/pkg/labels", "//vendor:k8s.io/apimachinery/pkg/labels",
"//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/runtime/serializer",
"//vendor:k8s.io/apimachinery/pkg/util/sets", "//vendor:k8s.io/apimachinery/pkg/util/sets",
"//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/apiserver/pkg/apis/example",
"//vendor:k8s.io/apiserver/pkg/apis/example/v1",
"//vendor:k8s.io/apiserver/pkg/storage", "//vendor:k8s.io/apiserver/pkg/storage",
"//vendor:k8s.io/client-go/pkg/api",
"//vendor:k8s.io/client-go/pkg/api/install", "//vendor:k8s.io/client-go/pkg/api/install",
"//vendor:k8s.io/client-go/pkg/api/v1",
], ],
) )
@ -46,3 +50,13 @@ filegroup(
srcs = [":package-srcs"], srcs = [":package-srcs"],
tags = ["automanaged"], tags = ["automanaged"],
) )
go_library(
name = "go_default_library",
srcs = ["utils.go"],
tags = ["automanaged"],
deps = [
"//vendor:k8s.io/apiserver/pkg/apis/example",
"//vendor:k8s.io/client-go/pkg/api/install",
],
)

View File

@ -27,6 +27,7 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality" apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
apitesting "k8s.io/apimachinery/pkg/api/testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
@ -34,9 +35,9 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/api/v1"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest" "k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing" etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
@ -44,23 +45,24 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
"k8s.io/apimachinery/pkg/runtime/serializer"
_ "k8s.io/client-go/pkg/api/install" _ "k8s.io/client-go/pkg/api/install"
) )
func DeepEqualSafePodSpec() api.PodSpec { var (
grace := int64(30) scheme = runtime.NewScheme()
return api.PodSpec{ codecs = serializer.NewCodecFactory(scheme)
RestartPolicy: api.RestartPolicyAlways, )
DNSPolicy: api.DNSClusterFirst,
TerminationGracePeriodSeconds: &grace, func init() {
SecurityContext: &api.PodSecurityContext{}, metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
SchedulerName: api.DefaultSchedulerName, example.AddToScheme(scheme)
} examplev1.AddToScheme(scheme)
} }
// GetAttrs returns labels and fields of a given object for filtering purposes. // GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) { func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
pod, ok := obj.(*api.Pod) pod, ok := obj.(*example.Pod)
if !ok { if !ok {
return nil, nil, fmt.Errorf("not a pod") return nil, nil, fmt.Errorf("not a pod")
} }
@ -69,7 +71,7 @@ func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
// PodToSelectableFields returns a field set that represents the object // PodToSelectableFields returns a field set that represents the object
// TODO: fields are not labels, and the validation rules for them do not apply. // TODO: fields are not labels, and the validation rules for them do not apply.
func PodToSelectableFields(pod *api.Pod) fields.Set { func PodToSelectableFields(pod *example.Pod) fields.Set {
// The purpose of allocation with a given number of elements is to reduce // The purpose of allocation with a given number of elements is to reduce
// amount of allocations needed to create the fields.Set. If you add any // amount of allocations needed to create the fields.Set. If you add any
// field here or the number of object-meta related fields changes, this should // field here or the number of object-meta related fields changes, this should
@ -89,9 +91,9 @@ func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, ha
return source return source
} }
func newEtcdTestStorage(t *testing.T, codec runtime.Codec, prefix string) (*etcdtesting.EtcdTestServer, storage.Interface) { func newEtcdTestStorage(t *testing.T, prefix string) (*etcdtesting.EtcdTestServer, storage.Interface) {
server, _ := etcdtesting.NewUnsecuredEtcd3TestClientServer(t) server, _ := etcdtesting.NewUnsecuredEtcd3TestClientServer(t, scheme)
storage := etcd3.New(server.V3Client, codec, prefix) storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix)
return server, storage return server, storage
} }
@ -101,39 +103,39 @@ func newTestCacher(s storage.Interface, cap int) *storage.Cacher {
CacheCapacity: cap, CacheCapacity: cap,
Storage: s, Storage: s,
Versioner: etcdstorage.APIObjectVersioner{}, Versioner: etcdstorage.APIObjectVersioner{},
Copier: api.Scheme, Copier: scheme,
Type: &api.Pod{}, Type: &example.Pod{},
ResourcePrefix: prefix, ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
GetAttrsFunc: GetAttrs, GetAttrsFunc: GetAttrs,
NewListFunc: func() runtime.Object { return &api.PodList{} }, NewListFunc: func() runtime.Object { return &example.PodList{} },
Codec: api.Codecs.LegacyCodec(v1.SchemeGroupVersion), Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
} }
return storage.NewCacherFromConfig(config) return storage.NewCacherFromConfig(config)
} }
func makeTestPod(name string) *api.Pod { func makeTestPod(name string) *example.Pod {
return &api.Pod{ return &example.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name}, ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name},
Spec: DeepEqualSafePodSpec(), Spec: DeepEqualSafePodSpec(),
} }
} }
func updatePod(t *testing.T, s storage.Interface, obj, old *api.Pod) *api.Pod { func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *example.Pod {
updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
newObj, err := api.Scheme.DeepCopy(obj) newObj, err := scheme.DeepCopy(obj)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
return nil, nil, err return nil, nil, err
} }
return newObj.(*api.Pod), nil, nil return newObj.(*example.Pod), nil, nil
} }
key := "pods/" + obj.Namespace + "/" + obj.Name key := "pods/" + obj.Namespace + "/" + obj.Name
if err := s.GuaranteedUpdate(context.TODO(), key, &api.Pod{}, old == nil, nil, updateFn); err != nil { if err := s.GuaranteedUpdate(context.TODO(), key, &example.Pod{}, old == nil, nil, updateFn); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
obj.ResourceVersion = "" obj.ResourceVersion = ""
result := &api.Pod{} result := &example.Pod{}
if err := s.Get(context.TODO(), key, "", result, false); err != nil { if err := s.Get(context.TODO(), key, "", result, false); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -141,7 +143,7 @@ func updatePod(t *testing.T, s storage.Interface, obj, old *api.Pod) *api.Pod {
} }
func TestGet(t *testing.T) { func TestGet(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix()) server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t) defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10) cacher := newTestCacher(etcdStorage, 10)
defer cacher.Stop() defer cacher.Stop()
@ -150,7 +152,7 @@ func TestGet(t *testing.T) {
fooCreated := updatePod(t, etcdStorage, podFoo, nil) fooCreated := updatePod(t, etcdStorage, podFoo, nil)
// We pass the ResourceVersion from the above Create() operation. // We pass the ResourceVersion from the above Create() operation.
result := &api.Pod{} result := &example.Pod{}
if err := cacher.Get(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, result, true); err != nil { if err := cacher.Get(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, result, true); err != nil {
t.Errorf("Unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
} }
@ -161,7 +163,7 @@ func TestGet(t *testing.T) {
if err := cacher.Get(context.TODO(), "pods/ns/bar", fooCreated.ResourceVersion, result, true); err != nil { if err := cacher.Get(context.TODO(), "pods/ns/bar", fooCreated.ResourceVersion, result, true); err != nil {
t.Errorf("Unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
} }
emptyPod := api.Pod{} emptyPod := example.Pod{}
if e, a := emptyPod, *result; !reflect.DeepEqual(e, a) { if e, a := emptyPod, *result; !reflect.DeepEqual(e, a) {
t.Errorf("Expected: %#v, got: %#v", e, a) t.Errorf("Expected: %#v, got: %#v", e, a)
} }
@ -172,7 +174,7 @@ func TestGet(t *testing.T) {
} }
func TestList(t *testing.T) { func TestList(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix()) server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t) defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10) cacher := newTestCacher(etcdStorage, 10)
defer cacher.Stop() defer cacher.Stop()
@ -196,20 +198,20 @@ func TestList(t *testing.T) {
podFooNS2.Namespace += "2" podFooNS2.Namespace += "2"
updatePod(t, etcdStorage, podFooNS2, nil) updatePod(t, etcdStorage, podFooNS2, nil)
deleted := api.Pod{} deleted := example.Pod{}
if err := etcdStorage.Delete(context.TODO(), "pods/ns/bar", &deleted, nil); err != nil { if err := etcdStorage.Delete(context.TODO(), "pods/ns/bar", &deleted, nil); err != nil {
t.Errorf("Unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
} }
// We first List directly from etcd by passing empty resourceVersion, // We first List directly from etcd by passing empty resourceVersion,
// to get the current etcd resourceVersion. // to get the current etcd resourceVersion.
rvResult := &api.PodList{} rvResult := &example.PodList{}
if err := cacher.List(context.TODO(), "pods/ns", "", storage.Everything, rvResult); err != nil { if err := cacher.List(context.TODO(), "pods/ns", "", storage.Everything, rvResult); err != nil {
t.Errorf("Unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
} }
deletedPodRV := rvResult.ListMeta.ResourceVersion deletedPodRV := rvResult.ListMeta.ResourceVersion
result := &api.PodList{} result := &example.PodList{}
// We pass the current etcd ResourceVersion received from the above List() operation, // We pass the current etcd ResourceVersion received from the above List() operation,
// since there is not easy way to get ResourceVersion of barPod deletion operation. // since there is not easy way to get ResourceVersion of barPod deletion operation.
if err := cacher.List(context.TODO(), "pods/ns", deletedPodRV, storage.Everything, result); err != nil { if err := cacher.List(context.TODO(), "pods/ns", deletedPodRV, storage.Everything, result); err != nil {
@ -237,7 +239,7 @@ func TestList(t *testing.T) {
t.Errorf("Unexpected namespace: %s", item.Namespace) t.Errorf("Unexpected namespace: %s", item.Namespace)
} }
var expected *api.Pod var expected *example.Pod
switch item.Name { switch item.Name {
case "foo": case "foo":
expected = podFooPrime expected = podFooPrime
@ -253,7 +255,7 @@ func TestList(t *testing.T) {
} }
func TestInfiniteList(t *testing.T) { func TestInfiniteList(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix()) server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t) defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10) cacher := newTestCacher(etcdStorage, 10)
defer cacher.Stop() defer cacher.Stop()
@ -268,7 +270,7 @@ func TestInfiniteList(t *testing.T) {
} }
listRV := strconv.Itoa(int(rv + 10)) listRV := strconv.Itoa(int(rv + 10))
result := &api.PodList{} result := &example.PodList{}
err = cacher.List(context.TODO(), "pods/ns", listRV, storage.Everything, result) err = cacher.List(context.TODO(), "pods/ns", listRV, storage.Everything, result)
if !errors.IsTimeout(err) { if !errors.IsTimeout(err) {
t.Errorf("Unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
@ -307,7 +309,7 @@ func (self *injectListError) List(ctx context.Context, key string, resourceVersi
} }
func TestWatch(t *testing.T) { func TestWatch(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix()) server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
// Inject one list error to make sure we test the relist case. // Inject one list error to make sure we test the relist case.
etcdStorage = &injectListError{errors: 1, Interface: etcdStorage} etcdStorage = &injectListError{errors: 1, Interface: etcdStorage}
defer server.Terminate(t) defer server.Terminate(t)
@ -384,7 +386,7 @@ func TestWatch(t *testing.T) {
} }
func TestWatcherTimeout(t *testing.T) { func TestWatcherTimeout(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix()) server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t) defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10) cacher := newTestCacher(etcdStorage, 10)
defer cacher.Stop() defer cacher.Stop()
@ -426,7 +428,7 @@ func TestWatcherTimeout(t *testing.T) {
} }
func TestFiltering(t *testing.T) { func TestFiltering(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix()) server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t) defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10) cacher := newTestCacher(etcdStorage, 10)
defer cacher.Stop() defer cacher.Stop()
@ -458,7 +460,7 @@ func TestFiltering(t *testing.T) {
fooUnfiltered := updatePod(t, etcdStorage, podFoo, fooFiltered) fooUnfiltered := updatePod(t, etcdStorage, podFoo, fooFiltered)
_ = updatePod(t, etcdStorage, podFooPrime, fooUnfiltered) _ = updatePod(t, etcdStorage, podFooPrime, fooUnfiltered)
deleted := api.Pod{} deleted := example.Pod{}
if err := etcdStorage.Delete(context.TODO(), "pods/ns/foo", &deleted, nil); err != nil { if err := etcdStorage.Delete(context.TODO(), "pods/ns/foo", &deleted, nil); err != nil {
t.Errorf("Unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
} }
@ -488,7 +490,7 @@ func TestFiltering(t *testing.T) {
} }
func TestStartingResourceVersion(t *testing.T) { func TestStartingResourceVersion(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix()) server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t) defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10) cacher := newTestCacher(etcdStorage, 10)
defer cacher.Stop() defer cacher.Stop()
@ -520,7 +522,7 @@ func TestStartingResourceVersion(t *testing.T) {
select { select {
case e := <-watcher.ResultChan(): case e := <-watcher.ResultChan():
pod := e.Object.(*api.Pod) pod := e.Object.(*example.Pod)
podRV, err := storage.ParseWatchResourceVersion(pod.ResourceVersion) podRV, err := storage.ParseWatchResourceVersion(pod.ResourceVersion)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
@ -536,7 +538,7 @@ func TestStartingResourceVersion(t *testing.T) {
} }
func TestRandomWatchDeliver(t *testing.T) { func TestRandomWatchDeliver(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix()) server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t) defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10) cacher := newTestCacher(etcdStorage, 10)
defer cacher.Stop() defer cacher.Stop()
@ -569,7 +571,7 @@ func TestRandomWatchDeliver(t *testing.T) {
if !ok { if !ok {
break break
} }
if a, e := event.Object.(*api.Pod).Name, fmt.Sprintf("foo-%d", watched); e != a { if a, e := event.Object.(*example.Pod).Name, fmt.Sprintf("foo-%d", watched); e != a {
t.Errorf("Unexpected object watched: %s, expected %s", a, e) t.Errorf("Unexpected object watched: %s, expected %s", a, e)
} }
watched++ watched++

View File

@ -0,0 +1,32 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tests
import (
"k8s.io/apiserver/pkg/apis/example"
_ "k8s.io/client-go/pkg/api/install"
)
func DeepEqualSafePodSpec() example.PodSpec {
grace := int64(30)
return example.PodSpec{
RestartPolicy: "Always",
TerminationGracePeriodSeconds: &grace,
SchedulerName: "default-scheduler",
}
}

View File

@ -21,6 +21,7 @@ import (
"github.com/spf13/pflag" "github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storage/storagebackend"
) )
@ -34,13 +35,14 @@ type EtcdOptions struct {
EtcdServersOverrides []string EtcdServersOverrides []string
} }
func NewEtcdOptions() *EtcdOptions { func NewEtcdOptions(scheme *runtime.Scheme) *EtcdOptions {
return &EtcdOptions{ return &EtcdOptions{
StorageConfig: storagebackend.Config{ StorageConfig: storagebackend.Config{
Prefix: DefaultEtcdPathPrefix, Prefix: DefaultEtcdPathPrefix,
// Default cache size to 0 - if unset, its size will be set based on target // Default cache size to 0 - if unset, its size will be set based on target
// memory usage. // memory usage.
DeserializationCacheSize: 0, DeserializationCacheSize: 0,
Copier: scheme,
}, },
} }
} }

View File

@ -44,4 +44,5 @@ type Config struct {
DeserializationCacheSize int DeserializationCacheSize int
Codec runtime.Codec Codec runtime.Codec
Copier runtime.ObjectCopier
} }

View File

@ -311,6 +311,7 @@ func NewMasterConfig() *master.Config {
// prefix code, so please don't change without ensuring // prefix code, so please don't change without ensuring
// sufficient coverage in other ways. // sufficient coverage in other ways.
Prefix: uuid.New(), Prefix: uuid.New(),
Copier: api.Scheme,
} }
info, _ := runtime.SerializerInfoForMediaType(api.Codecs.SupportedMediaTypes(), runtime.ContentTypeJSON) info, _ := runtime.SerializerInfoForMediaType(api.Codecs.SupportedMediaTypes(), runtime.ContentTypeJSON)

1
vendor/BUILD vendored
View File

@ -14025,6 +14025,7 @@ go_library(
"//vendor:github.com/golang/glog", "//vendor:github.com/golang/glog",
"//vendor:github.com/spf13/pflag", "//vendor:github.com/spf13/pflag",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/net", "//vendor:k8s.io/apimachinery/pkg/util/net",
"//vendor:k8s.io/apiserver/pkg/admission", "//vendor:k8s.io/apiserver/pkg/admission",
"//vendor:k8s.io/apiserver/pkg/authentication/authenticatorfactory", "//vendor:k8s.io/apiserver/pkg/authentication/authenticatorfactory",