diff --git a/go.mod b/go.mod index 11557bfb..e8590000 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/peterbourgon/diskv v2.0.1+incompatible github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.4 + go.uber.org/goleak v1.3.0 golang.org/x/net v0.25.0 golang.org/x/oauth2 v0.20.0 golang.org/x/term v0.20.0 diff --git a/go.sum b/go.sum index 585c0ad4..17248f85 100644 --- a/go.sum +++ b/go.sum @@ -96,6 +96,8 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/tools/record/event.go b/tools/record/event.go index 0745fb4a..0b3b14f8 100644 --- a/tools/record/event.go +++ b/tools/record/event.go @@ -203,8 +203,8 @@ func NewBroadcaster(opts ...BroadcasterOption) EventBroadcaster { // - The context was nil. // - The context was context.Background() to begin with. // - // Both cases get checked here. - haveCtxCancelation := ctx.Done() == nil + // Both cases get checked here: we have cancelation if (and only if) there is a channel. + haveCtxCancelation := ctx.Done() != nil eventBroadcaster.cancelationCtx, eventBroadcaster.cancel = context.WithCancel(ctx) diff --git a/tools/record/event_test.go b/tools/record/event_test.go index f1bdef78..a988a4af 100644 --- a/tools/record/event_test.go +++ b/tools/record/event_test.go @@ -19,6 +19,7 @@ package record import ( "context" "encoding/json" + stderrors "errors" "fmt" "net/http" "strconv" @@ -26,6 +27,9 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "go.uber.org/goleak" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -34,6 +38,8 @@ import ( "k8s.io/client-go/kubernetes/scheme" restclient "k8s.io/client-go/rest" ref "k8s.io/client-go/tools/reference" + "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" "k8s.io/utils/clock" testclocks "k8s.io/utils/clock/testing" ) @@ -104,13 +110,38 @@ func OnPatchFactory(testCache map[string]*v1.Event, patchEvent chan<- *v1.Event) } } +// newBroadcasterForTests creates a new broadcaster which produces per-test log +// output if StartStructuredLogging is used. Will be shut down automatically +// after the test. +func newBroadcasterForTests(tb testing.TB) EventBroadcaster { + _, ctx := ktesting.NewTestContext(tb) + caster := NewBroadcaster(WithSleepDuration(0), WithContext(ctx)) + tb.Cleanup(caster.Shutdown) + return caster +} + +func TestBroadcasterShutdown(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancelCause(ctx) + + // Start a broadcaster with background activity. + caster := NewBroadcaster(WithContext(ctx)) + caster.StartStructuredLogging(0) + + // Stop it. + cancel(stderrors.New("time to stop")) + + // Ensure that the broadcaster goroutine is not left running. + goleak.VerifyNone(t) +} + func TestNonRacyShutdown(t *testing.T) { // Attempt to simulate previously racy conditions, and ensure that no race // occurs: Nominally, calling "Eventf" *followed by* shutdown from the same // thread should be a safe operation, but it's not if we launch recorder.Action // in a goroutine. - caster := NewBroadcasterForTests(0) + caster := newBroadcasterForTests(t) clock := testclocks.NewFakeClock(time.Now()) recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, caster, clock) @@ -151,14 +182,15 @@ func TestEventf(t *testing.T) { t.Fatal(err) } table := []struct { - obj k8sruntime.Object - eventtype string - reason string - messageFmt string - elements []interface{} - expect *v1.Event - expectLog string - expectUpdate bool + obj k8sruntime.Object + eventtype string + reason string + messageFmt string + elements []interface{} + expect *v1.Event + expectLog string + expectStructuredLog string + expectUpdate bool }{ { obj: testRef, @@ -186,7 +218,9 @@ func TestEventf(t *testing.T) { Count: 1, Type: v1.EventTypeNormal, }, - expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`, + expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`, + expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[2]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1" +`, expectUpdate: false, }, { @@ -214,7 +248,9 @@ func TestEventf(t *testing.T) { Count: 1, Type: v1.EventTypeNormal, }, - expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`, + expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`, + expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="" kind="Pod" apiVersion="v1" type="Normal" reason="Killed" message="some other verbose message: 1" +`, expectUpdate: false, }, { @@ -243,7 +279,9 @@ func TestEventf(t *testing.T) { Count: 2, Type: v1.EventTypeNormal, }, - expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`, + expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`, + expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[2]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1" +`, expectUpdate: true, }, { @@ -272,7 +310,9 @@ func TestEventf(t *testing.T) { Count: 1, Type: v1.EventTypeNormal, }, - expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`, + expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`, + expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[3]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1" +`, expectUpdate: false, }, { @@ -301,7 +341,9 @@ func TestEventf(t *testing.T) { Count: 3, Type: v1.EventTypeNormal, }, - expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`, + expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`, + expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[2]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1" +`, expectUpdate: true, }, { @@ -330,7 +372,9 @@ func TestEventf(t *testing.T) { Count: 1, Type: v1.EventTypeNormal, }, - expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`, + expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`, + expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[3]" kind="Pod" apiVersion="v1" type="Normal" reason="Stopped" message="some verbose message: 1" +`, expectUpdate: false, }, { @@ -359,7 +403,9 @@ func TestEventf(t *testing.T) { Count: 2, Type: v1.EventTypeNormal, }, - expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`, + expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`, + expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[3]" kind="Pod" apiVersion="v1" type="Normal" reason="Stopped" message="some verbose message: 1" +`, expectUpdate: true, }, } @@ -377,23 +423,36 @@ func TestEventf(t *testing.T) { }, OnPatch: OnPatchFactory(testCache, patchEvent), } - eventBroadcaster := NewBroadcasterForTests(0) + logger := ktesting.NewLogger(t, ktesting.NewConfig(ktesting.BufferLogs(true))) + logSink := logger.GetSink().(ktesting.Underlier) + ctx := klog.NewContext(context.Background(), logger) + eventBroadcaster := NewBroadcaster(WithSleepDuration(0), WithContext(ctx)) + defer eventBroadcaster.Shutdown() sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) clock := testclocks.NewFakeClock(time.Now()) recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, eventBroadcaster, clock) for index, item := range table { clock.Step(1 * time.Second) + //nolint:logcheck // Intentionally testing StartLogging here. logWatcher := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) { if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a { t.Errorf("Expected '%v', got '%v'", e, a) } logCalled <- struct{}{} }) + oldEnd := len(logSink.GetBuffer().String()) + structuredLogWatcher := eventBroadcaster.StartStructuredLogging(0) recorder.Eventf(item.obj, item.eventtype, item.reason, item.messageFmt, item.elements...) <-logCalled + // We don't get notified by the structured test logger directly. + // Instead, we periodically check what new output it has produced. + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, item.expectStructuredLog, logSink.GetBuffer().String()[oldEnd:], "new structured log output") + }, time.Minute, time.Millisecond) + // validate event if item.expectUpdate { actualEvent := <-patchEvent @@ -403,6 +462,7 @@ func TestEventf(t *testing.T) { validateEvent(strconv.Itoa(index), actualEvent, item.expect, t) } logWatcher.Stop() + structuredLogWatcher.Stop() } sinkWatcher.Stop() } @@ -561,8 +621,9 @@ func TestLotsOfEvents(t *testing.T) { }, } - eventBroadcaster := NewBroadcasterForTests(0) + eventBroadcaster := newBroadcasterForTests(t) sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) + //nolint:logcheck // Intentionally using StartLogging here to get notified. logWatcher := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) { loggerCalled <- struct{}{} }) @@ -658,7 +719,7 @@ func TestEventfNoNamespace(t *testing.T) { }, OnPatch: OnPatchFactory(testCache, patchEvent), } - eventBroadcaster := NewBroadcasterForTests(0) + eventBroadcaster := newBroadcasterForTests(t) sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) clock := testclocks.NewFakeClock(time.Now()) @@ -953,7 +1014,7 @@ func TestMultiSinkCache(t *testing.T) { OnPatch: OnPatchFactory(testCache2, patchEvent2), } - eventBroadcaster := NewBroadcasterForTests(0) + eventBroadcaster := newBroadcasterForTests(t) clock := testclocks.NewFakeClock(time.Now()) recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, eventBroadcaster, clock) @@ -971,6 +1032,9 @@ func TestMultiSinkCache(t *testing.T) { validateEvent(strconv.Itoa(index), actualEvent, item.expect, t) } } + // Stop before creating more events, otherwise the On* callbacks above + // get stuck writing to the channel that we don't read from anymore. + sinkWatcher.Stop() // Another StartRecordingToSink call should start to record events with new clean cache. sinkWatcher2 := eventBroadcaster.StartRecordingToSink(&testEvents2) @@ -988,6 +1052,5 @@ func TestMultiSinkCache(t *testing.T) { } } - sinkWatcher.Stop() sinkWatcher2.Stop() } diff --git a/tools/record/main_test.go b/tools/record/main_test.go index 58f81f74..04df262c 100644 --- a/tools/record/main_test.go +++ b/tools/record/main_test.go @@ -17,10 +17,11 @@ limitations under the License. package record import ( - "os" "testing" + + "go.uber.org/goleak" ) func TestMain(m *testing.M) { - os.Exit(m.Run()) + goleak.VerifyTestMain(m) }