client-go/consistency-detector: change the signature of checkWatchListConsistencyIfRequested

the signature of the method was tightly connected to the reflector,
making it difficult to use for anything other than a reflector.

this simple refactor makes the method more generic.

Kubernetes-commit: 83c7542abc8c542c01ecb67376f134b2071c5304
This commit is contained in:
Lukasz Szaszkiewicz 2024-04-22 14:01:22 +02:00 committed by Kubernetes Publisher
parent c7396197f3
commit 6bdde7723e
3 changed files with 62 additions and 39 deletions

View File

@ -695,7 +695,7 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
// we utilize the temporaryStore to ensure independence from the current store implementation. // we utilize the temporaryStore to ensure independence from the current store implementation.
// as of today, the store is implemented as a queue and will be drained by the higher-level // as of today, the store is implemented as a queue and will be drained by the higher-level
// component as soon as it finishes replacing the content. // component as soon as it finishes replacing the content.
checkWatchListConsistencyIfRequested(stopCh, r.name, resourceVersion, r.listerWatcher, temporaryStore) checkWatchListDataConsistencyIfRequested(wait.ContextForChannel(stopCh), r.name, resourceVersion, wrapListFuncWithContext(r.listerWatcher.List), temporaryStore.List)
if err = r.store.Replace(temporaryStore.List(), resourceVersion); err != nil { if err = r.store.Replace(temporaryStore.List(), resourceVersion); err != nil {
return nil, fmt.Errorf("unable to sync watch-list result: %v", err) return nil, fmt.Errorf("unable to sync watch-list result: %v", err)
@ -933,6 +933,13 @@ func isWatchErrorRetriable(err error) bool {
return false return false
} }
// wrapListFuncWithContext simply wraps ListFunction into another function that accepts a context and ignores it.
func wrapListFuncWithContext(listFn ListFunc) func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
return func(_ context.Context, options metav1.ListOptions) (runtime.Object, error) {
return listFn(options)
}
}
// initialEventsEndBookmarkTicker a ticker that produces a warning if the bookmark event // initialEventsEndBookmarkTicker a ticker that produces a warning if the bookmark event
// which marks the end of the watch stream, has not been received within the defined tick interval. // which marks the end of the watch stream, has not been received within the defined tick interval.
// //

View File

