diff --git a/go.mod b/go.mod index 765cf947..38f2f48e 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( golang.org/x/term v0.13.0 golang.org/x/time v0.3.0 google.golang.org/protobuf v1.31.0 - k8s.io/api v0.0.0-20231020231154-1535dfa58aa1 + k8s.io/api v0.0.0-20231020231155-fe172d7dd4b6 k8s.io/apimachinery v0.0.0-20231020230052-c047e325a432 k8s.io/klog/v2 v2.100.1 k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 @@ -60,6 +60,6 @@ require ( ) replace ( - k8s.io/api => k8s.io/api v0.0.0-20231020231154-1535dfa58aa1 + k8s.io/api => k8s.io/api v0.0.0-20231020231155-fe172d7dd4b6 k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20231020230052-c047e325a432 ) diff --git a/go.sum b/go.sum index 6ea0b5dd..03edf4d0 100644 --- a/go.sum +++ b/go.sum @@ -147,8 +147,8 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/api v0.0.0-20231020231154-1535dfa58aa1 h1:qfLikakw9JxZoptlrycHCEd9rcAGdLMTg9ulMi0VrD0= -k8s.io/api v0.0.0-20231020231154-1535dfa58aa1/go.mod h1:mgYOiLIgrQcsuVxrBI6Pplk91r3sl5ZJ7eUx7UBMTkY= +k8s.io/api v0.0.0-20231020231155-fe172d7dd4b6 h1:L4jlSzYt2s6+MTDB3gKmik1CNSx9Dpzf3wGnSFV1pqk= +k8s.io/api v0.0.0-20231020231155-fe172d7dd4b6/go.mod h1:mgYOiLIgrQcsuVxrBI6Pplk91r3sl5ZJ7eUx7UBMTkY= k8s.io/apimachinery v0.0.0-20231020230052-c047e325a432 h1:TWovhSGZGPhiGaOsd06sIch/R3NwKrbnIj5leHo2OCM= k8s.io/apimachinery v0.0.0-20231020230052-c047e325a432/go.mod h1:mdlGhJWO1mhVzQXm1Lx7D1BvvBIVKlRVy0vvl1LwGjg= k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg= diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 2cf4723d..c1ea13de 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -674,6 +674,12 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) { // "k8s.io/initial-events-end" bookmark. initTrace.Step("Objects streamed", trace.Field{Key: "count", Value: len(temporaryStore.List())}) r.setIsLastSyncResourceVersionUnavailable(false) + + // we utilize the temporaryStore to ensure independence from the current store implementation. + // as of today, the store is implemented as a queue and will be drained by the higher-level + // component as soon as it finishes replacing the content. + checkWatchListConsistencyIfRequested(stopCh, r.name, resourceVersion, r.listerWatcher, temporaryStore) + if err = r.store.Replace(temporaryStore.List(), resourceVersion); err != nil { return nil, fmt.Errorf("unable to sync watch-list result: %v", err) } diff --git a/tools/cache/reflector_data_consistency_detector.go b/tools/cache/reflector_data_consistency_detector.go new file mode 100644 index 00000000..aa3027d7 --- /dev/null +++ b/tools/cache/reflector_data_consistency_detector.go @@ -0,0 +1,119 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "context" + "os" + "sort" + "strconv" + "time" + + "github.com/google/go-cmp/cmp" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" +) + +var dataConsistencyDetectionEnabled = false + +func init() { + dataConsistencyDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHLIST_INCONSISTENCY_DETECTOR")) +} + +// checkWatchListConsistencyIfRequested performs a data consistency check only when +// the KUBE_WATCHLIST_INCONSISTENCY_DETECTOR environment variable was set during a binary startup. +// +// The consistency check is meant to be enforced only in the CI, not in production. +// The check ensures that data retrieved by the watch-list api call +// is exactly the same as data received by the standard list api call. +// +// Note that this function will panic when data inconsistency is detected. +// This is intentional because we want to catch it in the CI. +func checkWatchListConsistencyIfRequested(stopCh <-chan struct{}, identity string, lastSyncedResourceVersion string, listerWatcher Lister, store Store) { + if !dataConsistencyDetectionEnabled { + return + } + checkWatchListConsistency(stopCh, identity, lastSyncedResourceVersion, listerWatcher, store) +} + +// checkWatchListConsistency exists solely for testing purposes. +// we cannot use checkWatchListConsistencyIfRequested because +// it is guarded by an environmental variable. +// we cannot manipulate the environmental variable because +// it will affect other tests in this package. +func checkWatchListConsistency(stopCh <-chan struct{}, identity string, lastSyncedResourceVersion string, listerWatcher Lister, store Store) { + klog.Warningf("%s: data consistency check for the watch-list feature is enabled, this will result in an additional call to the API server.", identity) + opts := metav1.ListOptions{ + ResourceVersion: lastSyncedResourceVersion, + ResourceVersionMatch: metav1.ResourceVersionMatchExact, + } + var list runtime.Object + err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), time.Second, true, func(_ context.Context) (done bool, err error) { + list, err = listerWatcher.List(opts) + if err != nil { + // the consistency check will only be enabled in the CI + // and LIST calls in general will be retired by the client-go library + // if we fail simply log and retry + klog.Errorf("failed to list data from the server, retrying until stopCh is closed, err: %v", err) + return false, nil + } + return true, nil + }) + if err != nil { + klog.Errorf("failed to list data from the server, the watch-list consistency check won't be performed, stopCh was closed, err: %v", err) + return + } + + rawListItems, err := meta.ExtractListWithAlloc(list) + if err != nil { + panic(err) // this should never happen + } + + listItems := toMetaObjectSliceOrDie(rawListItems) + storeItems := toMetaObjectSliceOrDie(store.List()) + + sort.Sort(byUID(listItems)) + sort.Sort(byUID(storeItems)) + + if !cmp.Equal(listItems, storeItems) { + klog.Infof("%s: data received by the new watch-list api call is different than received by the standard list api call, diff: %v", identity, cmp.Diff(listItems, storeItems)) + msg := "data inconsistency detected for the watch-list feature, panicking!" + panic(msg) + } +} + +type byUID []metav1.Object + +func (a byUID) Len() int { return len(a) } +func (a byUID) Less(i, j int) bool { return a[i].GetUID() < a[j].GetUID() } +func (a byUID) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +func toMetaObjectSliceOrDie[T any](s []T) []metav1.Object { + result := make([]metav1.Object, len(s)) + for i, v := range s { + m, err := meta.Accessor(v) + if err != nil { + panic(err) + } + result[i] = m + } + return result +} diff --git a/tools/cache/reflector_data_consistency_detector_test.go b/tools/cache/reflector_data_consistency_detector_test.go new file mode 100644 index 00000000..3c7eda7d --- /dev/null +++ b/tools/cache/reflector_data_consistency_detector_test.go @@ -0,0 +1,143 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" +) + +func TestWatchListConsistency(t *testing.T) { + scenarios := []struct { + name string + + podList *v1.PodList + storeContent []*v1.Pod + + expectedRequestOptions []metav1.ListOptions + expectedListRequests int + expectPanic bool + }{ + { + name: "watchlist consistency check won't panic when data is consistent", + podList: &v1.PodList{ + ListMeta: metav1.ListMeta{ResourceVersion: "2"}, + Items: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2")}, + }, + storeContent: []*v1.Pod{makePod("p1", "1"), makePod("p2", "2")}, + expectedListRequests: 1, + expectedRequestOptions: []metav1.ListOptions{ + { + ResourceVersion: "2", + ResourceVersionMatch: metav1.ResourceVersionMatchExact, + }, + }, + }, + + { + name: "watchlist consistency check won't panic when there is no data", + podList: &v1.PodList{ + ListMeta: metav1.ListMeta{ResourceVersion: "2"}, + }, + expectedListRequests: 1, + expectedRequestOptions: []metav1.ListOptions{ + { + ResourceVersion: "2", + ResourceVersionMatch: metav1.ResourceVersionMatchExact, + }, + }, + }, + + { + name: "watchlist consistency panics when data is inconsistent", + podList: &v1.PodList{ + ListMeta: metav1.ListMeta{ResourceVersion: "2"}, + Items: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2"), *makePod("p3", "3")}, + }, + storeContent: []*v1.Pod{makePod("p1", "1"), makePod("p2", "2")}, + expectedListRequests: 1, + expectedRequestOptions: []metav1.ListOptions{ + { + ResourceVersion: "2", + ResourceVersionMatch: metav1.ResourceVersionMatchExact, + }, + }, + expectPanic: true, + }, + } + + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + listWatcher, store, _, stopCh := testData() + for _, obj := range scenario.storeContent { + require.NoError(t, store.Add(obj)) + } + listWatcher.customListResponse = scenario.podList + + if scenario.expectPanic { + require.Panics(t, func() { checkWatchListConsistency(stopCh, "", scenario.podList.ResourceVersion, listWatcher, store) }) + } else { + checkWatchListConsistency(stopCh, "", scenario.podList.ResourceVersion, listWatcher, store) + } + + verifyListCounter(t, listWatcher, scenario.expectedListRequests) + verifyRequestOptions(t, listWatcher, scenario.expectedRequestOptions) + }) + } +} + +func TestDriveWatchLisConsistencyIfRequired(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + checkWatchListConsistencyIfRequested(stopCh, "", "", nil, nil) +} + +func TestWatchListConsistencyRetry(t *testing.T) { + store := NewStore(MetaNamespaceKeyFunc) + stopCh := make(chan struct{}) + defer close(stopCh) + + stopListErrorAfter := 5 + errLister := &errorLister{stopErrorAfter: stopListErrorAfter} + + checkWatchListConsistency(stopCh, "", "", errLister, store) + require.Equal(t, errLister.listCounter, errLister.stopErrorAfter) +} + +type errorLister struct { + listCounter int + stopErrorAfter int +} + +func (lw *errorLister) List(_ metav1.ListOptions) (runtime.Object, error) { + lw.listCounter++ + if lw.listCounter == lw.stopErrorAfter { + return &v1.PodList{}, nil + } + return nil, fmt.Errorf("nasty error") +} + +func (lw *errorLister) Watch(_ metav1.ListOptions) (watch.Interface, error) { + panic("not implemented") +} diff --git a/tools/cache/reflector_watchlist_test.go b/tools/cache/reflector_watchlist_test.go index 93741178..c43db073 100644 --- a/tools/cache/reflector_watchlist_test.go +++ b/tools/cache/reflector_watchlist_test.go @@ -30,6 +30,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" "k8s.io/utils/pointer" ) @@ -491,7 +492,7 @@ func verifyStore(t *testing.T, s Store, expectedPods []v1.Pod) { } func makePod(name, rv string) *v1.Pod { - return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: name, ResourceVersion: rv}} + return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: name, ResourceVersion: rv, UID: types.UID(name)}} } func testData() (*fakeListWatcher, Store, *Reflector, chan struct{}) {