diff --git a/staging/src/k8s.io/client-go/util/consistencydetector/data_consistency_detector.go b/staging/src/k8s.io/client-go/util/consistencydetector/data_consistency_detector.go index 2a5d2098758..b33d08032fa 100644 --- a/staging/src/k8s.io/client-go/util/consistencydetector/data_consistency_detector.go +++ b/staging/src/k8s.io/client-go/util/consistencydetector/data_consistency_detector.go @@ -46,8 +46,9 @@ func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity return } klog.Warningf("data consistency check for %s is enabled, this will result in an additional call to the API server.", identity) - listOptions.ResourceVersion = lastSyncedResourceVersion - listOptions.ResourceVersionMatch = metav1.ResourceVersionMatchExact + + retrievedItems := toMetaObjectSliceOrDie(retrieveItemsFn()) + listOptions = prepareListCallOptions(lastSyncedResourceVersion, listOptions, len(retrievedItems)) var list runtime.Object err := wait.PollUntilContextCancel(ctx, time.Second, true, func(_ context.Context) (done bool, err error) { list, err = listFn(ctx, listOptions) @@ -69,9 +70,7 @@ func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity if err != nil { panic(err) // this should never happen } - listItems := toMetaObjectSliceOrDie(rawListItems) - retrievedItems := toMetaObjectSliceOrDie(retrieveItemsFn()) sort.Sort(byUID(listItems)) sort.Sort(byUID(retrievedItems)) @@ -85,24 +84,49 @@ func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity // canFormAdditionalListCall ensures that we can form a valid LIST requests // for checking data consistency. -func canFormAdditionalListCall(resourceVersion string, options metav1.ListOptions) bool { +func canFormAdditionalListCall(lastSyncedResourceVersion string, listOptions metav1.ListOptions) bool { // since we are setting ResourceVersionMatch to metav1.ResourceVersionMatchExact // we need to make sure that the continuation hasn't been set // https://github.com/kubernetes/kubernetes/blob/be4afb9ef90b19ccb6f7e595cbdb247e088b2347/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/validation/validation.go#L38 - if len(options.Continue) > 0 { + if len(listOptions.Continue) > 0 { return false } // since we are setting ResourceVersionMatch to metav1.ResourceVersionMatchExact // we need to make sure that the RV is valid because the validation code forbids RV == "0" // https://github.com/kubernetes/kubernetes/blob/be4afb9ef90b19ccb6f7e595cbdb247e088b2347/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/validation/validation.go#L44 - if resourceVersion == "0" { + if lastSyncedResourceVersion == "0" { return false } return true } +// prepareListCallOptions changes the input list options so that +// the list call goes directly to etcd +func prepareListCallOptions(lastSyncedResourceVersion string, listOptions metav1.ListOptions, retrievedItemsCount int) metav1.ListOptions { + // this is our legacy case: + // + // the watch cache skips the Limit if the ResourceVersion was set to "0" + // thus, to compare with data retrieved directly from etcd + // we need to skip the limit to for the list call as well. + // + // note that when the number of retrieved items is less than the request limit, + // it means either the watch cache is disabled, or there is not enough data. + // in both cases, we can use the limit because we will be able to compare + // the data with the items retrieved from etcd. + if listOptions.ResourceVersion == "0" && listOptions.Limit > 0 && int64(retrievedItemsCount) > listOptions.Limit { + listOptions.Limit = 0 + } + + // set the RV and RVM so that we get the snapshot of data + // directly from etcd. + listOptions.ResourceVersion = lastSyncedResourceVersion + listOptions.ResourceVersionMatch = metav1.ResourceVersionMatchExact + + return listOptions +} + type byUID []metav1.Object func (a byUID) Len() int { return len(a) } diff --git a/staging/src/k8s.io/client-go/util/consistencydetector/data_consistency_detector_test.go b/staging/src/k8s.io/client-go/util/consistencydetector/data_consistency_detector_test.go index a7c6928996e..59682a68abc 100644 --- a/staging/src/k8s.io/client-go/util/consistencydetector/data_consistency_detector_test.go +++ b/staging/src/k8s.io/client-go/util/consistencydetector/data_consistency_detector_test.go @@ -60,6 +60,59 @@ func TestDataConsistencyChecker(t *testing.T) { }, }, + { + name: "legacy, the limit is removed from the list options when it wasn't honored by the watch cache", + listResponse: &v1.PodList{ + ListMeta: metav1.ListMeta{ResourceVersion: "2"}, + Items: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2"), *makePod("p3", "3")}, + }, + requestOptions: metav1.ListOptions{ResourceVersion: "0", Limit: 2}, + retrievedItems: []*v1.Pod{makePod("p1", "1"), makePod("p2", "2"), makePod("p3", "3")}, + expectedListRequests: 1, + expectedRequestOptions: []metav1.ListOptions{ + { + ResourceVersion: "2", + ResourceVersionMatch: metav1.ResourceVersionMatchExact, + }, + }, + }, + + { + name: "the limit is NOT removed from the list options for non-legacy request", + listResponse: &v1.PodList{ + ListMeta: metav1.ListMeta{ResourceVersion: "2"}, + Items: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2"), *makePod("p3", "3")}, + }, + requestOptions: metav1.ListOptions{ResourceVersion: "2", Limit: 2}, + retrievedItems: []*v1.Pod{makePod("p1", "1"), makePod("p2", "2"), makePod("p3", "3")}, + expectedListRequests: 1, + expectedRequestOptions: []metav1.ListOptions{ + { + Limit: 2, + ResourceVersion: "2", + ResourceVersionMatch: metav1.ResourceVersionMatchExact, + }, + }, + }, + + { + name: "legacy, the limit is NOT removed from the list options when the watch cache is disabled", + listResponse: &v1.PodList{ + ListMeta: metav1.ListMeta{ResourceVersion: "2"}, + Items: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2"), *makePod("p3", "3")}, + }, + requestOptions: metav1.ListOptions{ResourceVersion: "0", Limit: 5}, + retrievedItems: []*v1.Pod{makePod("p1", "1"), makePod("p2", "2"), makePod("p3", "3")}, + expectedListRequests: 1, + expectedRequestOptions: []metav1.ListOptions{ + { + Limit: 5, + ResourceVersion: "2", + ResourceVersionMatch: metav1.ResourceVersionMatchExact, + }, + }, + }, + { name: "data consistency check won't panic when there is no data", listResponse: &v1.PodList{