mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 20:53:33 +00:00
Merge pull request #111807 from ncdc/watch-cache-unstructured-details
Watch cache: use resource.group for object type in log messages and metrics
This commit is contained in:
commit
2b4e85024c
@ -56,6 +56,7 @@ func StorageWithCacher() generic.StorageDecorator {
|
|||||||
cacherConfig := cacherstorage.Config{
|
cacherConfig := cacherstorage.Config{
|
||||||
Storage: s,
|
Storage: s,
|
||||||
Versioner: storage.APIObjectVersioner{},
|
Versioner: storage.APIObjectVersioner{},
|
||||||
|
GroupResource: storageConfig.GroupResource,
|
||||||
ResourcePrefix: resourcePrefix,
|
ResourcePrefix: resourcePrefix,
|
||||||
KeyFunc: keyFunc,
|
KeyFunc: keyFunc,
|
||||||
NewFunc: newFunc,
|
NewFunc: newFunc,
|
||||||
|
@ -2318,6 +2318,7 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
|
|||||||
config := cacherstorage.Config{
|
config := cacherstorage.Config{
|
||||||
Storage: s,
|
Storage: s,
|
||||||
Versioner: storage.APIObjectVersioner{},
|
Versioner: storage.APIObjectVersioner{},
|
||||||
|
GroupResource: schema.GroupResource{Resource: "pods"},
|
||||||
ResourcePrefix: podPrefix,
|
ResourcePrefix: podPrefix,
|
||||||
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) },
|
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) },
|
||||||
GetAttrsFunc: getPodAttrs,
|
GetAttrsFunc: getPodAttrs,
|
||||||
|
@ -31,6 +31,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
@ -69,6 +70,10 @@ type Config struct {
|
|||||||
// An underlying storage.Versioner.
|
// An underlying storage.Versioner.
|
||||||
Versioner storage.Versioner
|
Versioner storage.Versioner
|
||||||
|
|
||||||
|
// The GroupResource the cacher is caching. Used for disambiguating *unstructured.Unstructured (CRDs) in logging
|
||||||
|
// and metrics.
|
||||||
|
GroupResource schema.GroupResource
|
||||||
|
|
||||||
// The Cache will be caching objects of a given Type and assumes that they
|
// The Cache will be caching objects of a given Type and assumes that they
|
||||||
// are all stored under ResourcePrefix directory in the underlying database.
|
// are all stored under ResourcePrefix directory in the underlying database.
|
||||||
ResourcePrefix string
|
ResourcePrefix string
|
||||||
@ -146,13 +151,13 @@ func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *indexedWatchers) terminateAll(objectType reflect.Type, done func(*cacheWatcher)) {
|
func (i *indexedWatchers) terminateAll(groupResource schema.GroupResource, done func(*cacheWatcher)) {
|
||||||
// note that we don't have to call setDrainInputBufferLocked method on the watchers
|
// note that we don't have to call setDrainInputBufferLocked method on the watchers
|
||||||
// because we take advantage of the default value - stop immediately
|
// because we take advantage of the default value - stop immediately
|
||||||
// also watchers that have had already its draining strategy set
|
// also watchers that have had already its draining strategy set
|
||||||
// are no longer available (they were removed from the allWatchers and the valueWatchers maps)
|
// are no longer available (they were removed from the allWatchers and the valueWatchers maps)
|
||||||
if len(i.allWatchers) > 0 || len(i.valueWatchers) > 0 {
|
if len(i.allWatchers) > 0 || len(i.valueWatchers) > 0 {
|
||||||
klog.Warningf("Terminating all watchers from cacher %v", objectType)
|
klog.Warningf("Terminating all watchers from cacher %v", groupResource)
|
||||||
}
|
}
|
||||||
i.allWatchers.terminateAll(done)
|
i.allWatchers.terminateAll(done)
|
||||||
for _, watchers := range i.valueWatchers {
|
for _, watchers := range i.valueWatchers {
|
||||||
@ -258,6 +263,8 @@ type Cacher struct {
|
|||||||
|
|
||||||
// Expected type of objects in the underlying cache.
|
// Expected type of objects in the underlying cache.
|
||||||
objectType reflect.Type
|
objectType reflect.Type
|
||||||
|
// Used for logging, to disambiguate *unstructured.Unstructured (CRDs)
|
||||||
|
groupResource schema.GroupResource
|
||||||
|
|
||||||
// "sliding window" of recent changes of objects and the current state.
|
// "sliding window" of recent changes of objects and the current state.
|
||||||
watchCache *watchCache
|
watchCache *watchCache
|
||||||
@ -347,6 +354,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
|
|||||||
ready: newReady(),
|
ready: newReady(),
|
||||||
storage: config.Storage,
|
storage: config.Storage,
|
||||||
objectType: objType,
|
objectType: objType,
|
||||||
|
groupResource: config.GroupResource,
|
||||||
versioner: config.Versioner,
|
versioner: config.Versioner,
|
||||||
newFunc: config.NewFunc,
|
newFunc: config.NewFunc,
|
||||||
indexedTrigger: indexedTrigger,
|
indexedTrigger: indexedTrigger,
|
||||||
@ -377,7 +385,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
watchCache := newWatchCache(
|
watchCache := newWatchCache(
|
||||||
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, objType)
|
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource)
|
||||||
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
|
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
|
||||||
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
|
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
|
||||||
|
|
||||||
@ -422,8 +430,8 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
|
|||||||
c.watchCache.SetOnReplace(func() {
|
c.watchCache.SetOnReplace(func() {
|
||||||
successfulList = true
|
successfulList = true
|
||||||
c.ready.set(true)
|
c.ready.set(true)
|
||||||
klog.V(1).Infof("cacher (%v): initialized", c.objectType.String())
|
klog.V(1).Infof("cacher (%v): initialized", c.groupResource.String())
|
||||||
metrics.WatchCacheInitializations.WithLabelValues(c.objectType.String()).Inc()
|
metrics.WatchCacheInitializations.WithLabelValues(c.groupResource.String()).Inc()
|
||||||
})
|
})
|
||||||
defer func() {
|
defer func() {
|
||||||
if successfulList {
|
if successfulList {
|
||||||
@ -437,7 +445,7 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
|
|||||||
// Also note that startCaching is called in a loop, so there's no need
|
// Also note that startCaching is called in a loop, so there's no need
|
||||||
// to have another loop here.
|
// to have another loop here.
|
||||||
if err := c.reflector.ListAndWatch(stopChannel); err != nil {
|
if err := c.reflector.ListAndWatch(stopChannel); err != nil {
|
||||||
klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.objectType.String(), err)
|
klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.groupResource.String(), err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -508,7 +516,16 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
|
|||||||
// given that memory allocation may trigger GC and block the thread.
|
// given that memory allocation may trigger GC and block the thread.
|
||||||
// Also note that emptyFunc is a placeholder, until we will be able
|
// Also note that emptyFunc is a placeholder, until we will be able
|
||||||
// to compute watcher.forget function (which has to happen under lock).
|
// to compute watcher.forget function (which has to happen under lock).
|
||||||
watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, deadline, pred.AllowWatchBookmarks, c.objectType, identifier)
|
watcher := newCacheWatcher(
|
||||||
|
chanSize,
|
||||||
|
filterWithAttrsFunction(key, pred),
|
||||||
|
emptyFunc,
|
||||||
|
c.versioner,
|
||||||
|
deadline,
|
||||||
|
pred.AllowWatchBookmarks,
|
||||||
|
c.groupResource,
|
||||||
|
identifier,
|
||||||
|
)
|
||||||
|
|
||||||
// We explicitly use thread unsafe version and do locking ourself to ensure that
|
// We explicitly use thread unsafe version and do locking ourself to ensure that
|
||||||
// no new events will be processed in the meantime. The watchCache will be unlocked
|
// no new events will be processed in the meantime. The watchCache will be unlocked
|
||||||
@ -654,7 +671,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
|
|||||||
|
|
||||||
trace := utiltrace.New("cacher list",
|
trace := utiltrace.New("cacher list",
|
||||||
utiltrace.Field{"audit-id", endpointsrequest.GetAuditIDTruncated(ctx)},
|
utiltrace.Field{"audit-id", endpointsrequest.GetAuditIDTruncated(ctx)},
|
||||||
utiltrace.Field{Key: "type", Value: c.objectType.String()})
|
utiltrace.Field{Key: "type", Value: c.groupResource.String()})
|
||||||
defer trace.LogIfLong(500 * time.Millisecond)
|
defer trace.LogIfLong(500 * time.Millisecond)
|
||||||
|
|
||||||
if err := c.ready.wait(); err != nil {
|
if err := c.ready.wait(); err != nil {
|
||||||
@ -757,7 +774,7 @@ func (c *Cacher) triggerValuesThreadUnsafe(event *watchCacheEvent) ([]string, bo
|
|||||||
func (c *Cacher) processEvent(event *watchCacheEvent) {
|
func (c *Cacher) processEvent(event *watchCacheEvent) {
|
||||||
if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {
|
if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {
|
||||||
// Monitor if this gets backed up, and how much.
|
// Monitor if this gets backed up, and how much.
|
||||||
klog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen)
|
klog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.groupResource.String(), curLen)
|
||||||
}
|
}
|
||||||
c.incoming <- *event
|
c.incoming <- *event
|
||||||
}
|
}
|
||||||
@ -788,7 +805,7 @@ func (c *Cacher) dispatchEvents() {
|
|||||||
c.dispatchEvent(&event)
|
c.dispatchEvent(&event)
|
||||||
}
|
}
|
||||||
lastProcessedResourceVersion = event.ResourceVersion
|
lastProcessedResourceVersion = event.ResourceVersion
|
||||||
metrics.EventsCounter.WithLabelValues(c.objectType.String()).Inc()
|
metrics.EventsCounter.WithLabelValues(c.groupResource.String()).Inc()
|
||||||
case <-bookmarkTimer.C():
|
case <-bookmarkTimer.C():
|
||||||
bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25))
|
bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25))
|
||||||
// Never send a bookmark event if we did not see an event here, this is fine
|
// Never send a bookmark event if we did not see an event here, this is fine
|
||||||
@ -1008,7 +1025,7 @@ func (c *Cacher) finishDispatching() {
|
|||||||
func (c *Cacher) terminateAllWatchers() {
|
func (c *Cacher) terminateAllWatchers() {
|
||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
c.watchers.terminateAll(c.objectType, c.stopWatcherLocked)
|
c.watchers.terminateAll(c.groupResource, c.stopWatcherLocked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cacher) stopWatcherLocked(watcher *cacheWatcher) {
|
func (c *Cacher) stopWatcherLocked(watcher *cacheWatcher) {
|
||||||
@ -1176,8 +1193,7 @@ type cacheWatcher struct {
|
|||||||
// save it here to send bookmark events before that.
|
// save it here to send bookmark events before that.
|
||||||
deadline time.Time
|
deadline time.Time
|
||||||
allowWatchBookmarks bool
|
allowWatchBookmarks bool
|
||||||
// Object type of the cache watcher interests
|
groupResource schema.GroupResource
|
||||||
objectType reflect.Type
|
|
||||||
|
|
||||||
// human readable identifier that helps assigning cacheWatcher
|
// human readable identifier that helps assigning cacheWatcher
|
||||||
// instance with request
|
// instance with request
|
||||||
@ -1188,7 +1204,16 @@ type cacheWatcher struct {
|
|||||||
drainInputBuffer bool
|
drainInputBuffer bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(bool), versioner storage.Versioner, deadline time.Time, allowWatchBookmarks bool, objectType reflect.Type, identifier string) *cacheWatcher {
|
func newCacheWatcher(
|
||||||
|
chanSize int,
|
||||||
|
filter filterWithAttrsFunc,
|
||||||
|
forget func(bool),
|
||||||
|
versioner storage.Versioner,
|
||||||
|
deadline time.Time,
|
||||||
|
allowWatchBookmarks bool,
|
||||||
|
groupResource schema.GroupResource,
|
||||||
|
identifier string,
|
||||||
|
) *cacheWatcher {
|
||||||
return &cacheWatcher{
|
return &cacheWatcher{
|
||||||
input: make(chan *watchCacheEvent, chanSize),
|
input: make(chan *watchCacheEvent, chanSize),
|
||||||
result: make(chan watch.Event, chanSize),
|
result: make(chan watch.Event, chanSize),
|
||||||
@ -1199,7 +1224,7 @@ func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(bool)
|
|||||||
versioner: versioner,
|
versioner: versioner,
|
||||||
deadline: deadline,
|
deadline: deadline,
|
||||||
allowWatchBookmarks: allowWatchBookmarks,
|
allowWatchBookmarks: allowWatchBookmarks,
|
||||||
objectType: objectType,
|
groupResource: groupResource,
|
||||||
identifier: identifier,
|
identifier: identifier,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1256,8 +1281,8 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
|
|||||||
// This means that we couldn't send event to that watcher.
|
// This means that we couldn't send event to that watcher.
|
||||||
// Since we don't want to block on it infinitely,
|
// Since we don't want to block on it infinitely,
|
||||||
// we simply terminate it.
|
// we simply terminate it.
|
||||||
klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.objectType.String(), c.identifier, len(c.input), len(c.result))
|
klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.groupResource.String(), c.identifier, len(c.input), len(c.result))
|
||||||
metrics.TerminatedWatchersCounter.WithLabelValues(c.objectType.String()).Inc()
|
metrics.TerminatedWatchersCounter.WithLabelValues(c.groupResource.String()).Inc()
|
||||||
c.forget(false)
|
c.forget(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1455,13 +1480,12 @@ func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watch
|
|||||||
initEventCount++
|
initEventCount++
|
||||||
}
|
}
|
||||||
|
|
||||||
objType := c.objectType.String()
|
|
||||||
if initEventCount > 0 {
|
if initEventCount > 0 {
|
||||||
metrics.InitCounter.WithLabelValues(objType).Add(float64(initEventCount))
|
metrics.InitCounter.WithLabelValues(c.groupResource.String()).Add(float64(initEventCount))
|
||||||
}
|
}
|
||||||
processingTime := time.Since(startTime)
|
processingTime := time.Since(startTime)
|
||||||
if processingTime > initProcessThreshold {
|
if processingTime > initProcessThreshold {
|
||||||
klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, objType, c.identifier, processingTime)
|
klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, c.groupResource, c.identifier, processingTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.process(ctx, resourceVersion)
|
c.process(ctx, resourceVersion)
|
||||||
|
@ -34,6 +34,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||||
"k8s.io/apimachinery/pkg/util/diff"
|
"k8s.io/apimachinery/pkg/util/diff"
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
@ -47,10 +48,6 @@ import (
|
|||||||
testingclock "k8s.io/utils/clock/testing"
|
testingclock "k8s.io/utils/clock/testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
objectType = reflect.TypeOf(&v1.Pod{})
|
|
||||||
)
|
|
||||||
|
|
||||||
// verifies the cacheWatcher.process goroutine is properly cleaned up even if
|
// verifies the cacheWatcher.process goroutine is properly cleaned up even if
|
||||||
// the writes to cacheWatcher.result channel is blocked.
|
// the writes to cacheWatcher.result channel is blocked.
|
||||||
func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
|
func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
|
||||||
@ -74,7 +71,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
|
|||||||
}
|
}
|
||||||
// set the size of the buffer of w.result to 0, so that the writes to
|
// set the size of the buffer of w.result to 0, so that the writes to
|
||||||
// w.result is blocked.
|
// w.result is blocked.
|
||||||
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "")
|
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
|
||||||
go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0)
|
go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0)
|
||||||
w.Stop()
|
w.Stop()
|
||||||
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
|
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
|
||||||
@ -194,7 +191,7 @@ TestCase:
|
|||||||
testCase.events[j].ResourceVersion = uint64(j) + 1
|
testCase.events[j].ResourceVersion = uint64(j) + 1
|
||||||
}
|
}
|
||||||
|
|
||||||
w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "")
|
w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
|
||||||
go w.processInterval(context.Background(), intervalFromEvents(testCase.events), 0)
|
go w.processInterval(context.Background(), intervalFromEvents(testCase.events), 0)
|
||||||
|
|
||||||
ch := w.ResultChan()
|
ch := w.ResultChan()
|
||||||
@ -266,6 +263,7 @@ func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) {
|
|||||||
config := Config{
|
config := Config{
|
||||||
Storage: s,
|
Storage: s,
|
||||||
Versioner: testVersioner{},
|
Versioner: testVersioner{},
|
||||||
|
GroupResource: schema.GroupResource{Resource: "pods"},
|
||||||
ResourcePrefix: prefix,
|
ResourcePrefix: prefix,
|
||||||
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
|
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
|
||||||
GetAttrsFunc: storage.DefaultNamespaceScopedAttr,
|
GetAttrsFunc: storage.DefaultNamespaceScopedAttr,
|
||||||
@ -537,7 +535,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
|
|||||||
// timeout to zero and run the Stop goroutine concurrently.
|
// timeout to zero and run the Stop goroutine concurrently.
|
||||||
// May sure that the watch will not be blocked on Stop.
|
// May sure that the watch will not be blocked on Stop.
|
||||||
for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
|
for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
|
||||||
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "")
|
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
|
||||||
go w.Stop()
|
go w.Stop()
|
||||||
select {
|
select {
|
||||||
case <-done:
|
case <-done:
|
||||||
@ -549,7 +547,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
|
|||||||
deadline := time.Now().Add(time.Hour)
|
deadline := time.Now().Add(time.Hour)
|
||||||
// After that, verifies the cacheWatcher.process goroutine works correctly.
|
// After that, verifies the cacheWatcher.process goroutine works correctly.
|
||||||
for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
|
for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
|
||||||
w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, objectType, "")
|
w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, schema.GroupResource{Resource: "pods"}, "")
|
||||||
w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)}
|
w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)}
|
||||||
ctx, cancel := context.WithDeadline(context.Background(), deadline)
|
ctx, cancel := context.WithDeadline(context.Background(), deadline)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
@ -674,7 +672,7 @@ func TestTimeBucketWatchersBasic(t *testing.T) {
|
|||||||
forget := func(bool) {}
|
forget := func(bool) {}
|
||||||
|
|
||||||
newWatcher := func(deadline time.Time) *cacheWatcher {
|
newWatcher := func(deadline time.Time) *cacheWatcher {
|
||||||
return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, objectType, "")
|
return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
clock := testingclock.NewFakeClock(time.Now())
|
clock := testingclock.NewFakeClock(time.Now())
|
||||||
@ -1616,7 +1614,7 @@ func TestCacheWatcherDraining(t *testing.T) {
|
|||||||
makeWatchCacheEvent(5),
|
makeWatchCacheEvent(5),
|
||||||
makeWatchCacheEvent(6),
|
makeWatchCacheEvent(6),
|
||||||
}
|
}
|
||||||
w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, objectType, "")
|
w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
|
||||||
go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1)
|
go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1)
|
||||||
if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) {
|
if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) {
|
||||||
t.Fatal("failed adding an even to the watcher")
|
t.Fatal("failed adding an even to the watcher")
|
||||||
@ -1657,7 +1655,7 @@ func TestCacheWatcherDrainingRequestedButNotDrained(t *testing.T) {
|
|||||||
makeWatchCacheEvent(5),
|
makeWatchCacheEvent(5),
|
||||||
makeWatchCacheEvent(6),
|
makeWatchCacheEvent(6),
|
||||||
}
|
}
|
||||||
w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, objectType, "")
|
w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
|
||||||
go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1)
|
go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1)
|
||||||
if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) {
|
if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) {
|
||||||
t.Fatal("failed adding an even to the watcher")
|
t.Fatal("failed adding an even to the watcher")
|
||||||
|
@ -19,7 +19,6 @@ package cacher
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"reflect"
|
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -28,6 +27,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
"k8s.io/apiserver/pkg/storage/cacher/metrics"
|
"k8s.io/apiserver/pkg/storage/cacher/metrics"
|
||||||
@ -189,8 +189,8 @@ type watchCache struct {
|
|||||||
// An underlying storage.Versioner.
|
// An underlying storage.Versioner.
|
||||||
versioner storage.Versioner
|
versioner storage.Versioner
|
||||||
|
|
||||||
// cacher's objectType.
|
// cacher's group resource
|
||||||
objectType reflect.Type
|
groupResource schema.GroupResource
|
||||||
|
|
||||||
// For testing cache interval invalidation.
|
// For testing cache interval invalidation.
|
||||||
indexValidator indexValidator
|
indexValidator indexValidator
|
||||||
@ -203,7 +203,7 @@ func newWatchCache(
|
|||||||
versioner storage.Versioner,
|
versioner storage.Versioner,
|
||||||
indexers *cache.Indexers,
|
indexers *cache.Indexers,
|
||||||
clock clock.Clock,
|
clock clock.Clock,
|
||||||
objectType reflect.Type) *watchCache {
|
groupResource schema.GroupResource) *watchCache {
|
||||||
wc := &watchCache{
|
wc := &watchCache{
|
||||||
capacity: defaultLowerBoundCapacity,
|
capacity: defaultLowerBoundCapacity,
|
||||||
keyFunc: keyFunc,
|
keyFunc: keyFunc,
|
||||||
@ -219,10 +219,9 @@ func newWatchCache(
|
|||||||
eventHandler: eventHandler,
|
eventHandler: eventHandler,
|
||||||
clock: clock,
|
clock: clock,
|
||||||
versioner: versioner,
|
versioner: versioner,
|
||||||
objectType: objectType,
|
groupResource: groupResource,
|
||||||
}
|
}
|
||||||
objType := objectType.String()
|
metrics.WatchCacheCapacity.WithLabelValues(groupResource.String()).Set(float64(wc.capacity))
|
||||||
metrics.WatchCacheCapacity.WithLabelValues(objType).Set(float64(wc.capacity))
|
|
||||||
wc.cond = sync.NewCond(wc.RLocker())
|
wc.cond = sync.NewCond(wc.RLocker())
|
||||||
wc.indexValidator = wc.isIndexValidLocked
|
wc.indexValidator = wc.isIndexValidLocked
|
||||||
|
|
||||||
@ -387,7 +386,7 @@ func (w *watchCache) doCacheResizeLocked(capacity int) {
|
|||||||
newCache[i%capacity] = w.cache[i%w.capacity]
|
newCache[i%capacity] = w.cache[i%w.capacity]
|
||||||
}
|
}
|
||||||
w.cache = newCache
|
w.cache = newCache
|
||||||
metrics.RecordsWatchCacheCapacityChange(w.objectType.String(), w.capacity, capacity)
|
metrics.RecordsWatchCacheCapacityChange(w.groupResource.String(), w.capacity, capacity)
|
||||||
w.capacity = capacity
|
w.capacity = capacity
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -18,7 +18,6 @@ package cacher
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
@ -31,9 +30,9 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/apiserver/pkg/apis/example"
|
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
testingclock "k8s.io/utils/clock/testing"
|
testingclock "k8s.io/utils/clock/testing"
|
||||||
@ -112,7 +111,7 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache {
|
|||||||
}
|
}
|
||||||
versioner := storage.APIObjectVersioner{}
|
versioner := storage.APIObjectVersioner{}
|
||||||
mockHandler := func(*watchCacheEvent) {}
|
mockHandler := func(*watchCacheEvent) {}
|
||||||
wc := newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), reflect.TypeOf(&example.Pod{}))
|
wc := newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), schema.GroupResource{Resource: "pods"})
|
||||||
// To preserve behavior of tests that assume a given capacity,
|
// To preserve behavior of tests that assume a given capacity,
|
||||||
// resize it to th expected size.
|
// resize it to th expected size.
|
||||||
wc.capacity = capacity
|
wc.capacity = capacity
|
||||||
|
@ -121,6 +121,7 @@ func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*cacherstor
|
|||||||
config := cacherstorage.Config{
|
config := cacherstorage.Config{
|
||||||
Storage: s,
|
Storage: s,
|
||||||
Versioner: v,
|
Versioner: v,
|
||||||
|
GroupResource: schema.GroupResource{Resource: "pods"},
|
||||||
ResourcePrefix: prefix,
|
ResourcePrefix: prefix,
|
||||||
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
|
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
|
||||||
GetAttrsFunc: GetAttrs,
|
GetAttrsFunc: GetAttrs,
|
||||||
|
Loading…
Reference in New Issue
Block a user