mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-27 15:39:39 +00:00
Add support for type checking Unstructured via GVK in reflector
It was previously possible to instantiate `Reflector` with `*unstructured.Unstructured` as the expected type but this did not support checking that event objects were of the correct API type (e.g. if event object was `v1.Pod` in `Unstructured` form but `v1.Service` was expected). This commit adds support for providing a GVK via an `Unstructured` expected type to compare with the GVK of event objects. The GVK will also be used in reflector log output. Kubernetes-commit: 237dbfd8ad322dfcad4bd4d5345368480c22d82f
This commit is contained in:
parent
0e97bf0202
commit
ab4e4fccf4
60
tools/cache/reflector.go
vendored
60
tools/cache/reflector.go
vendored
@ -29,7 +29,9 @@ import (
|
|||||||
apierrs "k8s.io/apimachinery/pkg/api/errors"
|
apierrs "k8s.io/apimachinery/pkg/api/errors"
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/util/clock"
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
"k8s.io/apimachinery/pkg/util/naming"
|
"k8s.io/apimachinery/pkg/util/naming"
|
||||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||||
@ -41,13 +43,22 @@ import (
|
|||||||
"k8s.io/utils/trace"
|
"k8s.io/utils/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const defaultExpectedTypeName = "<unspecified>"
|
||||||
|
|
||||||
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
|
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
|
||||||
type Reflector struct {
|
type Reflector struct {
|
||||||
// name identifies this reflector. By default it will be a file:line if possible.
|
// name identifies this reflector. By default it will be a file:line if possible.
|
||||||
name string
|
name string
|
||||||
|
|
||||||
|
// The name of the type we expect to place in the store. The name
|
||||||
|
// will be the stringification of expectedGVK if provided, and the
|
||||||
|
// stringification of expectedType otherwise. It is for display
|
||||||
|
// only, and should not be used for parsing or comparison.
|
||||||
|
expectedTypeName string
|
||||||
// The type of object we expect to place in the store.
|
// The type of object we expect to place in the store.
|
||||||
expectedType reflect.Type
|
expectedType reflect.Type
|
||||||
|
// The GVK of the object we expect to place in the store if unstructured.
|
||||||
|
expectedGVK *schema.GroupVersionKind
|
||||||
// The destination to sync up with the watch source
|
// The destination to sync up with the watch source
|
||||||
store Store
|
store Store
|
||||||
// listerWatcher is used to perform lists and watches.
|
// listerWatcher is used to perform lists and watches.
|
||||||
@ -100,14 +111,35 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
|
|||||||
name: name,
|
name: name,
|
||||||
listerWatcher: lw,
|
listerWatcher: lw,
|
||||||
store: store,
|
store: store,
|
||||||
expectedType: reflect.TypeOf(expectedType),
|
|
||||||
period: time.Second,
|
period: time.Second,
|
||||||
resyncPeriod: resyncPeriod,
|
resyncPeriod: resyncPeriod,
|
||||||
clock: &clock.RealClock{},
|
clock: &clock.RealClock{},
|
||||||
}
|
}
|
||||||
|
r.setExpectedType(expectedType)
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Reflector) setExpectedType(expectedType interface{}) {
|
||||||
|
r.expectedType = reflect.TypeOf(expectedType)
|
||||||
|
if r.expectedType == nil {
|
||||||
|
r.expectedTypeName = defaultExpectedTypeName
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
r.expectedTypeName = r.expectedType.String()
|
||||||
|
|
||||||
|
if obj, ok := expectedType.(*unstructured.Unstructured); ok {
|
||||||
|
// Use gvk to check that watch event objects are of the desired type.
|
||||||
|
gvk := obj.GroupVersionKind()
|
||||||
|
if gvk.Empty() {
|
||||||
|
klog.V(4).Infof("Reflector from %s configured with expectedType of *unstructured.Unstructured with empty GroupVersionKind.", r.name)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
r.expectedGVK = &gvk
|
||||||
|
r.expectedTypeName = gvk.String()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
|
// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
|
||||||
// call chains to NewReflector, so they'd be low entropy names for reflectors
|
// call chains to NewReflector, so they'd be low entropy names for reflectors
|
||||||
var internalPackages = []string{"client-go/tools/cache/"}
|
var internalPackages = []string{"client-go/tools/cache/"}
|
||||||
@ -115,7 +147,7 @@ var internalPackages = []string{"client-go/tools/cache/"}
|
|||||||
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
|
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
|
||||||
// Run will exit when stopCh is closed.
|
// Run will exit when stopCh is closed.
|
||||||
func (r *Reflector) Run(stopCh <-chan struct{}) {
|
func (r *Reflector) Run(stopCh <-chan struct{}) {
|
||||||
klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
|
klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
|
||||||
wait.Until(func() {
|
wait.Until(func() {
|
||||||
if err := r.ListAndWatch(stopCh); err != nil {
|
if err := r.ListAndWatch(stopCh); err != nil {
|
||||||
utilruntime.HandleError(err)
|
utilruntime.HandleError(err)
|
||||||
@ -150,7 +182,7 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
|
|||||||
// and then use the resource version to watch.
|
// and then use the resource version to watch.
|
||||||
// It returns error if ListAndWatch didn't even try to initialize watch.
|
// It returns error if ListAndWatch didn't even try to initialize watch.
|
||||||
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||||
klog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
|
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
|
||||||
var resourceVersion string
|
var resourceVersion string
|
||||||
|
|
||||||
// Explicitly set "0" as resource version - it's fine for the List()
|
// Explicitly set "0" as resource version - it's fine for the List()
|
||||||
@ -191,7 +223,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
case <-listCh:
|
case <-listCh:
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
|
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
|
||||||
}
|
}
|
||||||
initTrace.Step("Objects listed")
|
initTrace.Step("Objects listed")
|
||||||
listMetaInterface, err := meta.ListAccessor(list)
|
listMetaInterface, err := meta.ListAccessor(list)
|
||||||
@ -270,9 +302,9 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
case io.EOF:
|
case io.EOF:
|
||||||
// watch closed normally
|
// watch closed normally
|
||||||
case io.ErrUnexpectedEOF:
|
case io.ErrUnexpectedEOF:
|
||||||
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)
|
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
|
||||||
default:
|
default:
|
||||||
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
|
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
|
||||||
}
|
}
|
||||||
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
|
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
|
||||||
// It doesn't make sense to re-list all objects because most likely we will be able to restart
|
// It doesn't make sense to re-list all objects because most likely we will be able to restart
|
||||||
@ -289,9 +321,9 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
if err != errorStopRequested {
|
if err != errorStopRequested {
|
||||||
switch {
|
switch {
|
||||||
case apierrs.IsResourceExpired(err):
|
case apierrs.IsResourceExpired(err):
|
||||||
klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
|
klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
|
||||||
default:
|
default:
|
||||||
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
|
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -331,10 +363,18 @@ loop:
|
|||||||
if event.Type == watch.Error {
|
if event.Type == watch.Error {
|
||||||
return apierrs.FromObject(event.Object)
|
return apierrs.FromObject(event.Object)
|
||||||
}
|
}
|
||||||
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
|
if r.expectedType != nil {
|
||||||
|
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
|
||||||
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
|
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
if r.expectedGVK != nil {
|
||||||
|
if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
meta, err := meta.Accessor(event.Object)
|
meta, err := meta.Accessor(event.Object)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
|
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
|
||||||
@ -375,7 +415,7 @@ loop:
|
|||||||
if watchDuration < 1*time.Second && eventCount == 0 {
|
if watchDuration < 1*time.Second && eventCount == 0 {
|
||||||
return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
|
return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
|
||||||
}
|
}
|
||||||
klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
|
klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
58
tools/cache/reflector_test.go
vendored
58
tools/cache/reflector_test.go
vendored
@ -20,13 +20,16 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
)
|
)
|
||||||
@ -430,3 +433,58 @@ func TestReflectorWatchListPageSize(t *testing.T) {
|
|||||||
t.Errorf("Expected 10 results, got %d", len(results))
|
t.Errorf("Expected 10 results, got %d", len(results))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReflectorSetExpectedType(t *testing.T) {
|
||||||
|
obj := &unstructured.Unstructured{}
|
||||||
|
gvk := schema.GroupVersionKind{
|
||||||
|
Group: "mygroup",
|
||||||
|
Version: "v1",
|
||||||
|
Kind: "MyKind",
|
||||||
|
}
|
||||||
|
obj.SetGroupVersionKind(gvk)
|
||||||
|
testCases := map[string]struct {
|
||||||
|
inputType interface{}
|
||||||
|
expectedTypeName string
|
||||||
|
expectedType reflect.Type
|
||||||
|
expectedGVK *schema.GroupVersionKind
|
||||||
|
}{
|
||||||
|
"Nil type": {
|
||||||
|
expectedTypeName: defaultExpectedTypeName,
|
||||||
|
},
|
||||||
|
"Normal type": {
|
||||||
|
inputType: &v1.Pod{},
|
||||||
|
expectedTypeName: "*v1.Pod",
|
||||||
|
expectedType: reflect.TypeOf(&v1.Pod{}),
|
||||||
|
},
|
||||||
|
"Unstructured type without GVK": {
|
||||||
|
inputType: &unstructured.Unstructured{},
|
||||||
|
expectedTypeName: "*unstructured.Unstructured",
|
||||||
|
expectedType: reflect.TypeOf(&unstructured.Unstructured{}),
|
||||||
|
},
|
||||||
|
"Unstructured type with GVK": {
|
||||||
|
inputType: obj,
|
||||||
|
expectedTypeName: gvk.String(),
|
||||||
|
expectedType: reflect.TypeOf(&unstructured.Unstructured{}),
|
||||||
|
expectedGVK: &gvk,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for testName, tc := range testCases {
|
||||||
|
t.Run(testName, func(t *testing.T) {
|
||||||
|
r := &Reflector{}
|
||||||
|
r.setExpectedType(tc.inputType)
|
||||||
|
if tc.expectedType != r.expectedType {
|
||||||
|
t.Fatalf("Expected expectedType %v, got %v", tc.expectedType, r.expectedType)
|
||||||
|
}
|
||||||
|
if tc.expectedTypeName != r.expectedTypeName {
|
||||||
|
t.Fatalf("Expected expectedTypeName %v, got %v", tc.expectedTypeName, r.expectedTypeName)
|
||||||
|
}
|
||||||
|
gvkNotEqual := (tc.expectedGVK == nil) != (r.expectedGVK == nil)
|
||||||
|
if tc.expectedGVK != nil && r.expectedGVK != nil {
|
||||||
|
gvkNotEqual = *tc.expectedGVK != *r.expectedGVK
|
||||||
|
}
|
||||||
|
if gvkNotEqual {
|
||||||
|
t.Fatalf("Expected expectedGVK %v, got %v", tc.expectedGVK, r.expectedGVK)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user