From 866e6a7e3cf35d16a6d3d5175e36d05dfe4b0797 Mon Sep 17 00:00:00 2001 From: wojtekt Date: Mon, 31 Aug 2020 15:58:16 +0200 Subject: [PATCH] Allow tracking resource version for reflector store Kubernetes-commit: 4af1328bb8a3b3eb2289bbbe624480548dd39cdc --- tools/cache/reflector.go | 12 ++++++++ tools/cache/reflector_test.go | 56 +++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 9c9a758f..360d7304 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -101,6 +101,15 @@ type Reflector struct { watchErrorHandler WatchErrorHandler } +// ResourceVersionUpdater is an interface that allows store implementation to +// track the current resource version of the reflector. This is especially +// important if storage bookmarks are enabled. +type ResourceVersionUpdater interface { + // UpdateResourceVersion is called each time current resource version of the reflector + // is updated. + UpdateResourceVersion(resourceVersion string) +} + // The WatchErrorHandler is called whenever ListAndWatch drops the // connection with an error. After calling this handler, the informer // will backoff and retry. @@ -507,6 +516,9 @@ loop: } *resourceVersion = newResourceVersion r.setLastSyncResourceVersion(newResourceVersion) + if rvu, ok := r.store.(ResourceVersionUpdater); ok { + rvu.UpdateResourceVersion(newResourceVersion) + } eventCount++ } } diff --git a/tools/cache/reflector_test.go b/tools/cache/reflector_test.go index 9260d5a6..6a67e59c 100644 --- a/tools/cache/reflector_test.go +++ b/tools/cache/reflector_test.go @@ -910,3 +910,59 @@ func TestReflectorSetExpectedType(t *testing.T) { }) } } + +type storeWithRV struct { + Store + + // resourceVersions tracks values passed by UpdateResourceVersion + resourceVersions []string +} + +func (s *storeWithRV) UpdateResourceVersion(resourceVersion string) { + s.resourceVersions = append(s.resourceVersions, resourceVersion) +} + +func newStoreWithRV() *storeWithRV { + return &storeWithRV{ + Store: NewStore(MetaNamespaceKeyFunc), + } +} + +func TestReflectorResourceVersionUpdate(t *testing.T) { + s := newStoreWithRV() + + stopCh := make(chan struct{}) + fw := watch.NewFake() + + lw := &testLW{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return fw, nil + }, + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}}, nil + }, + } + r := NewReflector(lw, &v1.Pod{}, s, 0) + + makePod := func(rv string) *v1.Pod { + return &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: rv}} + } + + go func() { + fw.Action(watch.Added, makePod("10")) + fw.Action(watch.Modified, makePod("20")) + fw.Action(watch.Bookmark, makePod("30")) + fw.Action(watch.Deleted, makePod("40")) + close(stopCh) + }() + + // Initial list should use RV=0 + if err := r.ListAndWatch(stopCh); err != nil { + t.Fatal(err) + } + + expectedRVs := []string{"10", "20", "30", "40"} + if !reflect.DeepEqual(s.resourceVersions, expectedRVs) { + t.Errorf("Expected series of resource version updates of %#v but got: %#v", expectedRVs, s.resourceVersions) + } +}