mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 13:37:30 +00:00
Rename cacheWatcher#stop
This commit is contained in:
parent
bb496d626b
commit
0b3c07a98c
@ -856,7 +856,8 @@ func (c *Cacher) startDispatchingBookmarkEvents() {
|
|||||||
// as we don't delete watcher from bookmarkWatchers when it is stopped.
|
// as we don't delete watcher from bookmarkWatchers when it is stopped.
|
||||||
for _, watchers := range c.bookmarkWatchers.popExpiredWatchers() {
|
for _, watchers := range c.bookmarkWatchers.popExpiredWatchers() {
|
||||||
for _, watcher := range watchers {
|
for _, watcher := range watchers {
|
||||||
// watcher.stop() is protected by c.Lock()
|
// c.Lock() is held here.
|
||||||
|
// watcher.stopThreadUnsafe() is protected by c.Lock()
|
||||||
if watcher.stopped {
|
if watcher.stopped {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -926,7 +927,7 @@ func (c *Cacher) finishDispatching() {
|
|||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
c.dispatching = false
|
c.dispatching = false
|
||||||
for _, watcher := range c.watchersToStop {
|
for _, watcher := range c.watchersToStop {
|
||||||
watcher.stop()
|
watcher.stopThreadUnsafe()
|
||||||
}
|
}
|
||||||
c.watchersToStop = c.watchersToStop[:0]
|
c.watchersToStop = c.watchersToStop[:0]
|
||||||
}
|
}
|
||||||
@ -941,7 +942,7 @@ func (c *Cacher) stopWatcherThreadUnsafe(watcher *cacheWatcher) {
|
|||||||
if c.dispatching {
|
if c.dispatching {
|
||||||
c.watchersToStop = append(c.watchersToStop, watcher)
|
c.watchersToStop = append(c.watchersToStop, watcher)
|
||||||
} else {
|
} else {
|
||||||
watcher.stop()
|
watcher.stopThreadUnsafe()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -971,7 +972,7 @@ func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported b
|
|||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
|
|
||||||
// It's possible that the watcher is already not in the structure (e.g. in case of
|
// It's possible that the watcher is already not in the structure (e.g. in case of
|
||||||
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stop()
|
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopThreadUnsafe()
|
||||||
// on a watcher multiple times.
|
// on a watcher multiple times.
|
||||||
c.watchers.deleteWatcher(index, triggerValue, triggerSupported, c.stopWatcherThreadUnsafe)
|
c.watchers.deleteWatcher(index, triggerValue, triggerSupported, c.stopWatcherThreadUnsafe)
|
||||||
}
|
}
|
||||||
@ -1073,8 +1074,8 @@ func (c *errWatcher) Stop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// cacheWatcher implements watch.Interface
|
// cacheWatcher implements watch.Interface
|
||||||
|
// this is not thread-safe
|
||||||
type cacheWatcher struct {
|
type cacheWatcher struct {
|
||||||
sync.Mutex
|
|
||||||
input chan *watchCacheEvent
|
input chan *watchCacheEvent
|
||||||
result chan watch.Event
|
result chan watch.Event
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
@ -1115,12 +1116,8 @@ func (c *cacheWatcher) Stop() {
|
|||||||
c.forget()
|
c.forget()
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(#73958)
|
// we rely on the fact that stopThredUnsafe is actually protected by Cacher.Lock()
|
||||||
// stop() is protected by Cacher.Lock(), rename it to
|
func (c *cacheWatcher) stopThreadUnsafe() {
|
||||||
// stopThreadUnsafe and remove the sync.Mutex.
|
|
||||||
func (c *cacheWatcher) stop() {
|
|
||||||
c.Lock()
|
|
||||||
defer c.Unlock()
|
|
||||||
if !c.stopped {
|
if !c.stopped {
|
||||||
c.stopped = true
|
c.stopped = true
|
||||||
close(c.done)
|
close(c.done)
|
||||||
|
@ -63,7 +63,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
|
|||||||
// forget() has to stop the watcher, as only stopping the watcher
|
// forget() has to stop the watcher, as only stopping the watcher
|
||||||
// triggers stopping the process() goroutine which we are in the
|
// triggers stopping the process() goroutine which we are in the
|
||||||
// end waiting for in this test.
|
// end waiting for in this test.
|
||||||
w.stop()
|
w.stopThreadUnsafe()
|
||||||
}
|
}
|
||||||
initEvents := []*watchCacheEvent{
|
initEvents := []*watchCacheEvent{
|
||||||
{Object: &v1.Pod{}},
|
{Object: &v1.Pod{}},
|
||||||
@ -472,7 +472,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
|
|||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
filter := func(string, labels.Set, fields.Set) bool { return true }
|
filter := func(string, labels.Set, fields.Set) bool { return true }
|
||||||
forget := func() {
|
forget := func() {
|
||||||
w.stop()
|
w.stopThreadUnsafe()
|
||||||
done <- struct{}{}
|
done <- struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user