mirror of
https://github.com/kubernetes/client-go.git
synced 2025-08-11 04:01:42 +00:00
161 lines
6.3 KiB
Go
161 lines
6.3 KiB
Go
/*
|
|
Copyright 2023 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package consistencydetector
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"reflect"
|
|
"sort"
|
|
"strconv"
|
|
"time"
|
|
|
|
"k8s.io/apimachinery/pkg/api/meta"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/util/diff"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/klog/v2"
|
|
)
|
|
|
|
var dataConsistencyDetectionForWatchListEnabled = false
|
|
|
|
func init() {
|
|
dataConsistencyDetectionForWatchListEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHLIST_INCONSISTENCY_DETECTOR"))
|
|
}
|
|
|
|
// IsDataConsistencyDetectionForWatchListEnabled returns true when
|
|
// the KUBE_WATCHLIST_INCONSISTENCY_DETECTOR environment variable was set during a binary startup.
|
|
func IsDataConsistencyDetectionForWatchListEnabled() bool {
|
|
return dataConsistencyDetectionForWatchListEnabled
|
|
}
|
|
|
|
type RetrieveItemsFunc[U any] func() []U
|
|
|
|
type ListFunc[T runtime.Object] func(ctx context.Context, options metav1.ListOptions) (T, error)
|
|
|
|
// CheckDataConsistency exists solely for testing purposes.
|
|
// we cannot use checkWatchListDataConsistencyIfRequested because
|
|
// it is guarded by an environmental variable.
|
|
// we cannot manipulate the environmental variable because
|
|
// it will affect other tests in this package.
|
|
func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn ListFunc[T], listOptions metav1.ListOptions, retrieveItemsFn RetrieveItemsFunc[U]) {
|
|
if !canFormAdditionalListCall(lastSyncedResourceVersion, listOptions) {
|
|
klog.V(4).Infof("data consistency check for %s is enabled but the parameters (RV, ListOptions) doesn't allow for creating a valid LIST request. Skipping the data consistency check.", identity)
|
|
return
|
|
}
|
|
klog.Warningf("data consistency check for %s is enabled, this will result in an additional call to the API server.", identity)
|
|
|
|
retrievedItems := toMetaObjectSliceOrDie(retrieveItemsFn())
|
|
listOptions = prepareListCallOptions(lastSyncedResourceVersion, listOptions, len(retrievedItems))
|
|
var list runtime.Object
|
|
err := wait.PollUntilContextCancel(ctx, time.Second, true, func(_ context.Context) (done bool, err error) {
|
|
list, err = listFn(ctx, listOptions)
|
|
if err != nil {
|
|
// the consistency check will only be enabled in the CI
|
|
// and LIST calls in general will be retired by the client-go library
|
|
// if we fail simply log and retry
|
|
klog.Errorf("failed to list data from the server, retrying until stopCh is closed, err: %v", err)
|
|
return false, nil
|
|
}
|
|
return true, nil
|
|
})
|
|
if err != nil {
|
|
klog.Errorf("failed to list data from the server, the data consistency check for %s won't be performed, stopCh was closed, err: %v", identity, err)
|
|
return
|
|
}
|
|
|
|
rawListItems, err := meta.ExtractListWithAlloc(list)
|
|
if err != nil {
|
|
panic(err) // this should never happen
|
|
}
|
|
listItems := toMetaObjectSliceOrDie(rawListItems)
|
|
|
|
sort.Sort(byUID(listItems))
|
|
sort.Sort(byUID(retrievedItems))
|
|
|
|
if !reflect.DeepEqual(listItems, retrievedItems) {
|
|
klog.Infof("previously received data for %s is different than received by the standard list api call against etcd, diff: %v", identity, diff.Diff(listItems, retrievedItems))
|
|
msg := fmt.Sprintf("data inconsistency detected for %s, panicking!", identity)
|
|
panic(msg)
|
|
}
|
|
}
|
|
|
|
// canFormAdditionalListCall ensures that we can form a valid LIST requests
|
|
// for checking data consistency.
|
|
func canFormAdditionalListCall(lastSyncedResourceVersion string, listOptions metav1.ListOptions) bool {
|
|
// since we are setting ResourceVersionMatch to metav1.ResourceVersionMatchExact
|
|
// we need to make sure that the continuation hasn't been set
|
|
// https://github.com/kubernetes/kubernetes/blob/be4afb9ef90b19ccb6f7e595cbdb247e088b2347/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/validation/validation.go#L38
|
|
if len(listOptions.Continue) > 0 {
|
|
return false
|
|
}
|
|
|
|
// since we are setting ResourceVersionMatch to metav1.ResourceVersionMatchExact
|
|
// we need to make sure that the RV is valid because the validation code forbids RV == "0"
|
|
// https://github.com/kubernetes/kubernetes/blob/be4afb9ef90b19ccb6f7e595cbdb247e088b2347/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/validation/validation.go#L44
|
|
if lastSyncedResourceVersion == "0" {
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// prepareListCallOptions changes the input list options so that
|
|
// the list call goes directly to etcd
|
|
func prepareListCallOptions(lastSyncedResourceVersion string, listOptions metav1.ListOptions, retrievedItemsCount int) metav1.ListOptions {
|
|
// this is our legacy case:
|
|
//
|
|
// the watch cache skips the Limit if the ResourceVersion was set to "0"
|
|
// thus, to compare with data retrieved directly from etcd
|
|
// we need to skip the limit to for the list call as well.
|
|
//
|
|
// note that when the number of retrieved items is less than the request limit,
|
|
// it means either the watch cache is disabled, or there is not enough data.
|
|
// in both cases, we can use the limit because we will be able to compare
|
|
// the data with the items retrieved from etcd.
|
|
if listOptions.ResourceVersion == "0" && listOptions.Limit > 0 && int64(retrievedItemsCount) > listOptions.Limit {
|
|
listOptions.Limit = 0
|
|
}
|
|
|
|
// set the RV and RVM so that we get the snapshot of data
|
|
// directly from etcd.
|
|
listOptions.ResourceVersion = lastSyncedResourceVersion
|
|
listOptions.ResourceVersionMatch = metav1.ResourceVersionMatchExact
|
|
|
|
return listOptions
|
|
}
|
|
|
|
type byUID []metav1.Object
|
|
|
|
func (a byUID) Len() int { return len(a) }
|
|
func (a byUID) Less(i, j int) bool { return a[i].GetUID() < a[j].GetUID() }
|
|
func (a byUID) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
|
|
func toMetaObjectSliceOrDie[T any](s []T) []metav1.Object {
|
|
result := make([]metav1.Object, len(s))
|
|
for i, v := range s {
|
|
m, err := meta.Accessor(v)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
result[i] = m
|
|
}
|
|
return result
|
|
}
|