mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-14 06:15:45 +00:00
cacher avoid double locking (#117410)
* cacher: remove locking from watcherBookmarkTimeBuckets it turns out that the watcherBookmarkTimeBuckets is called from only three places/methods: startDispatching, finishDispatching and Watch. All these methods acquire c.Lock() before touching watcherBookmarkTimeBuckets. Thus we could remove explicit locking in watcherBookmarkTimeBuckets since the access is already synced. * cacher: rename watcherBookmarkTimeBuckets methods to indicate that proper synchronisation must be used
This commit is contained in:
parent
0d41d28ea2
commit
eab66a687b
@ -359,30 +359,30 @@ func TestTimeBucketWatchersBasic(t *testing.T) {
|
|||||||
clock := testingclock.NewFakeClock(time.Now())
|
clock := testingclock.NewFakeClock(time.Now())
|
||||||
watchers := newTimeBucketWatchers(clock, defaultBookmarkFrequency)
|
watchers := newTimeBucketWatchers(clock, defaultBookmarkFrequency)
|
||||||
now := clock.Now()
|
now := clock.Now()
|
||||||
watchers.addWatcher(newWatcher(now.Add(10 * time.Second)))
|
watchers.addWatcherThreadUnsafe(newWatcher(now.Add(10 * time.Second)))
|
||||||
watchers.addWatcher(newWatcher(now.Add(20 * time.Second)))
|
watchers.addWatcherThreadUnsafe(newWatcher(now.Add(20 * time.Second)))
|
||||||
watchers.addWatcher(newWatcher(now.Add(20 * time.Second)))
|
watchers.addWatcherThreadUnsafe(newWatcher(now.Add(20 * time.Second)))
|
||||||
|
|
||||||
if len(watchers.watchersBuckets) != 2 {
|
if len(watchers.watchersBuckets) != 2 {
|
||||||
t.Errorf("unexpected bucket size: %#v", watchers.watchersBuckets)
|
t.Errorf("unexpected bucket size: %#v", watchers.watchersBuckets)
|
||||||
}
|
}
|
||||||
watchers0 := watchers.popExpiredWatchers()
|
watchers0 := watchers.popExpiredWatchersThreadUnsafe()
|
||||||
if len(watchers0) != 0 {
|
if len(watchers0) != 0 {
|
||||||
t.Errorf("unexpected bucket size: %#v", watchers0)
|
t.Errorf("unexpected bucket size: %#v", watchers0)
|
||||||
}
|
}
|
||||||
|
|
||||||
clock.Step(10 * time.Second)
|
clock.Step(10 * time.Second)
|
||||||
watchers1 := watchers.popExpiredWatchers()
|
watchers1 := watchers.popExpiredWatchersThreadUnsafe()
|
||||||
if len(watchers1) != 1 || len(watchers1[0]) != 1 {
|
if len(watchers1) != 1 || len(watchers1[0]) != 1 {
|
||||||
t.Errorf("unexpected bucket size: %v", watchers1)
|
t.Errorf("unexpected bucket size: %v", watchers1)
|
||||||
}
|
}
|
||||||
watchers1 = watchers.popExpiredWatchers()
|
watchers1 = watchers.popExpiredWatchersThreadUnsafe()
|
||||||
if len(watchers1) != 0 {
|
if len(watchers1) != 0 {
|
||||||
t.Errorf("unexpected bucket size: %#v", watchers1)
|
t.Errorf("unexpected bucket size: %#v", watchers1)
|
||||||
}
|
}
|
||||||
|
|
||||||
clock.Step(12 * time.Second)
|
clock.Step(12 * time.Second)
|
||||||
watchers2 := watchers.popExpiredWatchers()
|
watchers2 := watchers.popExpiredWatchersThreadUnsafe()
|
||||||
if len(watchers2) != 1 || len(watchers2[0]) != 2 {
|
if len(watchers2) != 1 || len(watchers2[0]) != 2 {
|
||||||
t.Errorf("unexpected bucket size: %#v", watchers2)
|
t.Errorf("unexpected bucket size: %#v", watchers2)
|
||||||
}
|
}
|
||||||
@ -603,49 +603,49 @@ func TestBookmarkAfterResourceVersionWatchers(t *testing.T) {
|
|||||||
|
|
||||||
clock := testingclock.NewFakeClock(time.Now())
|
clock := testingclock.NewFakeClock(time.Now())
|
||||||
target := newTimeBucketWatchers(clock, defaultBookmarkFrequency)
|
target := newTimeBucketWatchers(clock, defaultBookmarkFrequency)
|
||||||
if !target.addWatcher(newWatcher("1", clock.Now().Add(2*time.Minute))) {
|
if !target.addWatcherThreadUnsafe(newWatcher("1", clock.Now().Add(2*time.Minute))) {
|
||||||
t.Fatal("failed adding an even to the watcher")
|
t.Fatal("failed adding an even to the watcher")
|
||||||
}
|
}
|
||||||
|
|
||||||
// the watcher is immediately expired (it's waiting for bookmark, so it is scheduled immediately)
|
// the watcher is immediately expired (it's waiting for bookmark, so it is scheduled immediately)
|
||||||
ret := target.popExpiredWatchers()
|
ret := target.popExpiredWatchersThreadUnsafe()
|
||||||
if len(ret) != 1 || len(ret[0]) != 1 {
|
if len(ret) != 1 || len(ret[0]) != 1 {
|
||||||
t.Fatalf("expected only one watcher to be expired")
|
t.Fatalf("expected only one watcher to be expired")
|
||||||
}
|
}
|
||||||
if !target.addWatcher(ret[0][0]) {
|
if !target.addWatcherThreadUnsafe(ret[0][0]) {
|
||||||
t.Fatal("failed adding an even to the watcher")
|
t.Fatal("failed adding an even to the watcher")
|
||||||
}
|
}
|
||||||
|
|
||||||
// after one second time the watcher is still expired
|
// after one second time the watcher is still expired
|
||||||
clock.Step(1 * time.Second)
|
clock.Step(1 * time.Second)
|
||||||
ret = target.popExpiredWatchers()
|
ret = target.popExpiredWatchersThreadUnsafe()
|
||||||
if len(ret) != 1 || len(ret[0]) != 1 {
|
if len(ret) != 1 || len(ret[0]) != 1 {
|
||||||
t.Fatalf("expected only one watcher to be expired")
|
t.Fatalf("expected only one watcher to be expired")
|
||||||
}
|
}
|
||||||
if !target.addWatcher(ret[0][0]) {
|
if !target.addWatcherThreadUnsafe(ret[0][0]) {
|
||||||
t.Fatal("failed adding an even to the watcher")
|
t.Fatal("failed adding an even to the watcher")
|
||||||
}
|
}
|
||||||
|
|
||||||
// after 29 seconds the watcher is still expired
|
// after 29 seconds the watcher is still expired
|
||||||
clock.Step(29 * time.Second)
|
clock.Step(29 * time.Second)
|
||||||
ret = target.popExpiredWatchers()
|
ret = target.popExpiredWatchersThreadUnsafe()
|
||||||
if len(ret) != 1 || len(ret[0]) != 1 {
|
if len(ret) != 1 || len(ret[0]) != 1 {
|
||||||
t.Fatalf("expected only one watcher to be expired")
|
t.Fatalf("expected only one watcher to be expired")
|
||||||
}
|
}
|
||||||
|
|
||||||
// after confirming the watcher is not expired immediately
|
// after confirming the watcher is not expired immediately
|
||||||
ret[0][0].markBookmarkAfterRvAsReceived(&watchCacheEvent{Type: watch.Bookmark, ResourceVersion: 10, Object: &v1.Pod{}})
|
ret[0][0].markBookmarkAfterRvAsReceived(&watchCacheEvent{Type: watch.Bookmark, ResourceVersion: 10, Object: &v1.Pod{}})
|
||||||
if !target.addWatcher(ret[0][0]) {
|
if !target.addWatcherThreadUnsafe(ret[0][0]) {
|
||||||
t.Fatal("failed adding an even to the watcher")
|
t.Fatal("failed adding an even to the watcher")
|
||||||
}
|
}
|
||||||
clock.Step(30 * time.Second)
|
clock.Step(30 * time.Second)
|
||||||
ret = target.popExpiredWatchers()
|
ret = target.popExpiredWatchersThreadUnsafe()
|
||||||
if len(ret) != 0 {
|
if len(ret) != 0 {
|
||||||
t.Fatalf("didn't expect any watchers to be expired")
|
t.Fatalf("didn't expect any watchers to be expired")
|
||||||
}
|
}
|
||||||
|
|
||||||
clock.Step(30 * time.Second)
|
clock.Step(30 * time.Second)
|
||||||
ret = target.popExpiredWatchers()
|
ret = target.popExpiredWatchersThreadUnsafe()
|
||||||
if len(ret) != 1 || len(ret[0]) != 1 {
|
if len(ret) != 1 || len(ret[0]) != 1 {
|
||||||
t.Fatalf("expected only one watcher to be expired")
|
t.Fatalf("expected only one watcher to be expired")
|
||||||
}
|
}
|
||||||
|
@ -184,7 +184,6 @@ func (i *indexedWatchers) terminateAll(groupResource schema.GroupResource, done
|
|||||||
// second in a bucket, and pop up them once at the timeout. To be more specific,
|
// second in a bucket, and pop up them once at the timeout. To be more specific,
|
||||||
// if you set fire time at X, you can get the bookmark within (X-1,X+1) period.
|
// if you set fire time at X, you can get the bookmark within (X-1,X+1) period.
|
||||||
type watcherBookmarkTimeBuckets struct {
|
type watcherBookmarkTimeBuckets struct {
|
||||||
lock sync.Mutex
|
|
||||||
// the key of watcherBuckets is the number of seconds since createTime
|
// the key of watcherBuckets is the number of seconds since createTime
|
||||||
watchersBuckets map[int64][]*cacheWatcher
|
watchersBuckets map[int64][]*cacheWatcher
|
||||||
createTime time.Time
|
createTime time.Time
|
||||||
@ -205,7 +204,7 @@ func newTimeBucketWatchers(clock clock.Clock, bookmarkFrequency time.Duration) *
|
|||||||
|
|
||||||
// adds a watcher to the bucket, if the deadline is before the start, it will be
|
// adds a watcher to the bucket, if the deadline is before the start, it will be
|
||||||
// added to the first one.
|
// added to the first one.
|
||||||
func (t *watcherBookmarkTimeBuckets) addWatcher(w *cacheWatcher) bool {
|
func (t *watcherBookmarkTimeBuckets) addWatcherThreadUnsafe(w *cacheWatcher) bool {
|
||||||
// note that the returned time can be before t.createTime,
|
// note that the returned time can be before t.createTime,
|
||||||
// especially in cases when the nextBookmarkTime method
|
// especially in cases when the nextBookmarkTime method
|
||||||
// give us the zero value of type Time
|
// give us the zero value of type Time
|
||||||
@ -215,8 +214,6 @@ func (t *watcherBookmarkTimeBuckets) addWatcher(w *cacheWatcher) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
bucketID := int64(nextTime.Sub(t.createTime) / time.Second)
|
bucketID := int64(nextTime.Sub(t.createTime) / time.Second)
|
||||||
t.lock.Lock()
|
|
||||||
defer t.lock.Unlock()
|
|
||||||
if bucketID < t.startBucketID {
|
if bucketID < t.startBucketID {
|
||||||
bucketID = t.startBucketID
|
bucketID = t.startBucketID
|
||||||
}
|
}
|
||||||
@ -225,12 +222,10 @@ func (t *watcherBookmarkTimeBuckets) addWatcher(w *cacheWatcher) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *watcherBookmarkTimeBuckets) popExpiredWatchers() [][]*cacheWatcher {
|
func (t *watcherBookmarkTimeBuckets) popExpiredWatchersThreadUnsafe() [][]*cacheWatcher {
|
||||||
currentBucketID := int64(t.clock.Since(t.createTime) / time.Second)
|
currentBucketID := int64(t.clock.Since(t.createTime) / time.Second)
|
||||||
// There should be one or two elements in almost all cases
|
// There should be one or two elements in almost all cases
|
||||||
expiredWatchers := make([][]*cacheWatcher, 0, 2)
|
expiredWatchers := make([][]*cacheWatcher, 0, 2)
|
||||||
t.lock.Lock()
|
|
||||||
defer t.lock.Unlock()
|
|
||||||
for ; t.startBucketID <= currentBucketID; t.startBucketID++ {
|
for ; t.startBucketID <= currentBucketID; t.startBucketID++ {
|
||||||
if watchers, ok := t.watchersBuckets[t.startBucketID]; ok {
|
if watchers, ok := t.watchersBuckets[t.startBucketID]; ok {
|
||||||
delete(t.watchersBuckets, t.startBucketID)
|
delete(t.watchersBuckets, t.startBucketID)
|
||||||
@ -328,6 +323,7 @@ type Cacher struct {
|
|||||||
// dispatching that event to avoid race with closing channels in watchers.
|
// dispatching that event to avoid race with closing channels in watchers.
|
||||||
watchersToStop []*cacheWatcher
|
watchersToStop []*cacheWatcher
|
||||||
// Maintain a timeout queue to send the bookmark event before the watcher times out.
|
// Maintain a timeout queue to send the bookmark event before the watcher times out.
|
||||||
|
// Note that this field when accessed MUST be protected by the Cacher.lock.
|
||||||
bookmarkWatchers *watcherBookmarkTimeBuckets
|
bookmarkWatchers *watcherBookmarkTimeBuckets
|
||||||
// expiredBookmarkWatchers is a list of watchers that were expired and need to be schedule for a next bookmark event
|
// expiredBookmarkWatchers is a list of watchers that were expired and need to be schedule for a next bookmark event
|
||||||
expiredBookmarkWatchers []*cacheWatcher
|
expiredBookmarkWatchers []*cacheWatcher
|
||||||
@ -647,7 +643,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
|
|||||||
|
|
||||||
// Add it to the queue only when the client support watch bookmarks.
|
// Add it to the queue only when the client support watch bookmarks.
|
||||||
if watcher.allowWatchBookmarks {
|
if watcher.allowWatchBookmarks {
|
||||||
c.bookmarkWatchers.addWatcher(watcher)
|
c.bookmarkWatchers.addWatcherThreadUnsafe(watcher)
|
||||||
}
|
}
|
||||||
c.watcherIdx++
|
c.watcherIdx++
|
||||||
}()
|
}()
|
||||||
@ -927,8 +923,12 @@ func (c *Cacher) dispatchEvents() {
|
|||||||
// Never send a bookmark event if we did not see an event here, this is fine
|
// Never send a bookmark event if we did not see an event here, this is fine
|
||||||
// because we don't provide any guarantees on sending bookmarks.
|
// because we don't provide any guarantees on sending bookmarks.
|
||||||
if lastProcessedResourceVersion == 0 {
|
if lastProcessedResourceVersion == 0 {
|
||||||
// pop expired watchers in case there has been no update
|
func() {
|
||||||
c.bookmarkWatchers.popExpiredWatchers()
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
|
// pop expired watchers in case there has been no update
|
||||||
|
c.bookmarkWatchers.popExpiredWatchersThreadUnsafe()
|
||||||
|
}()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
bookmarkEvent := &watchCacheEvent{
|
bookmarkEvent := &watchCacheEvent{
|
||||||
@ -1050,7 +1050,7 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
|
|||||||
func (c *Cacher) startDispatchingBookmarkEventsLocked() {
|
func (c *Cacher) startDispatchingBookmarkEventsLocked() {
|
||||||
// Pop already expired watchers. However, explicitly ignore stopped ones,
|
// Pop already expired watchers. However, explicitly ignore stopped ones,
|
||||||
// as we don't delete watcher from bookmarkWatchers when it is stopped.
|
// as we don't delete watcher from bookmarkWatchers when it is stopped.
|
||||||
for _, watchers := range c.bookmarkWatchers.popExpiredWatchers() {
|
for _, watchers := range c.bookmarkWatchers.popExpiredWatchersThreadUnsafe() {
|
||||||
for _, watcher := range watchers {
|
for _, watcher := range watchers {
|
||||||
// c.Lock() is held here.
|
// c.Lock() is held here.
|
||||||
// watcher.stopThreadUnsafe() is protected by c.Lock()
|
// watcher.stopThreadUnsafe() is protected by c.Lock()
|
||||||
@ -1155,7 +1155,7 @@ func (c *Cacher) finishDispatching() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// requeue the watcher for the next bookmark if needed.
|
// requeue the watcher for the next bookmark if needed.
|
||||||
c.bookmarkWatchers.addWatcher(watcher)
|
c.bookmarkWatchers.addWatcherThreadUnsafe(watcher)
|
||||||
}
|
}
|
||||||
c.expiredBookmarkWatchers = c.expiredBookmarkWatchers[:0]
|
c.expiredBookmarkWatchers = c.expiredBookmarkWatchers[:0]
|
||||||
}
|
}
|
||||||
|
@ -686,7 +686,9 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
|
|||||||
case <-stopCh:
|
case <-stopCh:
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
cacher.bookmarkWatchers.popExpiredWatchers()
|
cacher.Lock()
|
||||||
|
cacher.bookmarkWatchers.popExpiredWatchersThreadUnsafe()
|
||||||
|
cacher.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -700,9 +702,9 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
|
|||||||
|
|
||||||
// wait out the expiration period and pop expired watchers
|
// wait out the expiration period and pop expired watchers
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
cacher.bookmarkWatchers.popExpiredWatchers()
|
cacher.Lock()
|
||||||
cacher.bookmarkWatchers.lock.Lock()
|
defer cacher.Unlock()
|
||||||
defer cacher.bookmarkWatchers.lock.Unlock()
|
cacher.bookmarkWatchers.popExpiredWatchersThreadUnsafe()
|
||||||
if len(cacher.bookmarkWatchers.watchersBuckets) != 0 {
|
if len(cacher.bookmarkWatchers.watchersBuckets) != 0 {
|
||||||
numWatchers := 0
|
numWatchers := 0
|
||||||
for bucketID, v := range cacher.bookmarkWatchers.watchersBuckets {
|
for bucketID, v := range cacher.bookmarkWatchers.watchersBuckets {
|
||||||
|
Loading…
Reference in New Issue
Block a user