mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 13:37:30 +00:00
watch cache: metrics: objectType -> group resource
Use the group resource instead of objectType in watch cache metrics, because all CustomResources are grouped together as *unstructured.Unstructured, instead of 1 entry per type. Signed-off-by: Andy Goldstein <andy.goldstein@redhat.com>
This commit is contained in:
parent
397533a4c2
commit
d08b69e8d3
@ -70,7 +70,8 @@ type Config struct {
|
|||||||
// An underlying storage.Versioner.
|
// An underlying storage.Versioner.
|
||||||
Versioner storage.Versioner
|
Versioner storage.Versioner
|
||||||
|
|
||||||
// The GroupResource the cacher is caching. Used for disambiguating *unstructured.Unstructured (CRDs) in logging.
|
// The GroupResource the cacher is caching. Used for disambiguating *unstructured.Unstructured (CRDs) in logging
|
||||||
|
// and metrics.
|
||||||
GroupResource schema.GroupResource
|
GroupResource schema.GroupResource
|
||||||
|
|
||||||
// The Cache will be caching objects of a given Type and assumes that they
|
// The Cache will be caching objects of a given Type and assumes that they
|
||||||
@ -384,7 +385,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
watchCache := newWatchCache(
|
watchCache := newWatchCache(
|
||||||
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, objType)
|
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource)
|
||||||
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
|
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
|
||||||
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
|
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
|
||||||
|
|
||||||
@ -430,7 +431,7 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
|
|||||||
successfulList = true
|
successfulList = true
|
||||||
c.ready.set(true)
|
c.ready.set(true)
|
||||||
klog.V(1).Infof("cacher (%v): initialized", c.groupResource.String())
|
klog.V(1).Infof("cacher (%v): initialized", c.groupResource.String())
|
||||||
metrics.WatchCacheInitializations.WithLabelValues(c.objectType.String()).Inc()
|
metrics.WatchCacheInitializations.WithLabelValues(c.groupResource.String()).Inc()
|
||||||
})
|
})
|
||||||
defer func() {
|
defer func() {
|
||||||
if successfulList {
|
if successfulList {
|
||||||
@ -804,7 +805,7 @@ func (c *Cacher) dispatchEvents() {
|
|||||||
c.dispatchEvent(&event)
|
c.dispatchEvent(&event)
|
||||||
}
|
}
|
||||||
lastProcessedResourceVersion = event.ResourceVersion
|
lastProcessedResourceVersion = event.ResourceVersion
|
||||||
metrics.EventsCounter.WithLabelValues(c.objectType.String()).Inc()
|
metrics.EventsCounter.WithLabelValues(c.groupResource.String()).Inc()
|
||||||
case <-bookmarkTimer.C():
|
case <-bookmarkTimer.C():
|
||||||
bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25))
|
bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25))
|
||||||
// Never send a bookmark event if we did not see an event here, this is fine
|
// Never send a bookmark event if we did not see an event here, this is fine
|
||||||
@ -1281,7 +1282,7 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
|
|||||||
// Since we don't want to block on it infinitely,
|
// Since we don't want to block on it infinitely,
|
||||||
// we simply terminate it.
|
// we simply terminate it.
|
||||||
klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.groupResource.String(), c.identifier, len(c.input), len(c.result))
|
klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.groupResource.String(), c.identifier, len(c.input), len(c.result))
|
||||||
metrics.TerminatedWatchersCounter.WithLabelValues(c.objectType.String()).Inc()
|
metrics.TerminatedWatchersCounter.WithLabelValues(c.groupResource.String()).Inc()
|
||||||
c.forget(false)
|
c.forget(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1479,9 +1480,8 @@ func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watch
|
|||||||
initEventCount++
|
initEventCount++
|
||||||
}
|
}
|
||||||
|
|
||||||
objType := c.objectType.String()
|
|
||||||
if initEventCount > 0 {
|
if initEventCount > 0 {
|
||||||
metrics.InitCounter.WithLabelValues(objType).Add(float64(initEventCount))
|
metrics.InitCounter.WithLabelValues(c.groupResource.String()).Add(float64(initEventCount))
|
||||||
}
|
}
|
||||||
processingTime := time.Since(startTime)
|
processingTime := time.Since(startTime)
|
||||||
if processingTime > initProcessThreshold {
|
if processingTime > initProcessThreshold {
|
||||||
|
@ -48,10 +48,6 @@ import (
|
|||||||
testingclock "k8s.io/utils/clock/testing"
|
testingclock "k8s.io/utils/clock/testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
objectType = reflect.TypeOf(&v1.Pod{})
|
|
||||||
)
|
|
||||||
|
|
||||||
// verifies the cacheWatcher.process goroutine is properly cleaned up even if
|
// verifies the cacheWatcher.process goroutine is properly cleaned up even if
|
||||||
// the writes to cacheWatcher.result channel is blocked.
|
// the writes to cacheWatcher.result channel is blocked.
|
||||||
func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
|
func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
|
||||||
|
@ -19,7 +19,6 @@ package cacher
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"reflect"
|
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -28,6 +27,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
"k8s.io/apiserver/pkg/storage/cacher/metrics"
|
"k8s.io/apiserver/pkg/storage/cacher/metrics"
|
||||||
@ -189,8 +189,8 @@ type watchCache struct {
|
|||||||
// An underlying storage.Versioner.
|
// An underlying storage.Versioner.
|
||||||
versioner storage.Versioner
|
versioner storage.Versioner
|
||||||
|
|
||||||
// cacher's objectType.
|
// cacher's group resource
|
||||||
objectType reflect.Type
|
groupResource schema.GroupResource
|
||||||
|
|
||||||
// For testing cache interval invalidation.
|
// For testing cache interval invalidation.
|
||||||
indexValidator indexValidator
|
indexValidator indexValidator
|
||||||
@ -203,7 +203,7 @@ func newWatchCache(
|
|||||||
versioner storage.Versioner,
|
versioner storage.Versioner,
|
||||||
indexers *cache.Indexers,
|
indexers *cache.Indexers,
|
||||||
clock clock.Clock,
|
clock clock.Clock,
|
||||||
objectType reflect.Type) *watchCache {
|
groupResource schema.GroupResource) *watchCache {
|
||||||
wc := &watchCache{
|
wc := &watchCache{
|
||||||
capacity: defaultLowerBoundCapacity,
|
capacity: defaultLowerBoundCapacity,
|
||||||
keyFunc: keyFunc,
|
keyFunc: keyFunc,
|
||||||
@ -219,10 +219,9 @@ func newWatchCache(
|
|||||||
eventHandler: eventHandler,
|
eventHandler: eventHandler,
|
||||||
clock: clock,
|
clock: clock,
|
||||||
versioner: versioner,
|
versioner: versioner,
|
||||||
objectType: objectType,
|
groupResource: groupResource,
|
||||||
}
|
}
|
||||||
objType := objectType.String()
|
metrics.WatchCacheCapacity.WithLabelValues(groupResource.String()).Set(float64(wc.capacity))
|
||||||
metrics.WatchCacheCapacity.WithLabelValues(objType).Set(float64(wc.capacity))
|
|
||||||
wc.cond = sync.NewCond(wc.RLocker())
|
wc.cond = sync.NewCond(wc.RLocker())
|
||||||
wc.indexValidator = wc.isIndexValidLocked
|
wc.indexValidator = wc.isIndexValidLocked
|
||||||
|
|
||||||
@ -387,7 +386,7 @@ func (w *watchCache) doCacheResizeLocked(capacity int) {
|
|||||||
newCache[i%capacity] = w.cache[i%w.capacity]
|
newCache[i%capacity] = w.cache[i%w.capacity]
|
||||||
}
|
}
|
||||||
w.cache = newCache
|
w.cache = newCache
|
||||||
metrics.RecordsWatchCacheCapacityChange(w.objectType.String(), w.capacity, capacity)
|
metrics.RecordsWatchCacheCapacityChange(w.groupResource.String(), w.capacity, capacity)
|
||||||
w.capacity = capacity
|
w.capacity = capacity
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -18,7 +18,6 @@ package cacher
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
@ -31,9 +30,9 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/apiserver/pkg/apis/example"
|
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
testingclock "k8s.io/utils/clock/testing"
|
testingclock "k8s.io/utils/clock/testing"
|
||||||
@ -112,7 +111,7 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache {
|
|||||||
}
|
}
|
||||||
versioner := storage.APIObjectVersioner{}
|
versioner := storage.APIObjectVersioner{}
|
||||||
mockHandler := func(*watchCacheEvent) {}
|
mockHandler := func(*watchCacheEvent) {}
|
||||||
wc := newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), reflect.TypeOf(&example.Pod{}))
|
wc := newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), schema.GroupResource{Resource: "pods"})
|
||||||
// To preserve behavior of tests that assume a given capacity,
|
// To preserve behavior of tests that assume a given capacity,
|
||||||
// resize it to th expected size.
|
// resize it to th expected size.
|
||||||
wc.capacity = capacity
|
wc.capacity = capacity
|
||||||
|
Loading…
Reference in New Issue
Block a user