reflector: refactor watchHandler

Kubernetes-commit: e9e26068b746315b616a26b91ff2613f98e934bc
This commit is contained in:
Lukasz Szaszkiewicz 2022-06-24 13:44:32 +02:00 committed by Kubernetes Publisher
parent 163ee0b6d7
commit 4e28921c86
2 changed files with 36 additions and 24 deletions

View File

@ -426,7 +426,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
return err
}
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err := watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
switch {
case isExpiredError(err):
@ -457,7 +457,19 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err
}
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
func watchHandler(start time.Time,
w watch.Interface,
store Store,
expectedType reflect.Type,
expectedGVK *schema.GroupVersionKind,
name string,
expectedTypeName string,
setLastSyncResourceVersion func(string),
clock clock.Clock,
resourceVersion *string,
errc chan error,
stopCh <-chan struct{},
) error {
eventCount := 0
// Stopping the watcher should be idempotent and if we return from this function there's no way
@ -478,62 +490,62 @@ loop:
if event.Type == watch.Error {
return apierrors.FromObject(event.Object)
}
if r.expectedType != nil {
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
if expectedType != nil {
if e, a := expectedType, reflect.TypeOf(event.Object); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a))
continue
}
}
if r.expectedGVK != nil {
if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
if expectedGVK != nil {
if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a))
continue
}
}
meta, err := meta.Accessor(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
continue
}
newResourceVersion := meta.GetResourceVersion()
switch event.Type {
case watch.Added:
err := r.store.Add(event.Object)
err := store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
}
case watch.Modified:
err := r.store.Update(event.Object)
err := store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))
}
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
err := r.store.Delete(event.Object)
err := store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))
}
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
}
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
if rvu, ok := r.store.(ResourceVersionUpdater); ok {
setLastSyncResourceVersion(newResourceVersion)
if rvu, ok := store.(ResourceVersionUpdater); ok {
rvu.UpdateResourceVersion(newResourceVersion)
}
eventCount++
}
}
watchDuration := r.clock.Since(start)
watchDuration := clock.Since(start)
if watchDuration < 1*time.Second && eventCount == 0 {
return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
return fmt.Errorf("very short watch: %s: Unexpected watch close - watch jasted less than a second and no items received", name)
}
klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
klog.V(4).Infof("%s: Watch close - %v total %v items received", name, expectedTypeName, eventCount)
return nil
}

View File

@ -139,7 +139,7 @@ func TestReflectorWatchHandlerError(t *testing.T) {
fw.Stop()
}()
var resumeRV string
err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, wait.NeverStop)
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.expectedTypeName, g.setLastSyncResourceVersion, g.clock, &resumeRV, nevererrc, wait.NeverStop)
if err == nil {
t.Errorf("unexpected non-error")
}
@ -159,7 +159,7 @@ func TestReflectorWatchHandler(t *testing.T) {
fw.Stop()
}()
var resumeRV string
err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, wait.NeverStop)
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.expectedTypeName, g.setLastSyncResourceVersion, g.clock, &resumeRV, nevererrc, wait.NeverStop)
if err != nil {
t.Errorf("unexpected error %v", err)
}
@ -208,7 +208,7 @@ func TestReflectorStopWatch(t *testing.T) {
var resumeRV string
stopWatch := make(chan struct{}, 1)
stopWatch <- struct{}{}
err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, stopWatch)
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.expectedTypeName, g.setLastSyncResourceVersion, g.clock, &resumeRV, nevererrc, stopWatch)
if err != errorStopRequested {
t.Errorf("expected stop error, got %q", err)
}