diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector_data_consistency_detector.go b/staging/src/k8s.io/client-go/tools/cache/reflector_data_consistency_detector.go index 0aacee4a067..bd857de7a51 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector_data_consistency_detector.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector_data_consistency_detector.go @@ -1,5 +1,5 @@ /* -Copyright 2023 The Kubernetes Authors. +Copyright 2024 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -18,19 +18,12 @@ package cache import ( "context" - "fmt" "os" - "sort" "strconv" - "time" - "github.com/google/go-cmp/cmp" - - "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/klog/v2" + "k8s.io/client-go/util/consistencydetector" ) var dataConsistencyDetectionForWatchListEnabled = false @@ -39,10 +32,6 @@ func init() { dataConsistencyDetectionForWatchListEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHLIST_INCONSISTENCY_DETECTOR")) } -type retrieveItemsFunc[U any] func() []U - -type listFunc[T runtime.Object] func(ctx context.Context, options metav1.ListOptions) (T, error) - // checkWatchListDataConsistencyIfRequested performs a data consistency check only when // the KUBE_WATCHLIST_INCONSISTENCY_DETECTOR environment variable was set during a binary startup. // @@ -52,73 +41,11 @@ type listFunc[T runtime.Object] func(ctx context.Context, options metav1.ListOpt // // Note that this function will panic when data inconsistency is detected. // This is intentional because we want to catch it in the CI. -func checkWatchListDataConsistencyIfRequested[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn listFunc[T], retrieveItemsFn retrieveItemsFunc[U]) { +func checkWatchListDataConsistencyIfRequested[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn consistencydetector.ListFunc[T], retrieveItemsFn consistencydetector.RetrieveItemsFunc[U]) { if !dataConsistencyDetectionForWatchListEnabled { return } // for informers we pass an empty ListOptions because // listFn might be wrapped for filtering during informer construction. - checkDataConsistency(ctx, identity, lastSyncedResourceVersion, listFn, metav1.ListOptions{}, retrieveItemsFn) -} - -// checkDataConsistency exists solely for testing purposes. -// we cannot use checkWatchListDataConsistencyIfRequested because -// it is guarded by an environmental variable. -// we cannot manipulate the environmental variable because -// it will affect other tests in this package. -func checkDataConsistency[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn listFunc[T], listOptions metav1.ListOptions, retrieveItemsFn retrieveItemsFunc[U]) { - klog.Warningf("data consistency check for %s is enabled, this will result in an additional call to the API server.", identity) - listOptions.ResourceVersion = lastSyncedResourceVersion - listOptions.ResourceVersionMatch = metav1.ResourceVersionMatchExact - var list runtime.Object - err := wait.PollUntilContextCancel(ctx, time.Second, true, func(_ context.Context) (done bool, err error) { - list, err = listFn(ctx, listOptions) - if err != nil { - // the consistency check will only be enabled in the CI - // and LIST calls in general will be retired by the client-go library - // if we fail simply log and retry - klog.Errorf("failed to list data from the server, retrying until stopCh is closed, err: %v", err) - return false, nil - } - return true, nil - }) - if err != nil { - klog.Errorf("failed to list data from the server, the data consistency check for %s won't be performed, stopCh was closed, err: %v", identity, err) - return - } - - rawListItems, err := meta.ExtractListWithAlloc(list) - if err != nil { - panic(err) // this should never happen - } - - listItems := toMetaObjectSliceOrDie(rawListItems) - retrievedItems := toMetaObjectSliceOrDie(retrieveItemsFn()) - - sort.Sort(byUID(listItems)) - sort.Sort(byUID(retrievedItems)) - - if !cmp.Equal(listItems, retrievedItems) { - klog.Infof("previously received data for %s is different than received by the standard list api call against etcd, diff: %v", identity, cmp.Diff(listItems, retrievedItems)) - msg := fmt.Sprintf("data inconsistency detected for %s, panicking!", identity) - panic(msg) - } -} - -type byUID []metav1.Object - -func (a byUID) Len() int { return len(a) } -func (a byUID) Less(i, j int) bool { return a[i].GetUID() < a[j].GetUID() } -func (a byUID) Swap(i, j int) { a[i], a[j] = a[j], a[i] } - -func toMetaObjectSliceOrDie[T any](s []T) []metav1.Object { - result := make([]metav1.Object, len(s)) - for i, v := range s { - m, err := meta.Accessor(v) - if err != nil { - panic(err) - } - result[i] = m - } - return result + consistencydetector.CheckDataConsistency(ctx, identity, lastSyncedResourceVersion, listFn, metav1.ListOptions{}, retrieveItemsFn) } diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector_data_consistency_detector_test.go b/staging/src/k8s.io/client-go/tools/cache/reflector_data_consistency_detector_test.go index 81affb75d76..7d0d8bb624d 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector_data_consistency_detector_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector_data_consistency_detector_test.go @@ -1,5 +1,5 @@ /* -Copyright 2023 The Kubernetes Authors. +Copyright 2024 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -18,143 +18,12 @@ package cache import ( "context" - "fmt" "testing" - "github.com/stretchr/testify/require" - - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/utils/ptr" ) -func TestDataConsistencyChecker(t *testing.T) { - scenarios := []struct { - name string - - listResponse *v1.PodList - retrievedItems []*v1.Pod - requestOptions metav1.ListOptions - - expectedRequestOptions []metav1.ListOptions - expectedListRequests int - expectPanic bool - }{ - { - name: "data consistency check won't panic when data is consistent", - listResponse: &v1.PodList{ - ListMeta: metav1.ListMeta{ResourceVersion: "2"}, - Items: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2")}, - }, - requestOptions: metav1.ListOptions{TimeoutSeconds: ptr.To(int64(39))}, - retrievedItems: []*v1.Pod{makePod("p1", "1"), makePod("p2", "2")}, - expectedListRequests: 1, - expectedRequestOptions: []metav1.ListOptions{ - { - ResourceVersion: "2", - ResourceVersionMatch: metav1.ResourceVersionMatchExact, - TimeoutSeconds: ptr.To(int64(39)), - }, - }, - }, - - { - name: "data consistency check won't panic when there is no data", - listResponse: &v1.PodList{ - ListMeta: metav1.ListMeta{ResourceVersion: "2"}, - }, - requestOptions: metav1.ListOptions{TimeoutSeconds: ptr.To(int64(39))}, - expectedListRequests: 1, - expectedRequestOptions: []metav1.ListOptions{ - { - ResourceVersion: "2", - ResourceVersionMatch: metav1.ResourceVersionMatchExact, - TimeoutSeconds: ptr.To(int64(39)), - }, - }, - }, - - { - name: "data consistency panics when data is inconsistent", - listResponse: &v1.PodList{ - ListMeta: metav1.ListMeta{ResourceVersion: "2"}, - Items: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2"), *makePod("p3", "3")}, - }, - requestOptions: metav1.ListOptions{TimeoutSeconds: ptr.To(int64(39))}, - retrievedItems: []*v1.Pod{makePod("p1", "1"), makePod("p2", "2")}, - expectedListRequests: 1, - expectedRequestOptions: []metav1.ListOptions{ - { - ResourceVersion: "2", - ResourceVersionMatch: metav1.ResourceVersionMatchExact, - TimeoutSeconds: ptr.To(int64(39)), - }, - }, - expectPanic: true, - }, - } - - for _, scenario := range scenarios { - t.Run(scenario.name, func(t *testing.T) { - ctx := context.TODO() - fakeLister := &listWrapper{response: scenario.listResponse} - retrievedItemsFunc := func() []*v1.Pod { - return scenario.retrievedItems - } - - if scenario.expectPanic { - require.Panics(t, func() { - checkDataConsistency(ctx, "", scenario.listResponse.ResourceVersion, fakeLister.List, scenario.requestOptions, retrievedItemsFunc) - }) - } else { - checkDataConsistency(ctx, "", scenario.listResponse.ResourceVersion, fakeLister.List, scenario.requestOptions, retrievedItemsFunc) - } - - require.Equal(t, fakeLister.counter, scenario.expectedListRequests) - require.Equal(t, fakeLister.requestOptions, scenario.expectedRequestOptions) - }) - } -} - func TestDriveWatchLisConsistencyIfRequired(t *testing.T) { ctx := context.TODO() checkWatchListDataConsistencyIfRequested[runtime.Object, runtime.Object](ctx, "", "", nil, nil) } - -func TestDataConsistencyCheckerRetry(t *testing.T) { - ctx := context.TODO() - retrievedItemsFunc := func() []*v1.Pod { - return nil - } - stopListErrorAfter := 5 - fakeErrLister := &errorLister{stopErrorAfter: stopListErrorAfter} - - checkDataConsistency(ctx, "", "", fakeErrLister.List, metav1.ListOptions{}, retrievedItemsFunc) - require.Equal(t, fakeErrLister.listCounter, fakeErrLister.stopErrorAfter) -} - -type errorLister struct { - listCounter int - stopErrorAfter int -} - -func (lw *errorLister) List(_ context.Context, _ metav1.ListOptions) (runtime.Object, error) { - lw.listCounter++ - if lw.listCounter == lw.stopErrorAfter { - return &v1.PodList{}, nil - } - return nil, fmt.Errorf("nasty error") -} - -type listWrapper struct { - counter int - requestOptions []metav1.ListOptions - response *v1.PodList -} - -func (lw *listWrapper) List(_ context.Context, opts metav1.ListOptions) (*v1.PodList, error) { - lw.counter++ - lw.requestOptions = append(lw.requestOptions, opts) - return lw.response, nil -} diff --git a/staging/src/k8s.io/client-go/util/consistencydetector/data_consistency_detector.go b/staging/src/k8s.io/client-go/util/consistencydetector/data_consistency_detector.go new file mode 100644 index 00000000000..64288eb865f --- /dev/null +++ b/staging/src/k8s.io/client-go/util/consistencydetector/data_consistency_detector.go @@ -0,0 +1,98 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package consistencydetector + +import ( + "context" + "fmt" + "sort" + "time" + + "github.com/google/go-cmp/cmp" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" +) + +type RetrieveItemsFunc[U any] func() []U + +type ListFunc[T runtime.Object] func(ctx context.Context, options metav1.ListOptions) (T, error) + +// CheckDataConsistency exists solely for testing purposes. +// we cannot use checkWatchListDataConsistencyIfRequested because +// it is guarded by an environmental variable. +// we cannot manipulate the environmental variable because +// it will affect other tests in this package. +func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn ListFunc[T], listOptions metav1.ListOptions, retrieveItemsFn RetrieveItemsFunc[U]) { + klog.Warningf("data consistency check for %s is enabled, this will result in an additional call to the API server.", identity) + listOptions.ResourceVersion = lastSyncedResourceVersion + listOptions.ResourceVersionMatch = metav1.ResourceVersionMatchExact + var list runtime.Object + err := wait.PollUntilContextCancel(ctx, time.Second, true, func(_ context.Context) (done bool, err error) { + list, err = listFn(ctx, listOptions) + if err != nil { + // the consistency check will only be enabled in the CI + // and LIST calls in general will be retired by the client-go library + // if we fail simply log and retry + klog.Errorf("failed to list data from the server, retrying until stopCh is closed, err: %v", err) + return false, nil + } + return true, nil + }) + if err != nil { + klog.Errorf("failed to list data from the server, the data consistency check for %s won't be performed, stopCh was closed, err: %v", identity, err) + return + } + + rawListItems, err := meta.ExtractListWithAlloc(list) + if err != nil { + panic(err) // this should never happen + } + + listItems := toMetaObjectSliceOrDie(rawListItems) + retrievedItems := toMetaObjectSliceOrDie(retrieveItemsFn()) + + sort.Sort(byUID(listItems)) + sort.Sort(byUID(retrievedItems)) + + if !cmp.Equal(listItems, retrievedItems) { + klog.Infof("previously received data for %s is different than received by the standard list api call against etcd, diff: %v", identity, cmp.Diff(listItems, retrievedItems)) + msg := fmt.Sprintf("data inconsistency detected for %s, panicking!", identity) + panic(msg) + } +} + +type byUID []metav1.Object + +func (a byUID) Len() int { return len(a) } +func (a byUID) Less(i, j int) bool { return a[i].GetUID() < a[j].GetUID() } +func (a byUID) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +func toMetaObjectSliceOrDie[T any](s []T) []metav1.Object { + result := make([]metav1.Object, len(s)) + for i, v := range s { + m, err := meta.Accessor(v) + if err != nil { + panic(err) + } + result[i] = m + } + return result +} diff --git a/staging/src/k8s.io/client-go/util/consistencydetector/data_consistency_detector_test.go b/staging/src/k8s.io/client-go/util/consistencydetector/data_consistency_detector_test.go new file mode 100644 index 00000000000..c6d21754e0d --- /dev/null +++ b/staging/src/k8s.io/client-go/util/consistencydetector/data_consistency_detector_test.go @@ -0,0 +1,160 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package consistencydetector + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" +) + +func TestDataConsistencyChecker(t *testing.T) { + scenarios := []struct { + name string + + listResponse *v1.PodList + retrievedItems []*v1.Pod + requestOptions metav1.ListOptions + + expectedRequestOptions []metav1.ListOptions + expectedListRequests int + expectPanic bool + }{ + { + name: "data consistency check won't panic when data is consistent", + listResponse: &v1.PodList{ + ListMeta: metav1.ListMeta{ResourceVersion: "2"}, + Items: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2")}, + }, + requestOptions: metav1.ListOptions{TimeoutSeconds: ptr.To(int64(39))}, + retrievedItems: []*v1.Pod{makePod("p1", "1"), makePod("p2", "2")}, + expectedListRequests: 1, + expectedRequestOptions: []metav1.ListOptions{ + { + ResourceVersion: "2", + ResourceVersionMatch: metav1.ResourceVersionMatchExact, + TimeoutSeconds: ptr.To(int64(39)), + }, + }, + }, + + { + name: "data consistency check won't panic when there is no data", + listResponse: &v1.PodList{ + ListMeta: metav1.ListMeta{ResourceVersion: "2"}, + }, + requestOptions: metav1.ListOptions{TimeoutSeconds: ptr.To(int64(39))}, + expectedListRequests: 1, + expectedRequestOptions: []metav1.ListOptions{ + { + ResourceVersion: "2", + ResourceVersionMatch: metav1.ResourceVersionMatchExact, + TimeoutSeconds: ptr.To(int64(39)), + }, + }, + }, + + { + name: "data consistency panics when data is inconsistent", + listResponse: &v1.PodList{ + ListMeta: metav1.ListMeta{ResourceVersion: "2"}, + Items: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2"), *makePod("p3", "3")}, + }, + requestOptions: metav1.ListOptions{TimeoutSeconds: ptr.To(int64(39))}, + retrievedItems: []*v1.Pod{makePod("p1", "1"), makePod("p2", "2")}, + expectedListRequests: 1, + expectedRequestOptions: []metav1.ListOptions{ + { + ResourceVersion: "2", + ResourceVersionMatch: metav1.ResourceVersionMatchExact, + TimeoutSeconds: ptr.To(int64(39)), + }, + }, + expectPanic: true, + }, + } + + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + ctx := context.TODO() + fakeLister := &listWrapper{response: scenario.listResponse} + retrievedItemsFunc := func() []*v1.Pod { + return scenario.retrievedItems + } + + if scenario.expectPanic { + require.Panics(t, func() { + CheckDataConsistency(ctx, "", scenario.listResponse.ResourceVersion, fakeLister.List, scenario.requestOptions, retrievedItemsFunc) + }) + } else { + CheckDataConsistency(ctx, "", scenario.listResponse.ResourceVersion, fakeLister.List, scenario.requestOptions, retrievedItemsFunc) + } + + require.Equal(t, fakeLister.counter, scenario.expectedListRequests) + require.Equal(t, fakeLister.requestOptions, scenario.expectedRequestOptions) + }) + } +} + +func TestDataConsistencyCheckerRetry(t *testing.T) { + ctx := context.TODO() + retrievedItemsFunc := func() []*v1.Pod { + return nil + } + stopListErrorAfter := 5 + fakeErrLister := &errorLister{stopErrorAfter: stopListErrorAfter} + + CheckDataConsistency(ctx, "", "", fakeErrLister.List, metav1.ListOptions{}, retrievedItemsFunc) + require.Equal(t, fakeErrLister.listCounter, fakeErrLister.stopErrorAfter) +} + +type errorLister struct { + listCounter int + stopErrorAfter int +} + +func (lw *errorLister) List(_ context.Context, _ metav1.ListOptions) (runtime.Object, error) { + lw.listCounter++ + if lw.listCounter == lw.stopErrorAfter { + return &v1.PodList{}, nil + } + return nil, fmt.Errorf("nasty error") +} + +type listWrapper struct { + counter int + requestOptions []metav1.ListOptions + response *v1.PodList +} + +func (lw *listWrapper) List(_ context.Context, opts metav1.ListOptions) (*v1.PodList, error) { + lw.counter++ + lw.requestOptions = append(lw.requestOptions, opts) + return lw.response, nil +} + +func makePod(name, rv string) *v1.Pod { + return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: name, ResourceVersion: rv, UID: types.UID(name)}} +}