mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Improve logging in cacheWatcher
This commit is contained in:
parent
c66576b212
commit
b78f0b3106
@ -487,11 +487,14 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
|
|||||||
|
|
||||||
// Determine watch timeout('0' means deadline is not set, ignore checking)
|
// Determine watch timeout('0' means deadline is not set, ignore checking)
|
||||||
deadline, _ := ctx.Deadline()
|
deadline, _ := ctx.Deadline()
|
||||||
|
|
||||||
|
identifier := fmt.Sprintf("key: %q, labels: %q, fields: %q", key, pred.Label, pred.Field)
|
||||||
|
|
||||||
// Create a watcher here to reduce memory allocations under lock,
|
// Create a watcher here to reduce memory allocations under lock,
|
||||||
// given that memory allocation may trigger GC and block the thread.
|
// given that memory allocation may trigger GC and block the thread.
|
||||||
// Also note that emptyFunc is a placeholder, until we will be able
|
// Also note that emptyFunc is a placeholder, until we will be able
|
||||||
// to compute watcher.forget function (which has to happen under lock).
|
// to compute watcher.forget function (which has to happen under lock).
|
||||||
watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, deadline, pred.AllowWatchBookmarks, c.objectType)
|
watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, deadline, pred.AllowWatchBookmarks, c.objectType, identifier)
|
||||||
|
|
||||||
// We explicitly use thread unsafe version and do locking ourself to ensure that
|
// We explicitly use thread unsafe version and do locking ourself to ensure that
|
||||||
// no new events will be processed in the meantime. The watchCache will be unlocked
|
// no new events will be processed in the meantime. The watchCache will be unlocked
|
||||||
@ -1194,9 +1197,13 @@ type cacheWatcher struct {
|
|||||||
allowWatchBookmarks bool
|
allowWatchBookmarks bool
|
||||||
// Object type of the cache watcher interests
|
// Object type of the cache watcher interests
|
||||||
objectType reflect.Type
|
objectType reflect.Type
|
||||||
|
|
||||||
|
// human readable identifier that helps assigning cacheWatcher
|
||||||
|
// instance with request
|
||||||
|
identifier string
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner, deadline time.Time, allowWatchBookmarks bool, objectType reflect.Type) *cacheWatcher {
|
func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner, deadline time.Time, allowWatchBookmarks bool, objectType reflect.Type, identifier string) *cacheWatcher {
|
||||||
return &cacheWatcher{
|
return &cacheWatcher{
|
||||||
input: make(chan *watchCacheEvent, chanSize),
|
input: make(chan *watchCacheEvent, chanSize),
|
||||||
result: make(chan watch.Event, chanSize),
|
result: make(chan watch.Event, chanSize),
|
||||||
@ -1208,6 +1215,7 @@ func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), ve
|
|||||||
deadline: deadline,
|
deadline: deadline,
|
||||||
allowWatchBookmarks: allowWatchBookmarks,
|
allowWatchBookmarks: allowWatchBookmarks,
|
||||||
objectType: objectType,
|
objectType: objectType,
|
||||||
|
identifier: identifier,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1250,7 +1258,7 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
|
|||||||
// This means that we couldn't send event to that watcher.
|
// This means that we couldn't send event to that watcher.
|
||||||
// Since we don't want to block on it infinitely,
|
// Since we don't want to block on it infinitely,
|
||||||
// we simply terminate it.
|
// we simply terminate it.
|
||||||
klog.V(1).Infof("Forcing watcher close due to unresponsiveness: %v", c.objectType.String())
|
klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v", c.objectType.String(), c.identifier)
|
||||||
terminatedWatchersCounter.WithLabelValues(c.objectType.String()).Inc()
|
terminatedWatchersCounter.WithLabelValues(c.objectType.String()).Inc()
|
||||||
c.forget()
|
c.forget()
|
||||||
}
|
}
|
||||||
@ -1402,7 +1410,7 @@ func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEven
|
|||||||
}
|
}
|
||||||
processingTime := time.Since(startTime)
|
processingTime := time.Since(startTime)
|
||||||
if processingTime > initProcessThreshold {
|
if processingTime > initProcessThreshold {
|
||||||
klog.V(2).Infof("processing %d initEvents of %s took %v", len(initEvents), objType, processingTime)
|
klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", len(initEvents), objType, c.identifier, processingTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
defer close(c.result)
|
defer close(c.result)
|
||||||
|
@ -70,7 +70,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
|
|||||||
}
|
}
|
||||||
// set the size of the buffer of w.result to 0, so that the writes to
|
// set the size of the buffer of w.result to 0, so that the writes to
|
||||||
// w.result is blocked.
|
// w.result is blocked.
|
||||||
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType)
|
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "")
|
||||||
go w.process(context.Background(), initEvents, 0)
|
go w.process(context.Background(), initEvents, 0)
|
||||||
w.Stop()
|
w.Stop()
|
||||||
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
|
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
|
||||||
@ -190,7 +190,7 @@ TestCase:
|
|||||||
testCase.events[j].ResourceVersion = uint64(j) + 1
|
testCase.events[j].ResourceVersion = uint64(j) + 1
|
||||||
}
|
}
|
||||||
|
|
||||||
w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType)
|
w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "")
|
||||||
go w.process(context.Background(), testCase.events, 0)
|
go w.process(context.Background(), testCase.events, 0)
|
||||||
|
|
||||||
ch := w.ResultChan()
|
ch := w.ResultChan()
|
||||||
@ -527,7 +527,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
|
|||||||
// timeout to zero and run the Stop goroutine concurrently.
|
// timeout to zero and run the Stop goroutine concurrently.
|
||||||
// May sure that the watch will not be blocked on Stop.
|
// May sure that the watch will not be blocked on Stop.
|
||||||
for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
|
for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
|
||||||
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType)
|
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "")
|
||||||
go w.Stop()
|
go w.Stop()
|
||||||
select {
|
select {
|
||||||
case <-done:
|
case <-done:
|
||||||
@ -539,7 +539,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
|
|||||||
deadline := time.Now().Add(time.Hour)
|
deadline := time.Now().Add(time.Hour)
|
||||||
// After that, verifies the cacheWatcher.process goroutine works correctly.
|
// After that, verifies the cacheWatcher.process goroutine works correctly.
|
||||||
for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
|
for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
|
||||||
w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, objectType)
|
w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, objectType, "")
|
||||||
w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)}
|
w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)}
|
||||||
ctx, _ := context.WithDeadline(context.Background(), deadline)
|
ctx, _ := context.WithDeadline(context.Background(), deadline)
|
||||||
go w.process(ctx, nil, 0)
|
go w.process(ctx, nil, 0)
|
||||||
@ -598,7 +598,7 @@ func TestTimeBucketWatchersBasic(t *testing.T) {
|
|||||||
forget := func() {}
|
forget := func() {}
|
||||||
|
|
||||||
newWatcher := func(deadline time.Time) *cacheWatcher {
|
newWatcher := func(deadline time.Time) *cacheWatcher {
|
||||||
return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, objectType)
|
return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, objectType, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
clock := clock.NewFakeClock(time.Now())
|
clock := clock.NewFakeClock(time.Now())
|
||||||
|
Loading…
Reference in New Issue
Block a user