Merge pull request #120729 from pohly/events-context

k8s.io/client-go/tools/[events|record]: support context

Kubernetes-commit: f936f69cf994146650c287a95fa436d1d7352835
This commit is contained in:
Kubernetes Publisher 2023-10-04 15:45:16 +02:00
commit 0a782d6adb
10 changed files with 352 additions and 136 deletions

View File

@ -81,27 +81,27 @@ type EventSinkImpl struct {
}
// Create takes the representation of a event and creates it. Returns the server's representation of the event, and an error, if there is any.
func (e *EventSinkImpl) Create(event *eventsv1.Event) (*eventsv1.Event, error) {
func (e *EventSinkImpl) Create(ctx context.Context, event *eventsv1.Event) (*eventsv1.Event, error) {
if event.Namespace == "" {
return nil, fmt.Errorf("can't create an event with empty namespace")
}
return e.Interface.Events(event.Namespace).Create(context.TODO(), event, metav1.CreateOptions{})
return e.Interface.Events(event.Namespace).Create(ctx, event, metav1.CreateOptions{})
}
// Update takes the representation of a event and updates it. Returns the server's representation of the event, and an error, if there is any.
func (e *EventSinkImpl) Update(event *eventsv1.Event) (*eventsv1.Event, error) {
func (e *EventSinkImpl) Update(ctx context.Context, event *eventsv1.Event) (*eventsv1.Event, error) {
if event.Namespace == "" {
return nil, fmt.Errorf("can't update an event with empty namespace")
}
return e.Interface.Events(event.Namespace).Update(context.TODO(), event, metav1.UpdateOptions{})
return e.Interface.Events(event.Namespace).Update(ctx, event, metav1.UpdateOptions{})
}
// Patch applies the patch and returns the patched event, and an error, if there is any.
func (e *EventSinkImpl) Patch(event *eventsv1.Event, data []byte) (*eventsv1.Event, error) {
func (e *EventSinkImpl) Patch(ctx context.Context, event *eventsv1.Event, data []byte) (*eventsv1.Event, error) {
if event.Namespace == "" {
return nil, fmt.Errorf("can't patch an event with empty namespace")
}
return e.Interface.Events(event.Namespace).Patch(context.TODO(), event.Name, types.StrategicMergePatchType, data, metav1.PatchOptions{})
return e.Interface.Events(event.Namespace).Patch(ctx, event.Name, types.StrategicMergePatchType, data, metav1.PatchOptions{})
}
// NewBroadcaster Creates a new event broadcaster.
@ -124,13 +124,13 @@ func (e *eventBroadcasterImpl) Shutdown() {
}
// refreshExistingEventSeries refresh events TTL
func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
func (e *eventBroadcasterImpl) refreshExistingEventSeries(ctx context.Context) {
// TODO: Investigate whether lock contention won't be a problem
e.mu.Lock()
defer e.mu.Unlock()
for isomorphicKey, event := range e.eventCache {
if event.Series != nil {
if recordedEvent, retry := recordEvent(e.sink, event); !retry {
if recordedEvent, retry := recordEvent(ctx, e.sink, event); !retry {
if recordedEvent != nil {
e.eventCache[isomorphicKey] = recordedEvent
}
@ -142,7 +142,7 @@ func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
// finishSeries checks if a series has ended and either:
// - write final count to the apiserver
// - delete a singleton event (i.e. series field is nil) from the cache
func (e *eventBroadcasterImpl) finishSeries() {
func (e *eventBroadcasterImpl) finishSeries(ctx context.Context) {
// TODO: Investigate whether lock contention won't be a problem
e.mu.Lock()
defer e.mu.Unlock()
@ -150,7 +150,7 @@ func (e *eventBroadcasterImpl) finishSeries() {
eventSerie := event.Series
if eventSerie != nil {
if eventSerie.LastObservedTime.Time.Before(time.Now().Add(-finishTime)) {
if _, retry := recordEvent(e.sink, event); !retry {
if _, retry := recordEvent(ctx, e.sink, event); !retry {
delete(e.eventCache, isomorphicKey)
}
}
@ -161,13 +161,13 @@ func (e *eventBroadcasterImpl) finishSeries() {
}
// NewRecorder returns an EventRecorder that records events with the given event source.
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder {
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorderLogger {
hostname, _ := os.Hostname()
reportingInstance := reportingController + "-" + hostname
return &recorderImpl{scheme, reportingController, reportingInstance, e.Broadcaster, clock.RealClock{}}
return &recorderImplLogger{recorderImpl: &recorderImpl{scheme, reportingController, reportingInstance, e.Broadcaster, clock.RealClock{}}, logger: klog.Background()}
}
func (e *eventBroadcasterImpl) recordToSink(event *eventsv1.Event, clock clock.Clock) {
func (e *eventBroadcasterImpl) recordToSink(ctx context.Context, event *eventsv1.Event, clock clock.Clock) {
// Make a copy before modification, because there could be multiple listeners.
eventCopy := event.DeepCopy()
go func() {
@ -197,7 +197,7 @@ func (e *eventBroadcasterImpl) recordToSink(event *eventsv1.Event, clock clock.C
}()
if evToRecord != nil {
// TODO: Add a metric counting the number of recording attempts
e.attemptRecording(evToRecord)
e.attemptRecording(ctx, evToRecord)
// We don't want the new recorded Event to be reflected in the
// client's cache because server-side mutations could mess with the
// aggregation mechanism used by the client.
@ -205,40 +205,45 @@ func (e *eventBroadcasterImpl) recordToSink(event *eventsv1.Event, clock clock.C
}()
}
func (e *eventBroadcasterImpl) attemptRecording(event *eventsv1.Event) *eventsv1.Event {
func (e *eventBroadcasterImpl) attemptRecording(ctx context.Context, event *eventsv1.Event) {
tries := 0
for {
if recordedEvent, retry := recordEvent(e.sink, event); !retry {
return recordedEvent
if _, retry := recordEvent(ctx, e.sink, event); !retry {
return
}
tries++
if tries >= maxTriesPerEvent {
klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
return nil
klog.FromContext(ctx).Error(nil, "Unable to write event (retry limit exceeded!)", "event", event)
return
}
// Randomize sleep so that various clients won't all be
// synced up if the master goes down.
time.Sleep(wait.Jitter(e.sleepDuration, 0.25))
// synced up if the master goes down. Give up when
// the context is canceled.
select {
case <-ctx.Done():
return
case <-time.After(wait.Jitter(e.sleepDuration, 0.25)):
}
}
}
func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool) {
func recordEvent(ctx context.Context, sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool) {
var newEvent *eventsv1.Event
var err error
isEventSeries := event.Series != nil
if isEventSeries {
patch, patchBytesErr := createPatchBytesForSeries(event)
if patchBytesErr != nil {
klog.Errorf("Unable to calculate diff, no merge is possible: %v", patchBytesErr)
klog.FromContext(ctx).Error(patchBytesErr, "Unable to calculate diff, no merge is possible")
return nil, false
}
newEvent, err = sink.Patch(event, patch)
newEvent, err = sink.Patch(ctx, event, patch)
}
// Update can fail because the event may have been removed and it no longer exists.
if !isEventSeries || (isEventSeries && util.IsKeyNotFoundError(err)) {
// Making sure that ResourceVersion is empty on creation
event.ResourceVersion = ""
newEvent, err = sink.Create(event)
newEvent, err = sink.Create(ctx, event)
}
if err == nil {
return newEvent, false
@ -248,7 +253,7 @@ func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool)
switch err.(type) {
case *restclient.RequestConstructionError:
// We will construct the request the same next time, so don't keep trying.
klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
klog.FromContext(ctx).Error(err, "Unable to construct event (will not retry!)", "event", event)
return nil, false
case *errors.StatusError:
if errors.IsAlreadyExists(err) {
@ -260,9 +265,9 @@ func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool)
if isEventSeries {
return nil, true
}
klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
klog.FromContext(ctx).V(5).Info("Server rejected event (will not retry!)", "event", event, "err", err)
} else {
klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
klog.FromContext(ctx).Error(err, "Server rejected event (will not retry!)", "event", event)
}
return nil, false
case *errors.UnexpectedObjectError:
@ -271,7 +276,7 @@ func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool)
default:
// This case includes actual http transport errors. Go ahead and retry.
}
klog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
klog.FromContext(ctx).Error(err, "Unable to write event (may retry after sleeping)")
return nil, true
}
@ -307,29 +312,38 @@ func getKey(event *eventsv1.Event) eventKey {
// StartStructuredLogging starts sending events received from this EventBroadcaster to the structured logging function.
// The return value can be ignored or used to stop recording, if desired.
// TODO: this function should also return an error.
//
// Deprecated: use StartLogging instead.
func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) func() {
stopWatcher, err := e.StartEventWatcher(
func(obj runtime.Object) {
event, ok := obj.(*eventsv1.Event)
if !ok {
klog.Errorf("unexpected type, expected eventsv1.Event")
return
}
klog.V(verbosity).InfoS("Event occurred", "object", klog.KRef(event.Regarding.Namespace, event.Regarding.Name), "kind", event.Regarding.Kind, "apiVersion", event.Regarding.APIVersion, "type", event.Type, "reason", event.Reason, "action", event.Action, "note", event.Note)
})
logger := klog.Background().V(int(verbosity))
stopWatcher, err := e.StartLogging(logger)
if err != nil {
klog.Errorf("failed to start event watcher: '%v'", err)
logger.Error(err, "Failed to start event watcher")
return func() {}
}
return stopWatcher
}
// StartLogging starts sending events received from this EventBroadcaster to the structured logger.
// To adjust verbosity, use the logger's V method (i.e. pass `logger.V(3)` instead of `logger`).
// The returned function can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartLogging(logger klog.Logger) (func(), error) {
return e.StartEventWatcher(
func(obj runtime.Object) {
event, ok := obj.(*eventsv1.Event)
if !ok {
logger.Error(nil, "unexpected type, expected eventsv1.Event")
return
}
logger.Info("Event occurred", "object", klog.KRef(event.Regarding.Namespace, event.Regarding.Name), "kind", event.Regarding.Kind, "apiVersion", event.Regarding.APIVersion, "type", event.Type, "reason", event.Reason, "action", event.Action, "note", event.Note)
})
}
// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value is used to stop recording
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime.Object)) (func(), error) {
watcher, err := e.Watch()
if err != nil {
klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err)
return nil, err
}
go func() {
@ -345,37 +359,42 @@ func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime
return watcher.Stop, nil
}
func (e *eventBroadcasterImpl) startRecordingEvents(stopCh <-chan struct{}) error {
func (e *eventBroadcasterImpl) startRecordingEvents(ctx context.Context) error {
eventHandler := func(obj runtime.Object) {
event, ok := obj.(*eventsv1.Event)
if !ok {
klog.Errorf("unexpected type, expected eventsv1.Event")
klog.FromContext(ctx).Error(nil, "unexpected type, expected eventsv1.Event")
return
}
e.recordToSink(event, clock.RealClock{})
e.recordToSink(ctx, event, clock.RealClock{})
}
stopWatcher, err := e.StartEventWatcher(eventHandler)
if err != nil {
return err
}
go func() {
<-stopCh
<-ctx.Done()
stopWatcher()
}()
return nil
}
// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
// Deprecated: use StartRecordingToSinkWithContext instead.
func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) {
go wait.Until(e.refreshExistingEventSeries, refreshTime, stopCh)
go wait.Until(e.finishSeries, finishTime, stopCh)
err := e.startRecordingEvents(stopCh)
err := e.StartRecordingToSinkWithContext(wait.ContextForChannel(stopCh))
if err != nil {
klog.Errorf("unexpected type, expected eventsv1.Event")
return
klog.Background().Error(err, "Failed to start recording to sink")
}
}
// StartRecordingToSinkWithContext starts sending events received from the specified eventBroadcaster to the given sink.
func (e *eventBroadcasterImpl) StartRecordingToSinkWithContext(ctx context.Context) error {
go wait.UntilWithContext(ctx, e.refreshExistingEventSeries, refreshTime)
go wait.UntilWithContext(ctx, e.finishSeries, finishTime)
return e.startRecordingEvents(ctx)
}
type eventBroadcasterAdapterImpl struct {
coreClient typedv1core.EventsGetter
coreBroadcaster record.EventBroadcaster
@ -409,14 +428,14 @@ func (e *eventBroadcasterAdapterImpl) StartRecordingToSink(stopCh <-chan struct{
}
}
func (e *eventBroadcasterAdapterImpl) NewRecorder(name string) EventRecorder {
func (e *eventBroadcasterAdapterImpl) NewRecorder(name string) EventRecorderLogger {
if e.eventsv1Broadcaster != nil && e.eventsv1Client != nil {
return e.eventsv1Broadcaster.NewRecorder(scheme.Scheme, name)
}
return record.NewEventRecorderAdapter(e.DeprecatedNewLegacyRecorder(name))
}
func (e *eventBroadcasterAdapterImpl) DeprecatedNewLegacyRecorder(name string) record.EventRecorder {
func (e *eventBroadcasterAdapterImpl) DeprecatedNewLegacyRecorder(name string) record.EventRecorderLogger {
return e.coreBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: name})
}

View File

@ -25,6 +25,7 @@ import (
eventsv1 "k8s.io/api/events/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/klog/v2/ktesting"
)
func TestRecordEventToSink(t *testing.T) {
@ -78,11 +79,12 @@ func TestRecordEventToSink(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
kubeClient := fake.NewSimpleClientset()
eventSink := &EventSinkImpl{Interface: kubeClient.EventsV1()}
for _, ev := range tc.eventsToRecord {
recordEvent(eventSink, &ev)
recordEvent(ctx, eventSink, &ev)
}
recordedEvents, err := kubeClient.EventsV1().Events(metav1.NamespaceDefault).List(context.TODO(), metav1.ListOptions{})

View File

@ -40,12 +40,33 @@ type recorderImpl struct {
clock clock.Clock
}
var _ EventRecorder = &recorderImpl{}
func (recorder *recorderImpl) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
recorder.eventf(klog.Background(), regarding, related, eventtype, reason, action, note, args...)
}
type recorderImplLogger struct {
*recorderImpl
logger klog.Logger
}
var _ EventRecorderLogger = &recorderImplLogger{}
func (recorder *recorderImplLogger) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
recorder.eventf(recorder.logger, regarding, related, eventtype, reason, action, note, args...)
}
func (recorder *recorderImplLogger) WithLogger(logger klog.Logger) EventRecorderLogger {
return &recorderImplLogger{recorderImpl: recorder.recorderImpl, logger: logger}
}
func (recorder *recorderImpl) eventf(logger klog.Logger, regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
timestamp := metav1.MicroTime{Time: time.Now()}
message := fmt.Sprintf(note, args...)
refRegarding, err := reference.GetReference(recorder.scheme, regarding)
if err != nil {
klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", regarding, err, eventtype, reason, message)
logger.Error(err, "Could not construct reference, will not report event", "object", regarding, "eventType", eventtype, "reason", reason, "message", message)
return
}
@ -53,11 +74,11 @@ func (recorder *recorderImpl) Eventf(regarding runtime.Object, related runtime.O
if related != nil {
refRelated, err = reference.GetReference(recorder.scheme, related)
if err != nil {
klog.V(9).Infof("Could not construct reference to: '%#v' due to: '%v'.", related, err)
logger.V(9).Info("Could not construct reference", "object", related, "err", err)
}
}
if !util.ValidateEventType(eventtype) {
klog.Errorf("Unsupported event type: '%v'", eventtype)
logger.Error(nil, "Unsupported event type", "eventType", eventtype)
return
}
event := recorder.makeEvent(refRegarding, refRelated, timestamp, eventtype, reason, message, recorder.reportingController, recorder.reportingInstance, action)

View File

@ -34,6 +34,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
restclient "k8s.io/client-go/rest"
ref "k8s.io/client-go/tools/reference"
"k8s.io/klog/v2/ktesting"
)
type testEventSeriesSink struct {
@ -43,7 +44,7 @@ type testEventSeriesSink struct {
}
// Create records the event for testing.
func (t *testEventSeriesSink) Create(e *eventsv1.Event) (*eventsv1.Event, error) {
func (t *testEventSeriesSink) Create(ctx context.Context, e *eventsv1.Event) (*eventsv1.Event, error) {
if t.OnCreate != nil {
return t.OnCreate(e)
}
@ -51,7 +52,7 @@ func (t *testEventSeriesSink) Create(e *eventsv1.Event) (*eventsv1.Event, error)
}
// Update records the event for testing.
func (t *testEventSeriesSink) Update(e *eventsv1.Event) (*eventsv1.Event, error) {
func (t *testEventSeriesSink) Update(ctx context.Context, e *eventsv1.Event) (*eventsv1.Event, error) {
if t.OnUpdate != nil {
return t.OnUpdate(e)
}
@ -59,7 +60,7 @@ func (t *testEventSeriesSink) Update(e *eventsv1.Event) (*eventsv1.Event, error)
}
// Patch records the event for testing.
func (t *testEventSeriesSink) Patch(e *eventsv1.Event, p []byte) (*eventsv1.Event, error) {
func (t *testEventSeriesSink) Patch(ctx context.Context, e *eventsv1.Event, p []byte) (*eventsv1.Event, error) {
if t.OnPatch != nil {
return t.OnPatch(e, p)
}
@ -135,7 +136,9 @@ func TestEventSeriesf(t *testing.T) {
},
}
stopCh := make(chan struct{})
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
createEvent := make(chan *eventsv1.Event)
updateEvent := make(chan *eventsv1.Event)
@ -163,7 +166,7 @@ func TestEventSeriesf(t *testing.T) {
// Don't call StartRecordingToSink, as we don't need neither refreshing event
// series nor finishing them in this tests and additional events updated would
// race with our expected ones.
err = broadcaster.startRecordingEvents(stopCh)
err = broadcaster.startRecordingEvents(ctx)
if err != nil {
t.Fatal(err)
}
@ -184,7 +187,6 @@ func TestEventSeriesf(t *testing.T) {
validateEvent(strconv.Itoa(index), false, actualEvent, item.expect, t)
}
}
close(stopCh)
}
// TestEventSeriesWithEventSinkImplRace verifies that when Events are emitted to
@ -256,6 +258,7 @@ func validateEvent(messagePrefix string, expectedUpdate bool, actualEvent *event
}
func TestFinishSeries(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
hostname, _ := os.Hostname()
testPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
@ -295,7 +298,7 @@ func TestFinishSeries(t *testing.T) {
}
cache := map[eventKey]*eventsv1.Event{}
eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl)
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl)
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImplLogger)
cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{Time: time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started")
nonFinishedEvent := cachedEvent.DeepCopy()
nonFinishedEvent.ReportingController = "nonFinished-controller"
@ -305,7 +308,7 @@ func TestFinishSeries(t *testing.T) {
}
cache[getKey(cachedEvent)] = cachedEvent
cache[getKey(nonFinishedEvent)] = nonFinishedEvent
eventBroadcaster.finishSeries()
eventBroadcaster.finishSeries(ctx)
select {
case actualEvent := <-patchEvent:
t.Logf("validating event affected by patch request")
@ -327,6 +330,7 @@ func TestFinishSeries(t *testing.T) {
}
func TestRefreshExistingEventSeries(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
hostname, _ := os.Hostname()
testPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
@ -381,7 +385,7 @@ func TestRefreshExistingEventSeries(t *testing.T) {
}
cache := map[eventKey]*eventsv1.Event{}
eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl)
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl)
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImplLogger)
cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{Time: time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started")
cachedEvent.Series = &eventsv1.EventSeries{
Count: 10,
@ -390,7 +394,7 @@ func TestRefreshExistingEventSeries(t *testing.T) {
cacheKey := getKey(cachedEvent)
cache[cacheKey] = cachedEvent
eventBroadcaster.refreshExistingEventSeries()
eventBroadcaster.refreshExistingEventSeries(ctx)
select {
case <-patchEvent:
t.Logf("validating event affected by patch request")

View File

@ -20,6 +20,7 @@ import (
"fmt"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
)
// FakeRecorder is used as a fake during tests. It is thread safe. It is usable
@ -29,6 +30,8 @@ type FakeRecorder struct {
Events chan string
}
var _ EventRecorderLogger = &FakeRecorder{}
// Eventf emits an event
func (f *FakeRecorder) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
if f.Events != nil {
@ -36,6 +39,10 @@ func (f *FakeRecorder) Eventf(regarding runtime.Object, related runtime.Object,
}
}
func (f *FakeRecorder) WithLogger(logger klog.Logger) EventRecorderLogger {
return f
}
// NewFakeRecorder creates new fake event recorder with event channel with
// buffer of given size.
func NewFakeRecorder(bufferSize int) *FakeRecorder {

View File

@ -17,39 +17,30 @@ limitations under the License.
package events
import (
"context"
eventsv1 "k8s.io/api/events/v1"
"k8s.io/apimachinery/pkg/runtime"
internalevents "k8s.io/client-go/tools/internal/events"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
)
// EventRecorder knows how to record events on behalf of an EventSource.
type EventRecorder interface {
// Eventf constructs an event from the given information and puts it in the queue for sending.
// 'regarding' is the object this event is about. Event will make a reference-- or you may also
// pass a reference to the object directly.
// 'related' is the secondary object for more complex actions. E.g. when regarding object triggers
// a creation or deletion of related object.
// 'type' of this event, and can be one of Normal, Warning. New types could be added in future
// 'reason' is the reason this event is generated. 'reason' should be short and unique; it
// should be in UpperCamelCase format (starting with a capital letter). "reason" will be used
// to automate handling of events, so imagine people writing switch statements to handle them.
// You want to make that easy.
// 'action' explains what happened with regarding/what action did the ReportingController
// (ReportingController is a type of a Controller reporting an Event, e.g. k8s.io/node-controller, k8s.io/kubelet.)
// take in regarding's name; it should be in UpperCamelCase format (starting with a capital letter).
// 'note' is intended to be human readable.
Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{})
}
type EventRecorder = internalevents.EventRecorder
type EventRecorderLogger = internalevents.EventRecorderLogger
// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
type EventBroadcaster interface {
// StartRecordingToSink starts sending events received from the specified eventBroadcaster.
// Deprecated: use StartRecordingToSinkWithContext instead.
StartRecordingToSink(stopCh <-chan struct{})
// StartRecordingToSink starts sending events received from the specified eventBroadcaster.
StartRecordingToSinkWithContext(ctx context.Context) error
// NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster
// with the event source set to the given event source.
NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder
NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorderLogger
// StartEventWatcher enables you to watch for emitted events without usage
// of StartRecordingToSink. This lets you also process events in a custom way (e.g. in tests).
@ -59,8 +50,14 @@ type EventBroadcaster interface {
// StartStructuredLogging starts sending events received from this EventBroadcaster to the structured
// logging function. The return value can be ignored or used to stop recording, if desired.
// Deprecated: use StartLogging instead.
StartStructuredLogging(verbosity klog.Level) func()
// StartLogging starts sending events received from this EventBroadcaster to the structured logger.
// To adjust verbosity, use the logger's V method (i.e. pass `logger.V(3)` instead of `logger`).
// The returned function can be ignored or used to stop recording, if desired.
StartLogging(logger klog.Logger) (func(), error)
// Shutdown shuts down the broadcaster
Shutdown()
}
@ -70,9 +67,9 @@ type EventBroadcaster interface {
// It is assumed that EventSink will return the same sorts of errors as
// client-go's REST client.
type EventSink interface {
Create(event *eventsv1.Event) (*eventsv1.Event, error)
Update(event *eventsv1.Event) (*eventsv1.Event, error)
Patch(oldEvent *eventsv1.Event, data []byte) (*eventsv1.Event, error)
Create(ctx context.Context, event *eventsv1.Event) (*eventsv1.Event, error)
Update(ctx context.Context, event *eventsv1.Event) (*eventsv1.Event, error)
Patch(ctx context.Context, oldEvent *eventsv1.Event, data []byte) (*eventsv1.Event, error)
}
// EventBroadcasterAdapter is a auxiliary interface to simplify migration to
@ -85,10 +82,10 @@ type EventBroadcasterAdapter interface {
StartRecordingToSink(stopCh <-chan struct{})
// NewRecorder creates a new Event Recorder with specified name.
NewRecorder(name string) EventRecorder
NewRecorder(name string) EventRecorderLogger
// DeprecatedNewLegacyRecorder creates a legacy Event Recorder with specific name.
DeprecatedNewLegacyRecorder(name string) record.EventRecorder
DeprecatedNewLegacyRecorder(name string) record.EventRecorderLogger
// Shutdown shuts down the broadcaster.
Shutdown()

View File

@ -0,0 +1,59 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package internal is needed to break an import cycle: record.EventRecorderAdapter
// needs this interface definition to implement it, but event.NewEventBroadcasterAdapter
// needs record.NewBroadcaster. Therefore this interface cannot be in event/interfaces.go.
package internal
import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
)
// EventRecorder knows how to record events on behalf of an EventSource.
type EventRecorder interface {
// Eventf constructs an event from the given information and puts it in the queue for sending.
// 'regarding' is the object this event is about. Event will make a reference-- or you may also
// pass a reference to the object directly.
// 'related' is the secondary object for more complex actions. E.g. when regarding object triggers
// a creation or deletion of related object.
// 'type' of this event, and can be one of Normal, Warning. New types could be added in future
// 'reason' is the reason this event is generated. 'reason' should be short and unique; it
// should be in UpperCamelCase format (starting with a capital letter). "reason" will be used
// to automate handling of events, so imagine people writing switch statements to handle them.
// You want to make that easy.
// 'action' explains what happened with regarding/what action did the ReportingController
// (ReportingController is a type of a Controller reporting an Event, e.g. k8s.io/node-controller, k8s.io/kubelet.)
// take in regarding's name; it should be in UpperCamelCase format (starting with a capital letter).
// 'note' is intended to be human readable.
Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{})
}
// EventRecorderLogger extends EventRecorder such that a logger can
// be set for methods in EventRecorder. Normally, those methods
// uses the global default logger to record errors and debug messages.
// If that is not desired, use WithLogger to provide a logger instance.
type EventRecorderLogger interface {
EventRecorder
// WithLogger replaces the context used for logging. This is a cheap call
// and meant to be used for contextual logging:
// recorder := ...
// logger := klog.FromContext(ctx)
// recorder.WithLogger(logger).Eventf(...)
WithLogger(logger klog.Logger) EventRecorderLogger
}

View File

@ -29,6 +29,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
restclient "k8s.io/client-go/rest"
internalevents "k8s.io/client-go/tools/internal/events"
"k8s.io/client-go/tools/record/util"
ref "k8s.io/client-go/tools/reference"
"k8s.io/klog/v2"
@ -110,6 +111,21 @@ type EventRecorder interface {
AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}
// EventRecorderLogger extends EventRecorder such that a logger can
// be set for methods in EventRecorder. Normally, those methods
// uses the global default logger to record errors and debug messages.
// If that is not desired, use WithLogger to provide a logger instance.
type EventRecorderLogger interface {
EventRecorder
// WithLogger replaces the context used for logging. This is a cheap call
// and meant to be used for contextual logging:
// recorder := ...
// logger := klog.FromContext(ctx)
// recorder.WithLogger(logger).Eventf(...)
WithLogger(logger klog.Logger) EventRecorderLogger
}
// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
type EventBroadcaster interface {
// StartEventWatcher starts sending events received from this EventBroadcaster to the given
@ -131,7 +147,7 @@ type EventBroadcaster interface {
// NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster
// with the event source set to the given event source.
NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder
NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorderLogger
// Shutdown shuts down the broadcaster. Once the broadcaster is shut
// down, it will only try to record an event in a sink once before
@ -142,12 +158,14 @@ type EventBroadcaster interface {
// EventRecorderAdapter is a wrapper around a "k8s.io/client-go/tools/record".EventRecorder
// implementing the new "k8s.io/client-go/tools/events".EventRecorder interface.
type EventRecorderAdapter struct {
recorder EventRecorder
recorder EventRecorderLogger
}
var _ internalevents.EventRecorder = &EventRecorderAdapter{}
// NewEventRecorderAdapter returns an adapter implementing the new
// "k8s.io/client-go/tools/events".EventRecorder interface.
func NewEventRecorderAdapter(recorder EventRecorder) *EventRecorderAdapter {
func NewEventRecorderAdapter(recorder EventRecorderLogger) *EventRecorderAdapter {
return &EventRecorderAdapter{
recorder: recorder,
}
@ -158,28 +176,76 @@ func (a *EventRecorderAdapter) Eventf(regarding, _ runtime.Object, eventtype, re
a.recorder.Eventf(regarding, eventtype, reason, note, args...)
}
func (a *EventRecorderAdapter) WithLogger(logger klog.Logger) internalevents.EventRecorderLogger {
return &EventRecorderAdapter{
recorder: a.recorder.WithLogger(logger),
}
}
// Creates a new event broadcaster.
func NewBroadcaster() EventBroadcaster {
return newEventBroadcaster(watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration)
func NewBroadcaster(opts ...BroadcasterOption) EventBroadcaster {
c := config{
sleepDuration: defaultSleepDuration,
}
for _, opt := range opts {
opt(&c)
}
eventBroadcaster := &eventBroadcasterImpl{
Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
sleepDuration: c.sleepDuration,
options: c.CorrelatorOptions,
}
ctx := c.Context
if ctx == nil {
ctx = context.Background()
} else {
// Calling Shutdown is not required when a context was provided:
// when the context is canceled, this goroutine will shut down
// the broadcaster.
go func() {
<-ctx.Done()
eventBroadcaster.Broadcaster.Shutdown()
}()
}
eventBroadcaster.cancelationCtx, eventBroadcaster.cancel = context.WithCancel(ctx)
return eventBroadcaster
}
func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster {
return newEventBroadcaster(watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration)
return NewBroadcaster(WithSleepDuration(sleepDuration))
}
func NewBroadcasterWithCorrelatorOptions(options CorrelatorOptions) EventBroadcaster {
eventBroadcaster := newEventBroadcaster(watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration)
eventBroadcaster.options = options
return eventBroadcaster
return NewBroadcaster(WithCorrelatorOptions(options))
}
func newEventBroadcaster(broadcaster *watch.Broadcaster, sleepDuration time.Duration) *eventBroadcasterImpl {
eventBroadcaster := &eventBroadcasterImpl{
Broadcaster: broadcaster,
sleepDuration: sleepDuration,
func WithCorrelatorOptions(options CorrelatorOptions) BroadcasterOption {
return func(c *config) {
c.CorrelatorOptions = options
}
eventBroadcaster.cancelationCtx, eventBroadcaster.cancel = context.WithCancel(context.Background())
return eventBroadcaster
}
// WithContext sets a context for the broadcaster. Canceling the context will
// shut down the broadcaster, Shutdown doesn't need to be called. The context
// can also be used to provide a logger.
func WithContext(ctx context.Context) BroadcasterOption {
return func(c *config) {
c.Context = ctx
}
}
func WithSleepDuration(sleepDuration time.Duration) BroadcasterOption {
return func(c *config) {
c.sleepDuration = sleepDuration
}
}
type BroadcasterOption func(*config)
type config struct {
CorrelatorOptions
context.Context
sleepDuration time.Duration
}
type eventBroadcasterImpl struct {
@ -220,12 +286,12 @@ func (e *eventBroadcasterImpl) recordToSink(sink EventSink, event *v1.Event, eve
}
tries := 0
for {
if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
if recordEvent(e.cancelationCtx, sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
break
}
tries++
if tries >= maxTriesPerEvent {
klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
klog.FromContext(e.cancelationCtx).Error(nil, "Unable to write event (retry limit exceeded!)", "event", event)
break
}
@ -237,7 +303,7 @@ func (e *eventBroadcasterImpl) recordToSink(sink EventSink, event *v1.Event, eve
}
select {
case <-e.cancelationCtx.Done():
klog.Errorf("Unable to write event '%#v' (broadcaster is shut down)", event)
klog.FromContext(e.cancelationCtx).Error(nil, "Unable to write event (broadcaster is shut down)", "event", event)
return
case <-time.After(delay):
}
@ -248,7 +314,7 @@ func (e *eventBroadcasterImpl) recordToSink(sink EventSink, event *v1.Event, eve
// was successfully recorded or discarded, false if it should be retried.
// If updateExistingEvent is false, it creates a new event, otherwise it updates
// existing event.
func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
func recordEvent(ctx context.Context, sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
var newEvent *v1.Event
var err error
if updateExistingEvent {
@ -271,13 +337,13 @@ func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEv
switch err.(type) {
case *restclient.RequestConstructionError:
// We will construct the request the same next time, so don't keep trying.
klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
klog.FromContext(ctx).Error(err, "Unable to construct event (will not retry!)", "event", event)
return true
case *errors.StatusError:
if errors.IsAlreadyExists(err) || errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
klog.FromContext(ctx).V(5).Info("Server rejected event (will not retry!)", "event", event, "err", err)
} else {
klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
klog.FromContext(ctx).Error(err, "Server rejected event (will not retry!)", "event", event)
}
return true
case *errors.UnexpectedObjectError:
@ -286,7 +352,7 @@ func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEv
default:
// This case includes actual http transport errors. Go ahead and retry.
}
klog.Errorf("Unable to write event: '%#v': '%v'(may retry after sleeping)", event, err)
klog.FromContext(ctx).Error(err, "Unable to write event (may retry after sleeping)", "event", event)
return false
}
@ -299,12 +365,15 @@ func (e *eventBroadcasterImpl) StartLogging(logf func(format string, args ...int
})
}
// StartStructuredLogging starts sending events received from this EventBroadcaster to the structured logging function.
// StartStructuredLogging starts sending events received from this EventBroadcaster to a structured logger.
// The logger is retrieved from a context if the broadcaster was constructed with a context, otherwise
// the global default is used.
// The return value can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) watch.Interface {
loggerV := klog.FromContext(e.cancelationCtx).V(int(verbosity))
return e.StartEventWatcher(
func(e *v1.Event) {
klog.V(verbosity).InfoS("Event occurred", "object", klog.KRef(e.InvolvedObject.Namespace, e.InvolvedObject.Name), "fieldPath", e.InvolvedObject.FieldPath, "kind", e.InvolvedObject.Kind, "apiVersion", e.InvolvedObject.APIVersion, "type", e.Type, "reason", e.Reason, "message", e.Message)
loggerV.Info("Event occurred", "object", klog.KRef(e.InvolvedObject.Namespace, e.InvolvedObject.Name), "fieldPath", e.InvolvedObject.FieldPath, "kind", e.InvolvedObject.Kind, "apiVersion", e.InvolvedObject.APIVersion, "type", e.Type, "reason", e.Reason, "message", e.Message)
})
}
@ -313,26 +382,32 @@ func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) watc
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
watcher, err := e.Watch()
if err != nil {
klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err)
klog.FromContext(e.cancelationCtx).Error(err, "Unable start event watcher (will not retry!)")
}
go func() {
defer utilruntime.HandleCrash()
for watchEvent := range watcher.ResultChan() {
event, ok := watchEvent.Object.(*v1.Event)
if !ok {
// This is all local, so there's no reason this should
// ever happen.
continue
for {
select {
case <-e.cancelationCtx.Done():
watcher.Stop()
return
case watchEvent := <-watcher.ResultChan():
event, ok := watchEvent.Object.(*v1.Event)
if !ok {
// This is all local, so there's no reason this should
// ever happen.
continue
}
eventHandler(event)
}
eventHandler(event)
}
}()
return watcher
}
// NewRecorder returns an EventRecorder that records events with the given event source.
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder {
return &recorderImpl{scheme, source, e.Broadcaster, clock.RealClock{}}
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorderLogger {
return &recorderImplLogger{recorderImpl: &recorderImpl{scheme, source, e.Broadcaster, clock.RealClock{}}, logger: klog.Background()}
}
type recorderImpl struct {
@ -342,15 +417,17 @@ type recorderImpl struct {
clock clock.PassiveClock
}
func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, eventtype, reason, message string) {
var _ EventRecorder = &recorderImpl{}
func (recorder *recorderImpl) generateEvent(logger klog.Logger, object runtime.Object, annotations map[string]string, eventtype, reason, message string) {
ref, err := ref.GetReference(recorder.scheme, object)
if err != nil {
klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
logger.Error(err, "Could not construct reference, will not report event", "object", object, "eventType", eventtype, "reason", reason, "message", message)
return
}
if !util.ValidateEventType(eventtype) {
klog.Errorf("Unsupported event type: '%v'", eventtype)
logger.Error(nil, "Unsupported event type", "eventType", eventtype)
return
}
@ -367,16 +444,16 @@ func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations m
// outgoing events anyway).
sent, err := recorder.ActionOrDrop(watch.Added, event)
if err != nil {
klog.Errorf("unable to record event: %v (will not retry!)", err)
logger.Error(err, "Unable to record event (will not retry!)")
return
}
if !sent {
klog.Errorf("unable to record event: too many queued events, dropped event %#v", event)
logger.Error(nil, "Unable to record event: too many queued events, dropped event", "event", event)
}
}
func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
recorder.generateEvent(object, nil, eventtype, reason, message)
recorder.generateEvent(klog.Background(), object, nil, eventtype, reason, message)
}
func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
@ -384,7 +461,7 @@ func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, m
}
func (recorder *recorderImpl) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
recorder.generateEvent(object, annotations, eventtype, reason, fmt.Sprintf(messageFmt, args...))
recorder.generateEvent(klog.Background(), object, annotations, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}
func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
@ -408,3 +485,26 @@ func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map
Type: eventtype,
}
}
type recorderImplLogger struct {
*recorderImpl
logger klog.Logger
}
var _ EventRecorderLogger = &recorderImplLogger{}
func (recorder recorderImplLogger) Event(object runtime.Object, eventtype, reason, message string) {
recorder.recorderImpl.generateEvent(recorder.logger, object, nil, eventtype, reason, message)
}
func (recorder recorderImplLogger) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}
func (recorder recorderImplLogger) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
recorder.generateEvent(recorder.logger, object, annotations, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}
func (recorder recorderImplLogger) WithLogger(logger klog.Logger) EventRecorderLogger {
return recorderImplLogger{recorderImpl: recorder.recorderImpl, logger: logger}
}

View File

@ -112,7 +112,7 @@ func TestNonRacyShutdown(t *testing.T) {
caster := NewBroadcasterForTests(0)
clock := testclocks.NewFakeClock(time.Now())
recorder := recorderWithFakeClock(v1.EventSource{Component: "eventTest"}, caster, clock)
recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, caster, clock)
var wg sync.WaitGroup
wg.Add(100)
@ -381,7 +381,7 @@ func TestEventf(t *testing.T) {
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
clock := testclocks.NewFakeClock(time.Now())
recorder := recorderWithFakeClock(v1.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
for index, item := range table {
clock.Step(1 * time.Second)
logWatcher := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
@ -407,7 +407,7 @@ func TestEventf(t *testing.T) {
sinkWatcher.Stop()
}
func recorderWithFakeClock(eventSource v1.EventSource, eventBroadcaster EventBroadcaster, clock clock.Clock) EventRecorder {
func recorderWithFakeClock(t *testing.T, eventSource v1.EventSource, eventBroadcaster EventBroadcaster, clock clock.Clock) EventRecorder {
return &recorderImpl{scheme.Scheme, eventSource, eventBroadcaster.(*eventBroadcasterImpl).Broadcaster, clock}
}
@ -662,7 +662,7 @@ func TestEventfNoNamespace(t *testing.T) {
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
clock := testclocks.NewFakeClock(time.Now())
recorder := recorderWithFakeClock(v1.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
for index, item := range table {
clock.Step(1 * time.Second)
@ -955,7 +955,7 @@ func TestMultiSinkCache(t *testing.T) {
eventBroadcaster := NewBroadcasterForTests(0)
clock := testclocks.NewFakeClock(time.Now())
recorder := recorderWithFakeClock(v1.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
for index, item := range table {

View File

@ -20,6 +20,7 @@ import (
"fmt"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
)
// FakeRecorder is used as a fake during tests. It is thread safe. It is usable
@ -31,6 +32,8 @@ type FakeRecorder struct {
IncludeObject bool
}
var _ EventRecorderLogger = &FakeRecorder{}
func objectString(object runtime.Object, includeObject bool) string {
if !includeObject {
return ""
@ -68,6 +71,10 @@ func (f *FakeRecorder) AnnotatedEventf(object runtime.Object, annotations map[st
f.writeEvent(object, annotations, eventtype, reason, messageFmt, args...)
}
func (f *FakeRecorder) WithLogger(logger klog.Logger) EventRecorderLogger {
return f
}
// NewFakeRecorder creates new fake event recorder with event channel with
// buffer of given size.
func NewFakeRecorder(bufferSize int) *FakeRecorder {