Merge pull request #124635 from pohly/event-broadcaster-shutdown-fix

client-go/tools/record: fix and test Broadcaster shutdown + logging
This commit is contained in:
Kubernetes Prow Robot 2024-05-28 07:03:09 -07:00 committed by GitHub
commit f4ea903712
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 89 additions and 25 deletions

View File

@ -203,8 +203,8 @@ func NewBroadcaster(opts ...BroadcasterOption) EventBroadcaster {
// - The context was nil.
// - The context was context.Background() to begin with.
//
// Both cases get checked here.
haveCtxCancelation := ctx.Done() == nil
// Both cases get checked here: we have cancelation if (and only if) there is a channel.
haveCtxCancelation := ctx.Done() != nil
eventBroadcaster.cancelationCtx, eventBroadcaster.cancel = context.WithCancel(ctx)

View File

@ -19,6 +19,7 @@ package record
import (
"context"
"encoding/json"
stderrors "errors"
"fmt"
"net/http"
"strconv"
@ -26,6 +27,9 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/goleak"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -34,6 +38,8 @@ import (
"k8s.io/client-go/kubernetes/scheme"
restclient "k8s.io/client-go/rest"
ref "k8s.io/client-go/tools/reference"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
"k8s.io/utils/clock"
testclocks "k8s.io/utils/clock/testing"
)
@ -104,13 +110,38 @@ func OnPatchFactory(testCache map[string]*v1.Event, patchEvent chan<- *v1.Event)
}
}
// newBroadcasterForTests creates a new broadcaster which produces per-test log
// output if StartStructuredLogging is used. Will be shut down automatically
// after the test.
func newBroadcasterForTests(tb testing.TB) EventBroadcaster {
_, ctx := ktesting.NewTestContext(tb)
caster := NewBroadcaster(WithSleepDuration(0), WithContext(ctx))
tb.Cleanup(caster.Shutdown)
return caster
}
func TestBroadcasterShutdown(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
// Start a broadcaster with background activity.
caster := NewBroadcaster(WithContext(ctx))
caster.StartStructuredLogging(0)
// Stop it.
cancel(stderrors.New("time to stop"))
// Ensure that the broadcaster goroutine is not left running.
goleak.VerifyNone(t)
}
func TestNonRacyShutdown(t *testing.T) {
// Attempt to simulate previously racy conditions, and ensure that no race
// occurs: Nominally, calling "Eventf" *followed by* shutdown from the same
// thread should be a safe operation, but it's not if we launch recorder.Action
// in a goroutine.
caster := NewBroadcasterForTests(0)
caster := newBroadcasterForTests(t)
clock := testclocks.NewFakeClock(time.Now())
recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, caster, clock)
@ -151,14 +182,15 @@ func TestEventf(t *testing.T) {
t.Fatal(err)
}
table := []struct {
obj k8sruntime.Object
eventtype string
reason string
messageFmt string
elements []interface{}
expect *v1.Event
expectLog string
expectUpdate bool
obj k8sruntime.Object
eventtype string
reason string
messageFmt string
elements []interface{}
expect *v1.Event
expectLog string
expectStructuredLog string
expectUpdate bool
}{
{
obj: testRef,
@ -186,7 +218,9 @@ func TestEventf(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[2]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1"
`,
expectUpdate: false,
},
{
@ -214,7 +248,9 @@ func TestEventf(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`,
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="" kind="Pod" apiVersion="v1" type="Normal" reason="Killed" message="some other verbose message: 1"
`,
expectUpdate: false,
},
{
@ -243,7 +279,9 @@ func TestEventf(t *testing.T) {
Count: 2,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[2]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1"
`,
expectUpdate: true,
},
{
@ -272,7 +310,9 @@ func TestEventf(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[3]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1"
`,
expectUpdate: false,
},
{
@ -301,7 +341,9 @@ func TestEventf(t *testing.T) {
Count: 3,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[2]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1"
`,
expectUpdate: true,
},
{
@ -330,7 +372,9 @@ func TestEventf(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[3]" kind="Pod" apiVersion="v1" type="Normal" reason="Stopped" message="some verbose message: 1"
`,
expectUpdate: false,
},
{
@ -359,7 +403,9 @@ func TestEventf(t *testing.T) {
Count: 2,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[3]" kind="Pod" apiVersion="v1" type="Normal" reason="Stopped" message="some verbose message: 1"
`,
expectUpdate: true,
},
}
@ -377,23 +423,36 @@ func TestEventf(t *testing.T) {
},
OnPatch: OnPatchFactory(testCache, patchEvent),
}
eventBroadcaster := NewBroadcasterForTests(0)
logger := ktesting.NewLogger(t, ktesting.NewConfig(ktesting.BufferLogs(true)))
logSink := logger.GetSink().(ktesting.Underlier)
ctx := klog.NewContext(context.Background(), logger)
eventBroadcaster := NewBroadcaster(WithSleepDuration(0), WithContext(ctx))
defer eventBroadcaster.Shutdown()
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
clock := testclocks.NewFakeClock(time.Now())
recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
for index, item := range table {
clock.Step(1 * time.Second)
//nolint:logcheck // Intentionally testing StartLogging here.
logWatcher := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a {
t.Errorf("Expected '%v', got '%v'", e, a)
}
logCalled <- struct{}{}
})
oldEnd := len(logSink.GetBuffer().String())
structuredLogWatcher := eventBroadcaster.StartStructuredLogging(0)
recorder.Eventf(item.obj, item.eventtype, item.reason, item.messageFmt, item.elements...)
<-logCalled
// We don't get notified by the structured test logger directly.
// Instead, we periodically check what new output it has produced.
assert.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Equal(t, item.expectStructuredLog, logSink.GetBuffer().String()[oldEnd:], "new structured log output")
}, time.Minute, time.Millisecond)
// validate event
if item.expectUpdate {
actualEvent := <-patchEvent
@ -403,6 +462,7 @@ func TestEventf(t *testing.T) {
validateEvent(strconv.Itoa(index), actualEvent, item.expect, t)
}
logWatcher.Stop()
structuredLogWatcher.Stop()
}
sinkWatcher.Stop()
}
@ -561,8 +621,9 @@ func TestLotsOfEvents(t *testing.T) {
},
}
eventBroadcaster := NewBroadcasterForTests(0)
eventBroadcaster := newBroadcasterForTests(t)
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
//nolint:logcheck // Intentionally using StartLogging here to get notified.
logWatcher := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
loggerCalled <- struct{}{}
})
@ -658,7 +719,7 @@ func TestEventfNoNamespace(t *testing.T) {
},
OnPatch: OnPatchFactory(testCache, patchEvent),
}
eventBroadcaster := NewBroadcasterForTests(0)
eventBroadcaster := newBroadcasterForTests(t)
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
clock := testclocks.NewFakeClock(time.Now())
@ -953,7 +1014,7 @@ func TestMultiSinkCache(t *testing.T) {
OnPatch: OnPatchFactory(testCache2, patchEvent2),
}
eventBroadcaster := NewBroadcasterForTests(0)
eventBroadcaster := newBroadcasterForTests(t)
clock := testclocks.NewFakeClock(time.Now())
recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
@ -971,6 +1032,9 @@ func TestMultiSinkCache(t *testing.T) {
validateEvent(strconv.Itoa(index), actualEvent, item.expect, t)
}
}
// Stop before creating more events, otherwise the On* callbacks above
// get stuck writing to the channel that we don't read from anymore.
sinkWatcher.Stop()
// Another StartRecordingToSink call should start to record events with new clean cache.
sinkWatcher2 := eventBroadcaster.StartRecordingToSink(&testEvents2)
@ -988,6 +1052,5 @@ func TestMultiSinkCache(t *testing.T) {
}
}
sinkWatcher.Stop()
sinkWatcher2.Stop()
}

View File

@ -17,10 +17,11 @@ limitations under the License.
package record
import (
"os"
"testing"
"go.uber.org/goleak"
)
func TestMain(m *testing.M) {
os.Exit(m.Run())
goleak.VerifyTestMain(m)
}