diff --git a/tools/cache/reflector_data_consistency_detector.go b/tools/cache/reflector_data_consistency_detector.go new file mode 100644 index 00000000..bd857de7 --- /dev/null +++ b/tools/cache/reflector_data_consistency_detector.go @@ -0,0 +1,51 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "context" + "os" + "strconv" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/util/consistencydetector" +) + +var dataConsistencyDetectionForWatchListEnabled = false + +func init() { + dataConsistencyDetectionForWatchListEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHLIST_INCONSISTENCY_DETECTOR")) +} + +// checkWatchListDataConsistencyIfRequested performs a data consistency check only when +// the KUBE_WATCHLIST_INCONSISTENCY_DETECTOR environment variable was set during a binary startup. +// +// The consistency check is meant to be enforced only in the CI, not in production. +// The check ensures that data retrieved by the watch-list api call +// is exactly the same as data received by the standard list api call against etcd. +// +// Note that this function will panic when data inconsistency is detected. +// This is intentional because we want to catch it in the CI. +func checkWatchListDataConsistencyIfRequested[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn consistencydetector.ListFunc[T], retrieveItemsFn consistencydetector.RetrieveItemsFunc[U]) { + if !dataConsistencyDetectionForWatchListEnabled { + return + } + // for informers we pass an empty ListOptions because + // listFn might be wrapped for filtering during informer construction. + consistencydetector.CheckDataConsistency(ctx, identity, lastSyncedResourceVersion, listFn, metav1.ListOptions{}, retrieveItemsFn) +} diff --git a/tools/cache/reflector_data_consistency_detector_test.go b/tools/cache/reflector_data_consistency_detector_test.go new file mode 100644 index 00000000..7d0d8bb6 --- /dev/null +++ b/tools/cache/reflector_data_consistency_detector_test.go @@ -0,0 +1,29 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "context" + "testing" + + "k8s.io/apimachinery/pkg/runtime" +) + +func TestDriveWatchLisConsistencyIfRequired(t *testing.T) { + ctx := context.TODO() + checkWatchListDataConsistencyIfRequested[runtime.Object, runtime.Object](ctx, "", "", nil, nil) +} diff --git a/util/consistencydetector/data_consistency_detector.go b/util/consistencydetector/data_consistency_detector.go index 084f1322..64288eb8 100644 --- a/util/consistencydetector/data_consistency_detector.go +++ b/util/consistencydetector/data_consistency_detector.go @@ -19,9 +19,7 @@ package consistencydetector import ( "context" "fmt" - "os" "sort" - "strconv" "time" "github.com/google/go-cmp/cmp" @@ -33,34 +31,10 @@ import ( "k8s.io/klog/v2" ) -var dataConsistencyDetectionForWatchListEnabled = false - -func init() { - dataConsistencyDetectionForWatchListEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHLIST_INCONSISTENCY_DETECTOR")) -} - type RetrieveItemsFunc[U any] func() []U type ListFunc[T runtime.Object] func(ctx context.Context, options metav1.ListOptions) (T, error) -// checkWatchListDataConsistencyIfRequested performs a data consistency check only when -// the KUBE_WATCHLIST_INCONSISTENCY_DETECTOR environment variable was set during a binary startup. -// -// The consistency check is meant to be enforced only in the CI, not in production. -// The check ensures that data retrieved by the watch-list api call -// is exactly the same as data received by the standard list api call against etcd. -// -// Note that this function will panic when data inconsistency is detected. -// This is intentional because we want to catch it in the CI. -func checkWatchListDataConsistencyIfRequested[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn ListFunc[T], retrieveItemsFn RetrieveItemsFunc[U]) { - if !dataConsistencyDetectionForWatchListEnabled { - return - } - // for informers we pass an empty ListOptions because - // listFn might be wrapped for filtering during informer construction. - CheckDataConsistency(ctx, identity, lastSyncedResourceVersion, listFn, metav1.ListOptions{}, retrieveItemsFn) -} - // CheckDataConsistency exists solely for testing purposes. // we cannot use checkWatchListDataConsistencyIfRequested because // it is guarded by an environmental variable. diff --git a/util/consistencydetector/data_consistency_detector_test.go b/util/consistencydetector/data_consistency_detector_test.go index 9cbe036e..c6d21754 100644 --- a/util/consistencydetector/data_consistency_detector_test.go +++ b/util/consistencydetector/data_consistency_detector_test.go @@ -118,11 +118,6 @@ func TestDataConsistencyChecker(t *testing.T) { } } -func TestDriveWatchLisConsistencyIfRequired(t *testing.T) { - ctx := context.TODO() - checkWatchListDataConsistencyIfRequested[runtime.Object, runtime.Object](ctx, "", "", nil, nil) -} - func TestDataConsistencyCheckerRetry(t *testing.T) { ctx := context.TODO() retrievedItemsFunc := func() []*v1.Pod {