Merge pull request #130412 from serathius/watchcache-progress

Move watch progress to separate package.
This commit is contained in:
Kubernetes Prow Robot 2025-02-26 00:16:41 -08:00 committed by GitHub
commit 3d9fcb7c01
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 19 additions and 23 deletions

View File

@ -43,6 +43,7 @@ import (
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher/metrics"
"k8s.io/apiserver/pkg/storage/cacher/progress"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
@ -420,7 +421,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
return nil, fmt.Errorf("config.EventsHistoryWindow (%v) must not be below %v", eventFreshDuration, DefaultEventFreshDuration)
}
progressRequester := newConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock, contextMetadata)
progressRequester := progress.NewConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock, contextMetadata)
watchCache := newWatchCache(
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers,
config.Clock, eventFreshDuration, config.GroupResource, progressRequester)

View File

@ -2815,13 +2815,6 @@ func TestWatchStreamSeparation(t *testing.T) {
expectBookmarkOnEtcd: true,
expectBookmarkOnWatchCache: true,
},
{
name: "common RPC & watch cache context > both get bookmarks",
separateCacheWatchRPC: false,
useWatchCacheContextMetadata: true,
expectBookmarkOnEtcd: true,
expectBookmarkOnWatchCache: true,
},
{
name: "separate RPC > only etcd gets bookmarks",
separateCacheWatchRPC: true,
@ -2877,7 +2870,7 @@ func TestWatchStreamSeparation(t *testing.T) {
var contextMetadata metadata.MD
if tc.useWatchCacheContextMetadata {
contextMetadata = cacher.watchCache.waitingUntilFresh.contextMetadata
contextMetadata = metadata.New(map[string]string{"source": "cache"})
}
// For the first 100ms from watch creation, watch progress requests are ignored.
time.Sleep(200 * time.Millisecond)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
package progress
import (
"context"
@ -36,8 +36,8 @@ const (
progressRequestPeriod = 100 * time.Millisecond
)
func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock TickerFactory, contextMetadata metadata.MD) *conditionalProgressRequester {
pr := &conditionalProgressRequester{
func NewConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock TickerFactory, contextMetadata metadata.MD) *ConditionalProgressRequester {
pr := &ConditionalProgressRequester{
clock: clock,
requestWatchProgress: requestWatchProgress,
contextMetadata: contextMetadata,
@ -52,9 +52,9 @@ type TickerFactory interface {
NewTimer(time.Duration) clock.Timer
}
// conditionalProgressRequester will request progress notification if there
// ConditionalProgressRequester will request progress notification if there
// is a request waiting for watch cache to be fresh.
type conditionalProgressRequester struct {
type ConditionalProgressRequester struct {
clock TickerFactory
requestWatchProgress WatchProgressRequester
contextMetadata metadata.MD
@ -65,7 +65,7 @@ type conditionalProgressRequester struct {
stopped bool
}
func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) {
func (pr *ConditionalProgressRequester) Run(stopCh <-chan struct{}) {
ctx := wait.ContextForChannel(stopCh)
if pr.contextMetadata != nil {
ctx = metadata.NewOutgoingContext(ctx, pr.contextMetadata)
@ -115,14 +115,14 @@ func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) {
}
}
func (pr *conditionalProgressRequester) Add() {
func (pr *ConditionalProgressRequester) Add() {
pr.mux.Lock()
defer pr.mux.Unlock()
pr.waiting += 1
pr.cond.Signal()
}
func (pr *conditionalProgressRequester) Remove() {
func (pr *ConditionalProgressRequester) Remove() {
pr.mux.Lock()
defer pr.mux.Unlock()
pr.waiting -= 1

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
package progress
import (
"context"
@ -115,12 +115,12 @@ func TestConditionalProgressRequester(t *testing.T) {
func newTestConditionalProgressRequester(clock clock.WithTicker) *testConditionalProgressRequester {
pr := &testConditionalProgressRequester{}
pr.conditionalProgressRequester = newConditionalProgressRequester(pr.RequestWatchProgress, clock, nil)
pr.ConditionalProgressRequester = NewConditionalProgressRequester(pr.RequestWatchProgress, clock, nil)
return pr
}
type testConditionalProgressRequester struct {
*conditionalProgressRequester
*ConditionalProgressRequester
progressRequestsSentCount atomic.Int32
}

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher/metrics"
"k8s.io/apiserver/pkg/storage/cacher/progress"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
@ -150,7 +151,7 @@ type watchCache struct {
// Requests progress notification if there are requests waiting for watch
// to be fresh
waitingUntilFresh *conditionalProgressRequester
waitingUntilFresh *progress.ConditionalProgressRequester
// Stores previous snapshots of orderedLister to allow serving requests from previous revisions.
snapshots *storeSnapshotter
@ -165,7 +166,7 @@ func newWatchCache(
clock clock.WithTicker,
eventFreshDuration time.Duration,
groupResource schema.GroupResource,
progressRequester *conditionalProgressRequester) *watchCache {
progressRequester *progress.ConditionalProgressRequester) *watchCache {
wc := &watchCache{
capacity: defaultLowerBoundCapacity,
keyFunc: keyFunc,

View File

@ -40,6 +40,7 @@ import (
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher/metrics"
"k8s.io/apiserver/pkg/storage/cacher/progress"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
featuregatetesting "k8s.io/component-base/featuregate/testing"
@ -128,7 +129,7 @@ func newTestWatchCache(capacity int, eventFreshDuration time.Duration, indexers
wc := &testWatchCache{}
wc.bookmarkRevision = make(chan int64, 1)
wc.stopCh = make(chan struct{})
pr := newConditionalProgressRequester(wc.RequestWatchProgress, &immediateTickerFactory{}, nil)
pr := progress.NewConditionalProgressRequester(wc.RequestWatchProgress, &immediateTickerFactory{}, nil)
go pr.Run(wc.stopCh)
wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), eventFreshDuration, schema.GroupResource{Resource: "pods"}, pr)
// To preserve behavior of tests that assume a given capacity,