Files
client-go/tools/cache/reflector_test.go
Maru Newby ab4e4fccf4 Add support for type checking Unstructured via GVK in reflector
It was previously possible to instantiate `Reflector` with
`*unstructured.Unstructured` as the expected type but this did not
support checking that event objects were of the correct API
type (e.g. if event object was `v1.Pod` in `Unstructured` form but
`v1.Service` was expected). This commit adds support for providing a
GVK via an `Unstructured` expected type to compare with the GVK of
event objects. The GVK will also be used in reflector log output.

Kubernetes-commit: 237dbfd8ad322dfcad4bd4d5345368480c22d82f
2019-09-24 01:20:40 +00:00

491 lines
14 KiB
Go

/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"errors"
"fmt"
"math/rand"
"reflect"
"strconv"
"testing"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
)
var nevererrc chan error
type testLW struct {
ListFunc func(options metav1.ListOptions) (runtime.Object, error)
WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
}
func (t *testLW) List(options metav1.ListOptions) (runtime.Object, error) {
return t.ListFunc(options)
}
func (t *testLW) Watch(options metav1.ListOptions) (watch.Interface, error) {
return t.WatchFunc(options)
}
func TestCloseWatchChannelOnError(t *testing.T) {
r := NewReflector(&testLW{}, &v1.Pod{}, NewStore(MetaNamespaceKeyFunc), 0)
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
fw := watch.NewFake()
r.listerWatcher = &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
},
}
go r.ListAndWatch(wait.NeverStop)
fw.Error(pod)
select {
case _, ok := <-fw.ResultChan():
if ok {
t.Errorf("Watch channel left open after cancellation")
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("the cancellation is at least %s late", wait.ForeverTestTimeout.String())
break
}
}
func TestRunUntil(t *testing.T) {
stopCh := make(chan struct{})
store := NewStore(MetaNamespaceKeyFunc)
r := NewReflector(&testLW{}, &v1.Pod{}, store, 0)
fw := watch.NewFake()
r.listerWatcher = &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
},
}
go r.Run(stopCh)
// Synchronously add a dummy pod into the watch channel so we
// know the RunUntil go routine is in the watch handler.
fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}})
close(stopCh)
select {
case _, ok := <-fw.ResultChan():
if ok {
t.Errorf("Watch channel left open after stopping the watch")
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("the cancellation is at least %s late", wait.ForeverTestTimeout.String())
break
}
}
func TestReflectorResyncChan(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &v1.Pod{}, s, time.Millisecond)
a, _ := g.resyncChan()
b := time.After(wait.ForeverTestTimeout)
select {
case <-a:
t.Logf("got timeout as expected")
case <-b:
t.Errorf("resyncChan() is at least 99 milliseconds late??")
}
}
func BenchmarkReflectorResyncChanMany(b *testing.B) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &v1.Pod{}, s, 25*time.Millisecond)
// The improvement to this (calling the timer's Stop() method) makes
// this benchmark about 40% faster.
for i := 0; i < b.N; i++ {
g.resyncPeriod = time.Duration(rand.Float64() * float64(time.Millisecond) * 25)
_, stop := g.resyncChan()
stop()
}
}
func TestReflectorWatchHandlerError(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
fw := watch.NewFake()
go func() {
fw.Stop()
}()
var resumeRV string
err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop)
if err == nil {
t.Errorf("unexpected non-error")
}
}
func TestReflectorWatchHandler(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
fw := watch.NewFake()
s.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
s.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}})
go func() {
fw.Add(&v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "rejected"}})
fw.Delete(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
fw.Modify(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "55"}})
fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}})
fw.Stop()
}()
var resumeRV string
err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop)
if err != nil {
t.Errorf("unexpected error %v", err)
}
mkPod := func(id string, rv string) *v1.Pod {
return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: rv}}
}
table := []struct {
Pod *v1.Pod
exists bool
}{
{mkPod("foo", ""), false},
{mkPod("rejected", ""), false},
{mkPod("bar", "55"), true},
{mkPod("baz", "32"), true},
}
for _, item := range table {
obj, exists, _ := s.Get(item.Pod)
if e, a := item.exists, exists; e != a {
t.Errorf("%v: expected %v, got %v", item.Pod, e, a)
}
if !exists {
continue
}
if e, a := item.Pod.ResourceVersion, obj.(*v1.Pod).ResourceVersion; e != a {
t.Errorf("%v: expected %v, got %v", item.Pod, e, a)
}
}
// RV should send the last version we see.
if e, a := "32", resumeRV; e != a {
t.Errorf("expected %v, got %v", e, a)
}
// last sync resource version should be the last version synced with store
if e, a := "32", g.LastSyncResourceVersion(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
}
func TestReflectorStopWatch(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
fw := watch.NewFake()
var resumeRV string
stopWatch := make(chan struct{}, 1)
stopWatch <- struct{}{}
err := g.watchHandler(fw, &resumeRV, nevererrc, stopWatch)
if err != errorStopRequested {
t.Errorf("expected stop error, got %q", err)
}
}
func TestReflectorListAndWatch(t *testing.T) {
createdFakes := make(chan *watch.FakeWatcher)
// The ListFunc says that it's at revision 1. Therefore, we expect our WatchFunc
// to get called at the beginning of the watch with 1, and again with 3 when we
// inject an error.
expectedRVs := []string{"1", "3"}
lw := &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
rv := options.ResourceVersion
fw := watch.NewFake()
if e, a := expectedRVs[0], rv; e != a {
t.Errorf("Expected rv %v, but got %v", e, a)
}
expectedRVs = expectedRVs[1:]
// channel is not buffered because the for loop below needs to block. But
// we don't want to block here, so report the new fake via a go routine.
go func() { createdFakes <- fw }()
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
},
}
s := NewFIFO(MetaNamespaceKeyFunc)
r := NewReflector(lw, &v1.Pod{}, s, 0)
go r.ListAndWatch(wait.NeverStop)
ids := []string{"foo", "bar", "baz", "qux", "zoo"}
var fw *watch.FakeWatcher
for i, id := range ids {
if fw == nil {
fw = <-createdFakes
}
sendingRV := strconv.FormatUint(uint64(i+2), 10)
fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: sendingRV}})
if sendingRV == "3" {
// Inject a failure.
fw.Stop()
fw = nil
}
}
// Verify we received the right ids with the right resource versions.
for i, id := range ids {
pod := Pop(s).(*v1.Pod)
if e, a := id, pod.Name; e != a {
t.Errorf("%v: Expected %v, got %v", i, e, a)
}
if e, a := strconv.FormatUint(uint64(i+2), 10), pod.ResourceVersion; e != a {
t.Errorf("%v: Expected %v, got %v", i, e, a)
}
}
if len(expectedRVs) != 0 {
t.Error("called watchStarter an unexpected number of times")
}
}
func TestReflectorListAndWatchWithErrors(t *testing.T) {
mkPod := func(id string, rv string) *v1.Pod {
return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: rv}}
}
mkList := func(rv string, pods ...*v1.Pod) *v1.PodList {
list := &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: rv}}
for _, pod := range pods {
list.Items = append(list.Items, *pod)
}
return list
}
table := []struct {
list *v1.PodList
listErr error
events []watch.Event
watchErr error
}{
{
list: mkList("1"),
events: []watch.Event{
{Type: watch.Added, Object: mkPod("foo", "2")},
{Type: watch.Added, Object: mkPod("bar", "3")},
},
}, {
list: mkList("3", mkPod("foo", "2"), mkPod("bar", "3")),
events: []watch.Event{
{Type: watch.Deleted, Object: mkPod("foo", "4")},
{Type: watch.Added, Object: mkPod("qux", "5")},
},
}, {
listErr: fmt.Errorf("a list error"),
}, {
list: mkList("5", mkPod("bar", "3"), mkPod("qux", "5")),
watchErr: fmt.Errorf("a watch error"),
}, {
list: mkList("5", mkPod("bar", "3"), mkPod("qux", "5")),
events: []watch.Event{
{Type: watch.Added, Object: mkPod("baz", "6")},
},
}, {
list: mkList("6", mkPod("bar", "3"), mkPod("qux", "5"), mkPod("baz", "6")),
},
}
s := NewFIFO(MetaNamespaceKeyFunc)
for line, item := range table {
if item.list != nil {
// Test that the list is what currently exists in the store.
current := s.List()
checkMap := map[string]string{}
for _, item := range current {
pod := item.(*v1.Pod)
checkMap[pod.Name] = pod.ResourceVersion
}
for _, pod := range item.list.Items {
if e, a := pod.ResourceVersion, checkMap[pod.Name]; e != a {
t.Errorf("%v: expected %v, got %v for pod %v", line, e, a, pod.Name)
}
}
if e, a := len(item.list.Items), len(checkMap); e != a {
t.Errorf("%v: expected %v, got %v", line, e, a)
}
}
watchRet, watchErr := item.events, item.watchErr
lw := &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if watchErr != nil {
return nil, watchErr
}
watchErr = fmt.Errorf("second watch")
fw := watch.NewFake()
go func() {
for _, e := range watchRet {
fw.Action(e.Type, e.Object)
}
fw.Stop()
}()
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return item.list, item.listErr
},
}
r := NewReflector(lw, &v1.Pod{}, s, 0)
r.ListAndWatch(wait.NeverStop)
}
}
func TestReflectorResync(t *testing.T) {
iteration := 0
stopCh := make(chan struct{})
rerr := errors.New("expected resync reached")
s := &FakeCustomStore{
ResyncFunc: func() error {
iteration++
if iteration == 2 {
return rerr
}
return nil
},
}
lw := &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
fw := watch.NewFake()
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "0"}}, nil
},
}
resyncPeriod := 1 * time.Millisecond
r := NewReflector(lw, &v1.Pod{}, s, resyncPeriod)
if err := r.ListAndWatch(stopCh); err != nil {
// error from Resync is not propaged up to here.
t.Errorf("expected error %v", err)
}
if iteration != 2 {
t.Errorf("exactly 2 iterations were expected, got: %v", iteration)
}
}
func TestReflectorWatchListPageSize(t *testing.T) {
stopCh := make(chan struct{})
s := NewStore(MetaNamespaceKeyFunc)
lw := &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list.
close(stopCh)
fw := watch.NewFake()
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if options.Limit != 4 {
t.Fatalf("Expected list Limit of 4 but got %d", options.Limit)
}
pods := make([]v1.Pod, 10)
for i := 0; i < 10; i++ {
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
}
switch options.Continue {
case "":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C1"}, Items: pods[0:4]}, nil
case "C1":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C2"}, Items: pods[4:8]}, nil
case "C2":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[8:10]}, nil
default:
t.Fatalf("Unrecognized continue: %s", options.Continue)
}
return nil, nil
},
}
r := NewReflector(lw, &v1.Pod{}, s, 0)
// Set the reflector to paginate the list request in 4 item chunks.
r.WatchListPageSize = 4
r.ListAndWatch(stopCh)
results := s.List()
if len(results) != 10 {
t.Errorf("Expected 10 results, got %d", len(results))
}
}
func TestReflectorSetExpectedType(t *testing.T) {
obj := &unstructured.Unstructured{}
gvk := schema.GroupVersionKind{
Group: "mygroup",
Version: "v1",
Kind: "MyKind",
}
obj.SetGroupVersionKind(gvk)
testCases := map[string]struct {
inputType interface{}
expectedTypeName string
expectedType reflect.Type
expectedGVK *schema.GroupVersionKind
}{
"Nil type": {
expectedTypeName: defaultExpectedTypeName,
},
"Normal type": {
inputType: &v1.Pod{},
expectedTypeName: "*v1.Pod",
expectedType: reflect.TypeOf(&v1.Pod{}),
},
"Unstructured type without GVK": {
inputType: &unstructured.Unstructured{},
expectedTypeName: "*unstructured.Unstructured",
expectedType: reflect.TypeOf(&unstructured.Unstructured{}),
},
"Unstructured type with GVK": {
inputType: obj,
expectedTypeName: gvk.String(),
expectedType: reflect.TypeOf(&unstructured.Unstructured{}),
expectedGVK: &gvk,
},
}
for testName, tc := range testCases {
t.Run(testName, func(t *testing.T) {
r := &Reflector{}
r.setExpectedType(tc.inputType)
if tc.expectedType != r.expectedType {
t.Fatalf("Expected expectedType %v, got %v", tc.expectedType, r.expectedType)
}
if tc.expectedTypeName != r.expectedTypeName {
t.Fatalf("Expected expectedTypeName %v, got %v", tc.expectedTypeName, r.expectedTypeName)
}
gvkNotEqual := (tc.expectedGVK == nil) != (r.expectedGVK == nil)
if tc.expectedGVK != nil && r.expectedGVK != nil {
gvkNotEqual = *tc.expectedGVK != *r.expectedGVK
}
if gvkNotEqual {
t.Fatalf("Expected expectedGVK %v, got %v", tc.expectedGVK, r.expectedGVK)
}
})
}
}