mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Refactor storage tests using compaction
This commit is contained in:
parent
7da7ddd779
commit
b02f172cbd
@ -23,7 +23,6 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
@ -33,7 +32,6 @@ import (
|
|||||||
"google.golang.org/grpc/grpclog"
|
"google.golang.org/grpc/grpclog"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/apitesting"
|
"k8s.io/apimachinery/pkg/api/apitesting"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
@ -268,158 +266,7 @@ func compactStorage(etcdClient *clientv3.Client) storagetesting.Compaction {
|
|||||||
|
|
||||||
func TestListInconsistentContinuation(t *testing.T) {
|
func TestListInconsistentContinuation(t *testing.T) {
|
||||||
ctx, store, client := testSetup(t)
|
ctx, store, client := testSetup(t)
|
||||||
compaction := compactStorage(client)
|
storagetesting.RunTestListInconsistentContinuation(ctx, t, store, compactStorage(client))
|
||||||
|
|
||||||
if compaction == nil {
|
|
||||||
t.Skipf("compaction callback not provided")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Setup storage with the following structure:
|
|
||||||
// /
|
|
||||||
// - one-level/
|
|
||||||
// | - test
|
|
||||||
// |
|
|
||||||
// - two-level/
|
|
||||||
// - 1/
|
|
||||||
// | - test
|
|
||||||
// |
|
|
||||||
// - 2/
|
|
||||||
// - test
|
|
||||||
//
|
|
||||||
preset := []struct {
|
|
||||||
key string
|
|
||||||
obj *example.Pod
|
|
||||||
storedObj *example.Pod
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
key: "/one-level/test",
|
|
||||||
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
key: "/two-level/1/test",
|
|
||||||
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
key: "/two-level/2/test",
|
|
||||||
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for i, ps := range preset {
|
|
||||||
preset[i].storedObj = &example.Pod{}
|
|
||||||
err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Set failed: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pred := func(limit int64, continueValue string) storage.SelectionPredicate {
|
|
||||||
return storage.SelectionPredicate{
|
|
||||||
Limit: limit,
|
|
||||||
Continue: continueValue,
|
|
||||||
Label: labels.Everything(),
|
|
||||||
Field: fields.Everything(),
|
|
||||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
|
||||||
pod := obj.(*example.Pod)
|
|
||||||
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
out := &example.PodList{}
|
|
||||||
options := storage.ListOptions{
|
|
||||||
ResourceVersion: "0",
|
|
||||||
Predicate: pred(1, ""),
|
|
||||||
Recursive: true,
|
|
||||||
}
|
|
||||||
if err := store.GetList(ctx, "/", options, out); err != nil {
|
|
||||||
t.Fatalf("Unable to get initial list: %v", err)
|
|
||||||
}
|
|
||||||
if len(out.Continue) == 0 {
|
|
||||||
t.Fatalf("No continuation token set")
|
|
||||||
}
|
|
||||||
storagetesting.ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj}, out.Items)
|
|
||||||
|
|
||||||
continueFromSecondItem := out.Continue
|
|
||||||
|
|
||||||
// update /two-level/2/test/bar
|
|
||||||
oldName := preset[2].obj.Name
|
|
||||||
newPod := &example.Pod{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: oldName,
|
|
||||||
Labels: map[string]string{
|
|
||||||
"state": "new",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
if err := store.GuaranteedUpdate(ctx, preset[2].key, preset[2].storedObj, false, nil,
|
|
||||||
func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) {
|
|
||||||
return newPod, nil, nil
|
|
||||||
}, newPod); err != nil {
|
|
||||||
t.Fatalf("update failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// compact to latest revision.
|
|
||||||
lastRVString := preset[2].storedObj.ResourceVersion
|
|
||||||
compaction(ctx, t, lastRVString)
|
|
||||||
|
|
||||||
// The old continue token should have expired
|
|
||||||
options = storage.ListOptions{
|
|
||||||
ResourceVersion: "0",
|
|
||||||
Predicate: pred(0, continueFromSecondItem),
|
|
||||||
Recursive: true,
|
|
||||||
}
|
|
||||||
err := store.GetList(ctx, "/", options, out)
|
|
||||||
if err == nil {
|
|
||||||
t.Fatalf("unexpected no error")
|
|
||||||
}
|
|
||||||
if !strings.Contains(err.Error(), inconsistentContinue) {
|
|
||||||
t.Fatalf("unexpected error message %v", err)
|
|
||||||
}
|
|
||||||
status, ok := err.(apierrors.APIStatus)
|
|
||||||
if !ok {
|
|
||||||
t.Fatalf("expect error of implements the APIStatus interface, got %v", reflect.TypeOf(err))
|
|
||||||
}
|
|
||||||
inconsistentContinueFromSecondItem := status.Status().ListMeta.Continue
|
|
||||||
if len(inconsistentContinueFromSecondItem) == 0 {
|
|
||||||
t.Fatalf("expect non-empty continue token")
|
|
||||||
}
|
|
||||||
|
|
||||||
out = &example.PodList{}
|
|
||||||
options = storage.ListOptions{
|
|
||||||
ResourceVersion: "0",
|
|
||||||
Predicate: pred(1, inconsistentContinueFromSecondItem),
|
|
||||||
Recursive: true,
|
|
||||||
}
|
|
||||||
if err := store.GetList(ctx, "/", options, out); err != nil {
|
|
||||||
t.Fatalf("Unable to get second page: %v", err)
|
|
||||||
}
|
|
||||||
if len(out.Continue) == 0 {
|
|
||||||
t.Fatalf("No continuation token set")
|
|
||||||
}
|
|
||||||
validateResourceVersion := storagetesting.ResourceVersionNotOlderThan(lastRVString)
|
|
||||||
storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items)
|
|
||||||
if err := validateResourceVersion(out.ResourceVersion); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
continueFromThirdItem := out.Continue
|
|
||||||
resolvedResourceVersionFromThirdItem := out.ResourceVersion
|
|
||||||
out = &example.PodList{}
|
|
||||||
options = storage.ListOptions{
|
|
||||||
ResourceVersion: "0",
|
|
||||||
Predicate: pred(1, continueFromThirdItem),
|
|
||||||
Recursive: true,
|
|
||||||
}
|
|
||||||
if err := store.GetList(ctx, "/", options, out); err != nil {
|
|
||||||
t.Fatalf("Unable to get second page: %v", err)
|
|
||||||
}
|
|
||||||
if len(out.Continue) != 0 {
|
|
||||||
t.Fatalf("Unexpected continuation token set")
|
|
||||||
}
|
|
||||||
storagetesting.ExpectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items)
|
|
||||||
if out.ResourceVersion != resolvedResourceVersionFromThirdItem {
|
|
||||||
t.Fatalf("Expected list resource version to be %s, got %s", resolvedResourceVersionFromThirdItem, out.ResourceVersion)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTestLeaseManagerConfig() LeaseManagerConfig {
|
func newTestLeaseManagerConfig() LeaseManagerConfig {
|
||||||
|
@ -49,63 +49,9 @@ func TestDeleteTriggerWatch(t *testing.T) {
|
|||||||
storagetesting.RunTestDeleteTriggerWatch(ctx, t, store)
|
storagetesting.RunTestDeleteTriggerWatch(ctx, t, store)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestWatchFromZero tests that
|
|
||||||
// - watch from 0 should sync up and grab the object added before
|
|
||||||
// - watch from 0 is able to return events for objects whose previous version has been compacted
|
|
||||||
func TestWatchFromZero(t *testing.T) {
|
func TestWatchFromZero(t *testing.T) {
|
||||||
ctx, store, client := testSetup(t)
|
ctx, store, client := testSetup(t)
|
||||||
compaction := compactStorage(client)
|
storagetesting.RunTestWatchFromZero(ctx, t, store, compactStorage(client))
|
||||||
|
|
||||||
key, storedObj := storagetesting.TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}})
|
|
||||||
|
|
||||||
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Watch failed: %v", err)
|
|
||||||
}
|
|
||||||
storagetesting.TestCheckResult(t, watch.Added, w, storedObj)
|
|
||||||
w.Stop()
|
|
||||||
|
|
||||||
// Update
|
|
||||||
out := &example.Pod{}
|
|
||||||
err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
|
|
||||||
func(runtime.Object) (runtime.Object, error) {
|
|
||||||
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns", Annotations: map[string]string{"a": "1"}}}, nil
|
|
||||||
}), nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("GuaranteedUpdate failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make sure when we watch from 0 we receive an ADDED event
|
|
||||||
w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Watch failed: %v", err)
|
|
||||||
}
|
|
||||||
storagetesting.TestCheckResult(t, watch.Added, w, out)
|
|
||||||
w.Stop()
|
|
||||||
|
|
||||||
if compaction == nil {
|
|
||||||
t.Skip("compaction callback not provided")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update again
|
|
||||||
out = &example.Pod{}
|
|
||||||
err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
|
|
||||||
func(runtime.Object) (runtime.Object, error) {
|
|
||||||
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}, nil
|
|
||||||
}), nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("GuaranteedUpdate failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Compact previous versions
|
|
||||||
compaction(ctx, t, out.ResourceVersion)
|
|
||||||
|
|
||||||
// Make sure we can still watch from 0 and receive an ADDED event
|
|
||||||
w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Watch failed: %v", err)
|
|
||||||
}
|
|
||||||
storagetesting.TestCheckResult(t, watch.Added, w, out)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestWatchFromNoneZero tests that
|
// TestWatchFromNoneZero tests that
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
"math"
|
"math"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
@ -1412,6 +1413,159 @@ func RunTestListContinuationWithFilter(ctx context.Context, t *testing.T, store
|
|||||||
|
|
||||||
type Compaction func(ctx context.Context, t *testing.T, resourceVersion string)
|
type Compaction func(ctx context.Context, t *testing.T, resourceVersion string)
|
||||||
|
|
||||||
|
func RunTestListInconsistentContinuation(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction) {
|
||||||
|
if compaction == nil {
|
||||||
|
t.Skipf("compaction callback not provided")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup storage with the following structure:
|
||||||
|
// /
|
||||||
|
// - one-level/
|
||||||
|
// | - test
|
||||||
|
// |
|
||||||
|
// - two-level/
|
||||||
|
// - 1/
|
||||||
|
// | - test
|
||||||
|
// |
|
||||||
|
// - 2/
|
||||||
|
// - test
|
||||||
|
//
|
||||||
|
preset := []struct {
|
||||||
|
key string
|
||||||
|
obj *example.Pod
|
||||||
|
storedObj *example.Pod
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
key: "/one-level/test",
|
||||||
|
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
key: "/two-level/1/test",
|
||||||
|
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
key: "/two-level/2/test",
|
||||||
|
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, ps := range preset {
|
||||||
|
preset[i].storedObj = &example.Pod{}
|
||||||
|
err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Set failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pred := func(limit int64, continueValue string) storage.SelectionPredicate {
|
||||||
|
return storage.SelectionPredicate{
|
||||||
|
Limit: limit,
|
||||||
|
Continue: continueValue,
|
||||||
|
Label: labels.Everything(),
|
||||||
|
Field: fields.Everything(),
|
||||||
|
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||||
|
pod := obj.(*example.Pod)
|
||||||
|
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
out := &example.PodList{}
|
||||||
|
options := storage.ListOptions{
|
||||||
|
ResourceVersion: "0",
|
||||||
|
Predicate: pred(1, ""),
|
||||||
|
Recursive: true,
|
||||||
|
}
|
||||||
|
if err := store.GetList(ctx, "/", options, out); err != nil {
|
||||||
|
t.Fatalf("Unable to get initial list: %v", err)
|
||||||
|
}
|
||||||
|
if len(out.Continue) == 0 {
|
||||||
|
t.Fatalf("No continuation token set")
|
||||||
|
}
|
||||||
|
ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj}, out.Items)
|
||||||
|
|
||||||
|
continueFromSecondItem := out.Continue
|
||||||
|
|
||||||
|
// update /two-level/2/test/bar
|
||||||
|
oldName := preset[2].obj.Name
|
||||||
|
newPod := &example.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: oldName,
|
||||||
|
Labels: map[string]string{
|
||||||
|
"state": "new",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := store.GuaranteedUpdate(ctx, preset[2].key, preset[2].storedObj, false, nil,
|
||||||
|
func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) {
|
||||||
|
return newPod, nil, nil
|
||||||
|
}, newPod); err != nil {
|
||||||
|
t.Fatalf("update failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// compact to latest revision.
|
||||||
|
lastRVString := preset[2].storedObj.ResourceVersion
|
||||||
|
compaction(ctx, t, lastRVString)
|
||||||
|
|
||||||
|
// The old continue token should have expired
|
||||||
|
options = storage.ListOptions{
|
||||||
|
ResourceVersion: "0",
|
||||||
|
Predicate: pred(0, continueFromSecondItem),
|
||||||
|
Recursive: true,
|
||||||
|
}
|
||||||
|
err := store.GetList(ctx, "/", options, out)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("unexpected no error")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "The provided continue parameter is too old ") {
|
||||||
|
t.Fatalf("unexpected error message %v", err)
|
||||||
|
}
|
||||||
|
status, ok := err.(apierrors.APIStatus)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("expect error of implements the APIStatus interface, got %v", reflect.TypeOf(err))
|
||||||
|
}
|
||||||
|
inconsistentContinueFromSecondItem := status.Status().ListMeta.Continue
|
||||||
|
if len(inconsistentContinueFromSecondItem) == 0 {
|
||||||
|
t.Fatalf("expect non-empty continue token")
|
||||||
|
}
|
||||||
|
|
||||||
|
out = &example.PodList{}
|
||||||
|
options = storage.ListOptions{
|
||||||
|
ResourceVersion: "0",
|
||||||
|
Predicate: pred(1, inconsistentContinueFromSecondItem),
|
||||||
|
Recursive: true,
|
||||||
|
}
|
||||||
|
if err := store.GetList(ctx, "/", options, out); err != nil {
|
||||||
|
t.Fatalf("Unable to get second page: %v", err)
|
||||||
|
}
|
||||||
|
if len(out.Continue) == 0 {
|
||||||
|
t.Fatalf("No continuation token set")
|
||||||
|
}
|
||||||
|
validateResourceVersion := ResourceVersionNotOlderThan(lastRVString)
|
||||||
|
ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items)
|
||||||
|
if err := validateResourceVersion(out.ResourceVersion); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
continueFromThirdItem := out.Continue
|
||||||
|
resolvedResourceVersionFromThirdItem := out.ResourceVersion
|
||||||
|
out = &example.PodList{}
|
||||||
|
options = storage.ListOptions{
|
||||||
|
ResourceVersion: "0",
|
||||||
|
Predicate: pred(1, continueFromThirdItem),
|
||||||
|
Recursive: true,
|
||||||
|
}
|
||||||
|
if err := store.GetList(ctx, "/", options, out); err != nil {
|
||||||
|
t.Fatalf("Unable to get second page: %v", err)
|
||||||
|
}
|
||||||
|
if len(out.Continue) != 0 {
|
||||||
|
t.Fatalf("Unexpected continuation token set")
|
||||||
|
}
|
||||||
|
ExpectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items)
|
||||||
|
if out.ResourceVersion != resolvedResourceVersionFromThirdItem {
|
||||||
|
t.Fatalf("Expected list resource version to be %s, got %s", resolvedResourceVersionFromThirdItem, out.ResourceVersion)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type PrefixTransformerModifier func(*PrefixTransformer) value.Transformer
|
type PrefixTransformerModifier func(*PrefixTransformer) value.Transformer
|
||||||
|
|
||||||
type InterfaceWithPrefixTransformer interface {
|
type InterfaceWithPrefixTransformer interface {
|
||||||
|
@ -121,6 +121,62 @@ func testWatch(ctx context.Context, t *testing.T, store storage.Interface, recur
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RunTestWatchFromZero tests that
|
||||||
|
// - watch from 0 should sync up and grab the object added before
|
||||||
|
// - watch from 0 is able to return events for objects whose previous version has been compacted
|
||||||
|
func RunTestWatchFromZero(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction) {
|
||||||
|
key, storedObj := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}})
|
||||||
|
|
||||||
|
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Watch failed: %v", err)
|
||||||
|
}
|
||||||
|
TestCheckResult(t, watch.Added, w, storedObj)
|
||||||
|
w.Stop()
|
||||||
|
|
||||||
|
// Update
|
||||||
|
out := &example.Pod{}
|
||||||
|
err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
|
||||||
|
func(runtime.Object) (runtime.Object, error) {
|
||||||
|
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns", Annotations: map[string]string{"a": "1"}}}, nil
|
||||||
|
}), nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GuaranteedUpdate failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure when we watch from 0 we receive an ADDED event
|
||||||
|
w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Watch failed: %v", err)
|
||||||
|
}
|
||||||
|
TestCheckResult(t, watch.Added, w, out)
|
||||||
|
w.Stop()
|
||||||
|
|
||||||
|
if compaction == nil {
|
||||||
|
t.Skip("compaction callback not provided")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update again
|
||||||
|
out = &example.Pod{}
|
||||||
|
err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
|
||||||
|
func(runtime.Object) (runtime.Object, error) {
|
||||||
|
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}, nil
|
||||||
|
}), nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GuaranteedUpdate failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compact previous versions
|
||||||
|
compaction(ctx, t, out.ResourceVersion)
|
||||||
|
|
||||||
|
// Make sure we can still watch from 0 and receive an ADDED event
|
||||||
|
w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Watch failed: %v", err)
|
||||||
|
}
|
||||||
|
TestCheckResult(t, watch.Added, w, out)
|
||||||
|
}
|
||||||
|
|
||||||
func RunTestDeleteTriggerWatch(ctx context.Context, t *testing.T, store storage.Interface) {
|
func RunTestDeleteTriggerWatch(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
key, storedObj := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
key, storedObj := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||||
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
|
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
|
||||||
|
Loading…
Reference in New Issue
Block a user