client-go/tools/cache/reflector_test.go
Karl Isenberg 921628ecd6 chore: Add VeryShortWatchError typed error
- Add a new VeryShortWatchError struct for error matching, returned
  by `handleAnyWatch`, up through `Reflector.ListAndWatch`.
- Update test expectations to match exact errors.

Kubernetes-commit: 6eff9db0f10db72f2c64390e106a80621d136439
2025-04-15 11:48:57 -07:00

2249 lines
69 KiB
Go

/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"context"
"errors"
"fmt"
"math/rand"
"net/http"
"reflect"
goruntime "runtime"
"strconv"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog/v2/ktesting"
"k8s.io/utils/clock"
testingclock "k8s.io/utils/clock/testing"
"k8s.io/utils/ptr"
)
var nevererrc chan error
func TestCloseWatchChannelOnError(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
r := NewReflector(&ListWatch{}, &v1.Pod{}, NewStore(MetaNamespaceKeyFunc), 0)
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
fw := watch.NewFake()
r.listerWatcher = &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
},
}
go func() { assert.NoError(t, r.ListAndWatchWithContext(ctx)) }()
fw.Error(pod)
select {
case _, ok := <-fw.ResultChan():
if ok {
t.Errorf("Watch channel left open after cancellation")
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("the cancellation is at least %s late", wait.ForeverTestTimeout.String())
break
}
}
func TestRunUntil(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
store := NewStore(MetaNamespaceKeyFunc)
r := NewReflector(&ListWatch{}, &v1.Pod{}, store, 0)
fw := watch.NewFake()
r.listerWatcher = &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
},
}
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
r.RunWithContext(ctx)
}()
// Synchronously add a dummy pod into the watch channel so we
// know the RunUntil go routine is in the watch handler.
fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}})
cancel(errors.New("done"))
resultCh := fw.ResultChan()
for {
select {
case <-doneCh:
if resultCh == nil {
return // both closed
}
doneCh = nil
case _, ok := <-resultCh:
if ok {
t.Fatalf("Watch channel left open after stopping the watch")
}
if doneCh == nil {
return // both closed
}
resultCh = nil
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("the cancellation is at least %s late", wait.ForeverTestTimeout.String())
}
}
}
func TestReflectorResyncChan(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&ListWatch{}, &v1.Pod{}, s, time.Millisecond)
a, _ := g.resyncChan()
b := time.After(wait.ForeverTestTimeout)
select {
case <-a:
t.Logf("got timeout as expected")
case <-b:
t.Errorf("resyncChan() is at least 99 milliseconds late??")
}
}
// TestReflectorWatchStoppedBefore ensures that neither List nor Watch are
// called if the stop channel is closed before Reflector.watch is called.
func TestReflectorWatchStoppedBefore(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
cancel(errors.New("don't run"))
lw := &ListWatch{
ListFunc: func(_ metav1.ListOptions) (runtime.Object, error) {
t.Fatal("ListFunc called unexpectedly")
return nil, nil
},
WatchFunc: func(_ metav1.ListOptions) (watch.Interface, error) {
// If WatchFunc is never called, the watcher it returns doesn't need to be stopped.
t.Fatal("WatchFunc called unexpectedly")
return nil, nil
},
}
target := NewReflector(lw, &v1.Pod{}, nil, 0)
err := target.watch(ctx, nil, nil)
require.NoError(t, err)
}
// TestReflectorWatchStoppedAfter ensures that Reflector.watch always stops
// the watcher when the stop channel is closed.
func TestReflectorWatchStoppedAfter(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
var watchers []*watch.FakeWatcher
lw := &ListWatch{
ListFunc: func(_ metav1.ListOptions) (runtime.Object, error) {
t.Fatal("ListFunc called unexpectedly")
return nil, nil
},
WatchFunc: func(_ metav1.ListOptions) (watch.Interface, error) {
// Simulate the stop channel being closed after watching has started
go func() {
time.Sleep(10 * time.Millisecond)
cancel(errors.New("10ms timeout reached"))
}()
// Use a fake watcher that never sends events
w := watch.NewFake()
watchers = append(watchers, w)
return w, nil
},
}
target := NewReflector(lw, &v1.Pod{}, nil, 0)
err := target.watch(ctx, nil, nil)
require.NoError(t, err)
require.Len(t, watchers, 1)
require.True(t, watchers[0].IsStopped())
}
func BenchmarkReflectorResyncChanMany(b *testing.B) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 25*time.Millisecond)
// The improvement to this (calling the timer's Stop() method) makes
// this benchmark about 40% faster.
for i := 0; i < b.N; i++ {
g.resyncPeriod = time.Duration(rand.Float64() * float64(time.Millisecond) * 25)
_, stop := g.resyncChan()
stop()
}
}
// TestReflectorHandleWatchStoppedBefore ensures that handleWatch returns when
// stopCh is already closed before handleWatch was called. It also ensures that
// ResultChan and Stop are both called once.
func TestReflectorHandleWatchStoppedBefore(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0)
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
// Simulate the context being canceled before the watchHandler is called
cancel(errors.New("don't run"))
var calls []string
resultCh := make(chan watch.Event)
fw := &watch.MockWatcher{
StopFunc: func() {
calls = append(calls, "Stop")
close(resultCh)
},
ResultChanFunc: func() <-chan watch.Event {
calls = append(calls, "ResultChan")
return resultCh
},
}
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc)
require.Equal(t, err, errorStopRequested)
// Ensure handleWatch calls ResultChan and Stop
assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
}
// TestReflectorHandleWatchStoppedAfter ensures that handleWatch returns when
// stopCh is closed after handleWatch was called. It also ensures that
// ResultChan and Stop are both called once.
func TestReflectorHandleWatchStoppedAfter(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0)
var calls []string
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
resultCh := make(chan watch.Event)
fw := &watch.MockWatcher{
StopFunc: func() {
calls = append(calls, "Stop")
close(resultCh)
},
ResultChanFunc: func() <-chan watch.Event {
calls = append(calls, "ResultChan")
resultCh = make(chan watch.Event)
// Simulate the watch handler being stopped asynchronously by the
// caller, after watching has started.
go func() {
time.Sleep(10 * time.Millisecond)
cancel(errors.New("10ms timeout reached"))
}()
return resultCh
},
}
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc)
require.Equal(t, err, errorStopRequested)
// Ensure handleWatch calls ResultChan and Stop
assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
}
// TestReflectorHandleWatchResultChanClosedBefore ensures that handleWatch
// returns when the result channel is closed before handleWatch was called.
// It also ensures that ResultChan and Stop are both called once.
func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0)
_, ctx := ktesting.NewTestContext(t)
var calls []string
resultCh := make(chan watch.Event)
fw := &watch.MockWatcher{
StopFunc: func() {
calls = append(calls, "Stop")
},
ResultChanFunc: func() <-chan watch.Event {
calls = append(calls, "ResultChan")
return resultCh
},
}
// Simulate the result channel being closed by the producer before handleWatch is called.
close(resultCh)
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc)
require.Equal(t, &VeryShortWatchError{Name: g.name}, err)
// Ensure handleWatch calls ResultChan and Stop
assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
}
// TestReflectorHandleWatchResultChanClosedAfter ensures that handleWatch
// returns when the result channel is closed after handleWatch has started
// watching. It also ensures that ResultChan and Stop are both called once.
func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0)
_, ctx := ktesting.NewTestContext(t)
var calls []string
resultCh := make(chan watch.Event)
fw := &watch.MockWatcher{
StopFunc: func() {
calls = append(calls, "Stop")
},
ResultChanFunc: func() <-chan watch.Event {
calls = append(calls, "ResultChan")
resultCh = make(chan watch.Event)
// Simulate the result channel being closed by the producer, after
// watching has started.
go func() {
time.Sleep(10 * time.Millisecond)
close(resultCh)
}()
return resultCh
},
}
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc)
require.Equal(t, &VeryShortWatchError{Name: g.name}, err)
// Ensure handleWatch calls ResultChan and Stop
assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
}
func TestReflectorWatchHandler(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0)
// Wrap setLastSyncResourceVersion so we can tell the watchHandler to stop
// watching after all the events have been consumed.
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
setLastSyncResourceVersion := func(rv string) {
g.setLastSyncResourceVersion(rv)
if rv == "32" {
cancel(errors.New("LastSyncResourceVersion is 32"))
}
}
fw := watch.NewFake()
s.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
s.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}})
go func() {
fw.Add(&v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "rejected"}})
fw.Delete(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
fw.Modify(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "55"}})
fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}})
// Stop means that the consumer is done reading events.
// So let handleWatch call fw.Stop, after the Context is cancelled.
}()
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, setLastSyncResourceVersion, g.clock, nevererrc)
require.Equal(t, err, errorStopRequested)
mkPod := func(id string, rv string) *v1.Pod {
return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: rv}}
}
// Validate that the Store was updated by the events
table := []struct {
Pod *v1.Pod
exists bool
}{
{mkPod("foo", ""), false},
{mkPod("rejected", ""), false},
{mkPod("bar", "55"), true},
{mkPod("baz", "32"), true},
}
for _, item := range table {
obj, exists, _ := s.Get(item.Pod)
if e, a := item.exists, exists; e != a {
t.Errorf("%v: expected %v, got %v", item.Pod, e, a)
}
if !exists {
continue
}
if e, a := item.Pod.ResourceVersion, obj.(*v1.Pod).ResourceVersion; e != a {
t.Errorf("%v: expected %v, got %v", item.Pod, e, a)
}
}
// Validate that setLastSyncResourceVersion was called with the RV from the last event.
if e, a := "32", g.LastSyncResourceVersion(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
}
func TestReflectorStopWatch(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0)
fw := watch.NewFake()
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
cancel(errors.New("don't run"))
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc)
require.Equal(t, err, errorStopRequested)
}
func TestReflectorListAndWatch(t *testing.T) {
type listResult struct {
Object runtime.Object
Error error
}
table := []struct {
name string
useWatchList bool
listResults []listResult
watchEvents []watch.Event
expectedListOptions []metav1.ListOptions
expectedWatchOptions []metav1.ListOptions
expectedStore []metav1.Object
}{
{
name: "UseWatchList enabled",
useWatchList: true,
watchEvents: []watch.Event{
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1"}},
},
{
Type: watch.Bookmark,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "foo",
ResourceVersion: "1",
Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"},
}},
},
{
Type: watch.Modified,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "2"}},
},
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "3"}},
},
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "4"}},
},
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "qux", ResourceVersion: "5"}},
},
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "zoo", ResourceVersion: "6"}},
},
},
expectedWatchOptions: []metav1.ListOptions{
{
AllowWatchBookmarks: true,
ResourceVersion: "",
// ResourceVersionMatch defaults to "NotOlderThan" when
// ResourceVersion and Limit are empty.
ResourceVersionMatch: "NotOlderThan",
SendInitialEvents: ptr.To(true),
},
},
expectedStore: []metav1.Object{
// Pod "foo" with rv "1" is de-duped by rv 2
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "2"}},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "3"}},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "4"}},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "qux", ResourceVersion: "5"}},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "zoo", ResourceVersion: "6"}},
},
},
{
name: "UseWatchList disabled",
useWatchList: false,
listResults: []listResult{
{
Object: &v1.PodList{
ListMeta: metav1.ListMeta{ResourceVersion: "1"},
Items: []v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1"}},
},
},
},
},
watchEvents: []watch.Event{
{
Type: watch.Modified,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "2"}},
},
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "3"}},
},
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "4"}},
},
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "qux", ResourceVersion: "5"}},
},
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "zoo", ResourceVersion: "6"}},
},
},
expectedListOptions: []metav1.ListOptions{
{
AllowWatchBookmarks: false,
ResourceVersion: "0",
// ResourceVersionMatch defaults to "NotOlderThan" when
// ResourceVersion is set and non-zero.
Limit: 500,
SendInitialEvents: nil,
},
},
expectedWatchOptions: []metav1.ListOptions{
{
AllowWatchBookmarks: true,
ResourceVersion: "1",
// ResourceVersionMatch is not used by Watch calls
SendInitialEvents: nil,
},
},
expectedStore: []metav1.Object{
// Pod "foo" with rv "1" is de-duped by rv 2
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "2"}},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "3"}},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "4"}},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "qux", ResourceVersion: "5"}},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "zoo", ResourceVersion: "6"}},
},
},
}
for _, tc := range table {
t.Run(tc.name, func(t *testing.T) {
watcherCh := make(chan *watch.FakeWatcher)
var listOpts, watchOpts []metav1.ListOptions
// The ListFunc will never be called. So we expect Watch to only be called
// with options.ResourceVersion="" to start the WatchList.
lw := &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
watchOpts = append(watchOpts, options)
if len(watchOpts) > len(tc.expectedWatchOptions) {
return nil, fmt.Errorf("Expected ListerWatcher.Watch to only be called %d times",
len(tc.expectedWatchOptions))
}
w := watch.NewFake()
// Enqueue for event producer to use
go func() { watcherCh <- w }()
t.Log("Watcher Started")
return w, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
listOpts = append(listOpts, options)
if len(listOpts) > len(tc.listResults) {
return nil, fmt.Errorf("Expected ListerWatcher.List to only be called %d times",
len(tc.listResults))
}
listResult := tc.listResults[len(listOpts)-1]
return listResult.Object, listResult.Error
},
}
s := NewFIFO(MetaNamespaceKeyFunc)
r := NewReflector(lw, &v1.Pod{}, s, 0)
r.UseWatchList = ptr.To(tc.useWatchList)
// Start ListAndWatch in the background.
// When it returns, it will send an error or nil on the error
// channel and close the error channel.
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
errCh := make(chan error)
go func() {
defer close(errCh)
errCh <- r.ListAndWatchWithContext(ctx)
}()
// Stop ListAndWatch and wait for the error channel to close.
// Validate it didn't error in Cleanup, not a defer.
t.Cleanup(func() {
cancel()
for err := range errCh {
assert.NoError(t, err)
}
})
// Send watch events
var fw *watch.FakeWatcher
for _, event := range tc.watchEvents {
if fw == nil {
// Wait for ListerWatcher.Watch to be called
fw = <-watcherCh
}
obj := event.Object.(metav1.Object)
t.Logf("Sending %s event: name=%s, resourceVersion=%s",
event.Type, obj.GetName(), obj.GetResourceVersion())
fw.Action(event.Type, event.Object)
}
// Verify we received the right objects with the right resource versions.
for _, expectedObj := range tc.expectedStore {
storeObj := Pop(s).(metav1.Object)
assert.Equal(t, expectedObj.GetName(), storeObj.GetName())
assert.Equal(t, expectedObj.GetResourceVersion(), storeObj.GetResourceVersion())
}
// Verify we received the right number of List & Watch calls,
// with the expected options.
diffOpts := cmpopts.IgnoreFields(metav1.ListOptions{}, "TimeoutSeconds")
if diff := cmp.Diff(tc.expectedListOptions, listOpts, diffOpts); diff != "" {
t.Errorf("Unexpected List calls by ListAndWatch:\n%s", diff)
}
if diff := cmp.Diff(tc.expectedWatchOptions, watchOpts, diffOpts); diff != "" {
t.Errorf("Unexpected Watch calls by ListAndWatch:\n%s", diff)
}
})
}
}
func TestReflectorListAndWatchWithErrors(t *testing.T) {
mkPod := func(id string, rv string) *v1.Pod {
return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: rv}}
}
mkList := func(rv string, pods ...*v1.Pod) *v1.PodList {
list := &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: rv}}
for _, pod := range pods {
list.Items = append(list.Items, *pod)
}
return list
}
table := []struct {
list *v1.PodList
listErr error
events []watch.Event
watchErr error
}{
{
list: mkList("1"),
events: []watch.Event{
{Type: watch.Added, Object: mkPod("foo", "2")},
{Type: watch.Added, Object: mkPod("bar", "3")},
},
}, {
list: mkList("3", mkPod("foo", "2"), mkPod("bar", "3")),
events: []watch.Event{
{Type: watch.Deleted, Object: mkPod("foo", "4")},
{Type: watch.Added, Object: mkPod("qux", "5")},
},
}, {
listErr: fmt.Errorf("a list error"),
}, {
list: mkList("5", mkPod("bar", "3"), mkPod("qux", "5")),
watchErr: fmt.Errorf("a watch error"),
}, {
list: mkList("5", mkPod("bar", "3"), mkPod("qux", "5")),
events: []watch.Event{
{Type: watch.Added, Object: mkPod("baz", "6")},
},
}, {
list: mkList("6", mkPod("bar", "3"), mkPod("qux", "5"), mkPod("baz", "6")),
},
}
s := NewFIFO(MetaNamespaceKeyFunc)
for line, item := range table {
if item.list != nil {
// Test that the list is what currently exists in the store.
current := s.List()
checkMap := map[string]string{}
for _, item := range current {
pod := item.(*v1.Pod)
checkMap[pod.Name] = pod.ResourceVersion
}
for _, pod := range item.list.Items {
if e, a := pod.ResourceVersion, checkMap[pod.Name]; e != a {
t.Errorf("%v: expected %v, got %v for pod %v", line, e, a, pod.Name)
}
}
if e, a := len(item.list.Items), len(checkMap); e != a {
t.Errorf("%v: expected %v, got %v", line, e, a)
}
}
watchRet, watchErr := item.events, item.watchErr
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
lw := &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if watchErr != nil {
return nil, watchErr
}
watchErr = fmt.Errorf("second watch")
fw := watch.NewFake()
go func() {
for _, e := range watchRet {
fw.Action(e.Type, e.Object)
}
// Because FakeWatcher doesn't buffer events, it's safe to
// close the stop channel immediately without missing events.
// But usually, the event producer would instead close the
// result channel, and wait for the consumer to stop the
// watcher, to avoid race conditions.
// TODO: Fix the FakeWatcher to separate watcher.Stop from close(resultCh)
cancel(errors.New("done"))
}()
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return item.list, item.listErr
},
}
r := NewReflector(lw, &v1.Pod{}, s, 0)
err := r.ListAndWatchWithContext(ctx)
if item.listErr != nil && !errors.Is(err, item.listErr) {
t.Errorf("unexpected ListAndWatch error: %v", err)
}
if item.watchErr != nil && !errors.Is(err, item.watchErr) {
t.Errorf("unexpected ListAndWatch error: %v", err)
}
if item.listErr == nil && item.watchErr == nil {
assert.NoError(t, err)
}
}
}
func TestReflectorListAndWatchInitConnBackoff(t *testing.T) {
maxBackoff := 50 * time.Millisecond
table := []struct {
numConnFails int
expLowerBound time.Duration
expUpperBound time.Duration
}{
{5, 32 * time.Millisecond, 64 * time.Millisecond}, // case where maxBackoff is not hit, time should grow exponentially
{40, 35 * 2 * maxBackoff, 40 * 2 * maxBackoff}, // case where maxBoff is hit, backoff time should flatten
}
for _, test := range table {
t.Run(fmt.Sprintf("%d connection failures takes at least %d ms", test.numConnFails, 1<<test.numConnFails),
func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
connFails := test.numConnFails
fakeClock := testingclock.NewFakeClock(time.Unix(0, 0))
bm := wait.NewExponentialBackoffManager(time.Millisecond, maxBackoff, 100*time.Millisecond, 2.0, 1.0, fakeClock)
done := make(chan struct{})
defer close(done)
go func() {
i := 0
for {
select {
case <-done:
return
default:
}
if fakeClock.HasWaiters() {
step := (1 << (i + 1)) * time.Millisecond
if step > maxBackoff*2 {
step = maxBackoff * 2
}
fakeClock.Step(step)
i++
}
time.Sleep(100 * time.Microsecond)
}
}()
lw := &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if connFails > 0 {
connFails--
return nil, syscall.ECONNREFUSED
}
cancel(errors.New("done"))
return watch.NewFake(), nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
},
}
r := &Reflector{
name: "test-reflector",
listerWatcher: lw,
store: NewFIFO(MetaNamespaceKeyFunc),
backoffManager: bm,
clock: fakeClock,
watchErrorHandler: WatchErrorHandlerWithContext(DefaultWatchErrorHandler),
}
start := fakeClock.Now()
err := r.ListAndWatchWithContext(ctx)
elapsed := fakeClock.Since(start)
if err != nil {
t.Errorf("unexpected error %v", err)
}
if elapsed < (test.expLowerBound) {
t.Errorf("expected lower bound of ListAndWatch: %v, got %v", test.expLowerBound, elapsed)
}
if elapsed > (test.expUpperBound) {
t.Errorf("expected upper bound of ListAndWatch: %v, got %v", test.expUpperBound, elapsed)
}
})
}
}
type fakeBackoff struct {
clock clock.Clock
calls int
}
func (f *fakeBackoff) Backoff() clock.Timer {
f.calls++
return f.clock.NewTimer(time.Duration(0))
}
func TestBackoffOnTooManyRequests(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
err := apierrors.NewTooManyRequests("too many requests", 1)
clock := &clock.RealClock{}
bm := &fakeBackoff{clock: clock}
lw := &ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
switch bm.calls {
case 0:
return nil, err
case 1:
w := watch.NewFakeWithChanSize(1, false)
status := err.Status()
w.Error(&status)
return w, nil
default:
w := watch.NewFake()
w.Stop()
return w, nil
}
},
}
r := &Reflector{
name: "test-reflector",
listerWatcher: lw,
store: NewFIFO(MetaNamespaceKeyFunc),
backoffManager: bm,
clock: clock,
watchErrorHandler: WatchErrorHandlerWithContext(DefaultWatchErrorHandler),
}
stopCh := make(chan struct{})
if err := r.ListAndWatchWithContext(ctx); err != nil {
t.Fatal(err)
}
close(stopCh)
if bm.calls != 2 {
t.Errorf("unexpected watch backoff calls: %d", bm.calls)
}
}
func TestNoRelistOnTooManyRequests(t *testing.T) {
err := apierrors.NewTooManyRequests("too many requests", 1)
clock := &clock.RealClock{}
bm := &fakeBackoff{clock: clock}
listCalls, watchCalls := 0, 0
lw := &ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
listCalls++
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
watchCalls++
if watchCalls < 5 {
return nil, err
}
w := watch.NewFake()
w.Stop()
return w, nil
},
}
r := &Reflector{
name: "test-reflector",
listerWatcher: lw,
store: NewFIFO(MetaNamespaceKeyFunc),
backoffManager: bm,
clock: clock,
watchErrorHandler: WatchErrorHandlerWithContext(DefaultWatchErrorHandler),
}
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
if err := r.ListAndWatchWithContext(ctx); err != nil {
t.Fatal(err)
}
cancel(errors.New("done"))
if listCalls != 1 {
t.Errorf("unexpected list calls: %d", listCalls)
}
if watchCalls != 5 {
t.Errorf("unexpected watch calls: %d", watchCalls)
}
}
func TestRetryInternalError(t *testing.T) {
testCases := []struct {
name string
maxInternalDuration time.Duration
rewindTime int
wantRetries int
}{
{
name: "retries off",
maxInternalDuration: time.Duration(0),
wantRetries: 0,
},
{
name: "retries on, all calls fail",
maxInternalDuration: time.Second * 30,
wantRetries: 31,
},
{
name: "retries on, one call successful",
maxInternalDuration: time.Second * 30,
rewindTime: 10,
wantRetries: 40,
},
}
for _, tc := range testCases {
err := apierrors.NewInternalError(fmt.Errorf("etcdserver: no leader"))
fakeClock := testingclock.NewFakeClock(time.Now())
bm := &fakeBackoff{clock: fakeClock}
counter := 0
lw := &ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
counter = counter + 1
t.Logf("Counter: %v", counter)
if counter == tc.rewindTime {
t.Logf("Rewinding")
fakeClock.Step(time.Minute)
}
fakeClock.Step(time.Second)
w := watch.NewFakeWithChanSize(1, false)
status := err.Status()
w.Error(&status)
return w, nil
},
}
r := &Reflector{
name: "test-reflector",
listerWatcher: lw,
store: NewFIFO(MetaNamespaceKeyFunc),
backoffManager: bm,
clock: fakeClock,
watchErrorHandler: WatchErrorHandlerWithContext(DefaultWatchErrorHandler),
}
r.MaxInternalErrorRetryDuration = tc.maxInternalDuration
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
require.NoError(t, r.ListAndWatchWithContext(ctx))
cancel(errors.New("done"))
if counter-1 != tc.wantRetries {
t.Errorf("%v unexpected number of retries: %d", tc, counter-1)
}
}
}
func TestReflectorResync(t *testing.T) {
iteration := 0
_, ctx := ktesting.NewTestContext(t)
rerr := errors.New("expected resync reached")
s := &FakeCustomStore{
ResyncFunc: func() error {
iteration++
if iteration == 2 {
return rerr
}
return nil
},
}
lw := &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
fw := watch.NewFake()
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "0"}}, nil
},
}
resyncPeriod := 1 * time.Millisecond
r := NewReflector(lw, &v1.Pod{}, s, resyncPeriod)
if err := r.ListAndWatchWithContext(ctx); err != nil {
// error from Resync is not propaged up to here.
t.Errorf("expected error %v", err)
}
if iteration != 2 {
t.Errorf("exactly 2 iterations were expected, got: %v", iteration)
}
}
func TestReflectorWatchListPageSize(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
s := NewStore(MetaNamespaceKeyFunc)
lw := &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list.
cancel(errors.New("done"))
fw := watch.NewFake()
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if options.Limit != 4 {
t.Fatalf("Expected list Limit of 4 but got %d", options.Limit)
}
pods := make([]v1.Pod, 10)
for i := 0; i < 10; i++ {
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
}
switch options.Continue {
case "":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C1"}, Items: pods[0:4]}, nil
case "C1":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C2"}, Items: pods[4:8]}, nil
case "C2":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[8:10]}, nil
default:
t.Fatalf("Unrecognized continue: %s", options.Continue)
}
return nil, nil
},
}
r := NewReflector(lw, &v1.Pod{}, s, 0)
// Set resource version to test pagination also for not consistent reads.
r.setLastSyncResourceVersion("10")
// Set the reflector to paginate the list request in 4 item chunks.
r.WatchListPageSize = 4
require.NoError(t, r.ListAndWatchWithContext(ctx))
results := s.List()
if len(results) != 10 {
t.Errorf("Expected 10 results, got %d", len(results))
}
}
func TestReflectorNotPaginatingNotConsistentReads(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
s := NewStore(MetaNamespaceKeyFunc)
lw := &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list.
cancel(errors.New("done"))
fw := watch.NewFake()
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if options.ResourceVersion != "10" {
t.Fatalf("Expected ResourceVersion: \"10\", got: %s", options.ResourceVersion)
}
if options.Limit != 0 {
t.Fatalf("Expected list Limit of 0 but got %d", options.Limit)
}
pods := make([]v1.Pod, 10)
for i := 0; i < 10; i++ {
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
}
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods}, nil
},
}
r := NewReflector(lw, &v1.Pod{}, s, 0)
r.setLastSyncResourceVersion("10")
require.NoError(t, r.ListAndWatchWithContext(ctx))
results := s.List()
if len(results) != 10 {
t.Errorf("Expected 10 results, got %d", len(results))
}
}
func TestReflectorPaginatingNonConsistentReadsIfWatchCacheDisabled(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
var cancel func(error)
s := NewStore(MetaNamespaceKeyFunc)
lw := &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list.
cancel(errors.New("done"))
fw := watch.NewFake()
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
// Check that default pager limit is set.
if options.Limit != 500 {
t.Fatalf("Expected list Limit of 500 but got %d", options.Limit)
}
pods := make([]v1.Pod, 10)
for i := 0; i < 10; i++ {
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
}
switch options.Continue {
case "":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C1"}, Items: pods[0:4]}, nil
case "C1":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C2"}, Items: pods[4:8]}, nil
case "C2":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[8:10]}, nil
default:
t.Fatalf("Unrecognized continue: %s", options.Continue)
}
return nil, nil
},
}
r := NewReflector(lw, &v1.Pod{}, s, 0)
// Initial list should initialize paginatedResult in the reflector.
var cancelCtx context.Context
cancelCtx, cancel = context.WithCancelCause(ctx)
require.NoError(t, r.ListAndWatchWithContext(cancelCtx))
if results := s.List(); len(results) != 10 {
t.Errorf("Expected 10 results, got %d", len(results))
}
// Since initial list for ResourceVersion="0" was paginated, the subsequent
// ones should also be paginated.
cancelCtx, cancel = context.WithCancelCause(ctx)
require.NoError(t, r.ListAndWatchWithContext(cancelCtx))
if results := s.List(); len(results) != 10 {
t.Errorf("Expected 10 results, got %d", len(results))
}
}
// TestReflectorResyncWithResourceVersion ensures that a reflector keeps track of the ResourceVersion and sends
// it in relist requests to prevent the reflector from traveling back in time if the relist is to a api-server or
// etcd that is partitioned and serving older data than the reflector has already processed.
func TestReflectorResyncWithResourceVersion(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
cancelCtx, cancel := context.WithCancelCause(ctx)
s := NewStore(MetaNamespaceKeyFunc)
listCallRVs := []string{}
lw := &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list.
cancel(errors.New("done"))
fw := watch.NewFake()
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
listCallRVs = append(listCallRVs, options.ResourceVersion)
pods := make([]v1.Pod, 8)
for i := 0; i < 8; i++ {
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
}
switch options.ResourceVersion {
case "0":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil
case "10":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil
default:
t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion)
}
return nil, nil
},
}
r := NewReflector(lw, &v1.Pod{}, s, 0)
// Initial list should use RV=0
require.NoError(t, r.ListAndWatchWithContext(cancelCtx))
results := s.List()
if len(results) != 4 {
t.Errorf("Expected 4 results, got %d", len(results))
}
// relist should use lastSyncResourceVersions (RV=10)
cancelCtx, cancel = context.WithCancelCause(ctx)
require.NoError(t, r.ListAndWatchWithContext(cancelCtx))
results = s.List()
if len(results) != 8 {
t.Errorf("Expected 8 results, got %d", len(results))
}
expectedRVs := []string{"0", "10"}
if !reflect.DeepEqual(listCallRVs, expectedRVs) {
t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs)
}
}
// TestReflectorExpiredExactResourceVersion tests that a reflector handles the behavior of kubernetes 1.16 an earlier
// where if the exact ResourceVersion requested is not available for a List request for a non-zero ResourceVersion,
// an "Expired" error is returned if the ResourceVersion has expired (etcd has compacted it).
// (In kubernetes 1.17, or when the watch cache is enabled, the List will instead return the list that is no older than
// the requested ResourceVersion).
func TestReflectorExpiredExactResourceVersion(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
var cancelCtx context.Context
var cancel func(error)
s := NewStore(MetaNamespaceKeyFunc)
listCallRVs := []string{}
lw := &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list.
cancel(errors.New("done"))
fw := watch.NewFake()
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
listCallRVs = append(listCallRVs, options.ResourceVersion)
pods := make([]v1.Pod, 8)
for i := 0; i < 8; i++ {
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
}
switch options.ResourceVersion {
case "0":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil
case "10":
// When watch cache is disabled, if the exact ResourceVersion requested is not available, a "Expired" error is returned.
return nil, apierrors.NewResourceExpired("The resourceVersion for the provided watch is too old.")
case "":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil
default:
t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion)
}
return nil, nil
},
}
r := NewReflector(lw, &v1.Pod{}, s, 0)
// Initial list should use RV=0
cancelCtx, cancel = context.WithCancelCause(ctx)
require.NoError(t, r.ListAndWatchWithContext(cancelCtx))
results := s.List()
if len(results) != 4 {
t.Errorf("Expected 4 results, got %d", len(results))
}
// relist should use lastSyncResourceVersions (RV=10) and since RV=10 is expired, it should retry with RV="".
cancelCtx, cancel = context.WithCancelCause(ctx)
require.NoError(t, r.ListAndWatchWithContext(cancelCtx))
results = s.List()
if len(results) != 8 {
t.Errorf("Expected 8 results, got %d", len(results))
}
expectedRVs := []string{"0", "10", ""}
if !reflect.DeepEqual(listCallRVs, expectedRVs) {
t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs)
}
}
func TestReflectorFullListIfExpired(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
var cancelCtx context.Context
var cancel func(error)
s := NewStore(MetaNamespaceKeyFunc)
listCallRVs := []string{}
lw := &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list.
cancel(errors.New("done"))
fw := watch.NewFake()
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
listCallRVs = append(listCallRVs, options.ResourceVersion)
pods := make([]v1.Pod, 8)
for i := 0; i < 8; i++ {
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
}
rvContinueLimit := func(rv, c string, l int64) metav1.ListOptions {
return metav1.ListOptions{ResourceVersion: rv, Continue: c, Limit: l}
}
switch rvContinueLimit(options.ResourceVersion, options.Continue, options.Limit) {
// initial limited list
case rvContinueLimit("0", "", 4):
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil
// first page of the rv=10 list
case rvContinueLimit("10", "", 4):
return &v1.PodList{ListMeta: metav1.ListMeta{Continue: "C1", ResourceVersion: "11"}, Items: pods[0:4]}, nil
// second page of the above list
case rvContinueLimit("", "C1", 4):
return nil, apierrors.NewResourceExpired("The resourceVersion for the provided watch is too old.")
// rv=10 unlimited list
case rvContinueLimit("10", "", 0):
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil
default:
err := fmt.Errorf("unexpected list options: %#v", options)
t.Error(err)
return nil, err
}
},
}
r := NewReflector(lw, &v1.Pod{}, s, 0)
r.WatchListPageSize = 4
// Initial list should use RV=0
cancelCtx, cancel = context.WithCancelCause(ctx)
if err := r.ListAndWatchWithContext(cancelCtx); err != nil {
t.Fatal(err)
}
results := s.List()
if len(results) != 4 {
t.Errorf("Expected 4 results, got %d", len(results))
}
// relist should use lastSyncResourceVersions (RV=10) and since second page of that expired, it should full list with RV=10
cancelCtx, cancel = context.WithCancelCause(ctx)
if err := r.ListAndWatchWithContext(cancelCtx); err != nil {
t.Fatal(err)
}
results = s.List()
if len(results) != 8 {
t.Errorf("Expected 8 results, got %d", len(results))
}
expectedRVs := []string{"0", "10", "", "10"}
if !reflect.DeepEqual(listCallRVs, expectedRVs) {
t.Errorf("Expected series of list calls with resource versiosn of %#v but got: %#v", expectedRVs, listCallRVs)
}
}
func TestReflectorFullListIfTooLarge(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
var cancelCtx context.Context
var cancel func(error)
s := NewStore(MetaNamespaceKeyFunc)
listCallRVs := []string{}
version := 30
lw := &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list.
cancel(errors.New("done"))
fw := watch.NewFake()
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
listCallRVs = append(listCallRVs, options.ResourceVersion)
resourceVersion := strconv.Itoa(version)
switch options.ResourceVersion {
// initial list
case "0":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "20"}}, nil
// relist after the initial list
case "20":
err := apierrors.NewTimeoutError("too large resource version", 1)
err.ErrStatus.Details.Causes = []metav1.StatusCause{{Type: metav1.CauseTypeResourceVersionTooLarge}}
return nil, err
// relist after the initial list (covers the error format used in api server 1.17.0-1.18.5)
case "30":
err := apierrors.NewTimeoutError("too large resource version", 1)
err.ErrStatus.Details.Causes = []metav1.StatusCause{{Message: "Too large resource version"}}
return nil, err
// relist after the initial list (covers the error format used in api server before 1.17.0)
case "40":
err := apierrors.NewTimeoutError("Too large resource version", 1)
return nil, err
// relist from etcd after "too large" error
case "":
version += 10
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: resourceVersion}}, nil
default:
return nil, fmt.Errorf("unexpected List call: %s", options.ResourceVersion)
}
},
}
r := NewReflector(lw, &v1.Pod{}, s, 0)
// Initial list should use RV=0
cancelCtx, cancel = context.WithCancelCause(ctx)
if err := r.ListAndWatchWithContext(cancelCtx); err != nil {
t.Fatal(err)
}
// Relist from the future version.
// This may happen, as watchcache is initialized from "current global etcd resource version"
// when kube-apiserver is starting and if no objects are changing after that each kube-apiserver
// may be synced to a different version and they will never converge.
// TODO: We should use etcd progress-notify feature to avoid this behavior but until this is
// done we simply try to relist from now to avoid continuous errors on relists.
for i := 1; i <= 3; i++ {
// relist twice to cover the two variants of TooLargeResourceVersion api errors
cancelCtx, cancel = context.WithCancelCause(ctx)
if err := r.ListAndWatchWithContext(cancelCtx); err != nil {
t.Fatal(err)
}
}
expectedRVs := []string{"0", "20", "", "30", "", "40", ""}
if !reflect.DeepEqual(listCallRVs, expectedRVs) {
t.Errorf("Expected series of list calls with resource version of %#v but got: %#v", expectedRVs, listCallRVs)
}
}
func TestGetTypeDescriptionFromObject(t *testing.T) {
obj := &unstructured.Unstructured{}
gvk := schema.GroupVersionKind{
Group: "mygroup",
Version: "v1",
Kind: "MyKind",
}
obj.SetGroupVersionKind(gvk)
testCases := map[string]struct {
inputType interface{}
expectedTypeDescription string
}{
"Nil type": {
expectedTypeDescription: defaultExpectedTypeName,
},
"Normal type": {
inputType: &v1.Pod{},
expectedTypeDescription: "*v1.Pod",
},
"Unstructured type without GVK": {
inputType: &unstructured.Unstructured{},
expectedTypeDescription: "*unstructured.Unstructured",
},
"Unstructured type with GVK": {
inputType: obj,
expectedTypeDescription: gvk.String(),
},
}
for testName, tc := range testCases {
t.Run(testName, func(t *testing.T) {
typeDescription := getTypeDescriptionFromObject(tc.inputType)
if tc.expectedTypeDescription != typeDescription {
t.Fatalf("Expected typeDescription %v, got %v", tc.expectedTypeDescription, typeDescription)
}
})
}
}
func TestGetExpectedGVKFromObject(t *testing.T) {
obj := &unstructured.Unstructured{}
gvk := schema.GroupVersionKind{
Group: "mygroup",
Version: "v1",
Kind: "MyKind",
}
obj.SetGroupVersionKind(gvk)
testCases := map[string]struct {
inputType interface{}
expectedGVK *schema.GroupVersionKind
}{
"Nil type": {},
"Some non Unstructured type": {
inputType: &v1.Pod{},
},
"Unstructured type without GVK": {
inputType: &unstructured.Unstructured{},
},
"Unstructured type with GVK": {
inputType: obj,
expectedGVK: &gvk,
},
}
for testName, tc := range testCases {
t.Run(testName, func(t *testing.T) {
expectedGVK := getExpectedGVKFromObject(tc.inputType)
gvkNotEqual := (tc.expectedGVK == nil) != (expectedGVK == nil)
if tc.expectedGVK != nil && expectedGVK != nil {
gvkNotEqual = *tc.expectedGVK != *expectedGVK
}
if gvkNotEqual {
t.Fatalf("Expected expectedGVK %v, got %v", tc.expectedGVK, expectedGVK)
}
})
}
}
func TestWatchTimeout(t *testing.T) {
testCases := []struct {
name string
minWatchTimeout time.Duration
expectedMinTimeoutSeconds int64
}{
{
name: "no timeout",
expectedMinTimeoutSeconds: 5 * 60,
},
{
name: "small timeout not honored",
minWatchTimeout: time.Second,
expectedMinTimeoutSeconds: 5 * 60,
},
{
name: "30m timeout",
minWatchTimeout: 30 * time.Minute,
expectedMinTimeoutSeconds: 30 * 60,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
s := NewStore(MetaNamespaceKeyFunc)
var gotTimeoutSeconds int64
lw := &ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}}, nil
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if options.TimeoutSeconds != nil {
gotTimeoutSeconds = *options.TimeoutSeconds
}
// Stop once the reflector begins watching since we're only interested in the list.
cancel(errors.New("done"))
return watch.NewFake(), nil
},
}
opts := ReflectorOptions{
MinWatchTimeout: tc.minWatchTimeout,
}
r := NewReflectorWithOptions(lw, &v1.Pod{}, s, opts)
if err := r.ListAndWatchWithContext(ctx); err != nil {
t.Fatal(err)
}
minExpected := tc.expectedMinTimeoutSeconds
maxExpected := 2 * tc.expectedMinTimeoutSeconds
if gotTimeoutSeconds < minExpected || gotTimeoutSeconds > maxExpected {
t.Errorf("unexpected TimeoutSecond, got %v, expected in [%v, %v]", gotTimeoutSeconds, minExpected, maxExpected)
}
})
}
}
type storeWithRV struct {
Store
// resourceVersions tracks values passed by UpdateResourceVersion
resourceVersions []string
}
func (s *storeWithRV) UpdateResourceVersion(resourceVersion string) {
s.resourceVersions = append(s.resourceVersions, resourceVersion)
}
func newStoreWithRV() *storeWithRV {
return &storeWithRV{
Store: NewStore(MetaNamespaceKeyFunc),
}
}
func TestReflectorResourceVersionUpdate(t *testing.T) {
s := newStoreWithRV()
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
fw := watch.NewFake()
lw := &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}}, nil
},
}
r := NewReflector(lw, &v1.Pod{}, s, 0)
makePod := func(rv string) *v1.Pod {
return &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: rv}}
}
go func() {
fw.Action(watch.Added, makePod("10"))
fw.Action(watch.Modified, makePod("20"))
fw.Action(watch.Bookmark, makePod("30"))
fw.Action(watch.Deleted, makePod("40"))
cancel(errors.New("done"))
}()
// Initial list should use RV=0
if err := r.ListAndWatchWithContext(ctx); err != nil {
t.Fatal(err)
}
expectedRVs := []string{"10", "20", "30", "40"}
if !reflect.DeepEqual(s.resourceVersions, expectedRVs) {
t.Errorf("Expected series of resource version updates of %#v but got: %#v", expectedRVs, s.resourceVersions)
}
}
const (
fakeItemsNum = 100
exemptObjectIndex = fakeItemsNum / 4
pageNum = 3
)
func getPodListItems(start int, numItems int) (string, string, *v1.PodList) {
out := &v1.PodList{
Items: make([]v1.Pod, numItems),
}
for i := 0; i < numItems; i++ {
out.Items[i] = v1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Pod",
},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%d", i+start),
Namespace: "default",
Labels: map[string]string{
"label-key-1": "label-value-1",
},
Annotations: map[string]string{
"annotations-key-1": "annotations-value-1",
},
},
Spec: v1.PodSpec{
Overhead: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("3"),
v1.ResourceMemory: resource.MustParse("8"),
},
NodeSelector: map[string]string{
"foo": "bar",
"baz": "quux",
},
Affinity: &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{MatchExpressions: []v1.NodeSelectorRequirement{{Key: `foo`}}},
},
},
PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{
{Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{{Key: `foo`}}}},
},
},
},
TopologySpreadConstraints: []v1.TopologySpreadConstraint{
{TopologyKey: `foo`},
},
HostAliases: []v1.HostAlias{
{IP: "1.1.1.1"},
{IP: "2.2.2.2"},
},
ImagePullSecrets: []v1.LocalObjectReference{
{Name: "secret1"},
{Name: "secret2"},
},
Containers: []v1.Container{
{
Name: "foobar",
Image: "alpine",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("5"),
},
Limits: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("2"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("10"),
},
},
},
{
Name: "foobar2",
Image: "alpine",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("4"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("12"),
},
Limits: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("8"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("24"),
},
},
},
},
InitContainers: []v1.Container{
{
Name: "small-init",
Image: "alpine",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("5"),
},
Limits: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("5"),
},
},
},
{
Name: "big-init",
Image: "alpine",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("40"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("120"),
},
Limits: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("80"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("240"),
},
},
},
},
Hostname: fmt.Sprintf("node-%d", i),
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
ContainerStatuses: []v1.ContainerStatus{
{
ContainerID: "docker://numbers",
Image: "alpine",
Name: "foobar",
Ready: false,
},
{
ContainerID: "docker://numbers",
Image: "alpine",
Name: "foobar2",
Ready: false,
},
},
InitContainerStatuses: []v1.ContainerStatus{
{
ContainerID: "docker://numbers",
Image: "alpine",
Name: "small-init",
Ready: false,
},
{
ContainerID: "docker://numbers",
Image: "alpine",
Name: "big-init",
Ready: false,
},
},
Conditions: []v1.PodCondition{
{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
Reason: "successfully",
Message: "sync pod successfully",
LastProbeTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
},
},
},
}
}
return out.Items[0].GetName(), out.Items[exemptObjectIndex].GetName(), out
}
func getConfigmapListItems(start int, numItems int) (string, string, *v1.ConfigMapList) {
out := &v1.ConfigMapList{
Items: make([]v1.ConfigMap, numItems),
}
for i := 0; i < numItems; i++ {
out.Items[i] = v1.ConfigMap{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "ConfigMap",
},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("cm-%d", i+start),
Namespace: "default",
Labels: map[string]string{
"label-key-1": "label-value-1",
},
Annotations: map[string]string{
"annotations-key-1": "annotations-value-1",
},
},
Data: map[string]string{
"data-1": "value-1",
"data-2": "value-2",
},
}
}
return out.Items[0].GetName(), out.Items[exemptObjectIndex].GetName(), out
}
type TestPagingPodsLW struct {
totalPageCount int
fetchedPageCount int
detectedObjectNameList []string
exemptObjectNameList []string
}
func newPageTestLW(totalPageNum int) *TestPagingPodsLW {
return &TestPagingPodsLW{
totalPageCount: totalPageNum,
fetchedPageCount: 0,
}
}
func (t *TestPagingPodsLW) List(options metav1.ListOptions) (runtime.Object, error) {
firstPodName, exemptPodName, list := getPodListItems(t.fetchedPageCount*fakeItemsNum, fakeItemsNum)
t.detectedObjectNameList = append(t.detectedObjectNameList, firstPodName)
t.exemptObjectNameList = append(t.exemptObjectNameList, exemptPodName)
t.fetchedPageCount++
if t.fetchedPageCount >= t.totalPageCount {
return list, nil
}
list.SetContinue("true")
return list, nil
}
func (t *TestPagingPodsLW) Watch(options metav1.ListOptions) (watch.Interface, error) {
return nil, nil
}
func TestReflectorListExtract(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
store := NewStore(func(obj interface{}) (string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return "", fmt.Errorf("expect *v1.Pod, but got %T", obj)
}
return pod.GetName(), nil
})
lw := newPageTestLW(5)
reflector := NewReflector(lw, &v1.Pod{}, store, 0)
reflector.WatchListPageSize = fakeItemsNum
// execute list to fill store
if err := reflector.list(ctx); err != nil {
t.Fatal(err)
}
// We will not delete exemptPod,
// in order to see if the existence of this Pod causes other Pods that are not used to be unable to properly clear.
for _, podName := range lw.exemptObjectNameList {
_, exist, err := store.GetByKey(podName)
if err != nil || !exist {
t.Fatalf("%s should exist in pod store", podName)
}
}
// we will pay attention to whether the memory occupied by the first Pod is released
// Golang's can only be SetFinalizer for the first element of the array,
// so pod-0 will be the object of our attention
detectedPodAlreadyBeCleared := make(chan struct{}, len(lw.detectedObjectNameList))
for _, firstPodName := range lw.detectedObjectNameList {
_, exist, err := store.GetByKey(firstPodName)
if err != nil || !exist {
t.Fatalf("%s should exist in pod store", firstPodName)
}
firstPod, exist, err := store.GetByKey(firstPodName)
if err != nil || !exist {
t.Fatalf("%s should exist in pod store", firstPodName)
}
goruntime.SetFinalizer(firstPod, func(obj interface{}) {
t.Logf("%s already be gc\n", obj.(*v1.Pod).GetName())
detectedPodAlreadyBeCleared <- struct{}{}
})
}
storedObjectKeys := store.ListKeys()
for _, k := range storedObjectKeys {
// delete all Pods except the exempted Pods.
if sets.NewString(lw.exemptObjectNameList...).Has(k) {
continue
}
obj, exist, err := store.GetByKey(k)
if err != nil || !exist {
t.Fatalf("%s should exist in pod store", k)
}
if err := store.Delete(obj); err != nil {
t.Fatalf("delete object: %v", err)
}
goruntime.GC()
}
clearedNum := 0
for {
select {
case <-detectedPodAlreadyBeCleared:
clearedNum++
if clearedNum == len(lw.detectedObjectNameList) {
return
}
}
}
}
func TestReflectorReplacesStoreOnUnsafeDelete(t *testing.T) {
mkPod := func(id string, rv string) *v1.Pod {
return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: id, ResourceVersion: rv}}
}
mkList := func(rv string, pods ...*v1.Pod) *v1.PodList {
list := &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: rv}}
for _, pod := range pods {
list.Items = append(list.Items, *pod)
}
return list
}
makeStatus := func() *metav1.Status {
return &metav1.Status{
Status: metav1.StatusFailure,
Code: http.StatusInternalServerError,
Reason: metav1.StatusReasonStoreReadError,
Message: "failed to prepare current and previous objects: corrupt object has been deleted",
}
}
// these pods preexist and never get updated/deleted
preExisting := mkPod("foo-1", "1")
pods := []*v1.Pod{preExisting, mkPod("foo-2", "2"), mkPod("foo-3", "3")}
lastExpectedRV := "5"
lists := []*v1.PodList{
mkList("3", pods...), // initial list
mkList(lastExpectedRV, pods...), // re-list due to watch error
}
corruptObj := mkPod("foo", "4")
events := []watch.Event{
{Type: watch.Added, Object: corruptObj},
// the object becomes corrupt, and it gets unsafe-deleted, and
// watch sends the following Error event, note the RV has
// advanced to "5" in the storage due to the delete operation
{Type: watch.Error, Object: makeStatus()},
}
s := NewFIFO(MetaNamespaceKeyFunc)
var replaceInvoked atomic.Int32
store := &fakeStore{
Store: s,
beforeReplace: func(list []interface{}, rv string) {
// interested in the Replace call that happens after the Error event
if rv == lastExpectedRV {
replaceInvoked.Add(1)
_, exists, err := s.Get(corruptObj)
if err != nil || !exists {
t.Errorf("expected the object to exist in the store, exists: %t, err: %v", exists, err)
}
_, exists, err = s.Get(preExisting)
if err != nil || !exists {
t.Errorf("expected the pre-existing object to be in the store, exists: %t, err: %v", exists, err)
}
}
},
afterReplace: func(rv string, err error) {
if rv == lastExpectedRV {
replaceInvoked.Add(1)
if err != nil {
t.Errorf("expected Replace to have succeeded, but got error: %v", err)
}
_, exists, err := s.Get(corruptObj)
if err != nil || exists {
t.Errorf("expected the object to have been removed from the store, exists: %t, err: %v", exists, err)
}
// show that a pre-existing pod is still in the cache
_, exists, err = s.Get(preExisting)
if err != nil || !exists {
t.Errorf("expected the pre-existing object to be in the store, exists: %t, err: %v", exists, err)
}
}
},
}
var once sync.Once
lw := &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
fw := watch.NewFake()
go func() {
once.Do(func() {
for _, e := range events {
fw.Action(e.Type, e.Object)
}
})
}()
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
var list runtime.Object
if len(lists) > 0 {
list = lists[0]
lists = lists[1:]
}
return list, nil
},
}
r := NewReflector(lw, &v1.Pod{}, store, 0)
doneCh, stopCh := make(chan struct{}), make(chan struct{})
go func() {
defer close(doneCh)
//nolint:logcheck // Intentionally uses the old API.
r.Run(stopCh)
}()
// wait for the RV to sync to the version returned by the final list
err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
if rv := r.LastSyncResourceVersion(); rv == lastExpectedRV {
return true, nil
}
return false, nil
})
if err != nil {
t.Fatalf("reflector never caught up with expected revision: %q, err: %v", lastExpectedRV, err)
}
if want, got := lastExpectedRV, r.LastSyncResourceVersion(); want != got {
t.Errorf("expected LastSyncResourceVersion to be %q, but got: %q", want, got)
}
if want, got := 2, int(replaceInvoked.Load()); want != got {
t.Errorf("expected store Delete hooks to be invoked %d times, but got: %d", want, got)
}
if want, got := len(pods), len(s.List()); want != got {
t.Errorf("expected the store to have %d objects, but got: %d", want, got)
}
close(stopCh)
select {
case <-doneCh:
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timed out waiting for Run to return")
}
}
type fakeStore struct {
Store
beforeReplace func(list []interface{}, s string)
afterReplace func(rv string, err error)
}
func (f *fakeStore) Replace(list []interface{}, rv string) error {
f.beforeReplace(list, rv)
err := f.Store.Replace(list, rv)
f.afterReplace(rv, err)
return err
}
func BenchmarkExtractList(b *testing.B) {
_, _, podList := getPodListItems(0, fakeItemsNum)
_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
tests := []struct {
name string
list runtime.Object
}{
{
name: "PodList",
list: podList,
},
{
name: "ConfigMapList",
list: configMapList,
},
}
for _, tc := range tests {
b.Run(tc.name, func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := meta.ExtractList(tc.list)
if err != nil {
b.Errorf("extract list: %v", err)
}
}
b.StopTimer()
})
}
}
func BenchmarkEachListItem(b *testing.B) {
_, _, podList := getPodListItems(0, fakeItemsNum)
_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
tests := []struct {
name string
list runtime.Object
}{
{
name: "PodList",
list: podList,
},
{
name: "ConfigMapList",
list: configMapList,
},
}
for _, tc := range tests {
b.Run(tc.name, func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := meta.EachListItem(tc.list, func(object runtime.Object) error {
return nil
})
if err != nil {
b.Errorf("each list: %v", err)
}
}
b.StopTimer()
})
}
}
func BenchmarkExtractListWithAlloc(b *testing.B) {
_, _, podList := getPodListItems(0, fakeItemsNum)
_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
tests := []struct {
name string
list runtime.Object
}{
{
name: "PodList",
list: podList,
},
{
name: "ConfigMapList",
list: configMapList,
},
}
for _, tc := range tests {
b.Run(tc.name, func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := meta.ExtractListWithAlloc(tc.list)
if err != nil {
b.Errorf("extract list with alloc: %v", err)
}
}
b.StopTimer()
})
}
}
func BenchmarkEachListItemWithAlloc(b *testing.B) {
_, _, podList := getPodListItems(0, fakeItemsNum)
_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
tests := []struct {
name string
list runtime.Object
}{
{
name: "PodList",
list: podList,
},
{
name: "ConfigMapList",
list: configMapList,
},
}
for _, tc := range tests {
b.Run(tc.name, func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := meta.EachListItemWithAlloc(tc.list, func(object runtime.Object) error {
return nil
})
if err != nil {
b.Errorf("each list with alloc: %v", err)
}
}
b.StopTimer()
})
}
}
func BenchmarkReflectorList(b *testing.B) {
store := NewStore(func(obj interface{}) (string, error) {
o, err := meta.Accessor(obj)
if err != nil {
return "", err
}
return o.GetName(), nil
})
_, _, podList := getPodListItems(0, fakeItemsNum)
_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
tests := []struct {
name string
sample func() interface{}
list runtime.Object
}{
{
name: "PodList",
sample: func() interface{} {
return v1.Pod{}
},
list: podList,
},
{
name: "ConfigMapList",
sample: func() interface{} {
return v1.ConfigMap{}
},
list: configMapList,
},
}
for _, tc := range tests {
b.Run(tc.name, func(b *testing.B) {
_, ctx := ktesting.NewTestContext(b)
sample := tc.sample()
reflector := NewReflector(newPageTestLW(pageNum), &sample, store, 0)
reflector.WatchListPageSize = fakeItemsNum
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := reflector.list(ctx)
if err != nil {
b.Fatalf("reflect list: %v", err)
}
}
b.StopTimer()
})
}
}