client-go/consistencydetector: handles the watch cache legacy case

Kubernetes-commit: fe8a2d222cc4208ee8f89fd2912d35cf58ec5941
This commit is contained in:
Lukasz Szaszkiewicz
2024-06-04 18:37:37 +02:00
committed by Kubernetes Publisher
parent 7c6e307a72
commit c7d706847a
2 changed files with 84 additions and 7 deletions

View File

@@ -46,8 +46,9 @@ func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity
return
}
klog.Warningf("data consistency check for %s is enabled, this will result in an additional call to the API server.", identity)
listOptions.ResourceVersion = lastSyncedResourceVersion
listOptions.ResourceVersionMatch = metav1.ResourceVersionMatchExact
retrievedItems := toMetaObjectSliceOrDie(retrieveItemsFn())
listOptions = prepareListCallOptions(lastSyncedResourceVersion, listOptions, len(retrievedItems))
var list runtime.Object
err := wait.PollUntilContextCancel(ctx, time.Second, true, func(_ context.Context) (done bool, err error) {
list, err = listFn(ctx, listOptions)
@@ -69,9 +70,7 @@ func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity
if err != nil {
panic(err) // this should never happen
}
listItems := toMetaObjectSliceOrDie(rawListItems)
retrievedItems := toMetaObjectSliceOrDie(retrieveItemsFn())
sort.Sort(byUID(listItems))
sort.Sort(byUID(retrievedItems))
@@ -85,24 +84,49 @@ func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity
// canFormAdditionalListCall ensures that we can form a valid LIST requests
// for checking data consistency.
func canFormAdditionalListCall(resourceVersion string, options metav1.ListOptions) bool {
func canFormAdditionalListCall(lastSyncedResourceVersion string, listOptions metav1.ListOptions) bool {
// since we are setting ResourceVersionMatch to metav1.ResourceVersionMatchExact
// we need to make sure that the continuation hasn't been set
// https://github.com/kubernetes/kubernetes/blob/be4afb9ef90b19ccb6f7e595cbdb247e088b2347/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/validation/validation.go#L38
if len(options.Continue) > 0 {
if len(listOptions.Continue) > 0 {
return false
}
// since we are setting ResourceVersionMatch to metav1.ResourceVersionMatchExact
// we need to make sure that the RV is valid because the validation code forbids RV == "0"
// https://github.com/kubernetes/kubernetes/blob/be4afb9ef90b19ccb6f7e595cbdb247e088b2347/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/validation/validation.go#L44
if resourceVersion == "0" {
if lastSyncedResourceVersion == "0" {
return false
}
return true
}
// prepareListCallOptions changes the input list options so that
// the list call goes directly to etcd
func prepareListCallOptions(lastSyncedResourceVersion string, listOptions metav1.ListOptions, retrievedItemsCount int) metav1.ListOptions {
// this is our legacy case:
//
// the watch cache skips the Limit if the ResourceVersion was set to "0"
// thus, to compare with data retrieved directly from etcd
// we need to skip the limit to for the list call as well.
//
// note that when the number of retrieved items is less than the request limit,
// it means either the watch cache is disabled, or there is not enough data.
// in both cases, we can use the limit because we will be able to compare
// the data with the items retrieved from etcd.
if listOptions.ResourceVersion == "0" && listOptions.Limit > 0 && int64(retrievedItemsCount) > listOptions.Limit {
listOptions.Limit = 0
}
// set the RV and RVM so that we get the snapshot of data
// directly from etcd.
listOptions.ResourceVersion = lastSyncedResourceVersion
listOptions.ResourceVersionMatch = metav1.ResourceVersionMatchExact
return listOptions
}
type byUID []metav1.Object
func (a byUID) Len() int { return len(a) }