Merge pull request #109971 from stevekuznetsov/skuznets/isolate-continue

storage: move continue token definition to storage
This commit is contained in:
Kubernetes Prow Robot 2022-05-12 14:57:48 -07:00 committed by GitHub
commit 46469f62db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 266 additions and 206 deletions

View File

@ -0,0 +1,85 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"encoding/base64"
"encoding/json"
"fmt"
"path"
"strings"
)
// continueToken is a simple structured object for encoding the state of a continue token.
// TODO: if we change the version of the encoded from, we can't start encoding the new version
// until all other servers are upgraded (i.e. we need to support rolling schema)
// This is a public API struct and cannot change.
type continueToken struct {
APIVersion string `json:"v"`
ResourceVersion int64 `json:"rv"`
StartKey string `json:"start"`
}
// DecodeContinue transforms an encoded predicate from into a versioned struct.
// TODO: return a typed error that instructs clients that they must relist
func DecodeContinue(continueValue, keyPrefix string) (fromKey string, rv int64, err error) {
data, err := base64.RawURLEncoding.DecodeString(continueValue)
if err != nil {
return "", 0, fmt.Errorf("continue key is not valid: %v", err)
}
var c continueToken
if err := json.Unmarshal(data, &c); err != nil {
return "", 0, fmt.Errorf("continue key is not valid: %v", err)
}
switch c.APIVersion {
case "meta.k8s.io/v1":
if c.ResourceVersion == 0 {
return "", 0, fmt.Errorf("continue key is not valid: incorrect encoded start resourceVersion (version meta.k8s.io/v1)")
}
if len(c.StartKey) == 0 {
return "", 0, fmt.Errorf("continue key is not valid: encoded start key empty (version meta.k8s.io/v1)")
}
// defend against path traversal attacks by clients - path.Clean will ensure that startKey cannot
// be at a higher level of the hierarchy, and so when we append the key prefix we will end up with
// continue start key that is fully qualified and cannot range over anything less specific than
// keyPrefix.
key := c.StartKey
if !strings.HasPrefix(key, "/") {
key = "/" + key
}
cleaned := path.Clean(key)
if cleaned != key {
return "", 0, fmt.Errorf("continue key is not valid: %s", c.StartKey)
}
return keyPrefix + cleaned[1:], c.ResourceVersion, nil
default:
return "", 0, fmt.Errorf("continue key is not valid: server does not recognize this encoded version %q", c.APIVersion)
}
}
// EncodeContinue returns a string representing the encoded continuation of the current query.
func EncodeContinue(key, keyPrefix string, resourceVersion int64) (string, error) {
nextKey := strings.TrimPrefix(key, keyPrefix)
if nextKey == key {
return "", fmt.Errorf("unable to encode next field: the key and key prefix do not match")
}
out, err := json.Marshal(&continueToken{APIVersion: "meta.k8s.io/v1", ResourceVersion: resourceVersion, StartKey: nextKey})
if err != nil {
return "", err
}
return base64.RawURLEncoding.EncodeToString(out), nil
}

View File

@ -0,0 +1,71 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"encoding/base64"
"encoding/json"
"testing"
)
func encodeContinueOrDie(apiVersion string, resourceVersion int64, nextKey string) string {
out, err := json.Marshal(&continueToken{APIVersion: apiVersion, ResourceVersion: resourceVersion, StartKey: nextKey})
if err != nil {
panic(err)
}
return base64.RawURLEncoding.EncodeToString(out)
}
func Test_decodeContinue(t *testing.T) {
type args struct {
continueValue string
keyPrefix string
}
tests := []struct {
name string
args args
wantFromKey string
wantRv int64
wantErr bool
}{
{name: "valid", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "key"), keyPrefix: "/test/"}, wantRv: 1, wantFromKey: "/test/key"},
{name: "root path", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "/"), keyPrefix: "/test/"}, wantRv: 1, wantFromKey: "/test/"},
{name: "empty version", args: args{continueValue: encodeContinueOrDie("", 1, "key"), keyPrefix: "/test/"}, wantErr: true},
{name: "invalid version", args: args{continueValue: encodeContinueOrDie("v1", 1, "key"), keyPrefix: "/test/"}, wantErr: true},
{name: "path traversal - parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "../key"), keyPrefix: "/test/"}, wantErr: true},
{name: "path traversal - local", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "./key"), keyPrefix: "/test/"}, wantErr: true},
{name: "path traversal - double parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "./../key"), keyPrefix: "/test/"}, wantErr: true},
{name: "path traversal - after parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "key/../.."), keyPrefix: "/test/"}, wantErr: true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotFromKey, gotRv, err := DecodeContinue(tt.args.continueValue, tt.args.keyPrefix)
if (err != nil) != tt.wantErr {
t.Errorf("decodeContinue() error = %v, wantErr %v", err, tt.wantErr)
return
}
if gotFromKey != tt.wantFromKey {
t.Errorf("decodeContinue() gotFromKey = %v, want %v", gotFromKey, tt.wantFromKey)
}
if gotRv != tt.wantRv {
t.Errorf("decodeContinue() gotRv = %v, want %v", gotRv, tt.wantRv)
}
})
}
}

