mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 05:27:21 +00:00
Merge pull request #105483 from MadhavJivrajani/watch-cache-contention
cacher: Use watchCacheInterval to reduce lock contention
This commit is contained in:
commit
5340ae0bae
@ -509,7 +509,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
|
|||||||
// underlying watchCache is calling processEvent under its lock.
|
// underlying watchCache is calling processEvent under its lock.
|
||||||
c.watchCache.RLock()
|
c.watchCache.RLock()
|
||||||
defer c.watchCache.RUnlock()
|
defer c.watchCache.RUnlock()
|
||||||
initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
|
cacheInterval, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// To match the uncached watch implementation, once we have passed authn/authz/admission,
|
// To match the uncached watch implementation, once we have passed authn/authz/admission,
|
||||||
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
|
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
|
||||||
@ -531,7 +531,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
|
|||||||
c.watcherIdx++
|
c.watcherIdx++
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go watcher.processEvents(ctx, initEvents, watchRV)
|
go watcher.processInterval(ctx, cacheInterval, watchRV)
|
||||||
return watcher, nil
|
return watcher, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1333,7 +1333,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
|
|||||||
// would give us non-determinism.
|
// would give us non-determinism.
|
||||||
// At the same time, we don't want to block infinitely on putting
|
// At the same time, we don't want to block infinitely on putting
|
||||||
// to c.result, when c.done is already closed.
|
// to c.result, when c.done is already closed.
|
||||||
|
//
|
||||||
// This ensures that with c.done already close, we at most once go
|
// This ensures that with c.done already close, we at most once go
|
||||||
// into the next select after this. With that, no matter which
|
// into the next select after this. With that, no matter which
|
||||||
// statement we choose there, we will deliver only consecutive
|
// statement we choose there, we will deliver only consecutive
|
||||||
@ -1350,8 +1350,10 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cacheWatcher) processEvents(ctx context.Context, initEvents []*watchCacheEvent, resourceVersion uint64) {
|
func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watchCacheInterval, resourceVersion uint64) {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
|
defer close(c.result)
|
||||||
|
defer c.Stop()
|
||||||
|
|
||||||
// Check how long we are processing initEvents.
|
// Check how long we are processing initEvents.
|
||||||
// As long as these are not processed, we are not processing
|
// As long as these are not processed, we are not processing
|
||||||
@ -1368,22 +1370,46 @@ func (c *cacheWatcher) processEvents(ctx context.Context, initEvents []*watchCac
|
|||||||
// consider increase size of result buffer in those cases.
|
// consider increase size of result buffer in those cases.
|
||||||
const initProcessThreshold = 500 * time.Millisecond
|
const initProcessThreshold = 500 * time.Millisecond
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
for _, event := range initEvents {
|
|
||||||
|
initEventCount := 0
|
||||||
|
for {
|
||||||
|
event, err := cacheInterval.Next()
|
||||||
|
if err != nil {
|
||||||
|
// An error indicates that the cache interval
|
||||||
|
// has been invalidated and can no longer serve
|
||||||
|
// events.
|
||||||
|
//
|
||||||
|
// Initially we considered sending an "out-of-history"
|
||||||
|
// Error event in this case, but because historically
|
||||||
|
// such events weren't sent out of the watchCache, we
|
||||||
|
// decided not to. This is still ok, because on watch
|
||||||
|
// closure, the watcher will try to re-instantiate the
|
||||||
|
// watch and then will get an explicit "out-of-history"
|
||||||
|
// window. There is potential for optimization, but for
|
||||||
|
// now, in order to be on the safe side and not break
|
||||||
|
// custom clients, the cost of it is something that we
|
||||||
|
// are fully accepting.
|
||||||
|
klog.Warningf("couldn't retrieve watch event to serve: %#v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if event == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
c.sendWatchCacheEvent(event)
|
c.sendWatchCacheEvent(event)
|
||||||
|
// With some events already sent, update resourceVersion so that
|
||||||
|
// events that were buffered and not yet processed won't be delivered
|
||||||
|
// to this watcher second time causing going back in time.
|
||||||
|
resourceVersion = event.ResourceVersion
|
||||||
|
initEventCount++
|
||||||
}
|
}
|
||||||
|
|
||||||
objType := c.objectType.String()
|
objType := c.objectType.String()
|
||||||
if len(initEvents) > 0 {
|
if initEventCount > 0 {
|
||||||
initCounter.WithLabelValues(objType).Add(float64(len(initEvents)))
|
initCounter.WithLabelValues(objType).Add(float64(initEventCount))
|
||||||
// With some events already sent, update resourceVersion
|
|
||||||
// so that events that were buffered and not yet processed
|
|
||||||
// won't be delivered to this watcher second time causing
|
|
||||||
// going back in time.
|
|
||||||
resourceVersion = initEvents[len(initEvents)-1].ResourceVersion
|
|
||||||
}
|
}
|
||||||
processingTime := time.Since(startTime)
|
processingTime := time.Since(startTime)
|
||||||
if processingTime > initProcessThreshold {
|
if processingTime > initProcessThreshold {
|
||||||
klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", len(initEvents), objType, c.identifier, processingTime)
|
klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, objType, c.identifier, processingTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.process(ctx, resourceVersion)
|
c.process(ctx, resourceVersion)
|
||||||
@ -1398,8 +1424,6 @@ func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) {
|
|||||||
// process, but we're leaving this to the tuning phase.
|
// process, but we're leaving this to the tuning phase.
|
||||||
utilflowcontrol.WatchInitialized(ctx)
|
utilflowcontrol.WatchInitialized(ctx)
|
||||||
|
|
||||||
defer close(c.result)
|
|
||||||
defer c.Stop()
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case event, ok := <-c.input:
|
case event, ok := <-c.input:
|
||||||
|
@ -74,7 +74,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
|
|||||||
// set the size of the buffer of w.result to 0, so that the writes to
|
// set the size of the buffer of w.result to 0, so that the writes to
|
||||||
// w.result is blocked.
|
// w.result is blocked.
|
||||||
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "")
|
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "")
|
||||||
go w.processEvents(context.Background(), initEvents, 0)
|
go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0)
|
||||||
w.Stop()
|
w.Stop()
|
||||||
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
|
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
|
||||||
lock.RLock()
|
lock.RLock()
|
||||||
@ -194,7 +194,7 @@ TestCase:
|
|||||||
}
|
}
|
||||||
|
|
||||||
w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "")
|
w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "")
|
||||||
go w.processEvents(context.Background(), testCase.events, 0)
|
go w.processInterval(context.Background(), intervalFromEvents(testCase.events), 0)
|
||||||
|
|
||||||
ch := w.ResultChan()
|
ch := w.ResultChan()
|
||||||
for j, event := range testCase.expected {
|
for j, event := range testCase.expected {
|
||||||
@ -542,7 +542,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
|
|||||||
w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, objectType, "")
|
w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, objectType, "")
|
||||||
w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)}
|
w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)}
|
||||||
ctx, _ := context.WithDeadline(context.Background(), deadline)
|
ctx, _ := context.WithDeadline(context.Background(), deadline)
|
||||||
go w.processEvents(ctx, nil, 0)
|
go w.processInterval(ctx, intervalFromEvents(nil), 0)
|
||||||
select {
|
select {
|
||||||
case <-w.ResultChan():
|
case <-w.ResultChan():
|
||||||
case <-time.After(time.Second):
|
case <-time.After(time.Second):
|
||||||
@ -1412,3 +1412,85 @@ func TestCachingObjects(t *testing.T) {
|
|||||||
t.Run("single watcher", func(t *testing.T) { testCachingObjects(t, 1) })
|
t.Run("single watcher", func(t *testing.T) { testCachingObjects(t, 1) })
|
||||||
t.Run("many watcher", func(t *testing.T) { testCachingObjects(t, 3) })
|
t.Run("many watcher", func(t *testing.T) { testCachingObjects(t, 3) })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCacheIntervalInvalidationStopsWatch(t *testing.T) {
|
||||||
|
backingStorage := &dummyStorage{}
|
||||||
|
cacher, _, err := newTestCacher(backingStorage)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
|
defer cacher.Stop()
|
||||||
|
|
||||||
|
// Wait until cacher is initialized.
|
||||||
|
cacher.ready.wait()
|
||||||
|
// Ensure there is enough budget for slow processing since
|
||||||
|
// the entire watch cache is going to be served through the
|
||||||
|
// interval and events won't be popped from the cacheWatcher's
|
||||||
|
// input channel until much later.
|
||||||
|
cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond)
|
||||||
|
|
||||||
|
// We define a custom index validator such that the interval is
|
||||||
|
// able to serve the first bufferSize elements successfully, but
|
||||||
|
// on trying to fill it's buffer again, the indexValidator simulates
|
||||||
|
// an invalidation leading to the watch being closed and the number
|
||||||
|
// of events we actually process to be bufferSize, each event of
|
||||||
|
// type watch.Added.
|
||||||
|
valid := true
|
||||||
|
invalidateCacheInterval := func() {
|
||||||
|
valid = false
|
||||||
|
}
|
||||||
|
once := sync.Once{}
|
||||||
|
indexValidator := func(index int) bool {
|
||||||
|
isValid := valid && (index >= cacher.watchCache.startIndex)
|
||||||
|
once.Do(invalidateCacheInterval)
|
||||||
|
return isValid
|
||||||
|
}
|
||||||
|
cacher.watchCache.indexValidator = indexValidator
|
||||||
|
|
||||||
|
makePod := func(i int) *examplev1.Pod {
|
||||||
|
return &examplev1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: fmt.Sprintf("pod-%d", 1000+i),
|
||||||
|
Namespace: "ns",
|
||||||
|
ResourceVersion: fmt.Sprintf("%d", 1000+i),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 250 is arbitrary, point is to have enough elements such that
|
||||||
|
// it generates more than bufferSize number of events allowing
|
||||||
|
// us to simulate the invalidation of the cache interval.
|
||||||
|
totalPods := 250
|
||||||
|
for i := 0; i < totalPods; i++ {
|
||||||
|
err := cacher.watchCache.Add(makePod(i))
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{
|
||||||
|
ResourceVersion: "999",
|
||||||
|
Predicate: storage.Everything,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create watch: %v", err)
|
||||||
|
}
|
||||||
|
defer w.Stop()
|
||||||
|
|
||||||
|
received := 0
|
||||||
|
resChan := w.ResultChan()
|
||||||
|
for event := range resChan {
|
||||||
|
received++
|
||||||
|
t.Logf("event type: %v, events received so far: %d", event.Type, received)
|
||||||
|
if event.Type != watch.Added {
|
||||||
|
t.Errorf("unexpected event type, expected: %s, got: %s, event: %v", watch.Added, event.Type, event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Since the watch is stopped after the interval is invalidated,
|
||||||
|
// we should have processed exactly bufferSize number of elements.
|
||||||
|
if received != bufferSize {
|
||||||
|
t.Errorf("unexpected number of events received, expected: %d, got: %d", bufferSize+1, received)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -189,6 +189,9 @@ type watchCache struct {
|
|||||||
|
|
||||||
// cacher's objectType.
|
// cacher's objectType.
|
||||||
objectType reflect.Type
|
objectType reflect.Type
|
||||||
|
|
||||||
|
// For testing cache interval invalidation.
|
||||||
|
indexValidator indexValidator
|
||||||
}
|
}
|
||||||
|
|
||||||
func newWatchCache(
|
func newWatchCache(
|
||||||
@ -219,6 +222,8 @@ func newWatchCache(
|
|||||||
objType := objectType.String()
|
objType := objectType.String()
|
||||||
watchCacheCapacity.WithLabelValues(objType).Set(float64(wc.capacity))
|
watchCacheCapacity.WithLabelValues(objType).Set(float64(wc.capacity))
|
||||||
wc.cond = sync.NewCond(wc.RLocker())
|
wc.cond = sync.NewCond(wc.RLocker())
|
||||||
|
wc.indexValidator = wc.isIndexValidLocked
|
||||||
|
|
||||||
return wc
|
return wc
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -568,7 +573,7 @@ func (w *watchCache) SetOnReplace(onReplace func()) {
|
|||||||
w.onReplace = onReplace
|
w.onReplace = onReplace
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) {
|
func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) (*watchCacheInterval, error) {
|
||||||
size := w.endIndex - w.startIndex
|
size := w.endIndex - w.startIndex
|
||||||
var oldest uint64
|
var oldest uint64
|
||||||
switch {
|
switch {
|
||||||
@ -594,27 +599,11 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w
|
|||||||
// current state and only then start watching from that point.
|
// current state and only then start watching from that point.
|
||||||
//
|
//
|
||||||
// TODO: In v2 api, we should stop returning the current state - #13969.
|
// TODO: In v2 api, we should stop returning the current state - #13969.
|
||||||
allItems := w.store.List()
|
ci, err := newCacheIntervalFromStore(w.resourceVersion, w.store, w.getAttrsFunc)
|
||||||
result := make([]*watchCacheEvent, len(allItems))
|
if err != nil {
|
||||||
for i, item := range allItems {
|
return nil, err
|
||||||
elem, ok := item.(*storeElement)
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("not a storeElement: %v", elem)
|
|
||||||
}
|
|
||||||
objLabels, objFields, err := w.getAttrsFunc(elem.Object)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
result[i] = &watchCacheEvent{
|
|
||||||
Type: watch.Added,
|
|
||||||
Object: elem.Object,
|
|
||||||
ObjLabels: objLabels,
|
|
||||||
ObjFields: objFields,
|
|
||||||
Key: elem.Key,
|
|
||||||
ResourceVersion: w.resourceVersion,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return result, nil
|
return ci, nil
|
||||||
}
|
}
|
||||||
if resourceVersion < oldest-1 {
|
if resourceVersion < oldest-1 {
|
||||||
return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1))
|
return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1))
|
||||||
@ -625,11 +614,11 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w
|
|||||||
return w.cache[(w.startIndex+i)%w.capacity].ResourceVersion > resourceVersion
|
return w.cache[(w.startIndex+i)%w.capacity].ResourceVersion > resourceVersion
|
||||||
}
|
}
|
||||||
first := sort.Search(size, f)
|
first := sort.Search(size, f)
|
||||||
result := make([]*watchCacheEvent, size-first)
|
indexerFunc := func(i int) *watchCacheEvent {
|
||||||
for i := 0; i < size-first; i++ {
|
return w.cache[i%w.capacity]
|
||||||
result[i] = w.cache[(w.startIndex+first+i)%w.capacity]
|
|
||||||
}
|
}
|
||||||
return result, nil
|
ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, &w.RWMutex)
|
||||||
|
return ci, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *watchCache) Resync() error {
|
func (w *watchCache) Resync() error {
|
||||||
|
@ -72,9 +72,31 @@ type testWatchCache struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (w *testWatchCache) getAllEventsSince(resourceVersion uint64) ([]*watchCacheEvent, error) {
|
func (w *testWatchCache) getAllEventsSince(resourceVersion uint64) ([]*watchCacheEvent, error) {
|
||||||
w.watchCache.RLock()
|
cacheInterval, err := w.getCacheIntervalForEvents(resourceVersion)
|
||||||
defer w.watchCache.RUnlock()
|
if err != nil {
|
||||||
return w.watchCache.GetAllEventsSinceThreadUnsafe(resourceVersion)
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
result := []*watchCacheEvent{}
|
||||||
|
for {
|
||||||
|
event, err := cacheInterval.Next()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if event == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
result = append(result, event)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *testWatchCache) getCacheIntervalForEvents(resourceVersion uint64) (*watchCacheInterval, error) {
|
||||||
|
w.RLock()
|
||||||
|
defer w.RUnlock()
|
||||||
|
|
||||||
|
return w.GetAllEventsSinceThreadUnsafe(resourceVersion)
|
||||||
}
|
}
|
||||||
|
|
||||||
// newTestWatchCache just adds a fake clock.
|
// newTestWatchCache just adds a fake clock.
|
||||||
|
Loading…
Reference in New Issue
Block a user