Merge pull request #110040 from astoycos/fix-panic

Fix additional panic
This commit is contained in:
Kubernetes Prow Robot 2022-05-17 06:20:27 -07:00 committed by GitHub
commit ad2c625162
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 48 additions and 5 deletions

View File

@ -46,7 +46,7 @@ type Broadcaster struct {
distributing sync.WaitGroup distributing sync.WaitGroup
// incomingBlock allows us to ensure we don't race and end up sending events // incomingBlock allows us to ensure we don't race and end up sending events
// to a closed channel following a brodcaster shutdown. // to a closed channel following a broadcaster shutdown.
incomingBlock sync.Mutex incomingBlock sync.Mutex
incoming chan Event incoming chan Event
stopped chan struct{} stopped chan struct{}
@ -115,6 +115,8 @@ func (obj functionFakeRuntimeObject) DeepCopyObject() runtime.Object {
// won't ever see that event, and will always see any event after they are // won't ever see that event, and will always see any event after they are
// added. // added.
func (m *Broadcaster) blockQueue(f func()) { func (m *Broadcaster) blockQueue(f func()) {
m.incomingBlock.Lock()
defer m.incomingBlock.Unlock()
select { select {
case <-m.stopped: case <-m.stopped:
return return
@ -252,8 +254,6 @@ func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) (bool,
// have received the data yet as it can remain sitting in the buffered // have received the data yet as it can remain sitting in the buffered
// channel. It will block until the broadcaster stop request is actually executed // channel. It will block until the broadcaster stop request is actually executed
func (m *Broadcaster) Shutdown() { func (m *Broadcaster) Shutdown() {
m.incomingBlock.Lock()
defer m.incomingBlock.Unlock()
m.blockQueue(func() { m.blockQueue(func() {
close(m.stopped) close(m.stopped)
close(m.incoming) close(m.incoming)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package watch_test package watch
import ( import (
"reflect" "reflect"
@ -26,7 +26,6 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
. "k8s.io/apimachinery/pkg/watch"
) )
type myType struct { type myType struct {
@ -246,3 +245,47 @@ func TestBroadcasterSendEventAfterShutdown(t *testing.T) {
assert.Equal(t, sendOnClosed, false, "ActionOrDrop should return false if broadcaster is already shutdown") assert.Equal(t, sendOnClosed, false, "ActionOrDrop should return false if broadcaster is already shutdown")
assert.EqualError(t, err, "broadcaster already stopped", "ActionOrDrop should report error id broadcaster is shutdown") assert.EqualError(t, err, "broadcaster already stopped", "ActionOrDrop should report error id broadcaster is shutdown")
} }
// Test this since we see usage patterns where the broadcaster and watchers are
// stopped simultaneously leading to races.
func TestBroadcasterShutdownRace(t *testing.T) {
m := NewBroadcaster(1, WaitIfChannelFull)
stopCh := make(chan struct{})
// Add a bunch of watchers
const testWatchers = 2
for i := 0; i < testWatchers; i++ {
i := i
_, err := m.Watch()
if err != nil {
t.Fatalf("Unable start event watcher: '%v' (will not retry!)", err)
}
// This is how we force the watchers to close down independently of the
// eventbroadcaster, see real usage pattern in startRecordingEvents()
go func() {
<-stopCh
t.Log("Stopping Watchers")
m.stopWatching(int64(i))
}()
}
event := Event{Type: Added, Object: &myType{"foo", "hello world"}}
err := m.Action(event.Type, event.Object)
if err != nil {
t.Fatalf("error sending event: %v", err)
}
// Manually simulate m.Shutdown() but change it to force a race scenario
// 1. Close watcher stopchannel, so watchers are closed independently of the
// eventBroadcaster
// 2. Shutdown the m.incoming slightly Before m.stopped so that the watcher's
// call of Blockqueue can pass the m.stopped check.
m.blockQueue(func() {
close(stopCh)
close(m.incoming)
time.Sleep(1 * time.Millisecond)
close(m.stopped)
})
m.distributing.Wait()
}