View File

@ -18,6 +18,7 @@ package etcd3
import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apiserver/pkg/storage"
etcdrpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -60,7 +61,7 @@ func handleCompactedErrorForPaging(continueKey, keyPrefix string) error {
// continueToken.ResoureVersion=-1 means that the apiserver can
// continue the list at the latest resource version. We don't use rv=0
// for this purpose to distinguish from a bad token that has empty rv.
newToken, err := encodeContinue(continueKey, keyPrefix, -1)
newToken, err := storage.EncodeContinue(continueKey, keyPrefix, -1)
if err != nil {
utilruntime.HandleError(err)
return errors.NewResourceExpired(continueExpired)

View File

@ -19,8 +19,6 @@ package etcd3
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"path"
@ -519,66 +517,6 @@ func (s *store) Count(key string) (int64, error) {
return getResp.Count, nil
}
// continueToken is a simple structured object for encoding the state of a continue token.
// TODO: if we change the version of the encoded from, we can't start encoding the new version
// until all other servers are upgraded (i.e. we need to support rolling schema)
// This is a public API struct and cannot change.
type continueToken struct {
APIVersion string `json:"v"`
ResourceVersion int64 `json:"rv"`
StartKey string `json:"start"`
}
// parseFrom transforms an encoded predicate from into a versioned struct.
// TODO: return a typed error that instructs clients that they must relist
func decodeContinue(continueValue, keyPrefix string) (fromKey string, rv int64, err error) {
data, err := base64.RawURLEncoding.DecodeString(continueValue)
if err != nil {
return "", 0, fmt.Errorf("continue key is not valid: %v", err)
}
var c continueToken
if err := json.Unmarshal(data, &c); err != nil {
return "", 0, fmt.Errorf("continue key is not valid: %v", err)
}
switch c.APIVersion {
case "meta.k8s.io/v1":
if c.ResourceVersion == 0 {
return "", 0, fmt.Errorf("continue key is not valid: incorrect encoded start resourceVersion (version meta.k8s.io/v1)")
}
if len(c.StartKey) == 0 {
return "", 0, fmt.Errorf("continue key is not valid: encoded start key empty (version meta.k8s.io/v1)")
}
// defend against path traversal attacks by clients - path.Clean will ensure that startKey cannot
// be at a higher level of the hierarchy, and so when we append the key prefix we will end up with
// continue start key that is fully qualified and cannot range over anything less specific than
// keyPrefix.
key := c.StartKey
if !strings.HasPrefix(key, "/") {
key = "/" + key
}
cleaned := path.Clean(key)
if cleaned != key {
return "", 0, fmt.Errorf("continue key is not valid: %s", c.StartKey)
}
return keyPrefix + cleaned[1:], c.ResourceVersion, nil
default:
return "", 0, fmt.Errorf("continue key is not valid: server does not recognize this encoded version %q", c.APIVersion)
}
}
// encodeContinue returns a string representing the encoded continuation of the current query.
func encodeContinue(key, keyPrefix string, resourceVersion int64) (string, error) {
nextKey := strings.TrimPrefix(key, keyPrefix)
if nextKey == key {
return "", fmt.Errorf("unable to encode next field: the key and key prefix do not match")
}
out, err := json.Marshal(&continueToken{APIVersion: "meta.k8s.io/v1", ResourceVersion: resourceVersion, StartKey: nextKey})
if err != nil {
return "", err
}
return base64.RawURLEncoding.EncodeToString(out), nil
}
// GetList implements storage.Interface.
func (s *store) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
recursive := opts.Recursive
@ -637,7 +575,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
var continueKey string
switch {
case recursive && s.pagingEnabled && len(pred.Continue) > 0:
continueKey, continueRV, err = decodeContinue(pred.Continue, keyPrefix)
continueKey, continueRV, err = storage.DecodeContinue(pred.Continue, keyPrefix)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
}
@ -798,7 +736,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
// we never return a key that the client wouldn't be allowed to see
if hasMore {
// we want to start immediately after the last key
next, err := encodeContinue(string(lastKey)+"\x00", keyPrefix, returnedRV)
next, err := storage.EncodeContinue(string(lastKey)+"\x00", keyPrefix, returnedRV)
if err != nil {
return err
}

View File

@ -19,8 +19,6 @@ package etcd3
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"math"
@ -597,7 +595,7 @@ func TestList(t *testing.T) {
t.Errorf("Unexpected error: %v", err)
}
continueRV, _ := strconv.Atoi(list.ResourceVersion)
secondContinuation, err := encodeContinue("/two-level/2", "/two-level/", int64(continueRV))
secondContinuation, err := storage.EncodeContinue("/two-level/2", "/two-level/", int64(continueRV))
if err != nil {
t.Fatal(err)
}
@ -927,7 +925,7 @@ func TestList(t *testing.T) {
Field: fields.OneTermEqualSelector("metadata.name", "bar"),
Label: labels.Everything(),
Limit: 2,
Continue: encodeContinueOrDie("meta.k8s.io/v1", int64(continueRV), "z-level/3"),
Continue: storagetesting.EncodeContinueOrDie("z-level/3", int64(continueRV)),
},
expectedOut: []*example.Pod{preset[4].storedObj},
},
@ -938,7 +936,7 @@ func TestList(t *testing.T) {
Field: fields.OneTermEqualSelector("metadata.name", "bar"),
Label: labels.Everything(),
Limit: 1,
Continue: encodeContinueOrDie("meta.k8s.io/v1", int64(continueRV), "z-level/3/test-2"),
Continue: storagetesting.EncodeContinueOrDie("z-level/3/test-2", int64(continueRV)),
},
expectedOut: []*example.Pod{preset[4].storedObj},
},
@ -949,7 +947,7 @@ func TestList(t *testing.T) {
Field: fields.OneTermEqualSelector("metadata.name", "bar"),
Label: labels.Everything(),
Limit: 2,
Continue: encodeContinueOrDie("meta.k8s.io/v1", int64(continueRV), "z-level/3/test-2"),
Continue: storagetesting.EncodeContinueOrDie("z-level/3/test-2", int64(continueRV)),
},
expectedOut: []*example.Pod{preset[4].storedObj},
},
@ -1134,7 +1132,7 @@ func TestListContinuation(t *testing.T) {
if len(out.Continue) != 0 {
t.Fatalf("Unexpected continuation token set")
}
key, rv, err := decodeContinue(continueFromSecondItem, "/")
key, rv, err := storage.DecodeContinue(continueFromSecondItem, "/")
t.Logf("continue token was %d %s %v", rv, key, err)
storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj, *preset[2].storedObj}, out.Items)
if transformer.reads != 2 {
@ -1654,54 +1652,6 @@ func TestPrefix(t *testing.T) {
}
}
func encodeContinueOrDie(apiVersion string, resourceVersion int64, nextKey string) string {
out, err := json.Marshal(&continueToken{APIVersion: apiVersion, ResourceVersion: resourceVersion, StartKey: nextKey})
if err != nil {
panic(err)
}
return base64.RawURLEncoding.EncodeToString(out)
}
func Test_decodeContinue(t *testing.T) {
type args struct {
continueValue string
keyPrefix string
}
tests := []struct {
name string
args args
wantFromKey string
wantRv int64
wantErr bool
}{
{name: "valid", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "key"), keyPrefix: "/test/"}, wantRv: 1, wantFromKey: "/test/key"},
{name: "root path", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "/"), keyPrefix: "/test/"}, wantRv: 1, wantFromKey: "/test/"},
{name: "empty version", args: args{continueValue: encodeContinueOrDie("", 1, "key"), keyPrefix: "/test/"}, wantErr: true},
{name: "invalid version", args: args{continueValue: encodeContinueOrDie("v1", 1, "key"), keyPrefix: "/test/"}, wantErr: true},
{name: "path traversal - parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "../key"), keyPrefix: "/test/"}, wantErr: true},
{name: "path traversal - local", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "./key"), keyPrefix: "/test/"}, wantErr: true},
{name: "path traversal - double parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "./../key"), keyPrefix: "/test/"}, wantErr: true},
{name: "path traversal - after parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "key/../.."), keyPrefix: "/test/"}, wantErr: true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotFromKey, gotRv, err := decodeContinue(tt.args.continueValue, tt.args.keyPrefix)
if (err != nil) != tt.wantErr {
t.Errorf("decodeContinue() error = %v, wantErr %v", err, tt.wantErr)
return
}
if gotFromKey != tt.wantFromKey {
t.Errorf("decodeContinue() gotFromKey = %v, want %v", gotFromKey, tt.wantFromKey)
}
if gotRv != tt.wantRv {
t.Errorf("decodeContinue() gotRv = %v, want %v", gotRv, tt.wantRv)
}
})
}
}
func Test_growSlice(t *testing.T) {
type args struct {
initialCapacity int

View File

@ -21,12 +21,10 @@ import (
"errors"
"fmt"
"math"
"reflect"
"strconv"
"sync"
"testing"
"github.com/google/go-cmp/cmp"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
@ -701,28 +699,6 @@ func RunTestGuaranteedUpdateWithSuggestionAndConflict(ctx context.Context, t *te
}
}
// TestPropogateStore helps propagates store with objects, automates key generation, and returns
// keys and stored objects.
func TestPropogateStore(ctx context.Context, t *testing.T, store storage.Interface, obj *example.Pod) (string, *example.Pod) {
// Setup store with a key and grab the output for returning.
key := "/testkey"
return key, TestPropogateStoreWithKey(ctx, t, store, key, obj)
}
// TestPropogateStoreWithKey helps propagate store with objects, the given object will be stored at the specified key.
func TestPropogateStoreWithKey(ctx context.Context, t *testing.T, store storage.Interface, key string, obj *example.Pod) *example.Pod {
// Setup store with the specified key and grab the output for returning.
err := store.Delete(ctx, key, &example.Pod{}, nil, storage.ValidateAllObjectFunc, nil)
if err != nil && !storage.IsNotFound(err) {
t.Fatalf("Cleanup failed: %v", err)
}
setOutput := &example.Pod{}
if err := store.Create(ctx, key, obj, setOutput, 0); err != nil {
t.Fatalf("Set failed: %v", err)
}
return setOutput
}
func RunTestCount(ctx context.Context, t *testing.T, store storage.Interface) {
resourceA := "/foo.bar.io/abc"
@ -758,14 +734,3 @@ func RunTestCount(ctx context.Context, t *testing.T, store storage.Interface) {
t.Fatalf("store.Count for resource %s: expected %d but got %d", resourceA, resourceACountExpected, resourceACountGot)
}
}
func ExpectNoDiff(t *testing.T, msg string, expected, got interface{}) {
t.Helper()
if !reflect.DeepEqual(expected, got) {
if diff := cmp.Diff(expected, got); diff != "" {
t.Errorf("%s: %s", msg, diff)
} else {
t.Errorf("%s:\nexpected: %#v\ngot: %#v", msg, expected, got)
}
}
}

View File

@ -19,9 +19,16 @@ package testing
import (
"context"
"path"
"reflect"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example"
"k8s.io/apiserver/pkg/storage"
)
@ -70,3 +77,97 @@ func DeepEqualSafePodSpec() example.PodSpec {
SchedulerName: "default-scheduler",
}
}
// TestPropogateStore helps propagates store with objects, automates key generation, and returns
// keys and stored objects.
func TestPropogateStore(ctx context.Context, t *testing.T, store storage.Interface, obj *example.Pod) (string, *example.Pod) {
// Setup store with a key and grab the output for returning.
key := "/testkey"
return key, TestPropogateStoreWithKey(ctx, t, store, key, obj)
}
// TestPropogateStoreWithKey helps propagate store with objects, the given object will be stored at the specified key.
func TestPropogateStoreWithKey(ctx context.Context, t *testing.T, store storage.Interface, key string, obj *example.Pod) *example.Pod {
// Setup store with the specified key and grab the output for returning.
err := store.Delete(ctx, key, &example.Pod{}, nil, storage.ValidateAllObjectFunc, nil)
if err != nil && !storage.IsNotFound(err) {
t.Fatalf("Cleanup failed: %v", err)
}
setOutput := &example.Pod{}
if err := store.Create(ctx, key, obj, setOutput, 0); err != nil {
t.Fatalf("Set failed: %v", err)
}
return setOutput
}
func ExpectNoDiff(t *testing.T, msg string, expected, got interface{}) {
t.Helper()
if !reflect.DeepEqual(expected, got) {
if diff := cmp.Diff(expected, got); diff != "" {
t.Errorf("%s: %s", msg, diff)
} else {
t.Errorf("%s:\nexpected: %#v\ngot: %#v", msg, expected, got)
}
}
}
const dummyPrefix = "adapter"
func EncodeContinueOrDie(key string, resourceVersion int64) string {
token, err := storage.EncodeContinue(dummyPrefix+key, dummyPrefix, resourceVersion)
if err != nil {
panic(err)
}
return token
}
func TestCheckEventType(t *testing.T, expectEventType watch.EventType, w watch.Interface) {
select {
case res := <-w.ResultChan():
if res.Type != expectEventType {
t.Errorf("event type want=%v, get=%v", expectEventType, res.Type)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("time out after waiting %v on ResultChan", wait.ForeverTestTimeout)
}
}
func TestCheckResult(t *testing.T, expectEventType watch.EventType, w watch.Interface, expectObj *example.Pod) {
TestCheckResultFunc(t, expectEventType, w, func(object runtime.Object) error {
ExpectNoDiff(t, "incorrect object", expectObj, object)
return nil
})
}
func TestCheckResultFunc(t *testing.T, expectEventType watch.EventType, w watch.Interface, check func(object runtime.Object) error) {
select {
case res := <-w.ResultChan():
if res.Type != expectEventType {
t.Errorf("event type want=%v, get=%v", expectEventType, res.Type)
return
}
if err := check(res.Object); err != nil {
t.Error(err)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("time out after waiting %v on ResultChan", wait.ForeverTestTimeout)
}
}
func TestCheckStop(t *testing.T, w watch.Interface) {
select {
case e, ok := <-w.ResultChan():
if ok {
var obj string
switch e.Object.(type) {
case *example.Pod:
obj = e.Object.(*example.Pod).Name
case *v1.Status:
obj = e.Object.(*v1.Status).Message
}
t.Errorf("ResultChan should have been closed. Event: %s. Object: %s", e.Type, obj)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("time out after waiting 1s on ResultChan")
}
}

View File

@ -190,54 +190,3 @@ type testWatchStruct struct {
expectEvent bool
watchType watch.EventType
}
func TestCheckEventType(t *testing.T, expectEventType watch.EventType, w watch.Interface) {
select {
case res := <-w.ResultChan():
if res.Type != expectEventType {
t.Errorf("event type want=%v, get=%v", expectEventType, res.Type)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("time out after waiting %v on ResultChan", wait.ForeverTestTimeout)
}
}
func TestCheckResult(t *testing.T, expectEventType watch.EventType, w watch.Interface, expectObj *example.Pod) {
TestCheckResultFunc(t, expectEventType, w, func(object runtime.Object) error {
ExpectNoDiff(t, "incorrect object", expectObj, object)
return nil
})
}
func TestCheckResultFunc(t *testing.T, expectEventType watch.EventType, w watch.Interface, check func(object runtime.Object) error) {
select {
case res := <-w.ResultChan():
if res.Type != expectEventType {
t.Errorf("event type want=%v, get=%v", expectEventType, res.Type)
return
}
if err := check(res.Object); err != nil {
t.Error(err)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("time out after waiting %v on ResultChan", wait.ForeverTestTimeout)
}
}
func TestCheckStop(t *testing.T, w watch.Interface) {
select {
case e, ok := <-w.ResultChan():
if ok {
var obj string
switch e.Object.(type) {
case *example.Pod:
obj = e.Object.(*example.Pod).Name
case *metav1.Status:
obj = e.Object.(*metav1.Status).Message
}
t.Errorf("ResultChan should have been closed. Event: %s. Object: %s", e.Type, obj)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("time out after waiting 1s on ResultChan")
}
}