mirror of
https://github.com/kubernetes/client-go.git
synced 2025-07-19 09:38:39 +00:00
Merge pull request #121363 from p0lyn0mial/upstream-reflector-consistency-check
client-go/reflector: introduce a data consistency check for the watch-list feature. Kubernetes-commit: 0dc900cebe079efaf2087adc6fee6eb88d176020
This commit is contained in:
commit
5f697abdfd
4
go.mod
4
go.mod
@ -24,7 +24,7 @@ require (
|
||||
golang.org/x/term v0.13.0
|
||||
golang.org/x/time v0.3.0
|
||||
google.golang.org/protobuf v1.31.0
|
||||
k8s.io/api v0.0.0-20231020231154-1535dfa58aa1
|
||||
k8s.io/api v0.0.0-20231020231155-fe172d7dd4b6
|
||||
k8s.io/apimachinery v0.0.0-20231020230052-c047e325a432
|
||||
k8s.io/klog/v2 v2.100.1
|
||||
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00
|
||||
@ -60,6 +60,6 @@ require (
|
||||
)
|
||||
|
||||
replace (
|
||||
k8s.io/api => k8s.io/api v0.0.0-20231020231154-1535dfa58aa1
|
||||
k8s.io/api => k8s.io/api v0.0.0-20231020231155-fe172d7dd4b6
|
||||
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20231020230052-c047e325a432
|
||||
)
|
||||
|
4
go.sum
4
go.sum
@ -147,8 +147,8 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
k8s.io/api v0.0.0-20231020231154-1535dfa58aa1 h1:qfLikakw9JxZoptlrycHCEd9rcAGdLMTg9ulMi0VrD0=
|
||||
k8s.io/api v0.0.0-20231020231154-1535dfa58aa1/go.mod h1:mgYOiLIgrQcsuVxrBI6Pplk91r3sl5ZJ7eUx7UBMTkY=
|
||||
k8s.io/api v0.0.0-20231020231155-fe172d7dd4b6 h1:L4jlSzYt2s6+MTDB3gKmik1CNSx9Dpzf3wGnSFV1pqk=
|
||||
k8s.io/api v0.0.0-20231020231155-fe172d7dd4b6/go.mod h1:mgYOiLIgrQcsuVxrBI6Pplk91r3sl5ZJ7eUx7UBMTkY=
|
||||
k8s.io/apimachinery v0.0.0-20231020230052-c047e325a432 h1:TWovhSGZGPhiGaOsd06sIch/R3NwKrbnIj5leHo2OCM=
|
||||
k8s.io/apimachinery v0.0.0-20231020230052-c047e325a432/go.mod h1:mdlGhJWO1mhVzQXm1Lx7D1BvvBIVKlRVy0vvl1LwGjg=
|
||||
k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg=
|
||||
|
6
tools/cache/reflector.go
vendored
6
tools/cache/reflector.go
vendored
@ -674,6 +674,12 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
|
||||
// "k8s.io/initial-events-end" bookmark.
|
||||
initTrace.Step("Objects streamed", trace.Field{Key: "count", Value: len(temporaryStore.List())})
|
||||
r.setIsLastSyncResourceVersionUnavailable(false)
|
||||
|
||||
// we utilize the temporaryStore to ensure independence from the current store implementation.
|
||||
// as of today, the store is implemented as a queue and will be drained by the higher-level
|
||||
// component as soon as it finishes replacing the content.
|
||||
checkWatchListConsistencyIfRequested(stopCh, r.name, resourceVersion, r.listerWatcher, temporaryStore)
|
||||
|
||||
if err = r.store.Replace(temporaryStore.List(), resourceVersion); err != nil {
|
||||
return nil, fmt.Errorf("unable to sync watch-list result: %v", err)
|
||||
}
|
||||
|
119
tools/cache/reflector_data_consistency_detector.go
vendored
Normal file
119
tools/cache/reflector_data_consistency_detector.go
vendored
Normal file
@ -0,0 +1,119 @@
|
||||
/*
|
||||
Copyright 2023 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"sort"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
var dataConsistencyDetectionEnabled = false
|
||||
|
||||
func init() {
|
||||
dataConsistencyDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHLIST_INCONSISTENCY_DETECTOR"))
|
||||
}
|
||||
|
||||
// checkWatchListConsistencyIfRequested performs a data consistency check only when
|
||||
// the KUBE_WATCHLIST_INCONSISTENCY_DETECTOR environment variable was set during a binary startup.
|
||||
//
|
||||
// The consistency check is meant to be enforced only in the CI, not in production.
|
||||
// The check ensures that data retrieved by the watch-list api call
|
||||
// is exactly the same as data received by the standard list api call.
|
||||
//
|
||||
// Note that this function will panic when data inconsistency is detected.
|
||||
// This is intentional because we want to catch it in the CI.
|
||||
func checkWatchListConsistencyIfRequested(stopCh <-chan struct{}, identity string, lastSyncedResourceVersion string, listerWatcher Lister, store Store) {
|
||||
if !dataConsistencyDetectionEnabled {
|
||||
return
|
||||
}
|
||||
checkWatchListConsistency(stopCh, identity, lastSyncedResourceVersion, listerWatcher, store)
|
||||
}
|
||||
|
||||
// checkWatchListConsistency exists solely for testing purposes.
|
||||
// we cannot use checkWatchListConsistencyIfRequested because
|
||||
// it is guarded by an environmental variable.
|
||||
// we cannot manipulate the environmental variable because
|
||||
// it will affect other tests in this package.
|
||||
func checkWatchListConsistency(stopCh <-chan struct{}, identity string, lastSyncedResourceVersion string, listerWatcher Lister, store Store) {
|
||||
klog.Warningf("%s: data consistency check for the watch-list feature is enabled, this will result in an additional call to the API server.", identity)
|
||||
opts := metav1.ListOptions{
|
||||
ResourceVersion: lastSyncedResourceVersion,
|
||||
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
|
||||
}
|
||||
var list runtime.Object
|
||||
err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), time.Second, true, func(_ context.Context) (done bool, err error) {
|
||||
list, err = listerWatcher.List(opts)
|
||||
if err != nil {
|
||||
// the consistency check will only be enabled in the CI
|
||||
// and LIST calls in general will be retired by the client-go library
|
||||
// if we fail simply log and retry
|
||||
klog.Errorf("failed to list data from the server, retrying until stopCh is closed, err: %v", err)
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
klog.Errorf("failed to list data from the server, the watch-list consistency check won't be performed, stopCh was closed, err: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
rawListItems, err := meta.ExtractListWithAlloc(list)
|
||||
if err != nil {
|
||||
panic(err) // this should never happen
|
||||
}
|
||||
|
||||
listItems := toMetaObjectSliceOrDie(rawListItems)
|
||||
storeItems := toMetaObjectSliceOrDie(store.List())
|
||||
|
||||
sort.Sort(byUID(listItems))
|
||||
sort.Sort(byUID(storeItems))
|
||||
|
||||
if !cmp.Equal(listItems, storeItems) {
|
||||
klog.Infof("%s: data received by the new watch-list api call is different than received by the standard list api call, diff: %v", identity, cmp.Diff(listItems, storeItems))
|
||||
msg := "data inconsistency detected for the watch-list feature, panicking!"
|
||||
panic(msg)
|
||||
}
|
||||
}
|
||||
|
||||
type byUID []metav1.Object
|
||||
|
||||
func (a byUID) Len() int { return len(a) }
|
||||
func (a byUID) Less(i, j int) bool { return a[i].GetUID() < a[j].GetUID() }
|
||||
func (a byUID) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
|
||||
func toMetaObjectSliceOrDie[T any](s []T) []metav1.Object {
|
||||
result := make([]metav1.Object, len(s))
|
||||
for i, v := range s {
|
||||
m, err := meta.Accessor(v)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
result[i] = m
|
||||
}
|
||||
return result
|
||||
}
|
143
tools/cache/reflector_data_consistency_detector_test.go
vendored
Normal file
143
tools/cache/reflector_data_consistency_detector_test.go
vendored
Normal file
@ -0,0 +1,143 @@
|
||||
/*
|
||||
Copyright 2023 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
)
|
||||
|
||||
func TestWatchListConsistency(t *testing.T) {
|
||||
scenarios := []struct {
|
||||
name string
|
||||
|
||||
podList *v1.PodList
|
||||
storeContent []*v1.Pod
|
||||
|
||||
expectedRequestOptions []metav1.ListOptions
|
||||
expectedListRequests int
|
||||
expectPanic bool
|
||||
}{
|
||||
{
|
||||
name: "watchlist consistency check won't panic when data is consistent",
|
||||
podList: &v1.PodList{
|
||||
ListMeta: metav1.ListMeta{ResourceVersion: "2"},
|
||||
Items: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2")},
|
||||
},
|
||||
storeContent: []*v1.Pod{makePod("p1", "1"), makePod("p2", "2")},
|
||||
expectedListRequests: 1,
|
||||
expectedRequestOptions: []metav1.ListOptions{
|
||||
{
|
||||
ResourceVersion: "2",
|
||||
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
name: "watchlist consistency check won't panic when there is no data",
|
||||
podList: &v1.PodList{
|
||||
ListMeta: metav1.ListMeta{ResourceVersion: "2"},
|
||||
},
|
||||
expectedListRequests: 1,
|
||||
expectedRequestOptions: []metav1.ListOptions{
|
||||
{
|
||||
ResourceVersion: "2",
|
||||
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
name: "watchlist consistency panics when data is inconsistent",
|
||||
podList: &v1.PodList{
|
||||
ListMeta: metav1.ListMeta{ResourceVersion: "2"},
|
||||
Items: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2"), *makePod("p3", "3")},
|
||||
},
|
||||
storeContent: []*v1.Pod{makePod("p1", "1"), makePod("p2", "2")},
|
||||
expectedListRequests: 1,
|
||||
expectedRequestOptions: []metav1.ListOptions{
|
||||
{
|
||||
ResourceVersion: "2",
|
||||
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
|
||||
},
|
||||
},
|
||||
expectPanic: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, scenario := range scenarios {
|
||||
t.Run(scenario.name, func(t *testing.T) {
|
||||
listWatcher, store, _, stopCh := testData()
|
||||
for _, obj := range scenario.storeContent {
|
||||
require.NoError(t, store.Add(obj))
|
||||
}
|
||||
listWatcher.customListResponse = scenario.podList
|
||||
|
||||
if scenario.expectPanic {
|
||||
require.Panics(t, func() { checkWatchListConsistency(stopCh, "", scenario.podList.ResourceVersion, listWatcher, store) })
|
||||
} else {
|
||||
checkWatchListConsistency(stopCh, "", scenario.podList.ResourceVersion, listWatcher, store)
|
||||
}
|
||||
|
||||
verifyListCounter(t, listWatcher, scenario.expectedListRequests)
|
||||
verifyRequestOptions(t, listWatcher, scenario.expectedRequestOptions)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDriveWatchLisConsistencyIfRequired(t *testing.T) {
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
checkWatchListConsistencyIfRequested(stopCh, "", "", nil, nil)
|
||||
}
|
||||
|
||||
func TestWatchListConsistencyRetry(t *testing.T) {
|
||||
store := NewStore(MetaNamespaceKeyFunc)
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
stopListErrorAfter := 5
|
||||
errLister := &errorLister{stopErrorAfter: stopListErrorAfter}
|
||||
|
||||
checkWatchListConsistency(stopCh, "", "", errLister, store)
|
||||
require.Equal(t, errLister.listCounter, errLister.stopErrorAfter)
|
||||
}
|
||||
|
||||
type errorLister struct {
|
||||
listCounter int
|
||||
stopErrorAfter int
|
||||
}
|
||||
|
||||
func (lw *errorLister) List(_ metav1.ListOptions) (runtime.Object, error) {
|
||||
lw.listCounter++
|
||||
if lw.listCounter == lw.stopErrorAfter {
|
||||
return &v1.PodList{}, nil
|
||||
}
|
||||
return nil, fmt.Errorf("nasty error")
|
||||
}
|
||||
|
||||
func (lw *errorLister) Watch(_ metav1.ListOptions) (watch.Interface, error) {
|
||||
panic("not implemented")
|
||||
}
|
3
tools/cache/reflector_watchlist_test.go
vendored
3
tools/cache/reflector_watchlist_test.go
vendored
@ -30,6 +30,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/utils/pointer"
|
||||
)
|
||||
@ -491,7 +492,7 @@ func verifyStore(t *testing.T, s Store, expectedPods []v1.Pod) {
|
||||
}
|
||||
|
||||
func makePod(name, rv string) *v1.Pod {
|
||||
return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: name, ResourceVersion: rv}}
|
||||
return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: name, ResourceVersion: rv, UID: types.UID(name)}}
|
||||
}
|
||||
|
||||
func testData() (*fakeListWatcher, Store, *Reflector, chan struct{}) {
|
||||
|
Loading…
Reference in New Issue
Block a user