diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index fc97c2af3b5..1f5e71ad593 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -43,6 +43,7 @@ import ( "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/cacher/metrics" + "k8s.io/apiserver/pkg/storage/cacher/progress" etcdfeature "k8s.io/apiserver/pkg/storage/feature" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/cache" @@ -420,7 +421,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { return nil, fmt.Errorf("config.EventsHistoryWindow (%v) must not be below %v", eventFreshDuration, DefaultEventFreshDuration) } - progressRequester := newConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock, contextMetadata) + progressRequester := progress.NewConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock, contextMetadata) watchCache := newWatchCache( config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, eventFreshDuration, config.GroupResource, progressRequester) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 0b0dab1504f..27c36cc2b62 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -2815,13 +2815,6 @@ func TestWatchStreamSeparation(t *testing.T) { expectBookmarkOnEtcd: true, expectBookmarkOnWatchCache: true, }, - { - name: "common RPC & watch cache context > both get bookmarks", - separateCacheWatchRPC: false, - useWatchCacheContextMetadata: true, - expectBookmarkOnEtcd: true, - expectBookmarkOnWatchCache: true, - }, { name: "separate RPC > only etcd gets bookmarks", separateCacheWatchRPC: true, @@ -2877,7 +2870,7 @@ func TestWatchStreamSeparation(t *testing.T) { var contextMetadata metadata.MD if tc.useWatchCacheContextMetadata { - contextMetadata = cacher.watchCache.waitingUntilFresh.contextMetadata + contextMetadata = metadata.New(map[string]string{"source": "cache"}) } // For the first 100ms from watch creation, watch progress requests are ignored. time.Sleep(200 * time.Millisecond) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/progress/watch_progress.go similarity index 86% rename from staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go rename to staging/src/k8s.io/apiserver/pkg/storage/cacher/progress/watch_progress.go index 087fb14e546..76fe73f1365 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/progress/watch_progress.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package cacher +package progress import ( "context" @@ -36,8 +36,8 @@ const ( progressRequestPeriod = 100 * time.Millisecond ) -func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock TickerFactory, contextMetadata metadata.MD) *conditionalProgressRequester { - pr := &conditionalProgressRequester{ +func NewConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock TickerFactory, contextMetadata metadata.MD) *ConditionalProgressRequester { + pr := &ConditionalProgressRequester{ clock: clock, requestWatchProgress: requestWatchProgress, contextMetadata: contextMetadata, @@ -52,9 +52,9 @@ type TickerFactory interface { NewTimer(time.Duration) clock.Timer } -// conditionalProgressRequester will request progress notification if there +// ConditionalProgressRequester will request progress notification if there // is a request waiting for watch cache to be fresh. -type conditionalProgressRequester struct { +type ConditionalProgressRequester struct { clock TickerFactory requestWatchProgress WatchProgressRequester contextMetadata metadata.MD @@ -65,7 +65,7 @@ type conditionalProgressRequester struct { stopped bool } -func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) { +func (pr *ConditionalProgressRequester) Run(stopCh <-chan struct{}) { ctx := wait.ContextForChannel(stopCh) if pr.contextMetadata != nil { ctx = metadata.NewOutgoingContext(ctx, pr.contextMetadata) @@ -115,14 +115,14 @@ func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) { } } -func (pr *conditionalProgressRequester) Add() { +func (pr *ConditionalProgressRequester) Add() { pr.mux.Lock() defer pr.mux.Unlock() pr.waiting += 1 pr.cond.Signal() } -func (pr *conditionalProgressRequester) Remove() { +func (pr *ConditionalProgressRequester) Remove() { pr.mux.Lock() defer pr.mux.Unlock() pr.waiting -= 1 diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/progress/watch_progress_test.go similarity index 97% rename from staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress_test.go rename to staging/src/k8s.io/apiserver/pkg/storage/cacher/progress/watch_progress_test.go index 57c1f23d926..8fe71df7f7a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/progress/watch_progress_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package cacher +package progress import ( "context" @@ -115,12 +115,12 @@ func TestConditionalProgressRequester(t *testing.T) { func newTestConditionalProgressRequester(clock clock.WithTicker) *testConditionalProgressRequester { pr := &testConditionalProgressRequester{} - pr.conditionalProgressRequester = newConditionalProgressRequester(pr.RequestWatchProgress, clock, nil) + pr.ConditionalProgressRequester = NewConditionalProgressRequester(pr.RequestWatchProgress, clock, nil) return pr } type testConditionalProgressRequester struct { - *conditionalProgressRequester + *ConditionalProgressRequester progressRequestsSentCount atomic.Int32 } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 63783acf51c..835840ba7aa 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -33,6 +33,7 @@ import ( "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/cacher/metrics" + "k8s.io/apiserver/pkg/storage/cacher/progress" etcdfeature "k8s.io/apiserver/pkg/storage/feature" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/cache" @@ -150,7 +151,7 @@ type watchCache struct { // Requests progress notification if there are requests waiting for watch // to be fresh - waitingUntilFresh *conditionalProgressRequester + waitingUntilFresh *progress.ConditionalProgressRequester // Stores previous snapshots of orderedLister to allow serving requests from previous revisions. snapshots *storeSnapshotter @@ -165,7 +166,7 @@ func newWatchCache( clock clock.WithTicker, eventFreshDuration time.Duration, groupResource schema.GroupResource, - progressRequester *conditionalProgressRequester) *watchCache { + progressRequester *progress.ConditionalProgressRequester) *watchCache { wc := &watchCache{ capacity: defaultLowerBoundCapacity, keyFunc: keyFunc, diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index ebc3e8ad774..a6a59ed1873 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -40,6 +40,7 @@ import ( "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/cacher/metrics" + "k8s.io/apiserver/pkg/storage/cacher/progress" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/cache" featuregatetesting "k8s.io/component-base/featuregate/testing" @@ -128,7 +129,7 @@ func newTestWatchCache(capacity int, eventFreshDuration time.Duration, indexers wc := &testWatchCache{} wc.bookmarkRevision = make(chan int64, 1) wc.stopCh = make(chan struct{}) - pr := newConditionalProgressRequester(wc.RequestWatchProgress, &immediateTickerFactory{}, nil) + pr := progress.NewConditionalProgressRequester(wc.RequestWatchProgress, &immediateTickerFactory{}, nil) go pr.Run(wc.stopCh) wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), eventFreshDuration, schema.GroupResource{Resource: "pods"}, pr) // To preserve behavior of tests that assume a given capacity,