Write Unit test to imitate Panic

There was a race creating a panic with shutting down
an eventbroadcaster and it's associated watchers. This
test exposes it.

Signed-off-by: Andrew Stoycos <astoycos@redhat.com>
This commit is contained in:
Andrew Stoycos 2022-05-13 15:26:04 -04:00
parent 30adcd0b6c
commit 2d614a182c

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package watch_test package watch
import ( import (
"reflect" "reflect"
@ -26,7 +26,6 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
. "k8s.io/apimachinery/pkg/watch"
) )
type myType struct { type myType struct {
@ -246,3 +245,47 @@ func TestBroadcasterSendEventAfterShutdown(t *testing.T) {
assert.Equal(t, sendOnClosed, false, "ActionOrDrop should return false if broadcaster is already shutdown") assert.Equal(t, sendOnClosed, false, "ActionOrDrop should return false if broadcaster is already shutdown")
assert.EqualError(t, err, "broadcaster already stopped", "ActionOrDrop should report error id broadcaster is shutdown") assert.EqualError(t, err, "broadcaster already stopped", "ActionOrDrop should report error id broadcaster is shutdown")
} }
// Test this since we see usage patterns where the broadcaster and watchers are
// stopped simultaneously leading to races.
func TestBroadcasterShutdownRace(t *testing.T) {
m := NewBroadcaster(1, WaitIfChannelFull)
stopCh := make(chan struct{})
// Add a bunch of watchers
const testWatchers = 2
for i := 0; i < testWatchers; i++ {
i := i
_, err := m.Watch()
if err != nil {
t.Fatalf("Unable start event watcher: '%v' (will not retry!)", err)
}
// This is how we force the watchers to close down independently of the
// eventbroadcaster, see real usage pattern in startRecordingEvents()
go func() {
<-stopCh
t.Log("Stopping Watchers")
m.stopWatching(int64(i))
}()
}
event := Event{Type: Added, Object: &myType{"foo", "hello world"}}
err := m.Action(event.Type, event.Object)
if err != nil {
t.Fatalf("error sending event: %v", err)
}
// Manually simulate m.Shutdown() but change it to force a race scenario
// 1. Close watcher stopchannel, so watchers are closed independently of the
// eventBroadcaster
// 2. Shutdown the m.incoming slightly Before m.stopped so that the watcher's
// call of Blockqueue can pass the m.stopped check.
m.blockQueue(func() {
close(stopCh)
close(m.incoming)
time.Sleep(1 * time.Millisecond)
close(m.stopped)
})
m.distributing.Wait()
}