@ -18,6 +18,7 @@ package cache
import ( import (
"context" "context"
"fmt"
"os" "os"
"sort" "sort"
"strconv" "strconv"
@ -32,42 +33,46 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
var dataConsistencyDetectionEnabled = false var dataConsistencyDetectionForWatchListEnabled = false
func init() { func init() {
dataConsistencyDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHLIST_INCONSISTENCY_DETECTOR")) dataConsistencyDetectionForWatchListEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHLIST_INCONSISTENCY_DETECTOR"))
} }
// checkWatchListConsistencyIfRequested performs a data consistency check only when type retrieveItemsFunc[U any] func() []U
type listFunc[T runtime.Object] func(ctx context.Context, options metav1.ListOptions) (T, error)
// checkWatchListDataConsistencyIfRequested performs a data consistency check only when
// the KUBE_WATCHLIST_INCONSISTENCY_DETECTOR environment variable was set during a binary startup. // the KUBE_WATCHLIST_INCONSISTENCY_DETECTOR environment variable was set during a binary startup.
// //
// The consistency check is meant to be enforced only in the CI, not in production. // The consistency check is meant to be enforced only in the CI, not in production.
// The check ensures that data retrieved by the watch-list api call // The check ensures that data retrieved by the watch-list api call
// is exactly the same as data received by the standard list api call. // is exactly the same as data received by the standard list api call against etcd.
// //
// Note that this function will panic when data inconsistency is detected. // Note that this function will panic when data inconsistency is detected.
// This is intentional because we want to catch it in the CI. // This is intentional because we want to catch it in the CI.
func checkWatchListConsistencyIfRequested(stopCh <-chan struct{}, identity string, lastSyncedResourceVersion string, listerWatcher Lister, store Store) { func checkWatchListDataConsistencyIfRequested[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn listFunc[T], retrieveItemsFn retrieveItemsFunc[U]) {
if !dataConsistencyDetectionEnabled { if !dataConsistencyDetectionForWatchListEnabled {
return return
} }
checkWatchListConsistency(stopCh, identity, lastSyncedResourceVersion, listerWatcher, store) // for informers we pass an empty ListOptions because
// listFn might be wrapped for filtering during informer construction.
checkDataConsistency(ctx, identity, lastSyncedResourceVersion, listFn, metav1.ListOptions{}, retrieveItemsFn)
} }
// checkWatchListConsistency exists solely for testing purposes. // checkDataConsistency exists solely for testing purposes.
// we cannot use checkWatchListConsistencyIfRequested because // we cannot use checkWatchListDataConsistencyIfRequested because
// it is guarded by an environmental variable. // it is guarded by an environmental variable.
// we cannot manipulate the environmental variable because // we cannot manipulate the environmental variable because
// it will affect other tests in this package. // it will affect other tests in this package.
func checkWatchListConsistency(stopCh <-chan struct{}, identity string, lastSyncedResourceVersion string, listerWatcher Lister, store Store) { func checkDataConsistency[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn listFunc[T], listOptions metav1.ListOptions, retrieveItemsFn retrieveItemsFunc[U]) {
klog.Warningf("%s: data consistency check for the watch-list feature is enabled, this will result in an additional call to the API server.", identity) klog.Warningf("data consistency check for %s is enabled, this will result in an additional call to the API server.", identity)
opts := metav1.ListOptions{ listOptions.ResourceVersion = lastSyncedResourceVersion
ResourceVersion: lastSyncedResourceVersion, listOptions.ResourceVersionMatch = metav1.ResourceVersionMatchExact
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
}
var list runtime.Object var list runtime.Object
err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), time.Second, true, func(_ context.Context) (done bool, err error) { err := wait.PollUntilContextCancel(ctx, time.Second, true, func(_ context.Context) (done bool, err error) {
list, err = listerWatcher.List(opts) list, err = listFn(ctx, listOptions)
if err != nil { if err != nil {
// the consistency check will only be enabled in the CI // the consistency check will only be enabled in the CI
// and LIST calls in general will be retired by the client-go library // and LIST calls in general will be retired by the client-go library
@ -78,7 +83,7 @@ func checkWatchListConsistency(stopCh <-chan struct{}, identity string, lastSync
return true, nil return true, nil
}) })
if err != nil { if err != nil {
klog.Errorf("failed to list data from the server, the watch-list consistency check won't be performed, stopCh was closed, err: %v", err) klog.Errorf("failed to list data from the server, the data consistency check for %s won't be performed, stopCh was closed, err: %v", identity, err)
return return
} }
@ -88,14 +93,14 @@ func checkWatchListConsistency(stopCh <-chan struct{}, identity string, lastSync
} }
listItems := toMetaObjectSliceOrDie(rawListItems) listItems := toMetaObjectSliceOrDie(rawListItems)
storeItems := toMetaObjectSliceOrDie(store.List()) retrievedItems := toMetaObjectSliceOrDie(retrieveItemsFn())
sort.Sort(byUID(listItems)) sort.Sort(byUID(listItems))
sort.Sort(byUID(storeItems)) sort.Sort(byUID(retrievedItems))
if !cmp.Equal(listItems, storeItems) { if !cmp.Equal(listItems, retrievedItems) {
klog.Infof("%s: data received by the new watch-list api call is different than received by the standard list api call, diff: %v", identity, cmp.Diff(listItems, storeItems)) klog.Infof("previously received data for %s is different than received by the standard list api call against etcd, diff: %v", identity, cmp.Diff(listItems, retrievedItems))
msg := "data inconsistency detected for the watch-list feature, panicking!" msg := fmt.Sprintf("data inconsistency detected for %s, panicking!", identity)
panic(msg) panic(msg)
} }
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package cache package cache
import ( import (
"context"
"fmt" "fmt"
"testing" "testing"
@ -25,62 +26,71 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/utils/ptr"
) )
func TestWatchListConsistency(t *testing.T) { func TestDataConsistencyChecker(t *testing.T) {
scenarios := []struct { scenarios := []struct {
name string name string
podList *v1.PodList podList *v1.PodList
storeContent []*v1.Pod storeContent []*v1.Pod
requestOptions metav1.ListOptions
expectedRequestOptions []metav1.ListOptions expectedRequestOptions []metav1.ListOptions
expectedListRequests int expectedListRequests int
expectPanic bool expectPanic bool
}{ }{
{ {
name: "watchlist consistency check won't panic when data is consistent", name: "data consistency check won't panic when data is consistent",
podList: &v1.PodList{ podList: &v1.PodList{
ListMeta: metav1.ListMeta{ResourceVersion: "2"}, ListMeta: metav1.ListMeta{ResourceVersion: "2"},
Items: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2")}, Items: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2")},
}, },
requestOptions: metav1.ListOptions{TimeoutSeconds: ptr.To(int64(39))},
storeContent: []*v1.Pod{makePod("p1", "1"), makePod("p2", "2")}, storeContent: []*v1.Pod{makePod("p1", "1"), makePod("p2", "2")},
expectedListRequests: 1, expectedListRequests: 1,
expectedRequestOptions: []metav1.ListOptions{ expectedRequestOptions: []metav1.ListOptions{
{ {
ResourceVersion: "2", ResourceVersion: "2",
ResourceVersionMatch: metav1.ResourceVersionMatchExact, ResourceVersionMatch: metav1.ResourceVersionMatchExact,
TimeoutSeconds: ptr.To(int64(39)),
}, },
}, },
}, },
{ {
name: "watchlist consistency check won't panic when there is no data", name: "data consistency check won't panic when there is no data",
podList: &v1.PodList{ podList: &v1.PodList{
ListMeta: metav1.ListMeta{ResourceVersion: "2"}, ListMeta: metav1.ListMeta{ResourceVersion: "2"},
}, },
requestOptions: metav1.ListOptions{TimeoutSeconds: ptr.To(int64(39))},
expectedListRequests: 1, expectedListRequests: 1,
expectedRequestOptions: []metav1.ListOptions{ expectedRequestOptions: []metav1.ListOptions{
{ {
ResourceVersion: "2", ResourceVersion: "2",
ResourceVersionMatch: metav1.ResourceVersionMatchExact, ResourceVersionMatch: metav1.ResourceVersionMatchExact,
TimeoutSeconds: ptr.To(int64(39)),
}, },
}, },
}, },
{ {
name: "watchlist consistency panics when data is inconsistent", name: "data consistency panics when data is inconsistent",
podList: &v1.PodList{ podList: &v1.PodList{
ListMeta: metav1.ListMeta{ResourceVersion: "2"}, ListMeta: metav1.ListMeta{ResourceVersion: "2"},
Items: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2"), *makePod("p3", "3")}, Items: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2"), *makePod("p3", "3")},
}, },
requestOptions: metav1.ListOptions{TimeoutSeconds: ptr.To(int64(39))},
storeContent: []*v1.Pod{makePod("p1", "1"), makePod("p2", "2")}, storeContent: []*v1.Pod{makePod("p1", "1"), makePod("p2", "2")},
expectedListRequests: 1, expectedListRequests: 1,
expectedRequestOptions: []metav1.ListOptions{ expectedRequestOptions: []metav1.ListOptions{
{ {
ResourceVersion: "2", ResourceVersion: "2",
ResourceVersionMatch: metav1.ResourceVersionMatchExact, ResourceVersionMatch: metav1.ResourceVersionMatchExact,
TimeoutSeconds: ptr.To(int64(39)),
}, },
}, },
expectPanic: true, expectPanic: true,
@ -90,15 +100,18 @@ func TestWatchListConsistency(t *testing.T) {
for _, scenario := range scenarios { for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) { t.Run(scenario.name, func(t *testing.T) {
listWatcher, store, _, stopCh := testData() listWatcher, store, _, stopCh := testData()
ctx := wait.ContextForChannel(stopCh)
for _, obj := range scenario.storeContent { for _, obj := range scenario.storeContent {
require.NoError(t, store.Add(obj)) require.NoError(t, store.Add(obj))
} }
listWatcher.customListResponse = scenario.podList listWatcher.customListResponse = scenario.podList
if scenario.expectPanic { if scenario.expectPanic {
require.Panics(t, func() { checkWatchListConsistency(stopCh, "", scenario.podList.ResourceVersion, listWatcher, store) }) require.Panics(t, func() {
checkDataConsistency(ctx, "", scenario.podList.ResourceVersion, wrapListFuncWithContext(listWatcher.List), scenario.requestOptions, store.List)
})
} else { } else {
checkWatchListConsistency(stopCh, "", scenario.podList.ResourceVersion, listWatcher, store) checkDataConsistency(ctx, "", scenario.podList.ResourceVersion, wrapListFuncWithContext(listWatcher.List), scenario.requestOptions, store.List)
} }
verifyListCounter(t, listWatcher, scenario.expectedListRequests) verifyListCounter(t, listWatcher, scenario.expectedListRequests)
@ -108,20 +121,18 @@ func TestWatchListConsistency(t *testing.T) {
} }
func TestDriveWatchLisConsistencyIfRequired(t *testing.T) { func TestDriveWatchLisConsistencyIfRequired(t *testing.T) {
stopCh := make(chan struct{}) ctx := context.TODO()
defer close(stopCh) checkWatchListDataConsistencyIfRequested[runtime.Object, runtime.Object](ctx, "", "", nil, nil)
checkWatchListConsistencyIfRequested(stopCh, "", "", nil, nil)
} }
func TestWatchListConsistencyRetry(t *testing.T) { func TestDataConsistencyCheckerRetry(t *testing.T) {
store := NewStore(MetaNamespaceKeyFunc) store := NewStore(MetaNamespaceKeyFunc)
stopCh := make(chan struct{}) ctx := context.TODO()
defer close(stopCh)
stopListErrorAfter := 5 stopListErrorAfter := 5
errLister := &errorLister{stopErrorAfter: stopListErrorAfter} errLister := &errorLister{stopErrorAfter: stopListErrorAfter}
checkWatchListConsistency(stopCh, "", "", errLister, store) checkDataConsistency(ctx, "", "", wrapListFuncWithContext(errLister.List), metav1.ListOptions{}, store.List)
require.Equal(t, errLister.listCounter, errLister.stopErrorAfter) require.Equal(t, errLister.listCounter, errLister.stopErrorAfter)
} }