From ebbf7d7dc321718d6eb867150fc4fe6886492e75 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Tue, 30 Apr 2024 12:16:55 +0200 Subject: [PATCH] client-go/tools/record: fix and test Broadcaster shutdown + logging Constructing a Broadcaster already starts a watch which runs in the background. Shutdown must be called to avoid leaking the goroutine. Providing a context was supposed to remove the need to call Shutdown, but that did not actually work because the logic for "must check for cancellation" was accidentally inverted. While at it, structured log output also gets tested together with checking for goroutine leaks. Kubernetes-commit: ff779f1cb56cf896405e52f7923188b99b88bb00 --- tools/record/event.go | 4 +- tools/record/event_test.go | 105 +++++++++++++++++++++++++++++-------- tools/record/main_test.go | 5 +- 3 files changed, 89 insertions(+), 25 deletions(-) diff --git a/tools/record/event.go b/tools/record/event.go index 0745fb4a..0b3b14f8 100644 --- a/tools/record/event.go +++ b/tools/record/event.go @@ -203,8 +203,8 @@ func NewBroadcaster(opts ...BroadcasterOption) EventBroadcaster { // - The context was nil. // - The context was context.Background() to begin with. // - // Both cases get checked here. - haveCtxCancelation := ctx.Done() == nil + // Both cases get checked here: we have cancelation if (and only if) there is a channel. + haveCtxCancelation := ctx.Done() != nil eventBroadcaster.cancelationCtx, eventBroadcaster.cancel = context.WithCancel(ctx) diff --git a/tools/record/event_test.go b/tools/record/event_test.go index f1bdef78..a988a4af 100644 --- a/tools/record/event_test.go +++ b/tools/record/event_test.go @@ -19,6 +19,7 @@ package record import ( "context" "encoding/json" + stderrors "errors" "fmt" "net/http" "strconv" @@ -26,6 +27,9 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "go.uber.org/goleak" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -34,6 +38,8 @@ import ( "k8s.io/client-go/kubernetes/scheme" restclient "k8s.io/client-go/rest" ref "k8s.io/client-go/tools/reference" + "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" "k8s.io/utils/clock" testclocks "k8s.io/utils/clock/testing" ) @@ -104,13 +110,38 @@ func OnPatchFactory(testCache map[string]*v1.Event, patchEvent chan<- *v1.Event) } } +// newBroadcasterForTests creates a new broadcaster which produces per-test log +// output if StartStructuredLogging is used. Will be shut down automatically +// after the test. +func newBroadcasterForTests(tb testing.TB) EventBroadcaster { + _, ctx := ktesting.NewTestContext(tb) + caster := NewBroadcaster(WithSleepDuration(0), WithContext(ctx)) + tb.Cleanup(caster.Shutdown) + return caster +} + +func TestBroadcasterShutdown(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancelCause(ctx) + + // Start a broadcaster with background activity. + caster := NewBroadcaster(WithContext(ctx)) + caster.StartStructuredLogging(0) + + // Stop it. + cancel(stderrors.New("time to stop")) + + // Ensure that the broadcaster goroutine is not left running. + goleak.VerifyNone(t) +} + func TestNonRacyShutdown(t *testing.T) { // Attempt to simulate previously racy conditions, and ensure that no race // occurs: Nominally, calling "Eventf" *followed by* shutdown from the same // thread should be a safe operation, but it's not if we launch recorder.Action // in a goroutine. - caster := NewBroadcasterForTests(0) + caster := newBroadcasterForTests(t) clock := testclocks.NewFakeClock(time.Now()) recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, caster, clock) @@ -151,14 +182,15 @@ func TestEventf(t *testing.T) { t.Fatal(err) } table := []struct { - obj k8sruntime.Object - eventtype string - reason string - messageFmt string - elements []interface{} - expect *v1.Event - expectLog string - expectUpdate bool + obj k8sruntime.Object + eventtype string + reason string + messageFmt string + elements []interface{} + expect *v1.Event + expectLog string + expectStructuredLog string + expectUpdate bool }{ { obj: testRef, @@ -186,7 +218,9 @@ func TestEventf(t *testing.T) { Count: 1, Type: v1.EventTypeNormal, }, - expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`, + expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`, + expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[2]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1" +`, expectUpdate: false, }, { @@ -214,7 +248,9 @@ func TestEventf(t *testing.T) { Count: 1, Type: v1.EventTypeNormal, }, - expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`, + expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`, + expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="" kind="Pod" apiVersion="v1" type="Normal" reason="Killed" message="some other verbose message: 1" +`, expectUpdate: false, }, { @@ -243,7 +279,9 @@ func TestEventf(t *testing.T) { Count: 2, Type: v1.EventTypeNormal, }, - expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`, + expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`, + expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[2]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1" +`, expectUpdate: true, }, { @@ -272,7 +310,9 @@ func TestEventf(t *testing.T) { Count: 1, Type: v1.EventTypeNormal, }, - expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`, + expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`, + expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[3]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1" +`, expectUpdate: false, }, { @@ -301,7 +341,9 @@ func TestEventf(t *testing.T) { Count: 3, Type: v1.EventTypeNormal, }, - expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`, + expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`, + expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[2]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1" +`, expectUpdate: true, }, { @@ -330,7 +372,9 @@ func TestEventf(t *testing.T) { Count: 1, Type: v1.EventTypeNormal, }, - expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`, + expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`, + expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[3]" kind="Pod" apiVersion="v1" type="Normal" reason="Stopped" message="some verbose message: 1" +`, expectUpdate: false, }, { @@ -359,7 +403,9 @@ func TestEventf(t *testing.T) { Count: 2, Type: v1.EventTypeNormal, }, - expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`, + expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`, + expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[3]" kind="Pod" apiVersion="v1" type="Normal" reason="Stopped" message="some verbose message: 1" +`, expectUpdate: true, }, } @@ -377,23 +423,36 @@ func TestEventf(t *testing.T) { }, OnPatch: OnPatchFactory(testCache, patchEvent), } - eventBroadcaster := NewBroadcasterForTests(0) + logger := ktesting.NewLogger(t, ktesting.NewConfig(ktesting.BufferLogs(true))) + logSink := logger.GetSink().(ktesting.Underlier) + ctx := klog.NewContext(context.Background(), logger) + eventBroadcaster := NewBroadcaster(WithSleepDuration(0), WithContext(ctx)) + defer eventBroadcaster.Shutdown() sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) clock := testclocks.NewFakeClock(time.Now()) recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, eventBroadcaster, clock) for index, item := range table { clock.Step(1 * time.Second) + //nolint:logcheck // Intentionally testing StartLogging here. logWatcher := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) { if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a { t.Errorf("Expected '%v', got '%v'", e, a) } logCalled <- struct{}{} }) + oldEnd := len(logSink.GetBuffer().String()) + structuredLogWatcher := eventBroadcaster.StartStructuredLogging(0) recorder.Eventf(item.obj, item.eventtype, item.reason, item.messageFmt, item.elements...) <-logCalled + // We don't get notified by the structured test logger directly. + // Instead, we periodically check what new output it has produced. + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, item.expectStructuredLog, logSink.GetBuffer().String()[oldEnd:], "new structured log output") + }, time.Minute, time.Millisecond) + // validate event if item.expectUpdate { actualEvent := <-patchEvent @@ -403,6 +462,7 @@ func TestEventf(t *testing.T) { validateEvent(strconv.Itoa(index), actualEvent, item.expect, t) } logWatcher.Stop() + structuredLogWatcher.Stop() } sinkWatcher.Stop() } @@ -561,8 +621,9 @@ func TestLotsOfEvents(t *testing.T) { }, } - eventBroadcaster := NewBroadcasterForTests(0) + eventBroadcaster := newBroadcasterForTests(t) sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) + //nolint:logcheck // Intentionally using StartLogging here to get notified. logWatcher := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) { loggerCalled <- struct{}{} }) @@ -658,7 +719,7 @@ func TestEventfNoNamespace(t *testing.T) { }, OnPatch: OnPatchFactory(testCache, patchEvent), } - eventBroadcaster := NewBroadcasterForTests(0) + eventBroadcaster := newBroadcasterForTests(t) sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) clock := testclocks.NewFakeClock(time.Now()) @@ -953,7 +1014,7 @@ func TestMultiSinkCache(t *testing.T) { OnPatch: OnPatchFactory(testCache2, patchEvent2), } - eventBroadcaster := NewBroadcasterForTests(0) + eventBroadcaster := newBroadcasterForTests(t) clock := testclocks.NewFakeClock(time.Now()) recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, eventBroadcaster, clock) @@ -971,6 +1032,9 @@ func TestMultiSinkCache(t *testing.T) { validateEvent(strconv.Itoa(index), actualEvent, item.expect, t) } } + // Stop before creating more events, otherwise the On* callbacks above + // get stuck writing to the channel that we don't read from anymore. + sinkWatcher.Stop() // Another StartRecordingToSink call should start to record events with new clean cache. sinkWatcher2 := eventBroadcaster.StartRecordingToSink(&testEvents2) @@ -988,6 +1052,5 @@ func TestMultiSinkCache(t *testing.T) { } } - sinkWatcher.Stop() sinkWatcher2.Stop() } diff --git a/tools/record/main_test.go b/tools/record/main_test.go index 58f81f74..04df262c 100644 --- a/tools/record/main_test.go +++ b/tools/record/main_test.go @@ -17,10 +17,11 @@ limitations under the License. package record import ( - "os" "testing" + + "go.uber.org/goleak" ) func TestMain(m *testing.M) { - os.Exit(m.Run()) + goleak.VerifyTestMain(m) }