From ff779f1cb56cf896405e52f7923188b99b88bb00 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Tue, 30 Apr 2024 12:16:55 +0200 Subject: [PATCH] client-go/tools/record: fix and test Broadcaster shutdown + logging Constructing a Broadcaster already starts a watch which runs in the background. Shutdown must be called to avoid leaking the goroutine. Providing a context was supposed to remove the need to call Shutdown, but that did not actually work because the logic for "must check for cancellation" was accidentally inverted. While at it, structured log output also gets tested together with checking for goroutine leaks. --- .../k8s.io/client-go/tools/record/event.go | 4 +- .../client-go/tools/record/event_test.go | 105 ++++++++++++++---- .../client-go/tools/record/main_test.go | 5 +- 3 files changed, 89 insertions(+), 25 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/record/event.go b/staging/src/k8s.io/client-go/tools/record/event.go index 0745fb4a357..0b3b14f8da0 100644 --- a/staging/src/k8s.io/client-go/tools/record/event.go +++ b/staging/src/k8s.io/client-go/tools/record/event.go @@ -203,8 +203,8 @@ func NewBroadcaster(opts ...BroadcasterOption) EventBroadcaster { // - The context was nil. // - The context was context.Background() to begin with. // - // Both cases get checked here. - haveCtxCancelation := ctx.Done() == nil + // Both cases get checked here: we have cancelation if (and only if) there is a channel. + haveCtxCancelation := ctx.Done() != nil eventBroadcaster.cancelationCtx, eventBroadcaster.cancel = context.WithCancel(ctx) diff --git a/staging/src/k8s.io/client-go/tools/record/event_test.go b/staging/src/k8s.io/client-go/tools/record/event_test.go index f1bdef78e3a..a988a4afc9e 100644 --- a/staging/src/k8s.io/client-go/tools/record/event_test.go +++ b/staging/src/k8s.io/client-go/tools/record/event_test.go @@ -19,6 +19,7 @@ package record import ( "context" "encoding/json" + stderrors "errors" "fmt" "net/http" "strconv" @@ -26,6 +27,9 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "go.uber.org/goleak" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -34,6 +38,8 @@ import ( "k8s.io/client-go/kubernetes/scheme" restclient "k8s.io/client-go/rest" ref "k8s.io/client-go/tools/reference" + "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" "k8s.io/utils/clock" testclocks "k8s.io/utils/clock/testing" ) @@ -104,13 +110,38 @@ func OnPatchFactory(testCache map[string]*v1.Event, patchEvent chan<- *v1.Event) } } +// newBroadcasterForTests creates a new broadcaster which produces per-test log +// output if StartStructuredLogging is used. Will be shut down automatically +// after the test. +func newBroadcasterForTests(tb testing.TB) EventBroadcaster { + _, ctx := ktesting.NewTestContext(tb) + caster := NewBroadcaster(WithSleepDuration(0), WithContext(ctx)) + tb.Cleanup(caster.Shutdown) + return caster +} + +func TestBroadcasterShutdown(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancelCause(ctx) + + // Start a broadcaster with background activity. + caster := NewBroadcaster(WithContext(ctx)) + caster.StartStructuredLogging(0) + + // Stop it. + cancel(stderrors.New("time to stop")) + + // Ensure that the broadcaster goroutine is not left running. + goleak.VerifyNone(t) +} + func TestNonRacyShutdown(t *testing.T) { // Attempt to simulate previously racy conditions, and ensure that no race // occurs: Nominally, calling "Eventf" *followed by* shutdown from the same // thread should be a safe operation, but it's not if we launch recorder.Action // in a goroutine. - caster := NewBroadcasterForTests(0) + caster := newBroadcasterForTests(t) clock := testclocks.NewFakeClock(time.Now()) recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, caster, clock) @@ -151,14 +182,15 @@ func TestEventf(t *testing.T) { t.Fatal(err) } table := []struct { - obj k8sruntime.Object - eventtype string - reason string - messageFmt string - elements []interface{} - expect *v1.Event - expectLog string - expectUpdate bool + obj k8sruntime.Object + eventtype string + reason string + messageFmt string + elements []interface{} + expect *v1.Event + expectLog string + expectStructuredLog string + expectUpdate bool }{ { obj: testRef, @@ -186,7 +218,9 @@ func TestEventf(t *testing.T) { Count: 1, Type: v1.EventTypeNormal, }, - expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`, + expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`, + expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[2]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1" +`, expectUpdate: false, }, { @@ -214,7 +248,9 @@ func TestEventf(t *testing.T) { Count: 1, Type: v1.EventTypeNormal, }, - expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`, + expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`, + expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="" kind="Pod" apiVersion="v1" type="Normal" reason="Killed" message="some other verbose message: 1" +`, expectUpdate: false, }, { @@ -243,7 +279,9 @@ func TestEventf(t *testing.T) { Count: 2, Type: v1.EventTypeNormal, }, - expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`, + expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`, + expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[2]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1" +`, expectUpdate: true, }, { @@ -272,7 +310,9 @@ func TestEventf(t *testing.T) { Count: 1, Type: v1.EventTypeNormal, }, - expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`, + expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`, + expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[3]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1" +`, expectUpdate: false, }, { @@ -301,7 +341,9 @@ func TestEventf(t *testing.T) { Count: 3, Type: v1.EventTypeNormal, }, - expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`, + expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`, + expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[2]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1" +`, expectUpdate: true, }, { @@ -330,7 +372,9 @@ func TestEventf(t *testing.T) { Count: 1, Type: v1.EventTypeNormal, }, - expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`, + expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`, + expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[3]" kind="Pod" apiVersion="v1" type="Normal" reason="Stopped" message="some verbose message: 1" +`, expectUpdate: false, }, { @@ -359,7 +403,9 @@ func TestEventf(t *testing.T) { Count: 2, Type: v1.EventTypeNormal, }, - expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`, + expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`, + expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[3]" kind="Pod" apiVersion="v1" type="Normal" reason="Stopped" message="some verbose message: 1" +`, expectUpdate: true, }, } @@ -377,23 +423,36 @@ func TestEventf(t *testing.T) { }, OnPatch: OnPatchFactory(testCache, patchEvent), } - eventBroadcaster := NewBroadcasterForTests(0) + logger := ktesting.NewLogger(t, ktesting.NewConfig(ktesting.BufferLogs(true))) + logSink := logger.GetSink().(ktesting.Underlier) + ctx := klog.NewContext(context.Background(), logger) + eventBroadcaster := NewBroadcaster(WithSleepDuration(0), WithContext(ctx)) + defer eventBroadcaster.Shutdown() sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) clock := testclocks.NewFakeClock(time.Now()) recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, eventBroadcaster, clock) for index, item := range table { clock.Step(1 * time.Second) + //nolint:logcheck // Intentionally testing StartLogging here. logWatcher := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) { if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a { t.Errorf("Expected '%v', got '%v'", e, a) } logCalled <- struct{}{} }) + oldEnd := len(logSink.GetBuffer().String()) + structuredLogWatcher := eventBroadcaster.StartStructuredLogging(0) recorder.Eventf(item.obj, item.eventtype, item.reason, item.messageFmt, item.elements...) <-logCalled + // We don't get notified by the structured test logger directly. + // Instead, we periodically check what new output it has produced. + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, item.expectStructuredLog, logSink.GetBuffer().String()[oldEnd:], "new structured log output") + }, time.Minute, time.Millisecond) + // validate event if item.expectUpdate { actualEvent := <-patchEvent @@ -403,6 +462,7 @@ func TestEventf(t *testing.T) { validateEvent(strconv.Itoa(index), actualEvent, item.expect, t) } logWatcher.Stop() + structuredLogWatcher.Stop() } sinkWatcher.Stop() } @@ -561,8 +621,9 @@ func TestLotsOfEvents(t *testing.T) { }, } - eventBroadcaster := NewBroadcasterForTests(0) + eventBroadcaster := newBroadcasterForTests(t) sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) + //nolint:logcheck // Intentionally using StartLogging here to get notified. logWatcher := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) { loggerCalled <- struct{}{} }) @@ -658,7 +719,7 @@ func TestEventfNoNamespace(t *testing.T) { }, OnPatch: OnPatchFactory(testCache, patchEvent), } - eventBroadcaster := NewBroadcasterForTests(0) + eventBroadcaster := newBroadcasterForTests(t) sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) clock := testclocks.NewFakeClock(time.Now()) @@ -953,7 +1014,7 @@ func TestMultiSinkCache(t *testing.T) { OnPatch: OnPatchFactory(testCache2, patchEvent2), } - eventBroadcaster := NewBroadcasterForTests(0) + eventBroadcaster := newBroadcasterForTests(t) clock := testclocks.NewFakeClock(time.Now()) recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, eventBroadcaster, clock) @@ -971,6 +1032,9 @@ func TestMultiSinkCache(t *testing.T) { validateEvent(strconv.Itoa(index), actualEvent, item.expect, t) } } + // Stop before creating more events, otherwise the On* callbacks above + // get stuck writing to the channel that we don't read from anymore. + sinkWatcher.Stop() // Another StartRecordingToSink call should start to record events with new clean cache. sinkWatcher2 := eventBroadcaster.StartRecordingToSink(&testEvents2) @@ -988,6 +1052,5 @@ func TestMultiSinkCache(t *testing.T) { } } - sinkWatcher.Stop() sinkWatcher2.Stop() } diff --git a/staging/src/k8s.io/client-go/tools/record/main_test.go b/staging/src/k8s.io/client-go/tools/record/main_test.go index 58f81f74910..04df262ceec 100644 --- a/staging/src/k8s.io/client-go/tools/record/main_test.go +++ b/staging/src/k8s.io/client-go/tools/record/main_test.go @@ -17,10 +17,11 @@ limitations under the License. package record import ( - "os" "testing" + + "go.uber.org/goleak" ) func TestMain(m *testing.M) { - os.Exit(m.Run()) + goleak.VerifyTestMain(m) }