Merge pull request #36711 from hongchaodeng/e4

Automatic merge from submit-queue (batch tested with PRs 38181, 38128, 36711)

etcd2: have prefix always prepended

The prefix issue is discussed in #36290.

This is fixing etcd2 behavior separately.

**release note**:
```
etcd2: have prefix always prepended
```
This commit is contained in:
Kubernetes Submit Queue 2016-12-07 03:09:34 -08:00 committed by GitHub
commit 34c873a748
4 changed files with 35 additions and 72 deletions

View File

@ -63,7 +63,6 @@ go_test(
"//pkg/storage/testing:go_default_library",
"//pkg/watch:go_default_library",
"//vendor:github.com/coreos/etcd/client",
"//vendor:github.com/stretchr/testify/assert",
"//vendor:golang.org/x/net/context",
],
)

View File

@ -21,7 +21,6 @@ import (
"fmt"
"path"
"reflect"
"strings"
"time"
"k8s.io/kubernetes/pkg/api"
@ -97,7 +96,7 @@ func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Ob
if ctx == nil {
glog.Errorf("Context is nil")
}
key = h.prefixEtcdKey(key)
key = path.Join(h.pathPrefix, key)
data, err := runtime.Encode(h.codec, obj)
trace.Step("Object encoded")
if err != nil {
@ -148,7 +147,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object,
if ctx == nil {
glog.Errorf("Context is nil")
}
key = h.prefixEtcdKey(key)
key = path.Join(h.pathPrefix, key)
v, err := conversion.EnforcePtr(out)
if err != nil {
panic("unable to convert output object to pointer")
@ -210,7 +209,7 @@ func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion stri
if err != nil {
return nil, err
}
key = h.prefixEtcdKey(key)
key = path.Join(h.pathPrefix, key)
w := newEtcdWatcher(false, h.quorum, nil, storage.SimpleFilter(pred), h.codec, h.versioner, nil, h)
go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
return w, nil
@ -225,7 +224,7 @@ func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion
if err != nil {
return nil, err
}
key = h.prefixEtcdKey(key)
key = path.Join(h.pathPrefix, key)
w := newEtcdWatcher(true, h.quorum, exceptKey(key), storage.SimpleFilter(pred), h.codec, h.versioner, nil, h)
go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
return w, nil
@ -236,7 +235,7 @@ func (h *etcdHelper) Get(ctx context.Context, key string, resourceVersion string
if ctx == nil {
glog.Errorf("Context is nil")
}
key = h.prefixEtcdKey(key)
key = path.Join(h.pathPrefix, key)
_, _, _, err := h.bodyAndExtractObj(ctx, key, objPtr, ignoreNotFound)
return err
}
@ -306,7 +305,7 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, resourceVersion
if err != nil {
return err
}
key = h.prefixEtcdKey(key)
key = path.Join(h.pathPrefix, key)
startTime := time.Now()
trace.Step("About to read etcd node")
@ -389,7 +388,7 @@ func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion strin
if err != nil {
return err
}
key = h.prefixEtcdKey(key)
key = path.Join(h.pathPrefix, key)
startTime := time.Now()
trace.Step("About to list etcd node")
nodes, index, err := h.listEtcdNode(ctx, key)
@ -446,7 +445,7 @@ func (h *etcdHelper) GuaranteedUpdate(
// Panic is appropriate, because this is a programming error.
panic("need ptr to type")
}
key = h.prefixEtcdKey(key)
key = path.Join(h.pathPrefix, key)
for {
obj := reflect.New(v.Type()).Interface().(runtime.Object)
origBody, node, res, err := h.bodyAndExtractObj(ctx, key, obj, ignoreNotFound)
@ -539,13 +538,6 @@ func (h *etcdHelper) GuaranteedUpdate(
}
}
func (h *etcdHelper) prefixEtcdKey(key string) string {
if strings.HasPrefix(key, h.pathPrefix) {
return key
}
return path.Join(h.pathPrefix, key)
}
// etcdCache defines interface used for caching objects stored in etcd. Objects are keyed by
// their Node.ModifiedIndex, which is unique across all types.
// All implementations must be thread-safe.

View File

@ -24,7 +24,6 @@ import (
"time"
etcd "github.com/coreos/etcd/client"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
@ -99,8 +98,7 @@ func createPodList(t *testing.T, helper etcdHelper, list *api.PodList) error {
func TestList(t *testing.T) {
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
key := etcdtest.AddPrefix("/some/key")
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), key)
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix())
list := api.PodList{
Items: []api.Pod{
@ -123,7 +121,7 @@ func TestList(t *testing.T) {
var got api.PodList
// TODO: a sorted filter function could be applied such implied
// ordering on the returned list doesn't matter.
err := helper.List(context.TODO(), key, "", storage.Everything, &got)
err := helper.List(context.TODO(), "/", "", storage.Everything, &got)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
@ -136,8 +134,7 @@ func TestList(t *testing.T) {
func TestListFiltered(t *testing.T) {
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
key := etcdtest.AddPrefix("/some/key")
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), key)
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix())
list := api.PodList{
Items: []api.Pod{
@ -167,7 +164,7 @@ func TestListFiltered(t *testing.T) {
},
}
var got api.PodList
err := helper.List(context.TODO(), key, "", p, &got)
err := helper.List(context.TODO(), "/", "", p, &got)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
@ -181,13 +178,10 @@ func TestListFiltered(t *testing.T) {
func TestListAcrossDirectories(t *testing.T) {
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
rootkey := etcdtest.AddPrefix("/some/key")
key1 := etcdtest.AddPrefix("/some/key/directory1")
key2 := etcdtest.AddPrefix("/some/key/directory2")
roothelper := newEtcdHelper(server.Client, testapi.Default.Codec(), rootkey)
helper1 := newEtcdHelper(server.Client, testapi.Default.Codec(), key1)
helper2 := newEtcdHelper(server.Client, testapi.Default.Codec(), key2)
roothelper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix())
helper1 := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix()+"/dir1")
helper2 := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix()+"/dir2")
list := api.PodList{
Items: []api.Pod{
@ -217,7 +211,7 @@ func TestListAcrossDirectories(t *testing.T) {
list.Items[2] = *returnedObj
var got api.PodList
err := roothelper.List(context.TODO(), rootkey, "", storage.Everything, &got)
err := roothelper.List(context.TODO(), "/", "", storage.Everything, &got)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
@ -229,8 +223,8 @@ func TestListAcrossDirectories(t *testing.T) {
func TestGet(t *testing.T) {
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
key := etcdtest.AddPrefix("/some/key")
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), key)
key := "/some/key"
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix())
expect := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: apitesting.DeepEqualSafePodSpec(),
@ -251,14 +245,13 @@ func TestGet(t *testing.T) {
func TestGetNotFoundErr(t *testing.T) {
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
key := etcdtest.AddPrefix("/some/key")
boguskey := etcdtest.AddPrefix("/some/boguskey")
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), key)
boguskey := "/some/boguskey"
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix())
var got api.Pod
err := helper.Get(context.TODO(), boguskey, "", &got, false)
if !storage.IsNotFound(err) {
t.Errorf("Unexpected reponse on key=%v, err=%v", key, err)
t.Errorf("Unexpected reponse on key=%v, err=%v", boguskey, err)
}
}
@ -304,8 +297,8 @@ func TestGuaranteedUpdate(t *testing.T) {
_, codec := testScheme(t)
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
key := etcdtest.AddPrefix("/some/key")
helper := newEtcdHelper(server.Client, codec, key)
key := "/some/key"
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
obj := &storagetesting.TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
@ -349,8 +342,8 @@ func TestGuaranteedUpdateNoChange(t *testing.T) {
_, codec := testScheme(t)
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
key := etcdtest.AddPrefix("/some/key")
helper := newEtcdHelper(server.Client, codec, key)
key := "/some/key"
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
obj := &storagetesting.TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
@ -379,8 +372,8 @@ func TestGuaranteedUpdateKeyNotFound(t *testing.T) {
_, codec := testScheme(t)
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
key := etcdtest.AddPrefix("/some/key")
helper := newEtcdHelper(server.Client, codec, key)
key := "/some/key"
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
// Create a new node.
obj := &storagetesting.TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
@ -406,7 +399,7 @@ func TestGuaranteedUpdate_CreateCollision(t *testing.T) {
_, codec := testScheme(t)
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
key := etcdtest.AddPrefix("/some/key")
key := "/some/key"
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
const concurrency = 10
@ -471,27 +464,6 @@ func TestGuaranteedUpdateUIDMismatch(t *testing.T) {
}
}
func TestPrefixEtcdKey(t *testing.T) {
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
prefix := path.Join("/", etcdtest.PathPrefix())
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), prefix)
baseKey := "/some/key"
// Verify prefix is added
keyBefore := baseKey
keyAfter := helper.prefixEtcdKey(keyBefore)
assert.Equal(t, keyAfter, path.Join(prefix, baseKey), "Prefix incorrectly added by EtcdHelper")
// Verify prefix is not added
keyBefore = path.Join(prefix, baseKey)
keyAfter = helper.prefixEtcdKey(keyBefore)
assert.Equal(t, keyBefore, keyAfter, "Prefix incorrectly added by EtcdHelper")
}
func TestDeleteUIDMismatch(t *testing.T) {
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)

View File

@ -341,7 +341,7 @@ func makeSubsets(ip string, port int) []api.EndpointSubset {
func TestWatchEtcdState(t *testing.T) {
codec := testapi.Default.Codec()
key := etcdtest.AddPrefix("/somekey/foo")
key := "/somekey/foo"
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
@ -399,7 +399,7 @@ func TestWatchFromZeroIndex(t *testing.T) {
codec := testapi.Default.Codec()
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
key := etcdtest.AddPrefix("/somekey/foo")
key := "/somekey/foo"
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
@ -470,18 +470,18 @@ func TestWatchFromZeroIndex(t *testing.T) {
func TestWatchListFromZeroIndex(t *testing.T) {
codec := testapi.Default.Codec()
key := etcdtest.AddPrefix("/some/key")
prefix := "/some/key"
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
h := newEtcdHelper(server.Client, codec, key)
h := newEtcdHelper(server.Client, codec, prefix)
watching, err := h.WatchList(context.TODO(), key, "0", storage.Everything)
watching, err := h.WatchList(context.TODO(), "/", "0", storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watching.Stop()
// creates key/foo which should trigger the WatchList for "key"
// creates foo which should trigger the WatchList for "/"
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
err = h.Create(context.TODO(), pod.Name, pod, pod, 0)
if err != nil {
@ -501,7 +501,7 @@ func TestWatchListFromZeroIndex(t *testing.T) {
func TestWatchListIgnoresRootKey(t *testing.T) {
codec := testapi.Default.Codec()
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
key := etcdtest.AddPrefix("/some/key")
key := "/some/key"
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
h := newEtcdHelper(server.Client, codec, key)