client-go/tools/record: fix and test Broadcaster shutdown + logging

Constructing a Broadcaster already starts a watch which runs in the
background. Shutdown must be called to avoid leaking the goroutine.  Providing
a context was supposed to remove the need to call Shutdown, but that did not
actually work because the logic for "must check for cancellation" was
accidentally inverted.

While at it, structured log output also gets tested together with checking for
goroutine leaks.

Kubernetes-commit: ff779f1cb56cf896405e52f7923188b99b88bb00
This commit is contained in:
Patrick Ohly 2024-04-30 12:16:55 +02:00 committed by Kubernetes Publisher
parent f53839a9ad
commit ebbf7d7dc3
3 changed files with 89 additions and 25 deletions

View File

@ -203,8 +203,8 @@ func NewBroadcaster(opts ...BroadcasterOption) EventBroadcaster {
// - The context was nil.
// - The context was context.Background() to begin with.
//
// Both cases get checked here.
haveCtxCancelation := ctx.Done() == nil
// Both cases get checked here: we have cancelation if (and only if) there is a channel.
haveCtxCancelation := ctx.Done() != nil
eventBroadcaster.cancelationCtx, eventBroadcaster.cancel = context.WithCancel(ctx)

View File

@ -19,6 +19,7 @@ package record
import (
"context"
"encoding/json"
stderrors "errors"
"fmt"
"net/http"
"strconv"
@ -26,6 +27,9 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/goleak"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -34,6 +38,8 @@ import (
"k8s.io/client-go/kubernetes/scheme"
restclient "k8s.io/client-go/rest"
ref "k8s.io/client-go/tools/reference"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
"k8s.io/utils/clock"
testclocks "k8s.io/utils/clock/testing"
)
@ -104,13 +110,38 @@ func OnPatchFactory(testCache map[string]*v1.Event, patchEvent chan<- *v1.Event)
}
}
// newBroadcasterForTests creates a new broadcaster which produces per-test log
// output if StartStructuredLogging is used. Will be shut down automatically
// after the test.
func newBroadcasterForTests(tb testing.TB) EventBroadcaster {
_, ctx := ktesting.NewTestContext(tb)
caster := NewBroadcaster(WithSleepDuration(0), WithContext(ctx))
tb.Cleanup(caster.Shutdown)
return caster
}
func TestBroadcasterShutdown(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
// Start a broadcaster with background activity.
caster := NewBroadcaster(WithContext(ctx))
caster.StartStructuredLogging(0)
// Stop it.
cancel(stderrors.New("time to stop"))
// Ensure that the broadcaster goroutine is not left running.
goleak.VerifyNone(t)
}
func TestNonRacyShutdown(t *testing.T) {
// Attempt to simulate previously racy conditions, and ensure that no race
// occurs: Nominally, calling "Eventf" *followed by* shutdown from the same
// thread should be a safe operation, but it's not if we launch recorder.Action
// in a goroutine.
caster := NewBroadcasterForTests(0)
caster := newBroadcasterForTests(t)
clock := testclocks.NewFakeClock(time.Now())
recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, caster, clock)
@ -151,14 +182,15 @@ func TestEventf(t *testing.T) {
t.Fatal(err)
}
table := []struct {
obj k8sruntime.Object
eventtype string
reason string
messageFmt string
elements []interface{}
expect *v1.Event
expectLog string
expectUpdate bool
obj k8sruntime.Object
eventtype string
reason string
messageFmt string
elements []interface{}
expect *v1.Event
expectLog string
expectStructuredLog string
expectUpdate bool
}{
{
obj: testRef,
@ -186,7 +218,9 @@ func TestEventf(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[2]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1"
`,
expectUpdate: false,
},
{
@ -214,7 +248,9 @@ func TestEventf(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`,
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="" kind="Pod" apiVersion="v1" type="Normal" reason="Killed" message="some other verbose message: 1"
`,
expectUpdate: false,
},
{
@ -243,7 +279,9 @@ func TestEventf(t *testing.T) {
Count: 2,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[2]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1"
`,
expectUpdate: true,
},
{
@ -272,7 +310,9 @@ func TestEventf(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[3]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1"
`,
expectUpdate: false,
},
{
@ -301,7 +341,9 @@ func TestEventf(t *testing.T) {
Count: 3,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[2]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1"
`,
expectUpdate: true,
},
{
@ -330,7 +372,9 @@ func TestEventf(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[3]" kind="Pod" apiVersion="v1" type="Normal" reason="Stopped" message="some verbose message: 1"
`,
expectUpdate: false,
},
{
@ -359,7 +403,9 @@ func TestEventf(t *testing.T) {
Count: 2,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[3]" kind="Pod" apiVersion="v1" type="Normal" reason="Stopped" message="some verbose message: 1"
`,
expectUpdate: true,
},
}
@ -377,23 +423,36 @@ func TestEventf(t *testing.T) {
},
OnPatch: OnPatchFactory(testCache, patchEvent),
}
eventBroadcaster := NewBroadcasterForTests(0)
logger := ktesting.NewLogger(t, ktesting.NewConfig(ktesting.BufferLogs(true)))
logSink := logger.GetSink().(ktesting.Underlier)
ctx := klog.NewContext(context.Background(), logger)
eventBroadcaster := NewBroadcaster(WithSleepDuration(0), WithContext(ctx))
defer eventBroadcaster.Shutdown()
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
clock := testclocks.NewFakeClock(time.Now())
recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
for index, item := range table {
clock.Step(1 * time.Second)
//nolint:logcheck // Intentionally testing StartLogging here.
logWatcher := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a {
t.Errorf("Expected '%v', got '%v'", e, a)
}
logCalled <- struct{}{}
})
oldEnd := len(logSink.GetBuffer().String())
structuredLogWatcher := eventBroadcaster.StartStructuredLogging(0)
recorder.Eventf(item.obj, item.eventtype, item.reason, item.messageFmt, item.elements...)
<-logCalled
// We don't get notified by the structured test logger directly.
// Instead, we periodically check what new output it has produced.
assert.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Equal(t, item.expectStructuredLog, logSink.GetBuffer().String()[oldEnd:], "new structured log output")
}, time.Minute, time.Millisecond)
// validate event
if item.expectUpdate {
actualEvent := <-patchEvent
@ -403,6 +462,7 @@ func TestEventf(t *testing.T) {
validateEvent(strconv.Itoa(index), actualEvent, item.expect, t)
}
logWatcher.Stop()
structuredLogWatcher.Stop()
}
sinkWatcher.Stop()
}
@ -561,8 +621,9 @@ func TestLotsOfEvents(t *testing.T) {
},
}
eventBroadcaster := NewBroadcasterForTests(0)
eventBroadcaster := newBroadcasterForTests(t)
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
//nolint:logcheck // Intentionally using StartLogging here to get notified.
logWatcher := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
loggerCalled <- struct{}{}
})
@ -658,7 +719,7 @@ func TestEventfNoNamespace(t *testing.T) {
},
OnPatch: OnPatchFactory(testCache, patchEvent),
}
eventBroadcaster := NewBroadcasterForTests(0)
eventBroadcaster := newBroadcasterForTests(t)
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
clock := testclocks.NewFakeClock(time.Now())
@ -953,7 +1014,7 @@ func TestMultiSinkCache(t *testing.T) {
OnPatch: OnPatchFactory(testCache2, patchEvent2),
}
eventBroadcaster := NewBroadcasterForTests(0)
eventBroadcaster := newBroadcasterForTests(t)
clock := testclocks.NewFakeClock(time.Now())
recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
@ -971,6 +1032,9 @@ func TestMultiSinkCache(t *testing.T) {
validateEvent(strconv.Itoa(index), actualEvent, item.expect, t)
}
}
// Stop before creating more events, otherwise the On* callbacks above
// get stuck writing to the channel that we don't read from anymore.
sinkWatcher.Stop()
// Another StartRecordingToSink call should start to record events with new clean cache.
sinkWatcher2 := eventBroadcaster.StartRecordingToSink(&testEvents2)
@ -988,6 +1052,5 @@ func TestMultiSinkCache(t *testing.T) {
}
}
sinkWatcher.Stop()
sinkWatcher2.Stop()
}

View File

@ -17,10 +17,11 @@ limitations under the License.
package record
import (
"os"
"testing"
"go.uber.org/goleak"
)
func TestMain(m *testing.M) {
os.Exit(m.Run())
goleak.VerifyTestMain(m)
}