diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 7b081606..a617147b 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -695,7 +695,7 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) { // we utilize the temporaryStore to ensure independence from the current store implementation. // as of today, the store is implemented as a queue and will be drained by the higher-level // component as soon as it finishes replacing the content. - checkWatchListConsistencyIfRequested(stopCh, r.name, resourceVersion, r.listerWatcher, temporaryStore) + checkWatchListDataConsistencyIfRequested(wait.ContextForChannel(stopCh), r.name, resourceVersion, wrapListFuncWithContext(r.listerWatcher.List), temporaryStore.List) if err = r.store.Replace(temporaryStore.List(), resourceVersion); err != nil { return nil, fmt.Errorf("unable to sync watch-list result: %v", err) @@ -933,6 +933,13 @@ func isWatchErrorRetriable(err error) bool { return false } +// wrapListFuncWithContext simply wraps ListFunction into another function that accepts a context and ignores it. +func wrapListFuncWithContext(listFn ListFunc) func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { + return func(_ context.Context, options metav1.ListOptions) (runtime.Object, error) { + return listFn(options) + } +} + // initialEventsEndBookmarkTicker a ticker that produces a warning if the bookmark event // which marks the end of the watch stream, has not been received within the defined tick interval. // diff --git a/tools/cache/reflector_data_consistency_detector.go b/tools/cache/reflector_data_consistency_detector.go index aa3027d7..0aacee4a 100644 --- a/tools/cache/reflector_data_consistency_detector.go +++ b/tools/cache/reflector_data_consistency_detector.go @@ -18,6 +18,7 @@ package cache import ( "context" + "fmt" "os" "sort" "strconv" @@ -32,42 +33,46 @@ import ( "k8s.io/klog/v2" ) -var dataConsistencyDetectionEnabled = false +var dataConsistencyDetectionForWatchListEnabled = false func init() { - dataConsistencyDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHLIST_INCONSISTENCY_DETECTOR")) + dataConsistencyDetectionForWatchListEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHLIST_INCONSISTENCY_DETECTOR")) } -// checkWatchListConsistencyIfRequested performs a data consistency check only when +type retrieveItemsFunc[U any] func() []U + +type listFunc[T runtime.Object] func(ctx context.Context, options metav1.ListOptions) (T, error) + +// checkWatchListDataConsistencyIfRequested performs a data consistency check only when // the KUBE_WATCHLIST_INCONSISTENCY_DETECTOR environment variable was set during a binary startup. // // The consistency check is meant to be enforced only in the CI, not in production. // The check ensures that data retrieved by the watch-list api call -// is exactly the same as data received by the standard list api call. +// is exactly the same as data received by the standard list api call against etcd. // // Note that this function will panic when data inconsistency is detected. // This is intentional because we want to catch it in the CI. -func checkWatchListConsistencyIfRequested(stopCh <-chan struct{}, identity string, lastSyncedResourceVersion string, listerWatcher Lister, store Store) { - if !dataConsistencyDetectionEnabled { +func checkWatchListDataConsistencyIfRequested[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn listFunc[T], retrieveItemsFn retrieveItemsFunc[U]) { + if !dataConsistencyDetectionForWatchListEnabled { return } - checkWatchListConsistency(stopCh, identity, lastSyncedResourceVersion, listerWatcher, store) + // for informers we pass an empty ListOptions because + // listFn might be wrapped for filtering during informer construction. + checkDataConsistency(ctx, identity, lastSyncedResourceVersion, listFn, metav1.ListOptions{}, retrieveItemsFn) } -// checkWatchListConsistency exists solely for testing purposes. -// we cannot use checkWatchListConsistencyIfRequested because +// checkDataConsistency exists solely for testing purposes. +// we cannot use checkWatchListDataConsistencyIfRequested because // it is guarded by an environmental variable. // we cannot manipulate the environmental variable because // it will affect other tests in this package. -func checkWatchListConsistency(stopCh <-chan struct{}, identity string, lastSyncedResourceVersion string, listerWatcher Lister, store Store) { - klog.Warningf("%s: data consistency check for the watch-list feature is enabled, this will result in an additional call to the API server.", identity) - opts := metav1.ListOptions{ - ResourceVersion: lastSyncedResourceVersion, - ResourceVersionMatch: metav1.ResourceVersionMatchExact, - } +func checkDataConsistency[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn listFunc[T], listOptions metav1.ListOptions, retrieveItemsFn retrieveItemsFunc[U]) { + klog.Warningf("data consistency check for %s is enabled, this will result in an additional call to the API server.", identity) + listOptions.ResourceVersion = lastSyncedResourceVersion + listOptions.ResourceVersionMatch = metav1.ResourceVersionMatchExact var list runtime.Object - err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), time.Second, true, func(_ context.Context) (done bool, err error) { - list, err = listerWatcher.List(opts) + err := wait.PollUntilContextCancel(ctx, time.Second, true, func(_ context.Context) (done bool, err error) { + list, err = listFn(ctx, listOptions) if err != nil { // the consistency check will only be enabled in the CI // and LIST calls in general will be retired by the client-go library @@ -78,7 +83,7 @@ func checkWatchListConsistency(stopCh <-chan struct{}, identity string, lastSync return true, nil }) if err != nil { - klog.Errorf("failed to list data from the server, the watch-list consistency check won't be performed, stopCh was closed, err: %v", err) + klog.Errorf("failed to list data from the server, the data consistency check for %s won't be performed, stopCh was closed, err: %v", identity, err) return } @@ -88,14 +93,14 @@ func checkWatchListConsistency(stopCh <-chan struct{}, identity string, lastSync } listItems := toMetaObjectSliceOrDie(rawListItems) - storeItems := toMetaObjectSliceOrDie(store.List()) + retrievedItems := toMetaObjectSliceOrDie(retrieveItemsFn()) sort.Sort(byUID(listItems)) - sort.Sort(byUID(storeItems)) + sort.Sort(byUID(retrievedItems)) - if !cmp.Equal(listItems, storeItems) { - klog.Infof("%s: data received by the new watch-list api call is different than received by the standard list api call, diff: %v", identity, cmp.Diff(listItems, storeItems)) - msg := "data inconsistency detected for the watch-list feature, panicking!" + if !cmp.Equal(listItems, retrievedItems) { + klog.Infof("previously received data for %s is different than received by the standard list api call against etcd, diff: %v", identity, cmp.Diff(listItems, retrievedItems)) + msg := fmt.Sprintf("data inconsistency detected for %s, panicking!", identity) panic(msg) } } diff --git a/tools/cache/reflector_data_consistency_detector_test.go b/tools/cache/reflector_data_consistency_detector_test.go index 3c7eda7d..a878e488 100644 --- a/tools/cache/reflector_data_consistency_detector_test.go +++ b/tools/cache/reflector_data_consistency_detector_test.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "context" "fmt" "testing" @@ -25,62 +26,71 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + "k8s.io/utils/ptr" ) -func TestWatchListConsistency(t *testing.T) { +func TestDataConsistencyChecker(t *testing.T) { scenarios := []struct { name string - podList *v1.PodList - storeContent []*v1.Pod + podList *v1.PodList + storeContent []*v1.Pod + requestOptions metav1.ListOptions expectedRequestOptions []metav1.ListOptions expectedListRequests int expectPanic bool }{ { - name: "watchlist consistency check won't panic when data is consistent", + name: "data consistency check won't panic when data is consistent", podList: &v1.PodList{ ListMeta: metav1.ListMeta{ResourceVersion: "2"}, Items: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2")}, }, + requestOptions: metav1.ListOptions{TimeoutSeconds: ptr.To(int64(39))}, storeContent: []*v1.Pod{makePod("p1", "1"), makePod("p2", "2")}, expectedListRequests: 1, expectedRequestOptions: []metav1.ListOptions{ { ResourceVersion: "2", ResourceVersionMatch: metav1.ResourceVersionMatchExact, + TimeoutSeconds: ptr.To(int64(39)), }, }, }, { - name: "watchlist consistency check won't panic when there is no data", + name: "data consistency check won't panic when there is no data", podList: &v1.PodList{ ListMeta: metav1.ListMeta{ResourceVersion: "2"}, }, + requestOptions: metav1.ListOptions{TimeoutSeconds: ptr.To(int64(39))}, expectedListRequests: 1, expectedRequestOptions: []metav1.ListOptions{ { ResourceVersion: "2", ResourceVersionMatch: metav1.ResourceVersionMatchExact, + TimeoutSeconds: ptr.To(int64(39)), }, }, }, { - name: "watchlist consistency panics when data is inconsistent", + name: "data consistency panics when data is inconsistent", podList: &v1.PodList{ ListMeta: metav1.ListMeta{ResourceVersion: "2"}, Items: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2"), *makePod("p3", "3")}, }, + requestOptions: metav1.ListOptions{TimeoutSeconds: ptr.To(int64(39))}, storeContent: []*v1.Pod{makePod("p1", "1"), makePod("p2", "2")}, expectedListRequests: 1, expectedRequestOptions: []metav1.ListOptions{ { ResourceVersion: "2", ResourceVersionMatch: metav1.ResourceVersionMatchExact, + TimeoutSeconds: ptr.To(int64(39)), }, }, expectPanic: true, @@ -90,15 +100,18 @@ func TestWatchListConsistency(t *testing.T) { for _, scenario := range scenarios { t.Run(scenario.name, func(t *testing.T) { listWatcher, store, _, stopCh := testData() + ctx := wait.ContextForChannel(stopCh) for _, obj := range scenario.storeContent { require.NoError(t, store.Add(obj)) } listWatcher.customListResponse = scenario.podList if scenario.expectPanic { - require.Panics(t, func() { checkWatchListConsistency(stopCh, "", scenario.podList.ResourceVersion, listWatcher, store) }) + require.Panics(t, func() { + checkDataConsistency(ctx, "", scenario.podList.ResourceVersion, wrapListFuncWithContext(listWatcher.List), scenario.requestOptions, store.List) + }) } else { - checkWatchListConsistency(stopCh, "", scenario.podList.ResourceVersion, listWatcher, store) + checkDataConsistency(ctx, "", scenario.podList.ResourceVersion, wrapListFuncWithContext(listWatcher.List), scenario.requestOptions, store.List) } verifyListCounter(t, listWatcher, scenario.expectedListRequests) @@ -108,20 +121,18 @@ func TestWatchListConsistency(t *testing.T) { } func TestDriveWatchLisConsistencyIfRequired(t *testing.T) { - stopCh := make(chan struct{}) - defer close(stopCh) - checkWatchListConsistencyIfRequested(stopCh, "", "", nil, nil) + ctx := context.TODO() + checkWatchListDataConsistencyIfRequested[runtime.Object, runtime.Object](ctx, "", "", nil, nil) } -func TestWatchListConsistencyRetry(t *testing.T) { +func TestDataConsistencyCheckerRetry(t *testing.T) { store := NewStore(MetaNamespaceKeyFunc) - stopCh := make(chan struct{}) - defer close(stopCh) + ctx := context.TODO() stopListErrorAfter := 5 errLister := &errorLister{stopErrorAfter: stopListErrorAfter} - checkWatchListConsistency(stopCh, "", "", errLister, store) + checkDataConsistency(ctx, "", "", wrapListFuncWithContext(errLister.List), metav1.ListOptions{}, store.List) require.Equal(t, errLister.listCounter, errLister.stopErrorAfter) }