mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-14 21:53:52 +00:00
Merge pull request #19368 from krousey/record_race
Fix data race by eliminating concurrency in test
This commit is contained in:
@@ -109,38 +109,42 @@ func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSin
|
||||
eventCorrelator := NewEventCorrelator(util.RealClock{})
|
||||
return eventBroadcaster.StartEventWatcher(
|
||||
func(event *api.Event) {
|
||||
// Make a copy before modification, because there could be multiple listeners.
|
||||
// Events are safe to copy like this.
|
||||
eventCopy := *event
|
||||
event = &eventCopy
|
||||
result, err := eventCorrelator.EventCorrelate(event)
|
||||
if err != nil {
|
||||
util.HandleError(err)
|
||||
}
|
||||
if result.Skip {
|
||||
return
|
||||
}
|
||||
tries := 0
|
||||
for {
|
||||
if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
|
||||
break
|
||||
}
|
||||
tries++
|
||||
if tries >= maxTriesPerEvent {
|
||||
glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
|
||||
break
|
||||
}
|
||||
// Randomize the first sleep so that various clients won't all be
|
||||
// synced up if the master goes down.
|
||||
if tries == 1 {
|
||||
time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64()))
|
||||
} else {
|
||||
time.Sleep(sleepDuration)
|
||||
}
|
||||
}
|
||||
recordToSink(sink, event, eventCorrelator, randGen)
|
||||
})
|
||||
}
|
||||
|
||||
func recordToSink(sink EventSink, event *api.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand) {
|
||||
// Make a copy before modification, because there could be multiple listeners.
|
||||
// Events are safe to copy like this.
|
||||
eventCopy := *event
|
||||
event = &eventCopy
|
||||
result, err := eventCorrelator.EventCorrelate(event)
|
||||
if err != nil {
|
||||
util.HandleError(err)
|
||||
}
|
||||
if result.Skip {
|
||||
return
|
||||
}
|
||||
tries := 0
|
||||
for {
|
||||
if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
|
||||
break
|
||||
}
|
||||
tries++
|
||||
if tries >= maxTriesPerEvent {
|
||||
glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
|
||||
break
|
||||
}
|
||||
// Randomize the first sleep so that various clients won't all be
|
||||
// synced up if the master goes down.
|
||||
if tries == 1 {
|
||||
time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64()))
|
||||
} else {
|
||||
time.Sleep(sleepDuration)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func isKeyNotFoundError(err error) bool {
|
||||
statusErr, _ := err.(*errors.StatusError)
|
||||
// At the moment the server is returning 500 instead of a more specific
|
||||
|
@@ -19,7 +19,7 @@ package record
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -382,16 +382,8 @@ func recorderWithFakeClock(eventSource api.EventSource, eventBroadcaster EventBr
|
||||
}
|
||||
|
||||
func TestWriteEventError(t *testing.T) {
|
||||
ref := &api.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Name: "foo",
|
||||
Namespace: "baz",
|
||||
UID: "bar",
|
||||
APIVersion: "version",
|
||||
}
|
||||
type entry struct {
|
||||
timesToSendError int
|
||||
attemptsMade int
|
||||
attemptsWanted int
|
||||
err error
|
||||
}
|
||||
@@ -422,42 +414,25 @@ func TestWriteEventError(t *testing.T) {
|
||||
err: fmt.Errorf("A weird error"),
|
||||
},
|
||||
}
|
||||
done := make(chan struct{})
|
||||
|
||||
eventBroadcaster := NewBroadcaster()
|
||||
defer eventBroadcaster.StartRecordingToSink(
|
||||
&testEventSink{
|
||||
eventCorrelator := NewEventCorrelator(util.RealClock{})
|
||||
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
for caseName, ent := range table {
|
||||
attempts := 0
|
||||
sink := &testEventSink{
|
||||
OnCreate: func(event *api.Event) (*api.Event, error) {
|
||||
if event.Message == "finished" {
|
||||
close(done)
|
||||
return event, nil
|
||||
}
|
||||
item, ok := table[event.Message]
|
||||
if !ok {
|
||||
t.Errorf("Unexpected event: %#v", event)
|
||||
return event, nil
|
||||
}
|
||||
item.attemptsMade++
|
||||
if item.attemptsMade < item.timesToSendError {
|
||||
return nil, item.err
|
||||
attempts++
|
||||
if attempts < ent.timesToSendError {
|
||||
return nil, ent.err
|
||||
}
|
||||
return event, nil
|
||||
},
|
||||
},
|
||||
).Stop()
|
||||
clock := &util.FakeClock{time.Now()}
|
||||
recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
|
||||
for caseName := range table {
|
||||
clock.Step(1 * time.Second)
|
||||
recorder.Event(ref, api.EventTypeNormal, "Reason", caseName)
|
||||
runtime.Gosched()
|
||||
}
|
||||
recorder.Event(ref, api.EventTypeNormal, "Reason", "finished")
|
||||
<-done
|
||||
|
||||
for caseName, item := range table {
|
||||
if e, a := item.attemptsWanted, item.attemptsMade; e != a {
|
||||
t.Errorf("case %v: wanted %v, got %v attempts", caseName, e, a)
|
||||
}
|
||||
ev := &api.Event{}
|
||||
recordToSink(sink, ev, eventCorrelator, randGen)
|
||||
if attempts != ent.attemptsWanted {
|
||||
t.Errorf("case %v: wanted %d, got %d attempts", caseName, ent.attemptsWanted, attempts)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user