Merge pull request #125146 from p0lyn0mial/upstream-client-go-consistency-detector-move-to-new-package

client-go: move data consistency detector to a new package
This commit is contained in:
Kubernetes Prow Robot 2024-05-27 06:24:08 -07:00 committed by GitHub
commit 9d5db28f5f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 263 additions and 209 deletions

View File

@ -1,5 +1,5 @@
/*
Copyright 2023 The Kubernetes Authors.
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -18,19 +18,12 @@ package cache
import (
"context"
"fmt"
"os"
"sort"
"strconv"
"time"
"github.com/google/go-cmp/cmp"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"k8s.io/client-go/util/consistencydetector"
)
var dataConsistencyDetectionForWatchListEnabled = false
@ -39,10 +32,6 @@ func init() {
dataConsistencyDetectionForWatchListEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHLIST_INCONSISTENCY_DETECTOR"))
}
type retrieveItemsFunc[U any] func() []U
type listFunc[T runtime.Object] func(ctx context.Context, options metav1.ListOptions) (T, error)
// checkWatchListDataConsistencyIfRequested performs a data consistency check only when
// the KUBE_WATCHLIST_INCONSISTENCY_DETECTOR environment variable was set during a binary startup.
//
@ -52,73 +41,11 @@ type listFunc[T runtime.Object] func(ctx context.Context, options metav1.ListOpt
//
// Note that this function will panic when data inconsistency is detected.
// This is intentional because we want to catch it in the CI.
func checkWatchListDataConsistencyIfRequested[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn listFunc[T], retrieveItemsFn retrieveItemsFunc[U]) {
func checkWatchListDataConsistencyIfRequested[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn consistencydetector.ListFunc[T], retrieveItemsFn consistencydetector.RetrieveItemsFunc[U]) {
if !dataConsistencyDetectionForWatchListEnabled {
return
}
// for informers we pass an empty ListOptions because
// listFn might be wrapped for filtering during informer construction.
checkDataConsistency(ctx, identity, lastSyncedResourceVersion, listFn, metav1.ListOptions{}, retrieveItemsFn)
}
// checkDataConsistency exists solely for testing purposes.
// we cannot use checkWatchListDataConsistencyIfRequested because
// it is guarded by an environmental variable.
// we cannot manipulate the environmental variable because
// it will affect other tests in this package.
func checkDataConsistency[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn listFunc[T], listOptions metav1.ListOptions, retrieveItemsFn retrieveItemsFunc[U]) {
klog.Warningf("data consistency check for %s is enabled, this will result in an additional call to the API server.", identity)
listOptions.ResourceVersion = lastSyncedResourceVersion
listOptions.ResourceVersionMatch = metav1.ResourceVersionMatchExact
var list runtime.Object
err := wait.PollUntilContextCancel(ctx, time.Second, true, func(_ context.Context) (done bool, err error) {
list, err = listFn(ctx, listOptions)
if err != nil {
// the consistency check will only be enabled in the CI
// and LIST calls in general will be retired by the client-go library
// if we fail simply log and retry
klog.Errorf("failed to list data from the server, retrying until stopCh is closed, err: %v", err)
return false, nil
}
return true, nil
})
if err != nil {
klog.Errorf("failed to list data from the server, the data consistency check for %s won't be performed, stopCh was closed, err: %v", identity, err)
return
}
rawListItems, err := meta.ExtractListWithAlloc(list)
if err != nil {
panic(err) // this should never happen
}
listItems := toMetaObjectSliceOrDie(rawListItems)
retrievedItems := toMetaObjectSliceOrDie(retrieveItemsFn())
sort.Sort(byUID(listItems))
sort.Sort(byUID(retrievedItems))
if !cmp.Equal(listItems, retrievedItems) {
klog.Infof("previously received data for %s is different than received by the standard list api call against etcd, diff: %v", identity, cmp.Diff(listItems, retrievedItems))
msg := fmt.Sprintf("data inconsistency detected for %s, panicking!", identity)
panic(msg)
}
}
type byUID []metav1.Object
func (a byUID) Len() int { return len(a) }
func (a byUID) Less(i, j int) bool { return a[i].GetUID() < a[j].GetUID() }
func (a byUID) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func toMetaObjectSliceOrDie[T any](s []T) []metav1.Object {
result := make([]metav1.Object, len(s))
for i, v := range s {
m, err := meta.Accessor(v)
if err != nil {
panic(err)
}
result[i] = m
}
return result
consistencydetector.CheckDataConsistency(ctx, identity, lastSyncedResourceVersion, listFn, metav1.ListOptions{}, retrieveItemsFn)
}

View File

@ -1,5 +1,5 @@
/*
Copyright 2023 The Kubernetes Authors.
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -18,143 +18,12 @@ package cache
import (
"context"
"fmt"
"testing"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/ptr"
)
func TestDataConsistencyChecker(t *testing.T) {
scenarios := []struct {
name string
listResponse *v1.PodList
retrievedItems []*v1.Pod
requestOptions metav1.ListOptions
expectedRequestOptions []metav1.ListOptions
expectedListRequests int
expectPanic bool
}{
{
name: "data consistency check won't panic when data is consistent",
listResponse: &v1.PodList{
ListMeta: metav1.ListMeta{ResourceVersion: "2"},
Items: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2")},
},
requestOptions: metav1.ListOptions{TimeoutSeconds: ptr.To(int64(39))},
retrievedItems: []*v1.Pod{makePod("p1", "1"), makePod("p2", "2")},
expectedListRequests: 1,
expectedRequestOptions: []metav1.ListOptions{
{
ResourceVersion: "2",
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
TimeoutSeconds: ptr.To(int64(39)),
},
},
},
{
name: "data consistency check won't panic when there is no data",
listResponse: &v1.PodList{
ListMeta: metav1.ListMeta{ResourceVersion: "2"},
},
requestOptions: metav1.ListOptions{TimeoutSeconds: ptr.To(int64(39))},
expectedListRequests: 1,
expectedRequestOptions: []metav1.ListOptions{
{
ResourceVersion: "2",
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
TimeoutSeconds: ptr.To(int64(39)),
},
},
},
{
name: "data consistency panics when data is inconsistent",
listResponse: &v1.PodList{
ListMeta: metav1.ListMeta{ResourceVersion: "2"},
Items: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2"), *makePod("p3", "3")},
},
requestOptions: metav1.ListOptions{TimeoutSeconds: ptr.To(int64(39))},
retrievedItems: []*v1.Pod{makePod("p1", "1"), makePod("p2", "2")},
expectedListRequests: 1,
expectedRequestOptions: []metav1.ListOptions{
{
ResourceVersion: "2",
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
TimeoutSeconds: ptr.To(int64(39)),
},
},
expectPanic: true,
},
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
ctx := context.TODO()
fakeLister := &listWrapper{response: scenario.listResponse}
retrievedItemsFunc := func() []*v1.Pod {
return scenario.retrievedItems
}
if scenario.expectPanic {
require.Panics(t, func() {
checkDataConsistency(ctx, "", scenario.listResponse.ResourceVersion, fakeLister.List, scenario.requestOptions, retrievedItemsFunc)
})
} else {
checkDataConsistency(ctx, "", scenario.listResponse.ResourceVersion, fakeLister.List, scenario.requestOptions, retrievedItemsFunc)
}
require.Equal(t, fakeLister.counter, scenario.expectedListRequests)
require.Equal(t, fakeLister.requestOptions, scenario.expectedRequestOptions)
})
}
}
func TestDriveWatchLisConsistencyIfRequired(t *testing.T) {
ctx := context.TODO()
checkWatchListDataConsistencyIfRequested[runtime.Object, runtime.Object](ctx, "", "", nil, nil)
}
func TestDataConsistencyCheckerRetry(t *testing.T) {
ctx := context.TODO()
retrievedItemsFunc := func() []*v1.Pod {
return nil
}
stopListErrorAfter := 5
fakeErrLister := &errorLister{stopErrorAfter: stopListErrorAfter}
checkDataConsistency(ctx, "", "", fakeErrLister.List, metav1.ListOptions{}, retrievedItemsFunc)
require.Equal(t, fakeErrLister.listCounter, fakeErrLister.stopErrorAfter)
}
type errorLister struct {
listCounter int
stopErrorAfter int
}
func (lw *errorLister) List(_ context.Context, _ metav1.ListOptions) (runtime.Object, error) {
lw.listCounter++
if lw.listCounter == lw.stopErrorAfter {
return &v1.PodList{}, nil
}
return nil, fmt.Errorf("nasty error")
}
type listWrapper struct {
counter int
requestOptions []metav1.ListOptions
response *v1.PodList
}
func (lw *listWrapper) List(_ context.Context, opts metav1.ListOptions) (*v1.PodList, error) {
lw.counter++
lw.requestOptions = append(lw.requestOptions, opts)
return lw.response, nil
}

View File

@ -0,0 +1,98 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package consistencydetector
import (
"context"
"fmt"
"sort"
"time"
"github.com/google/go-cmp/cmp"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
)
type RetrieveItemsFunc[U any] func() []U
type ListFunc[T runtime.Object] func(ctx context.Context, options metav1.ListOptions) (T, error)
// CheckDataConsistency exists solely for testing purposes.
// we cannot use checkWatchListDataConsistencyIfRequested because
// it is guarded by an environmental variable.
// we cannot manipulate the environmental variable because
// it will affect other tests in this package.
func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn ListFunc[T], listOptions metav1.ListOptions, retrieveItemsFn RetrieveItemsFunc[U]) {
klog.Warningf("data consistency check for %s is enabled, this will result in an additional call to the API server.", identity)
listOptions.ResourceVersion = lastSyncedResourceVersion
listOptions.ResourceVersionMatch = metav1.ResourceVersionMatchExact
var list runtime.Object
err := wait.PollUntilContextCancel(ctx, time.Second, true, func(_ context.Context) (done bool, err error) {
list, err = listFn(ctx, listOptions)
if err != nil {
// the consistency check will only be enabled in the CI
// and LIST calls in general will be retired by the client-go library
// if we fail simply log and retry
klog.Errorf("failed to list data from the server, retrying until stopCh is closed, err: %v", err)
return false, nil
}
return true, nil
})
if err != nil {
klog.Errorf("failed to list data from the server, the data consistency check for %s won't be performed, stopCh was closed, err: %v", identity, err)
return
}
rawListItems, err := meta.ExtractListWithAlloc(list)
if err != nil {
panic(err) // this should never happen
}
listItems := toMetaObjectSliceOrDie(rawListItems)
retrievedItems := toMetaObjectSliceOrDie(retrieveItemsFn())
sort.Sort(byUID(listItems))
sort.Sort(byUID(retrievedItems))
if !cmp.Equal(listItems, retrievedItems) {
klog.Infof("previously received data for %s is different than received by the standard list api call against etcd, diff: %v", identity, cmp.Diff(listItems, retrievedItems))
msg := fmt.Sprintf("data inconsistency detected for %s, panicking!", identity)
panic(msg)
}
}
type byUID []metav1.Object
func (a byUID) Len() int { return len(a) }
func (a byUID) Less(i, j int) bool { return a[i].GetUID() < a[j].GetUID() }
func (a byUID) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func toMetaObjectSliceOrDie[T any](s []T) []metav1.Object {
result := make([]metav1.Object, len(s))
for i, v := range s {
m, err := meta.Accessor(v)
if err != nil {
panic(err)
}
result[i] = m
}
return result
}

View File

@ -0,0 +1,160 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package consistencydetector
import (
"context"
"fmt"
"testing"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
)
func TestDataConsistencyChecker(t *testing.T) {
scenarios := []struct {
name string
listResponse *v1.PodList
retrievedItems []*v1.Pod
requestOptions metav1.ListOptions
expectedRequestOptions []metav1.ListOptions
expectedListRequests int
expectPanic bool
}{
{
name: "data consistency check won't panic when data is consistent",
listResponse: &v1.PodList{
ListMeta: metav1.ListMeta{ResourceVersion: "2"},
Items: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2")},
},
requestOptions: metav1.ListOptions{TimeoutSeconds: ptr.To(int64(39))},
retrievedItems: []*v1.Pod{makePod("p1", "1"), makePod("p2", "2")},
expectedListRequests: 1,
expectedRequestOptions: []metav1.ListOptions{
{
ResourceVersion: "2",
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
TimeoutSeconds: ptr.To(int64(39)),
},
},
},
{
name: "data consistency check won't panic when there is no data",
listResponse: &v1.PodList{
ListMeta: metav1.ListMeta{ResourceVersion: "2"},
},
requestOptions: metav1.ListOptions{TimeoutSeconds: ptr.To(int64(39))},
expectedListRequests: 1,
expectedRequestOptions: []metav1.ListOptions{
{
ResourceVersion: "2",
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
TimeoutSeconds: ptr.To(int64(39)),
},
},
},
{
name: "data consistency panics when data is inconsistent",
listResponse: &v1.PodList{
ListMeta: metav1.ListMeta{ResourceVersion: "2"},
Items: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2"), *makePod("p3", "3")},
},
requestOptions: metav1.ListOptions{TimeoutSeconds: ptr.To(int64(39))},
retrievedItems: []*v1.Pod{makePod("p1", "1"), makePod("p2", "2")},
expectedListRequests: 1,
expectedRequestOptions: []metav1.ListOptions{
{
ResourceVersion: "2",
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
TimeoutSeconds: ptr.To(int64(39)),
},
},
expectPanic: true,
},
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
ctx := context.TODO()
fakeLister := &listWrapper{response: scenario.listResponse}
retrievedItemsFunc := func() []*v1.Pod {
return scenario.retrievedItems
}
if scenario.expectPanic {
require.Panics(t, func() {
CheckDataConsistency(ctx, "", scenario.listResponse.ResourceVersion, fakeLister.List, scenario.requestOptions, retrievedItemsFunc)
})
} else {
CheckDataConsistency(ctx, "", scenario.listResponse.ResourceVersion, fakeLister.List, scenario.requestOptions, retrievedItemsFunc)
}
require.Equal(t, fakeLister.counter, scenario.expectedListRequests)
require.Equal(t, fakeLister.requestOptions, scenario.expectedRequestOptions)
})
}
}
func TestDataConsistencyCheckerRetry(t *testing.T) {
ctx := context.TODO()
retrievedItemsFunc := func() []*v1.Pod {
return nil
}
stopListErrorAfter := 5
fakeErrLister := &errorLister{stopErrorAfter: stopListErrorAfter}
CheckDataConsistency(ctx, "", "", fakeErrLister.List, metav1.ListOptions{}, retrievedItemsFunc)
require.Equal(t, fakeErrLister.listCounter, fakeErrLister.stopErrorAfter)
}
type errorLister struct {
listCounter int
stopErrorAfter int
}
func (lw *errorLister) List(_ context.Context, _ metav1.ListOptions) (runtime.Object, error) {
lw.listCounter++
if lw.listCounter == lw.stopErrorAfter {
return &v1.PodList{}, nil
}
return nil, fmt.Errorf("nasty error")
}
type listWrapper struct {
counter int
requestOptions []metav1.ListOptions
response *v1.PodList
}
func (lw *listWrapper) List(_ context.Context, opts metav1.ListOptions) (*v1.PodList, error) {
lw.counter++
lw.requestOptions = append(lw.requestOptions, opts)
return lw.response, nil
}
func makePod(name, rv string) *v1.Pod {
return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: name, ResourceVersion: rv, UID: types.UID(name)}}
}