Merge pull request #130417 from serathius/watchcache-compact

Separate compactWatchCache from compactStore
This commit is contained in:
Kubernetes Prow Robot 2025-03-06 06:53:45 -08:00 committed by GitHub
commit 07e65dac2d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 37 additions and 28 deletions

View File

@ -182,7 +182,7 @@ func TestList(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead)
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client), true)
storagetesting.RunTestList(ctx, t, cacher, increaseRV(server.V3Client.Client), true)
})
}
}
@ -192,7 +192,7 @@ func TestConsistentList(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead)
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client), true, consistentRead)
storagetesting.RunTestConsistentList(ctx, t, cacher, increaseRV(server.V3Client.Client), true, consistentRead)
})
}
}
@ -203,7 +203,7 @@ func TestGetListNonRecursive(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead)
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(cacher, server.V3Client.Client), cacher)
storagetesting.RunTestGetListNonRecursive(ctx, t, increaseRV(server.V3Client.Client), cacher)
})
}
}
@ -297,7 +297,7 @@ func TestWatch(t *testing.T) {
func TestWatchFromZero(t *testing.T) {
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestWatchFromZero(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client))
storagetesting.RunTestWatchFromZero(ctx, t, cacher, compactWatchCache(cacher, server.V3Client.Client))
}
func TestDeleteTriggerWatch(t *testing.T) {

View File

@ -78,7 +78,7 @@ func computePodKey(obj *example.Pod) string {
return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
}
func compactStorage(c *CacheDelegator, client *clientv3.Client) storagetesting.Compaction {
func compactWatchCache(c *CacheDelegator, client *clientv3.Client) storagetesting.Compaction {
return func(ctx context.Context, t *testing.T, resourceVersion string) {
versioner := storage.APIObjectVersioner{}
rv, err := versioner.ParseResourceVersion(resourceVersion)
@ -117,12 +117,16 @@ func compactStorage(c *CacheDelegator, client *clientv3.Client) storagetesting.C
c.cacher.watchCache.startIndex++
}
c.cacher.watchCache.listResourceVersion = rv
if _, err = client.KV.Put(ctx, "compact_rev_key", resourceVersion); err != nil {
t.Fatalf("Could not update compact_rev_key: %v", err)
}
if _, err = client.Compact(ctx, int64(rv)); err != nil {
if _, err := client.Compact(ctx, int64(rv)); err != nil {
t.Fatalf("Could not compact: %v", err)
}
}
}
func increaseRV(client *clientv3.Client) storagetesting.IncreaseRVFunc {
return func(ctx context.Context, t *testing.T) {
if _, err := client.KV.Put(ctx, "increaseRV", "ok"); err != nil {
t.Fatalf("Could not update increaseRV: %v", err)
}
}
}

View File

@ -172,7 +172,7 @@ func TestListPaging(t *testing.T) {
func TestGetListNonRecursive(t *testing.T) {
ctx, store, client := testSetup(t)
storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(client.Client), store)
storagetesting.RunTestGetListNonRecursive(ctx, t, increaseRV(client.Client), store)
}
func TestGetListRecursivePrefix(t *testing.T) {
@ -249,12 +249,12 @@ func TestTransformationFailure(t *testing.T) {
func TestList(t *testing.T) {
ctx, store, client := testSetup(t)
storagetesting.RunTestList(ctx, t, store, compactStorage(client.Client), false)
storagetesting.RunTestList(ctx, t, store, increaseRV(client.Client), false)
}
func TestConsistentList(t *testing.T) {
ctx, store, client := testSetup(t)
storagetesting.RunTestConsistentList(ctx, t, store, compactStorage(client.Client), false, true)
storagetesting.RunTestConsistentList(ctx, t, store, increaseRV(client.Client), false, true)
}
func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, recorder *clientRecorder) storagetesting.CallsValidation {
@ -313,19 +313,27 @@ func TestNamespaceScopedList(t *testing.T) {
storagetesting.RunTestNamespaceScopedList(ctx, t, store)
}
func compactStorage(etcdClient *clientv3.Client) storagetesting.Compaction {
func compactStorage(client *clientv3.Client) storagetesting.Compaction {
return func(ctx context.Context, t *testing.T, resourceVersion string) {
versioner := storage.APIObjectVersioner{}
rv, err := versioner.ParseResourceVersion(resourceVersion)
if err != nil {
t.Fatal(err)
}
if _, _, err = compact(ctx, etcdClient, 0, int64(rv)); err != nil {
if _, err = client.Compact(ctx, int64(rv)); err != nil {
t.Fatalf("Unable to compact, %v", err)
}
}
}
func increaseRV(client *clientv3.Client) storagetesting.IncreaseRVFunc {
return func(ctx context.Context, t *testing.T) {
if _, err := client.KV.Put(ctx, "increaseRV", "ok"); err != nil {
t.Fatalf("Could not update increaseRV: %v", err)
}
}
}
func TestListInconsistentContinuation(t *testing.T) {
ctx, store, client := testSetup(t)
storagetesting.RunTestListInconsistentContinuation(ctx, t, store, compactStorage(client.Client))

View File

@ -622,7 +622,7 @@ func RunTestPreconditionalDeleteWithOnlySuggestionPass(ctx context.Context, t *t
expectNoDiff(t, "incorrect pod:", updatedPod, out)
}
func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction, ignoreWatchCacheTests bool) {
func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, increaseRV IncreaseRVFunc, ignoreWatchCacheTests bool) {
initialRV, createdPods, updatedPod, err := seedMultiLevelData(ctx, store)
if err != nil {
t.Fatal(err)
@ -648,10 +648,8 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, com
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name, "spec.nodeName": pod.Spec.NodeName}, nil
}
// Use compact to increase etcd global revision without changes to any resources.
// The increase in resources version comes from Kubernetes compaction updating hidden key.
// Used to test consistent List to confirm it returns latest etcd revision.
compaction(ctx, t, initialRV)
// Increase RV to test consistent List.
increaseRV(ctx, t)
currentRV := fmt.Sprintf("%d", continueRV+1)
tests := []struct {
@ -1591,7 +1589,7 @@ func ExpectContinueMatches(t *testing.T, expect, got string) {
t.Errorf("expected continue token: %s, got: %s", expectDecoded, gotDecoded)
}
func RunTestConsistentList(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction, cacheEnabled, consistentReadsSupported bool) {
func RunTestConsistentList(ctx context.Context, t *testing.T, store storage.Interface, increaseRV IncreaseRVFunc, cacheEnabled, consistentReadsSupported bool) {
outPod := &example.Pod{}
inPod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "foo"}}
err := store.Create(ctx, computePodKey(inPod), inPod, outPod, 0)
@ -1599,7 +1597,7 @@ func RunTestConsistentList(ctx context.Context, t *testing.T, store storage.Inte
t.Errorf("Unexpected error: %v", err)
}
lastObjecRV := outPod.ResourceVersion
compaction(ctx, t, outPod.ResourceVersion)
increaseRV(ctx, t)
parsedRV, _ := strconv.Atoi(outPod.ResourceVersion)
currentRV := fmt.Sprintf("%d", parsedRV+1)
@ -1609,7 +1607,7 @@ func RunTestConsistentList(ctx context.Context, t *testing.T, store storage.Inte
}
secondNonConsistentReadRV := lastObjecRV
if consistentReadsSupported {
if !cacheEnabled || consistentReadsSupported {
secondNonConsistentReadRV = currentRV
}
@ -1749,7 +1747,7 @@ func seedMultiLevelData(ctx context.Context, store storage.Interface) (initialRV
return initialRV, created, updated, nil
}
func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, compaction Compaction, store storage.Interface) {
func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, increaseRV IncreaseRVFunc, store storage.Interface) {
key, prevStoredObj := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}})
prevRV, _ := strconv.Atoi(prevStoredObj.ResourceVersion)
@ -1763,10 +1761,8 @@ func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, compaction Co
t.Fatalf("update failed: %v", err)
}
objRV, _ := strconv.Atoi(storedObj.ResourceVersion)
// Use compact to increase etcd global revision without changes to any resources.
// The increase in resources version comes from Kubernetes compaction updating hidden key.
// Used to test consistent List to confirm it returns latest etcd revision.
compaction(ctx, t, prevStoredObj.ResourceVersion)
// Increase RV to test consistent List.
increaseRV(ctx, t)
tests := []struct {
name string
@ -2320,6 +2316,7 @@ func RunTestListContinuationWithFilter(ctx context.Context, t *testing.T, store
}
type Compaction func(ctx context.Context, t *testing.T, resourceVersion string)
type IncreaseRVFunc func(ctx context.Context, t *testing.T)
func RunTestListInconsistentContinuation(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction) {
if compaction == nil {