From 63b125d4061d267ad8998ea30e054bb1445b1a5a Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Fri, 15 Jul 2022 10:14:26 +0200 Subject: [PATCH] reflector: simplify reading the resourceVersion --- .../k8s.io/client-go/tools/cache/reflector.go | 36 +++++++++---------- .../client-go/tools/cache/reflector_test.go | 11 +++--- 2 files changed, 20 insertions(+), 27 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector.go b/staging/src/k8s.io/client-go/tools/cache/reflector.go index 99646e20acd..cd4e2f5b206 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector.go @@ -253,10 +253,8 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) { // It returns error if ListAndWatch didn't even try to initialize watch. func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name) - var resourceVersion string - var err error - resourceVersion, err = r.list(stopCh) + err := r.list(stopCh) if err != nil { return err } @@ -299,7 +297,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) options := metav1.ListOptions{ - ResourceVersion: resourceVersion, + ResourceVersion: r.LastSyncResourceVersion(), // We want to avoid situations of hanging watchers. Stop any watchers that do not // receive any events within the timeout window. TimeoutSeconds: &timeoutSeconds, @@ -325,7 +323,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { return err } - if err := watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, &resourceVersion, resyncerrc, stopCh); err != nil { + if err := watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh); err != nil { if err != errorStopRequested { switch { case isExpiredError(err): @@ -346,9 +344,9 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { } } -// list simply lists all items and gets a resource version obtained from the server at the moment of the call. +// list simply lists all items and records a resource version obtained from the server at the moment of the call. // the resource version can be used for further progress notification (aka. watch). -func (r *Reflector) list(stopCh <-chan struct{}) (string, error) { +func (r *Reflector) list(stopCh <-chan struct{}) error { var resourceVersion string options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()} @@ -408,7 +406,7 @@ func (r *Reflector) list(stopCh <-chan struct{}) (string, error) { }() select { case <-stopCh: - return "", nil + return nil case r := <-panicCh: panic(r) case <-listCh: @@ -416,7 +414,7 @@ func (r *Reflector) list(stopCh <-chan struct{}) (string, error) { initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err}) if err != nil { klog.Warningf("%s: failed to list %v: %v", r.name, r.expectedTypeName, err) - return "", fmt.Errorf("failed to list %v: %w", r.expectedTypeName, err) + return fmt.Errorf("failed to list %v: %w", r.expectedTypeName, err) } // We check if the list was paginated and if so set the paginatedResult based on that. @@ -436,22 +434,22 @@ func (r *Reflector) list(stopCh <-chan struct{}) (string, error) { r.setIsLastSyncResourceVersionUnavailable(false) // list was successful listMetaInterface, err := meta.ListAccessor(list) if err != nil { - return "", fmt.Errorf("unable to understand list result %#v: %v", list, err) + return fmt.Errorf("unable to understand list result %#v: %v", list, err) } resourceVersion = listMetaInterface.GetResourceVersion() initTrace.Step("Resource version extracted") items, err := meta.ExtractList(list) if err != nil { - return "", fmt.Errorf("unable to understand list result %#v (%v)", list, err) + return fmt.Errorf("unable to understand list result %#v (%v)", list, err) } initTrace.Step("Objects extracted") if err := r.syncWith(items, resourceVersion); err != nil { - return "", fmt.Errorf("unable to sync list result: %v", err) + return fmt.Errorf("unable to sync list result: %v", err) } initTrace.Step("SyncWith done") r.setLastSyncResourceVersion(resourceVersion) initTrace.Step("Resource version updated") - return resourceVersion, nil + return nil } // syncWith replaces the store's items with the given list. @@ -463,7 +461,7 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err return r.store.Replace(found, resourceVersion) } -// watchHandler watches w and keeps *resourceVersion up to date. +// watchHandler watches w and sets setLastSyncResourceVersion func watchHandler(start time.Time, w watch.Interface, store Store, @@ -473,7 +471,6 @@ func watchHandler(start time.Time, expectedTypeName string, setLastSyncResourceVersion func(string), clock clock.Clock, - resourceVersion *string, errc chan error, stopCh <-chan struct{}, ) error { @@ -514,7 +511,7 @@ loop: utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event)) continue } - newResourceVersion := meta.GetResourceVersion() + resourceVersion := meta.GetResourceVersion() switch event.Type { case watch.Added: err := store.Add(event.Object) @@ -539,10 +536,9 @@ loop: default: utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event)) } - *resourceVersion = newResourceVersion - setLastSyncResourceVersion(newResourceVersion) + setLastSyncResourceVersion(resourceVersion) if rvu, ok := store.(ResourceVersionUpdater); ok { - rvu.UpdateResourceVersion(newResourceVersion) + rvu.UpdateResourceVersion(resourceVersion) } eventCount++ } @@ -550,7 +546,7 @@ loop: watchDuration := clock.Since(start) if watchDuration < 1*time.Second && eventCount == 0 { - return fmt.Errorf("very short watch: %s: Unexpected watch close - watch jasted less than a second and no items received", name) + return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name) } klog.V(4).Infof("%s: Watch close - %v total %v items received", name, expectedTypeName, eventCount) return nil diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go index 1c75a439f37..cc79b2b7b4f 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go @@ -138,8 +138,7 @@ func TestReflectorWatchHandlerError(t *testing.T) { go func() { fw.Stop() }() - var resumeRV string - err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.expectedTypeName, g.setLastSyncResourceVersion, g.clock, &resumeRV, nevererrc, wait.NeverStop) + err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.expectedTypeName, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop) if err == nil { t.Errorf("unexpected non-error") } @@ -158,8 +157,7 @@ func TestReflectorWatchHandler(t *testing.T) { fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}}) fw.Stop() }() - var resumeRV string - err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.expectedTypeName, g.setLastSyncResourceVersion, g.clock, &resumeRV, nevererrc, wait.NeverStop) + err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.expectedTypeName, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop) if err != nil { t.Errorf("unexpected error %v", err) } @@ -191,7 +189,7 @@ func TestReflectorWatchHandler(t *testing.T) { } // RV should send the last version we see. - if e, a := "32", resumeRV; e != a { + if e, a := "32", g.LastSyncResourceVersion(); e != a { t.Errorf("expected %v, got %v", e, a) } @@ -205,10 +203,9 @@ func TestReflectorStopWatch(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) fw := watch.NewFake() - var resumeRV string stopWatch := make(chan struct{}, 1) stopWatch <- struct{}{} - err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.expectedTypeName, g.setLastSyncResourceVersion, g.clock, &resumeRV, nevererrc, stopWatch) + err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.expectedTypeName, g.setLastSyncResourceVersion, g.clock, nevererrc, stopWatch) if err != errorStopRequested { t.Errorf("expected stop error, got %q", err) }