Merge pull request #124635 from pohly/event-broadcaster-shutdown-fix

client-go/tools/record: fix and test Broadcaster shutdown + logging

Kubernetes-commit: f4ea903712482c4a0021cc9c8d742673a213833e
This commit is contained in:
Kubernetes Publisher 2024-05-28 07:03:09 -07:00
commit b92b563c44
5 changed files with 92 additions and 25 deletions

1
go.mod
View File

@ -19,6 +19,7 @@ require (
github.com/peterbourgon/diskv v2.0.1+incompatible
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
go.uber.org/goleak v1.3.0
golang.org/x/net v0.25.0
golang.org/x/oauth2 v0.20.0
golang.org/x/term v0.20.0

2
go.sum
View File

@ -96,6 +96,8 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=

View File

@ -203,8 +203,8 @@ func NewBroadcaster(opts ...BroadcasterOption) EventBroadcaster {
// - The context was nil.
// - The context was context.Background() to begin with.
//
// Both cases get checked here.
haveCtxCancelation := ctx.Done() == nil
// Both cases get checked here: we have cancelation if (and only if) there is a channel.
haveCtxCancelation := ctx.Done() != nil
eventBroadcaster.cancelationCtx, eventBroadcaster.cancel = context.WithCancel(ctx)

View File

@ -19,6 +19,7 @@ package record
import (
"context"
"encoding/json"
stderrors "errors"
"fmt"
"net/http"
"strconv"
@ -26,6 +27,9 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/goleak"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -34,6 +38,8 @@ import (
"k8s.io/client-go/kubernetes/scheme"
restclient "k8s.io/client-go/rest"
ref "k8s.io/client-go/tools/reference"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
"k8s.io/utils/clock"
testclocks "k8s.io/utils/clock/testing"
)
@ -104,13 +110,38 @@ func OnPatchFactory(testCache map[string]*v1.Event, patchEvent chan<- *v1.Event)
}
}
// newBroadcasterForTests creates a new broadcaster which produces per-test log
// output if StartStructuredLogging is used. Will be shut down automatically
// after the test.
func newBroadcasterForTests(tb testing.TB) EventBroadcaster {
_, ctx := ktesting.NewTestContext(tb)
caster := NewBroadcaster(WithSleepDuration(0), WithContext(ctx))
tb.Cleanup(caster.Shutdown)
return caster
}
func TestBroadcasterShutdown(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
// Start a broadcaster with background activity.
caster := NewBroadcaster(WithContext(ctx))
caster.StartStructuredLogging(0)
// Stop it.
cancel(stderrors.New("time to stop"))
// Ensure that the broadcaster goroutine is not left running.
goleak.VerifyNone(t)
}
func TestNonRacyShutdown(t *testing.T) {
// Attempt to simulate previously racy conditions, and ensure that no race
// occurs: Nominally, calling "Eventf" *followed by* shutdown from the same
// thread should be a safe operation, but it's not if we launch recorder.Action
// in a goroutine.
caster := NewBroadcasterForTests(0)
caster := newBroadcasterForTests(t)
clock := testclocks.NewFakeClock(time.Now())
recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, caster, clock)
@ -151,14 +182,15 @@ func TestEventf(t *testing.T) {
t.Fatal(err)
}
table := []struct {
obj k8sruntime.Object
eventtype string
reason string
messageFmt string
elements []interface{}
expect *v1.Event
expectLog string
expectUpdate bool
obj k8sruntime.Object
eventtype string
reason string
messageFmt string
elements []interface{}
expect *v1.Event
expectLog string
expectStructuredLog string
expectUpdate bool
}{
{
obj: testRef,
@ -186,7 +218,9 @@ func TestEventf(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[2]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1"
`,
expectUpdate: false,
},
{
@ -214,7 +248,9 @@ func TestEventf(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`,
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="" kind="Pod" apiVersion="v1" type="Normal" reason="Killed" message="some other verbose message: 1"
`,
expectUpdate: false,
},
{
@ -243,7 +279,9 @@ func TestEventf(t *testing.T) {
Count: 2,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[2]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1"
`,
expectUpdate: true,
},
{
@ -272,7 +310,9 @@ func TestEventf(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[3]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1"
`,
expectUpdate: false,
},
{
@ -301,7 +341,9 @@ func TestEventf(t *testing.T) {
Count: 3,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[2]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1"
`,
expectUpdate: true,
},
{
@ -330,7 +372,9 @@ func TestEventf(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[3]" kind="Pod" apiVersion="v1" type="Normal" reason="Stopped" message="some verbose message: 1"
`,
expectUpdate: false,
},
{
@ -359,7 +403,9 @@ func TestEventf(t *testing.T) {
Count: 2,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[3]" kind="Pod" apiVersion="v1" type="Normal" reason="Stopped" message="some verbose message: 1"
`,
expectUpdate: true,
},
}
@ -377,23 +423,36 @@ func TestEventf(t *testing.T) {
},
OnPatch: OnPatchFactory(testCache, patchEvent),
}
eventBroadcaster := NewBroadcasterForTests(0)
logger := ktesting.NewLogger(t, ktesting.NewConfig(ktesting.BufferLogs(true)))
logSink := logger.GetSink().(ktesting.Underlier)
ctx := klog.NewContext(context.Background(), logger)
eventBroadcaster := NewBroadcaster(WithSleepDuration(0), WithContext(ctx))
defer eventBroadcaster.Shutdown()
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
clock := testclocks.NewFakeClock(time.Now())
recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
for index, item := range table {
clock.Step(1 * time.Second)
//nolint:logcheck // Intentionally testing StartLogging here.
logWatcher := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a {
t.Errorf("Expected '%v', got '%v'", e, a)
}
logCalled <- struct{}{}
})
oldEnd := len(logSink.GetBuffer().String())
structuredLogWatcher := eventBroadcaster.StartStructuredLogging(0)
recorder.Eventf(item.obj, item.eventtype, item.reason, item.messageFmt, item.elements...)
<-logCalled
// We don't get notified by the structured test logger directly.
// Instead, we periodically check what new output it has produced.
assert.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Equal(t, item.expectStructuredLog, logSink.GetBuffer().String()[oldEnd:], "new structured log output")
}, time.Minute, time.Millisecond)
// validate event
if item.expectUpdate {
actualEvent := <-patchEvent
@ -403,6 +462,7 @@ func TestEventf(t *testing.T) {
validateEvent(strconv.Itoa(index), actualEvent, item.expect, t)
}
logWatcher.Stop()
structuredLogWatcher.Stop()
}
sinkWatcher.Stop()
}
@ -561,8 +621,9 @@ func TestLotsOfEvents(t *testing.T) {
},
}
eventBroadcaster := NewBroadcasterForTests(0)
eventBroadcaster := newBroadcasterForTests(t)
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
//nolint:logcheck // Intentionally using StartLogging here to get notified.
logWatcher := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
loggerCalled <- struct{}{}
})
@ -658,7 +719,7 @@ func TestEventfNoNamespace(t *testing.T) {
},
OnPatch: OnPatchFactory(testCache, patchEvent),
}
eventBroadcaster := NewBroadcasterForTests(0)
eventBroadcaster := newBroadcasterForTests(t)
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
clock := testclocks.NewFakeClock(time.Now())
@ -953,7 +1014,7 @@ func TestMultiSinkCache(t *testing.T) {
OnPatch: OnPatchFactory(testCache2, patchEvent2),
}
eventBroadcaster := NewBroadcasterForTests(0)
eventBroadcaster := newBroadcasterForTests(t)
clock := testclocks.NewFakeClock(time.Now())
recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
@ -971,6 +1032,9 @@ func TestMultiSinkCache(t *testing.T) {
validateEvent(strconv.Itoa(index), actualEvent, item.expect, t)
}
}
// Stop before creating more events, otherwise the On* callbacks above
// get stuck writing to the channel that we don't read from anymore.
sinkWatcher.Stop()
// Another StartRecordingToSink call should start to record events with new clean cache.
sinkWatcher2 := eventBroadcaster.StartRecordingToSink(&testEvents2)
@ -988,6 +1052,5 @@ func TestMultiSinkCache(t *testing.T) {
}
}
sinkWatcher.Stop()
sinkWatcher2.Stop()
}

View File

@ -17,10 +17,11 @@ limitations under the License.
package record
import (
"os"
"testing"
"go.uber.org/goleak"
)
func TestMain(m *testing.M) {
os.Exit(m.Run())
goleak.VerifyTestMain(m)
}