mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-20 10:20:51 +00:00
Merge pull request #113370 from wojtek-t/refactor_storage_tests_3
Refactor storage tests - part 3
This commit is contained in:
commit
819cadca66
@ -22,9 +22,6 @@ import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
@ -33,10 +30,7 @@ import (
|
||||
"google.golang.org/grpc/grpclog"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/apitesting"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
@ -253,296 +247,41 @@ func TestListContinuationWithFilter(t *testing.T) {
|
||||
storagetesting.RunTestListContinuationWithFilter(ctx, t, store, validation)
|
||||
}
|
||||
|
||||
func TestListInconsistentContinuation(t *testing.T) {
|
||||
ctx, store, client := testSetup(t)
|
||||
|
||||
// Setup storage with the following structure:
|
||||
// /
|
||||
// - one-level/
|
||||
// | - test
|
||||
// |
|
||||
// - two-level/
|
||||
// - 1/
|
||||
// | - test
|
||||
// |
|
||||
// - 2/
|
||||
// - test
|
||||
//
|
||||
preset := []struct {
|
||||
key string
|
||||
obj *example.Pod
|
||||
storedObj *example.Pod
|
||||
}{
|
||||
{
|
||||
key: "/one-level/test",
|
||||
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
|
||||
},
|
||||
{
|
||||
key: "/two-level/1/test",
|
||||
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
|
||||
},
|
||||
{
|
||||
key: "/two-level/2/test",
|
||||
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}},
|
||||
},
|
||||
}
|
||||
|
||||
for i, ps := range preset {
|
||||
preset[i].storedObj = &example.Pod{}
|
||||
err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Set failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
pred := func(limit int64, continueValue string) storage.SelectionPredicate {
|
||||
return storage.SelectionPredicate{
|
||||
Limit: limit,
|
||||
Continue: continueValue,
|
||||
Label: labels.Everything(),
|
||||
Field: fields.Everything(),
|
||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
pod := obj.(*example.Pod)
|
||||
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
out := &example.PodList{}
|
||||
options := storage.ListOptions{
|
||||
ResourceVersion: "0",
|
||||
Predicate: pred(1, ""),
|
||||
Recursive: true,
|
||||
}
|
||||
if err := store.GetList(ctx, "/", options, out); err != nil {
|
||||
t.Fatalf("Unable to get initial list: %v", err)
|
||||
}
|
||||
if len(out.Continue) == 0 {
|
||||
t.Fatalf("No continuation token set")
|
||||
}
|
||||
storagetesting.ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj}, out.Items)
|
||||
|
||||
continueFromSecondItem := out.Continue
|
||||
|
||||
// update /two-level/2/test/bar
|
||||
oldName := preset[2].obj.Name
|
||||
newPod := &example.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: oldName,
|
||||
Labels: map[string]string{
|
||||
"state": "new",
|
||||
},
|
||||
},
|
||||
}
|
||||
if err := store.GuaranteedUpdate(ctx, preset[2].key, preset[2].storedObj, false, nil,
|
||||
func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) {
|
||||
return newPod, nil, nil
|
||||
}, newPod); err != nil {
|
||||
t.Fatalf("update failed: %v", err)
|
||||
}
|
||||
|
||||
// compact to latest revision.
|
||||
func compactStorage(etcdClient *clientv3.Client) storagetesting.Compaction {
|
||||
return func(ctx context.Context, t *testing.T, resourceVersion string) {
|
||||
versioner := storage.APIObjectVersioner{}
|
||||
lastRVString := preset[2].storedObj.ResourceVersion
|
||||
lastRV, err := versioner.ParseResourceVersion(lastRVString)
|
||||
rv, err := versioner.ParseResourceVersion(resourceVersion)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := client.KV.Compact(ctx, int64(lastRV), clientv3.WithCompactPhysical()); err != nil {
|
||||
if _, err := etcdClient.KV.Compact(ctx, int64(rv), clientv3.WithCompactPhysical()); err != nil {
|
||||
t.Fatalf("Unable to compact, %v", err)
|
||||
}
|
||||
|
||||
// The old continue token should have expired
|
||||
options = storage.ListOptions{
|
||||
ResourceVersion: "0",
|
||||
Predicate: pred(0, continueFromSecondItem),
|
||||
Recursive: true,
|
||||
}
|
||||
err = store.GetList(ctx, "/", options, out)
|
||||
if err == nil {
|
||||
t.Fatalf("unexpected no error")
|
||||
}
|
||||
if !strings.Contains(err.Error(), inconsistentContinue) {
|
||||
t.Fatalf("unexpected error message %v", err)
|
||||
}
|
||||
status, ok := err.(apierrors.APIStatus)
|
||||
if !ok {
|
||||
t.Fatalf("expect error of implements the APIStatus interface, got %v", reflect.TypeOf(err))
|
||||
}
|
||||
inconsistentContinueFromSecondItem := status.Status().ListMeta.Continue
|
||||
if len(inconsistentContinueFromSecondItem) == 0 {
|
||||
t.Fatalf("expect non-empty continue token")
|
||||
}
|
||||
|
||||
out = &example.PodList{}
|
||||
options = storage.ListOptions{
|
||||
ResourceVersion: "0",
|
||||
Predicate: pred(1, inconsistentContinueFromSecondItem),
|
||||
Recursive: true,
|
||||
}
|
||||
if err := store.GetList(ctx, "/", options, out); err != nil {
|
||||
t.Fatalf("Unable to get second page: %v", err)
|
||||
}
|
||||
if len(out.Continue) == 0 {
|
||||
t.Fatalf("No continuation token set")
|
||||
}
|
||||
validateResourceVersion := storagetesting.ResourceVersionNotOlderThan(lastRVString)
|
||||
storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items)
|
||||
if err := validateResourceVersion(out.ResourceVersion); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
continueFromThirdItem := out.Continue
|
||||
resolvedResourceVersionFromThirdItem := out.ResourceVersion
|
||||
out = &example.PodList{}
|
||||
options = storage.ListOptions{
|
||||
ResourceVersion: "0",
|
||||
Predicate: pred(1, continueFromThirdItem),
|
||||
Recursive: true,
|
||||
}
|
||||
if err := store.GetList(ctx, "/", options, out); err != nil {
|
||||
t.Fatalf("Unable to get second page: %v", err)
|
||||
}
|
||||
if len(out.Continue) != 0 {
|
||||
t.Fatalf("Unexpected continuation token set")
|
||||
}
|
||||
storagetesting.ExpectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items)
|
||||
if out.ResourceVersion != resolvedResourceVersionFromThirdItem {
|
||||
t.Fatalf("Expected list resource version to be %s, got %s", resolvedResourceVersionFromThirdItem, out.ResourceVersion)
|
||||
}
|
||||
}
|
||||
|
||||
func newTestLeaseManagerConfig() LeaseManagerConfig {
|
||||
cfg := NewDefaultLeaseManagerConfig()
|
||||
// As 30s is the default timeout for testing in global configuration,
|
||||
// we cannot wait longer than that in a single time: change it to 1s
|
||||
// for testing purposes. See wait.ForeverTestTimeout
|
||||
cfg.ReuseDurationSeconds = 1
|
||||
return cfg
|
||||
func TestListInconsistentContinuation(t *testing.T) {
|
||||
ctx, store, client := testSetup(t)
|
||||
storagetesting.RunTestListInconsistentContinuation(ctx, t, store, compactStorage(client))
|
||||
}
|
||||
|
||||
func newTestTransformer() value.Transformer {
|
||||
return storagetesting.NewPrefixTransformer([]byte(defaultTestPrefix), false)
|
||||
func TestConsistentList(t *testing.T) {
|
||||
ctx, store, _ := testSetup(t)
|
||||
storagetesting.RunTestConsistentList(ctx, t, &storeWithPrefixTransformer{store})
|
||||
}
|
||||
|
||||
type clientRecorder struct {
|
||||
reads uint64
|
||||
clientv3.KV
|
||||
func TestCount(t *testing.T) {
|
||||
ctx, store, _ := testSetup(t)
|
||||
storagetesting.RunTestCount(ctx, t, store)
|
||||
}
|
||||
|
||||
func (r *clientRecorder) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
|
||||
atomic.AddUint64(&r.reads, 1)
|
||||
return r.KV.Get(ctx, key, opts...)
|
||||
}
|
||||
|
||||
func (r *clientRecorder) GetReadsAndReset() uint64 {
|
||||
return atomic.SwapUint64(&r.reads, 0)
|
||||
}
|
||||
|
||||
type setupOptions struct {
|
||||
client func(*testing.T) *clientv3.Client
|
||||
codec runtime.Codec
|
||||
newFunc func() runtime.Object
|
||||
prefix string
|
||||
groupResource schema.GroupResource
|
||||
transformer value.Transformer
|
||||
pagingEnabled bool
|
||||
leaseConfig LeaseManagerConfig
|
||||
|
||||
recorderEnabled bool
|
||||
}
|
||||
|
||||
type setupOption func(*setupOptions)
|
||||
|
||||
func withClient(client *clientv3.Client) setupOption {
|
||||
return func(options *setupOptions) {
|
||||
options.client = func(t *testing.T) *clientv3.Client {
|
||||
return client
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func withClientConfig(config *embed.Config) setupOption {
|
||||
return func(options *setupOptions) {
|
||||
options.client = func(t *testing.T) *clientv3.Client {
|
||||
return testserver.RunEtcd(t, config)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func withCodec(codec runtime.Codec) setupOption {
|
||||
return func(options *setupOptions) {
|
||||
options.codec = codec
|
||||
}
|
||||
}
|
||||
|
||||
func withPrefix(prefix string) setupOption {
|
||||
return func(options *setupOptions) {
|
||||
options.prefix = prefix
|
||||
}
|
||||
}
|
||||
|
||||
func withoutPaging() setupOption {
|
||||
return func(options *setupOptions) {
|
||||
options.pagingEnabled = false
|
||||
}
|
||||
}
|
||||
|
||||
func withTransformer(transformer value.Transformer) setupOption {
|
||||
return func(options *setupOptions) {
|
||||
options.transformer = transformer
|
||||
}
|
||||
}
|
||||
|
||||
func withLeaseConfig(leaseConfig LeaseManagerConfig) setupOption {
|
||||
return func(options *setupOptions) {
|
||||
options.leaseConfig = leaseConfig
|
||||
}
|
||||
}
|
||||
|
||||
func withRecorder() setupOption {
|
||||
return func(options *setupOptions) {
|
||||
options.recorderEnabled = true
|
||||
}
|
||||
}
|
||||
|
||||
func withDefaults(options *setupOptions) {
|
||||
options.client = func(t *testing.T) *clientv3.Client {
|
||||
return testserver.RunEtcd(t, nil)
|
||||
}
|
||||
options.codec = apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
options.newFunc = newPod
|
||||
options.prefix = ""
|
||||
options.groupResource = schema.GroupResource{Resource: "pods"}
|
||||
options.transformer = newTestTransformer()
|
||||
options.pagingEnabled = true
|
||||
options.leaseConfig = newTestLeaseManagerConfig()
|
||||
}
|
||||
|
||||
var _ setupOption = withDefaults
|
||||
|
||||
func testSetup(t *testing.T, opts ...setupOption) (context.Context, *store, *clientv3.Client) {
|
||||
setupOpts := setupOptions{}
|
||||
opts = append([]setupOption{withDefaults}, opts...)
|
||||
for _, opt := range opts {
|
||||
opt(&setupOpts)
|
||||
}
|
||||
client := setupOpts.client(t)
|
||||
if setupOpts.recorderEnabled {
|
||||
client.KV = &clientRecorder{KV: client.KV}
|
||||
}
|
||||
store := newStore(
|
||||
client,
|
||||
setupOpts.codec,
|
||||
setupOpts.newFunc,
|
||||
setupOpts.prefix,
|
||||
setupOpts.groupResource,
|
||||
setupOpts.transformer,
|
||||
setupOpts.pagingEnabled,
|
||||
setupOpts.leaseConfig,
|
||||
)
|
||||
ctx := context.Background()
|
||||
return ctx, store, client
|
||||
}
|
||||
// =======================================================================
|
||||
// Implementation-specific tests are following.
|
||||
// The following tests are exercising the details of the implementation
|
||||
// not the actual user-facing contract of storage interface.
|
||||
// As such, they may focus e.g. on non-functional aspects like performance
|
||||
// impact.
|
||||
// =======================================================================
|
||||
|
||||
func TestPrefix(t *testing.T) {
|
||||
testcases := map[string]string{
|
||||
@ -651,119 +390,6 @@ func Test_growSlice(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// fancyTransformer creates next object on each call to
|
||||
// TransformFromStorage call.
|
||||
type fancyTransformer struct {
|
||||
transformer value.Transformer
|
||||
store *store
|
||||
|
||||
lock sync.Mutex
|
||||
index int
|
||||
}
|
||||
|
||||
func (t *fancyTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) {
|
||||
if err := t.createObject(ctx); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
return t.transformer.TransformFromStorage(ctx, data, dataCtx)
|
||||
}
|
||||
|
||||
func (t *fancyTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) {
|
||||
return t.transformer.TransformToStorage(ctx, data, dataCtx)
|
||||
}
|
||||
|
||||
func (t *fancyTransformer) createObject(ctx context.Context) error {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
|
||||
t.index++
|
||||
key := fmt.Sprintf("pod-%d", t.index)
|
||||
obj := &example.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: key,
|
||||
Labels: map[string]string{
|
||||
"even": strconv.FormatBool(t.index%2 == 0),
|
||||
},
|
||||
},
|
||||
}
|
||||
out := &example.Pod{}
|
||||
return t.store.Create(ctx, key, obj, out, 0)
|
||||
}
|
||||
|
||||
func TestConsistentList(t *testing.T) {
|
||||
transformer := &fancyTransformer{
|
||||
transformer: newTestTransformer(),
|
||||
}
|
||||
ctx, store, _ := testSetup(t, withTransformer(transformer))
|
||||
transformer.store = store
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
if err := transformer.createObject(ctx); err != nil {
|
||||
t.Fatalf("failed to create object: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
getAttrs := func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
pod, ok := obj.(*example.Pod)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("invalid object")
|
||||
}
|
||||
return labels.Set(pod.Labels), nil, nil
|
||||
}
|
||||
predicate := storage.SelectionPredicate{
|
||||
Label: labels.Set{"even": "true"}.AsSelector(),
|
||||
GetAttrs: getAttrs,
|
||||
Limit: 4,
|
||||
}
|
||||
|
||||
result1 := example.PodList{}
|
||||
options := storage.ListOptions{
|
||||
Predicate: predicate,
|
||||
Recursive: true,
|
||||
}
|
||||
if err := store.GetList(ctx, "/", options, &result1); err != nil {
|
||||
t.Fatalf("failed to list objects: %v", err)
|
||||
}
|
||||
|
||||
// List objects from the returned resource version.
|
||||
options = storage.ListOptions{
|
||||
Predicate: predicate,
|
||||
ResourceVersion: result1.ResourceVersion,
|
||||
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
|
||||
Recursive: true,
|
||||
}
|
||||
|
||||
result2 := example.PodList{}
|
||||
if err := store.GetList(ctx, "/", options, &result2); err != nil {
|
||||
t.Fatalf("failed to list objects: %v", err)
|
||||
}
|
||||
|
||||
storagetesting.ExpectNoDiff(t, "incorrect lists", result1, result2)
|
||||
|
||||
// Now also verify the ResourceVersionMatchNotOlderThan.
|
||||
options.ResourceVersionMatch = metav1.ResourceVersionMatchNotOlderThan
|
||||
|
||||
result3 := example.PodList{}
|
||||
if err := store.GetList(ctx, "/", options, &result3); err != nil {
|
||||
t.Fatalf("failed to list objects: %v", err)
|
||||
}
|
||||
|
||||
options.ResourceVersion = result3.ResourceVersion
|
||||
options.ResourceVersionMatch = metav1.ResourceVersionMatchExact
|
||||
|
||||
result4 := example.PodList{}
|
||||
if err := store.GetList(ctx, "/", options, &result4); err != nil {
|
||||
t.Fatalf("failed to list objects: %v", err)
|
||||
}
|
||||
|
||||
storagetesting.ExpectNoDiff(t, "incorrect lists", result3, result4)
|
||||
}
|
||||
|
||||
func TestCount(t *testing.T) {
|
||||
ctx, store, _ := testSetup(t)
|
||||
storagetesting.RunTestCount(ctx, t, store)
|
||||
}
|
||||
|
||||
func TestLeaseMaxObjectCount(t *testing.T) {
|
||||
ctx, store, _ := testSetup(t, withLeaseConfig(LeaseManagerConfig{
|
||||
ReuseDurationSeconds: defaultLeaseReuseDurationSeconds,
|
||||
@ -803,3 +429,134 @@ func TestLeaseMaxObjectCount(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ===================================================
|
||||
// Test-setup related function are following.
|
||||
// ===================================================
|
||||
|
||||
func newTestLeaseManagerConfig() LeaseManagerConfig {
|
||||
cfg := NewDefaultLeaseManagerConfig()
|
||||
// As 30s is the default timeout for testing in global configuration,
|
||||
// we cannot wait longer than that in a single time: change it to 1s
|
||||
// for testing purposes. See wait.ForeverTestTimeout
|
||||
cfg.ReuseDurationSeconds = 1
|
||||
return cfg
|
||||
}
|
||||
|
||||
func newTestTransformer() value.Transformer {
|
||||
return storagetesting.NewPrefixTransformer([]byte(defaultTestPrefix), false)
|
||||
}
|
||||
|
||||
type clientRecorder struct {
|
||||
reads uint64
|
||||
clientv3.KV
|
||||
}
|
||||
|
||||
func (r *clientRecorder) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
|
||||
atomic.AddUint64(&r.reads, 1)
|
||||
return r.KV.Get(ctx, key, opts...)
|
||||
}
|
||||
|
||||
func (r *clientRecorder) GetReadsAndReset() uint64 {
|
||||
return atomic.SwapUint64(&r.reads, 0)
|
||||
}
|
||||
|
||||
type setupOptions struct {
|
||||
client func(*testing.T) *clientv3.Client
|
||||
codec runtime.Codec
|
||||
newFunc func() runtime.Object
|
||||
prefix string
|
||||
groupResource schema.GroupResource
|
||||
transformer value.Transformer
|
||||
pagingEnabled bool
|
||||
leaseConfig LeaseManagerConfig
|
||||
|
||||
recorderEnabled bool
|
||||
}
|
||||
|
||||
type setupOption func(*setupOptions)
|
||||
|
||||
func withClient(client *clientv3.Client) setupOption {
|
||||
return func(options *setupOptions) {
|
||||
options.client = func(t *testing.T) *clientv3.Client {
|
||||
return client
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func withClientConfig(config *embed.Config) setupOption {
|
||||
return func(options *setupOptions) {
|
||||
options.client = func(t *testing.T) *clientv3.Client {
|
||||
return testserver.RunEtcd(t, config)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func withCodec(codec runtime.Codec) setupOption {
|
||||
return func(options *setupOptions) {
|
||||
options.codec = codec
|
||||
}
|
||||
}
|
||||
|
||||
func withPrefix(prefix string) setupOption {
|
||||
return func(options *setupOptions) {
|
||||
options.prefix = prefix
|
||||
}
|
||||
}
|
||||
|
||||
func withoutPaging() setupOption {
|
||||
return func(options *setupOptions) {
|
||||
options.pagingEnabled = false
|
||||
}
|
||||
}
|
||||
|
||||
func withLeaseConfig(leaseConfig LeaseManagerConfig) setupOption {
|
||||
return func(options *setupOptions) {
|
||||
options.leaseConfig = leaseConfig
|
||||
}
|
||||
}
|
||||
|
||||
func withRecorder() setupOption {
|
||||
return func(options *setupOptions) {
|
||||
options.recorderEnabled = true
|
||||
}
|
||||
}
|
||||
|
||||
func withDefaults(options *setupOptions) {
|
||||
options.client = func(t *testing.T) *clientv3.Client {
|
||||
return testserver.RunEtcd(t, nil)
|
||||
}
|
||||
options.codec = apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
options.newFunc = newPod
|
||||
options.prefix = ""
|
||||
options.groupResource = schema.GroupResource{Resource: "pods"}
|
||||
options.transformer = newTestTransformer()
|
||||
options.pagingEnabled = true
|
||||
options.leaseConfig = newTestLeaseManagerConfig()
|
||||
}
|
||||
|
||||
var _ setupOption = withDefaults
|
||||
|
||||
func testSetup(t *testing.T, opts ...setupOption) (context.Context, *store, *clientv3.Client) {
|
||||
setupOpts := setupOptions{}
|
||||
opts = append([]setupOption{withDefaults}, opts...)
|
||||
for _, opt := range opts {
|
||||
opt(&setupOpts)
|
||||
}
|
||||
client := setupOpts.client(t)
|
||||
if setupOpts.recorderEnabled {
|
||||
client.KV = &clientRecorder{KV: client.KV}
|
||||
}
|
||||
store := newStore(
|
||||
client,
|
||||
setupOpts.codec,
|
||||
setupOpts.newFunc,
|
||||
setupOpts.prefix,
|
||||
setupOpts.groupResource,
|
||||
setupOpts.transformer,
|
||||
setupOpts.pagingEnabled,
|
||||
setupOpts.leaseConfig,
|
||||
)
|
||||
ctx := context.Background()
|
||||
return ctx, store, client
|
||||
}
|
||||
|
@ -49,64 +49,9 @@ func TestDeleteTriggerWatch(t *testing.T) {
|
||||
storagetesting.RunTestDeleteTriggerWatch(ctx, t, store)
|
||||
}
|
||||
|
||||
// TestWatchFromZero tests that
|
||||
// - watch from 0 should sync up and grab the object added before
|
||||
// - watch from 0 is able to return events for objects whose previous version has been compacted
|
||||
func TestWatchFromZero(t *testing.T) {
|
||||
ctx, store, client := testSetup(t)
|
||||
key, storedObj := storagetesting.TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}})
|
||||
|
||||
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
||||
if err != nil {
|
||||
t.Fatalf("Watch failed: %v", err)
|
||||
}
|
||||
storagetesting.TestCheckResult(t, watch.Added, w, storedObj)
|
||||
w.Stop()
|
||||
|
||||
// Update
|
||||
out := &example.Pod{}
|
||||
err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
|
||||
func(runtime.Object) (runtime.Object, error) {
|
||||
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns", Annotations: map[string]string{"a": "1"}}}, nil
|
||||
}), nil)
|
||||
if err != nil {
|
||||
t.Fatalf("GuaranteedUpdate failed: %v", err)
|
||||
}
|
||||
|
||||
// Make sure when we watch from 0 we receive an ADDED event
|
||||
w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
||||
if err != nil {
|
||||
t.Fatalf("Watch failed: %v", err)
|
||||
}
|
||||
storagetesting.TestCheckResult(t, watch.Added, w, out)
|
||||
w.Stop()
|
||||
|
||||
// Update again
|
||||
out = &example.Pod{}
|
||||
err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
|
||||
func(runtime.Object) (runtime.Object, error) {
|
||||
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}, nil
|
||||
}), nil)
|
||||
if err != nil {
|
||||
t.Fatalf("GuaranteedUpdate failed: %v", err)
|
||||
}
|
||||
|
||||
// Compact previous versions
|
||||
revToCompact, err := store.versioner.ParseResourceVersion(out.ResourceVersion)
|
||||
if err != nil {
|
||||
t.Fatalf("Error converting %q to an int: %v", storedObj.ResourceVersion, err)
|
||||
}
|
||||
_, err = client.Compact(ctx, int64(revToCompact), clientv3.WithCompactPhysical())
|
||||
if err != nil {
|
||||
t.Fatalf("Error compacting: %v", err)
|
||||
}
|
||||
|
||||
// Make sure we can still watch from 0 and receive an ADDED event
|
||||
w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
||||
if err != nil {
|
||||
t.Fatalf("Watch failed: %v", err)
|
||||
}
|
||||
storagetesting.TestCheckResult(t, watch.Added, w, out)
|
||||
storagetesting.RunTestWatchFromZero(ctx, t, store, compactStorage(client))
|
||||
}
|
||||
|
||||
// TestWatchFromNoneZero tests that
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"math"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
@ -1410,6 +1411,161 @@ func RunTestListContinuationWithFilter(ctx context.Context, t *testing.T, store
|
||||
}
|
||||
}
|
||||
|
||||
type Compaction func(ctx context.Context, t *testing.T, resourceVersion string)
|
||||
|
||||
func RunTestListInconsistentContinuation(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction) {
|
||||
if compaction == nil {
|
||||
t.Skipf("compaction callback not provided")
|
||||
}
|
||||
|
||||
// Setup storage with the following structure:
|
||||
// /
|
||||
// - one-level/
|
||||
// | - test
|
||||
// |
|
||||
// - two-level/
|
||||
// - 1/
|
||||
// | - test
|
||||
// |
|
||||
// - 2/
|
||||
// - test
|
||||
//
|
||||
preset := []struct {
|
||||
key string
|
||||
obj *example.Pod
|
||||
storedObj *example.Pod
|
||||
}{
|
||||
{
|
||||
key: "/one-level/test",
|
||||
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
|
||||
},
|
||||
{
|
||||
key: "/two-level/1/test",
|
||||
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
|
||||
},
|
||||
{
|
||||
key: "/two-level/2/test",
|
||||
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}},
|
||||
},
|
||||
}
|
||||
|
||||
for i, ps := range preset {
|
||||
preset[i].storedObj = &example.Pod{}
|
||||
err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Set failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
pred := func(limit int64, continueValue string) storage.SelectionPredicate {
|
||||
return storage.SelectionPredicate{
|
||||
Limit: limit,
|
||||
Continue: continueValue,
|
||||
Label: labels.Everything(),
|
||||
Field: fields.Everything(),
|
||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
pod := obj.(*example.Pod)
|
||||
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
out := &example.PodList{}
|
||||
options := storage.ListOptions{
|
||||
ResourceVersion: "0",
|
||||
Predicate: pred(1, ""),
|
||||
Recursive: true,
|
||||
}
|
||||
if err := store.GetList(ctx, "/", options, out); err != nil {
|
||||
t.Fatalf("Unable to get initial list: %v", err)
|
||||
}
|
||||
if len(out.Continue) == 0 {
|
||||
t.Fatalf("No continuation token set")
|
||||
}
|
||||
ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj}, out.Items)
|
||||
|
||||
continueFromSecondItem := out.Continue
|
||||
|
||||
// update /two-level/2/test/bar
|
||||
oldName := preset[2].obj.Name
|
||||
newPod := &example.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: oldName,
|
||||
Labels: map[string]string{
|
||||
"state": "new",
|
||||
},
|
||||
},
|
||||
}
|
||||
if err := store.GuaranteedUpdate(ctx, preset[2].key, preset[2].storedObj, false, nil,
|
||||
func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) {
|
||||
return newPod, nil, nil
|
||||
}, newPod); err != nil {
|
||||
t.Fatalf("update failed: %v", err)
|
||||
}
|
||||
|
||||
// compact to latest revision.
|
||||
lastRVString := preset[2].storedObj.ResourceVersion
|
||||
compaction(ctx, t, lastRVString)
|
||||
|
||||
// The old continue token should have expired
|
||||
options = storage.ListOptions{
|
||||
ResourceVersion: "0",
|
||||
Predicate: pred(0, continueFromSecondItem),
|
||||
Recursive: true,
|
||||
}
|
||||
err := store.GetList(ctx, "/", options, out)
|
||||
if err == nil {
|
||||
t.Fatalf("unexpected no error")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "The provided continue parameter is too old ") {
|
||||
t.Fatalf("unexpected error message %v", err)
|
||||
}
|
||||
status, ok := err.(apierrors.APIStatus)
|
||||
if !ok {
|
||||
t.Fatalf("expect error of implements the APIStatus interface, got %v", reflect.TypeOf(err))
|
||||
}
|
||||
inconsistentContinueFromSecondItem := status.Status().ListMeta.Continue
|
||||
if len(inconsistentContinueFromSecondItem) == 0 {
|
||||
t.Fatalf("expect non-empty continue token")
|
||||
}
|
||||
|
||||
out = &example.PodList{}
|
||||
options = storage.ListOptions{
|
||||
ResourceVersion: "0",
|
||||
Predicate: pred(1, inconsistentContinueFromSecondItem),
|
||||
Recursive: true,
|
||||
}
|
||||
if err := store.GetList(ctx, "/", options, out); err != nil {
|
||||
t.Fatalf("Unable to get second page: %v", err)
|
||||
}
|
||||
if len(out.Continue) == 0 {
|
||||
t.Fatalf("No continuation token set")
|
||||
}
|
||||
validateResourceVersion := ResourceVersionNotOlderThan(lastRVString)
|
||||
ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items)
|
||||
if err := validateResourceVersion(out.ResourceVersion); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
continueFromThirdItem := out.Continue
|
||||
resolvedResourceVersionFromThirdItem := out.ResourceVersion
|
||||
out = &example.PodList{}
|
||||
options = storage.ListOptions{
|
||||
ResourceVersion: "0",
|
||||
Predicate: pred(1, continueFromThirdItem),
|
||||
Recursive: true,
|
||||
}
|
||||
if err := store.GetList(ctx, "/", options, out); err != nil {
|
||||
t.Fatalf("Unable to get second page: %v", err)
|
||||
}
|
||||
if len(out.Continue) != 0 {
|
||||
t.Fatalf("Unexpected continuation token set")
|
||||
}
|
||||
ExpectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items)
|
||||
if out.ResourceVersion != resolvedResourceVersionFromThirdItem {
|
||||
t.Fatalf("Expected list resource version to be %s, got %s", resolvedResourceVersionFromThirdItem, out.ResourceVersion)
|
||||
}
|
||||
}
|
||||
|
||||
type PrefixTransformerModifier func(*PrefixTransformer) value.Transformer
|
||||
|
||||
type InterfaceWithPrefixTransformer interface {
|
||||
@ -1418,6 +1574,94 @@ type InterfaceWithPrefixTransformer interface {
|
||||
UpdatePrefixTransformer(PrefixTransformerModifier) func()
|
||||
}
|
||||
|
||||
func RunTestConsistentList(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) {
|
||||
nextPod := func(index uint32) (string, *example.Pod) {
|
||||
key := fmt.Sprintf("pod-%d", index)
|
||||
obj := &example.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: key,
|
||||
Labels: map[string]string{
|
||||
"even": strconv.FormatBool(index%2 == 0),
|
||||
},
|
||||
},
|
||||
}
|
||||
return key, obj
|
||||
}
|
||||
|
||||
transformer := &reproducingTransformer{
|
||||
store: store,
|
||||
nextObject: nextPod,
|
||||
}
|
||||
|
||||
revertTransformer := store.UpdatePrefixTransformer(
|
||||
func(previousTransformer *PrefixTransformer) value.Transformer {
|
||||
transformer.wrapped = previousTransformer
|
||||
return transformer
|
||||
})
|
||||
defer revertTransformer()
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
if err := transformer.createObject(ctx); err != nil {
|
||||
t.Fatalf("failed to create object: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
getAttrs := func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
pod, ok := obj.(*example.Pod)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("invalid object")
|
||||
}
|
||||
return labels.Set(pod.Labels), nil, nil
|
||||
}
|
||||
predicate := storage.SelectionPredicate{
|
||||
Label: labels.Set{"even": "true"}.AsSelector(),
|
||||
GetAttrs: getAttrs,
|
||||
Limit: 4,
|
||||
}
|
||||
|
||||
result1 := example.PodList{}
|
||||
options := storage.ListOptions{
|
||||
Predicate: predicate,
|
||||
Recursive: true,
|
||||
}
|
||||
if err := store.GetList(ctx, "/", options, &result1); err != nil {
|
||||
t.Fatalf("failed to list objects: %v", err)
|
||||
}
|
||||
|
||||
// List objects from the returned resource version.
|
||||
options = storage.ListOptions{
|
||||
Predicate: predicate,
|
||||
ResourceVersion: result1.ResourceVersion,
|
||||
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
|
||||
Recursive: true,
|
||||
}
|
||||
|
||||
result2 := example.PodList{}
|
||||
if err := store.GetList(ctx, "/", options, &result2); err != nil {
|
||||
t.Fatalf("failed to list objects: %v", err)
|
||||
}
|
||||
|
||||
ExpectNoDiff(t, "incorrect lists", result1, result2)
|
||||
|
||||
// Now also verify the ResourceVersionMatchNotOlderThan.
|
||||
options.ResourceVersionMatch = metav1.ResourceVersionMatchNotOlderThan
|
||||
|
||||
result3 := example.PodList{}
|
||||
if err := store.GetList(ctx, "/", options, &result3); err != nil {
|
||||
t.Fatalf("failed to list objects: %v", err)
|
||||
}
|
||||
|
||||
options.ResourceVersion = result3.ResourceVersion
|
||||
options.ResourceVersionMatch = metav1.ResourceVersionMatchExact
|
||||
|
||||
result4 := example.PodList{}
|
||||
if err := store.GetList(ctx, "/", options, &result4); err != nil {
|
||||
t.Fatalf("failed to list objects: %v", err)
|
||||
}
|
||||
|
||||
ExpectNoDiff(t, "incorrect lists", result3, result4)
|
||||
}
|
||||
|
||||
func RunTestGuaranteedUpdate(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer, validation KeyValidation) {
|
||||
key := "/testkey"
|
||||
|
||||
|
@ -235,3 +235,33 @@ func (p *PrefixTransformer) TransformToStorage(ctx context.Context, data []byte,
|
||||
func (p *PrefixTransformer) GetReadsAndReset() uint64 {
|
||||
return atomic.SwapUint64(&p.reads, 0)
|
||||
}
|
||||
|
||||
// reproducingTransformer is a custom test-only transformer used purely
|
||||
// for testing consistency.
|
||||
// It allows for creating predefined objects on TransformFromStorage operations,
|
||||
// which allows for precise in time injection of new objects in the middle of
|
||||
// read operations.
|
||||
type reproducingTransformer struct {
|
||||
wrapped value.Transformer
|
||||
store storage.Interface
|
||||
|
||||
index uint32
|
||||
nextObject func(uint32) (string, *example.Pod)
|
||||
}
|
||||
|
||||
func (rt *reproducingTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) {
|
||||
if err := rt.createObject(ctx); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
return rt.wrapped.TransformFromStorage(ctx, data, dataCtx)
|
||||
}
|
||||
|
||||
func (rt *reproducingTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) {
|
||||
return rt.wrapped.TransformToStorage(ctx, data, dataCtx)
|
||||
}
|
||||
|
||||
func (rt *reproducingTransformer) createObject(ctx context.Context) error {
|
||||
key, obj := rt.nextObject(atomic.AddUint32(&rt.index, 1))
|
||||
out := &example.Pod{}
|
||||
return rt.store.Create(ctx, key, obj, out, 0)
|
||||
}
|
||||
|
@ -121,6 +121,62 @@ func testWatch(ctx context.Context, t *testing.T, store storage.Interface, recur
|
||||
}
|
||||
}
|
||||
|
||||
// RunTestWatchFromZero tests that
|
||||
// - watch from 0 should sync up and grab the object added before
|
||||
// - watch from 0 is able to return events for objects whose previous version has been compacted
|
||||
func RunTestWatchFromZero(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction) {
|
||||
key, storedObj := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}})
|
||||
|
||||
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
||||
if err != nil {
|
||||
t.Fatalf("Watch failed: %v", err)
|
||||
}
|
||||
TestCheckResult(t, watch.Added, w, storedObj)
|
||||
w.Stop()
|
||||
|
||||
// Update
|
||||
out := &example.Pod{}
|
||||
err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
|
||||
func(runtime.Object) (runtime.Object, error) {
|
||||
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns", Annotations: map[string]string{"a": "1"}}}, nil
|
||||
}), nil)
|
||||
if err != nil {
|
||||
t.Fatalf("GuaranteedUpdate failed: %v", err)
|
||||
}
|
||||
|
||||
// Make sure when we watch from 0 we receive an ADDED event
|
||||
w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
||||
if err != nil {
|
||||
t.Fatalf("Watch failed: %v", err)
|
||||
}
|
||||
TestCheckResult(t, watch.Added, w, out)
|
||||
w.Stop()
|
||||
|
||||
if compaction == nil {
|
||||
t.Skip("compaction callback not provided")
|
||||
}
|
||||
|
||||
// Update again
|
||||
out = &example.Pod{}
|
||||
err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
|
||||
func(runtime.Object) (runtime.Object, error) {
|
||||
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}, nil
|
||||
}), nil)
|
||||
if err != nil {
|
||||
t.Fatalf("GuaranteedUpdate failed: %v", err)
|
||||
}
|
||||
|
||||
// Compact previous versions
|
||||
compaction(ctx, t, out.ResourceVersion)
|
||||
|
||||
// Make sure we can still watch from 0 and receive an ADDED event
|
||||
w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
||||
if err != nil {
|
||||
t.Fatalf("Watch failed: %v", err)
|
||||
}
|
||||
TestCheckResult(t, watch.Added, w, out)
|
||||
}
|
||||
|
||||
func RunTestDeleteTriggerWatch(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
key, storedObj := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
|
||||
|
Loading…
Reference in New Issue
Block a user