mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Merge pull request #80788 from tedyu/stop-thread-unsafe
Rename cacheWatcher#stop
This commit is contained in:
commit
42d7feee28
@ -862,7 +862,8 @@ func (c *Cacher) startDispatchingBookmarkEvents() {
|
||||
// as we don't delete watcher from bookmarkWatchers when it is stopped.
|
||||
for _, watchers := range c.bookmarkWatchers.popExpiredWatchers() {
|
||||
for _, watcher := range watchers {
|
||||
// watcher.stop() is protected by c.Lock()
|
||||
// c.Lock() is held here.
|
||||
// watcher.stopThreadUnsafe() is protected by c.Lock()
|
||||
if watcher.stopped {
|
||||
continue
|
||||
}
|
||||
@ -932,7 +933,7 @@ func (c *Cacher) finishDispatching() {
|
||||
defer c.Unlock()
|
||||
c.dispatching = false
|
||||
for _, watcher := range c.watchersToStop {
|
||||
watcher.stop()
|
||||
watcher.stopThreadUnsafe()
|
||||
}
|
||||
c.watchersToStop = c.watchersToStop[:0]
|
||||
}
|
||||
@ -947,7 +948,7 @@ func (c *Cacher) stopWatcherThreadUnsafe(watcher *cacheWatcher) {
|
||||
if c.dispatching {
|
||||
c.watchersToStop = append(c.watchersToStop, watcher)
|
||||
} else {
|
||||
watcher.stop()
|
||||
watcher.stopThreadUnsafe()
|
||||
}
|
||||
}
|
||||
|
||||
@ -977,7 +978,7 @@ func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported b
|
||||
defer c.Unlock()
|
||||
|
||||
// It's possible that the watcher is already not in the structure (e.g. in case of
|
||||
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stop()
|
||||
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopThreadUnsafe()
|
||||
// on a watcher multiple times.
|
||||
c.watchers.deleteWatcher(index, triggerValue, triggerSupported, c.stopWatcherThreadUnsafe)
|
||||
}
|
||||
@ -1079,8 +1080,8 @@ func (c *errWatcher) Stop() {
|
||||
}
|
||||
|
||||
// cacheWatcher implements watch.Interface
|
||||
// this is not thread-safe
|
||||
type cacheWatcher struct {
|
||||
sync.Mutex
|
||||
input chan *watchCacheEvent
|
||||
result chan watch.Event
|
||||
done chan struct{}
|
||||
@ -1121,12 +1122,8 @@ func (c *cacheWatcher) Stop() {
|
||||
c.forget()
|
||||
}
|
||||
|
||||
// TODO(#73958)
|
||||
// stop() is protected by Cacher.Lock(), rename it to
|
||||
// stopThreadUnsafe and remove the sync.Mutex.
|
||||
func (c *cacheWatcher) stop() {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
// we rely on the fact that stopThredUnsafe is actually protected by Cacher.Lock()
|
||||
func (c *cacheWatcher) stopThreadUnsafe() {
|
||||
if !c.stopped {
|
||||
c.stopped = true
|
||||
close(c.done)
|
||||
|
@ -63,7 +63,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
|
||||
// forget() has to stop the watcher, as only stopping the watcher
|
||||
// triggers stopping the process() goroutine which we are in the
|
||||
// end waiting for in this test.
|
||||
w.stop()
|
||||
w.stopThreadUnsafe()
|
||||
}
|
||||
initEvents := []*watchCacheEvent{
|
||||
{Object: &v1.Pod{}},
|
||||
@ -472,7 +472,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
|
||||
done := make(chan struct{})
|
||||
filter := func(string, labels.Set, fields.Set) bool { return true }
|
||||
forget := func() {
|
||||
w.stop()
|
||||
w.stopThreadUnsafe()
|
||||
done <- struct{}{}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user