mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 14:07:14 +00:00
Refactor compaction in etcd3 tests
This commit is contained in:
parent
e287b36cc1
commit
7da7ddd779
@ -253,8 +253,26 @@ func TestListContinuationWithFilter(t *testing.T) {
|
|||||||
storagetesting.RunTestListContinuationWithFilter(ctx, t, store, validation)
|
storagetesting.RunTestListContinuationWithFilter(ctx, t, store, validation)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func compactStorage(etcdClient *clientv3.Client) storagetesting.Compaction {
|
||||||
|
return func(ctx context.Context, t *testing.T, resourceVersion string) {
|
||||||
|
versioner := storage.APIObjectVersioner{}
|
||||||
|
rv, err := versioner.ParseResourceVersion(resourceVersion)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if _, err := etcdClient.KV.Compact(ctx, int64(rv), clientv3.WithCompactPhysical()); err != nil {
|
||||||
|
t.Fatalf("Unable to compact, %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestListInconsistentContinuation(t *testing.T) {
|
func TestListInconsistentContinuation(t *testing.T) {
|
||||||
ctx, store, client := testSetup(t)
|
ctx, store, client := testSetup(t)
|
||||||
|
compaction := compactStorage(client)
|
||||||
|
|
||||||
|
if compaction == nil {
|
||||||
|
t.Skipf("compaction callback not provided")
|
||||||
|
}
|
||||||
|
|
||||||
// Setup storage with the following structure:
|
// Setup storage with the following structure:
|
||||||
// /
|
// /
|
||||||
@ -342,15 +360,8 @@ func TestListInconsistentContinuation(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// compact to latest revision.
|
// compact to latest revision.
|
||||||
versioner := storage.APIObjectVersioner{}
|
|
||||||
lastRVString := preset[2].storedObj.ResourceVersion
|
lastRVString := preset[2].storedObj.ResourceVersion
|
||||||
lastRV, err := versioner.ParseResourceVersion(lastRVString)
|
compaction(ctx, t, lastRVString)
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if _, err := client.KV.Compact(ctx, int64(lastRV), clientv3.WithCompactPhysical()); err != nil {
|
|
||||||
t.Fatalf("Unable to compact, %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// The old continue token should have expired
|
// The old continue token should have expired
|
||||||
options = storage.ListOptions{
|
options = storage.ListOptions{
|
||||||
@ -358,7 +369,7 @@ func TestListInconsistentContinuation(t *testing.T) {
|
|||||||
Predicate: pred(0, continueFromSecondItem),
|
Predicate: pred(0, continueFromSecondItem),
|
||||||
Recursive: true,
|
Recursive: true,
|
||||||
}
|
}
|
||||||
err = store.GetList(ctx, "/", options, out)
|
err := store.GetList(ctx, "/", options, out)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatalf("unexpected no error")
|
t.Fatalf("unexpected no error")
|
||||||
}
|
}
|
||||||
|
@ -54,6 +54,8 @@ func TestDeleteTriggerWatch(t *testing.T) {
|
|||||||
// - watch from 0 is able to return events for objects whose previous version has been compacted
|
// - watch from 0 is able to return events for objects whose previous version has been compacted
|
||||||
func TestWatchFromZero(t *testing.T) {
|
func TestWatchFromZero(t *testing.T) {
|
||||||
ctx, store, client := testSetup(t)
|
ctx, store, client := testSetup(t)
|
||||||
|
compaction := compactStorage(client)
|
||||||
|
|
||||||
key, storedObj := storagetesting.TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}})
|
key, storedObj := storagetesting.TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}})
|
||||||
|
|
||||||
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
||||||
@ -81,6 +83,10 @@ func TestWatchFromZero(t *testing.T) {
|
|||||||
storagetesting.TestCheckResult(t, watch.Added, w, out)
|
storagetesting.TestCheckResult(t, watch.Added, w, out)
|
||||||
w.Stop()
|
w.Stop()
|
||||||
|
|
||||||
|
if compaction == nil {
|
||||||
|
t.Skip("compaction callback not provided")
|
||||||
|
}
|
||||||
|
|
||||||
// Update again
|
// Update again
|
||||||
out = &example.Pod{}
|
out = &example.Pod{}
|
||||||
err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
|
err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
|
||||||
@ -92,14 +98,7 @@ func TestWatchFromZero(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Compact previous versions
|
// Compact previous versions
|
||||||
revToCompact, err := store.versioner.ParseResourceVersion(out.ResourceVersion)
|
compaction(ctx, t, out.ResourceVersion)
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Error converting %q to an int: %v", storedObj.ResourceVersion, err)
|
|
||||||
}
|
|
||||||
_, err = client.Compact(ctx, int64(revToCompact), clientv3.WithCompactPhysical())
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Error compacting: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make sure we can still watch from 0 and receive an ADDED event
|
// Make sure we can still watch from 0 and receive an ADDED event
|
||||||
w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
||||||
|
@ -1410,6 +1410,8 @@ func RunTestListContinuationWithFilter(ctx context.Context, t *testing.T, store
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Compaction func(ctx context.Context, t *testing.T, resourceVersion string)
|
||||||
|
|
||||||
type PrefixTransformerModifier func(*PrefixTransformer) value.Transformer
|
type PrefixTransformerModifier func(*PrefixTransformer) value.Transformer
|
||||||
|
|
||||||
type InterfaceWithPrefixTransformer interface {
|
type InterfaceWithPrefixTransformer interface {
|
||||||
|
Loading…
Reference in New Issue
Block a user