diff --git a/staging/src/k8s.io/client-go/tools/events/BUILD b/staging/src/k8s.io/client-go/tools/events/BUILD index e290abd8a89..51adc931278 100644 --- a/staging/src/k8s.io/client-go/tools/events/BUILD +++ b/staging/src/k8s.io/client-go/tools/events/BUILD @@ -38,6 +38,7 @@ go_test( "//staging/src/k8s.io/api/events/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//staging/src/k8s.io/client-go/tools/reference:go_default_library", ], diff --git a/staging/src/k8s.io/client-go/tools/events/event_broadcaster.go b/staging/src/k8s.io/client-go/tools/events/event_broadcaster.go index a6687a8cc4a..45ed84f6d01 100644 --- a/staging/src/k8s.io/client-go/tools/events/event_broadcaster.go +++ b/staging/src/k8s.io/client-go/tools/events/event_broadcaster.go @@ -67,20 +67,20 @@ type eventBroadcasterImpl struct { // NewBroadcaster Creates a new event broadcaster. func NewBroadcaster(sink EventSink) EventBroadcaster { - return newBroadcaster(sink, defaultSleepDuration) + return newBroadcaster(sink, defaultSleepDuration, map[eventKey]*v1beta1.Event{}) } // NewBroadcasterForTest Creates a new event broadcaster for test purposes. -func newBroadcaster(sink EventSink, sleepDuration time.Duration) EventBroadcaster { +func newBroadcaster(sink EventSink, sleepDuration time.Duration, eventCache map[eventKey]*v1beta1.Event) EventBroadcaster { return &eventBroadcasterImpl{ Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), - eventCache: map[eventKey]*v1beta1.Event{}, + eventCache: eventCache, sleepDuration: sleepDuration, sink: sink, } } -// TODO: add test for refreshExistingEventSeries +// refreshExistingEventSeries refresh events TTL func (e *eventBroadcasterImpl) refreshExistingEventSeries() { // TODO: Investigate whether lock contention won't be a problem e.mu.Lock() @@ -94,19 +94,23 @@ func (e *eventBroadcasterImpl) refreshExistingEventSeries() { } } -// TODO: add test for finishSeries +// finishSeries checks if a series has ended and either: +// - write final count to the apiserver +// - delete a singleton event (i.e. series field is nil) from the cache func (e *eventBroadcasterImpl) finishSeries() { // TODO: Investigate whether lock contention won't be a problem e.mu.Lock() defer e.mu.Unlock() for isomorphicKey, event := range e.eventCache { - eventSeries := event.Series - if eventSeries != nil { - if eventSeries.LastObservedTime.Time.Add(finishTime).Before(time.Now()) { + eventSerie := event.Series + if eventSerie != nil { + if eventSerie.LastObservedTime.Time.Before(time.Now().Add(-finishTime)) { if _, retry := recordEvent(e.sink, event); !retry { delete(e.eventCache, isomorphicKey) } } + } else if event.EventTime.Time.Before(time.Now().Add(-finishTime)) { + delete(e.eventCache, isomorphicKey) } } } diff --git a/staging/src/k8s.io/client-go/tools/events/eventseries_test.go b/staging/src/k8s.io/client-go/tools/events/eventseries_test.go index f4418922eec..f7c1302458e 100644 --- a/staging/src/k8s.io/client-go/tools/events/eventseries_test.go +++ b/staging/src/k8s.io/client-go/tools/events/eventseries_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/api/events/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes/scheme" ref "k8s.io/client-go/tools/reference" ) @@ -154,7 +155,7 @@ func TestEventSeriesf(t *testing.T) { return event, nil }, } - eventBroadcaster := newBroadcaster(&testEvents, 0) + eventBroadcaster := newBroadcaster(&testEvents, 0, map[eventKey]*v1beta1.Event{}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "eventTest") eventBroadcaster.StartRecordingToSink(stopCh) recorder.Eventf(regarding, related, isomorphicEvent.Type, isomorphicEvent.Reason, isomorphicEvent.Action, isomorphicEvent.Note, []interface{}{1}) @@ -206,3 +207,143 @@ func validateEventSerie(messagePrefix string, expectedUpdate bool, actualEvent * } } + +func TestFinishSeries(t *testing.T) { + hostname, _ := os.Hostname() + testPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + SelfLink: "/api/version/pods/foo", + Name: "foo", + Namespace: "baz", + UID: "bar", + }, + } + regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]") + if err != nil { + t.Fatal(err) + } + related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]") + if err != nil { + t.Fatal(err) + } + LastObservedTime := metav1.MicroTime{Time: time.Now().Add(-9 * time.Minute)} + + createEvent := make(chan *v1beta1.Event, 10) + updateEvent := make(chan *v1beta1.Event, 10) + patchEvent := make(chan *v1beta1.Event, 10) + testEvents := testEventSeriesSink{ + OnCreate: func(event *v1beta1.Event) (*v1beta1.Event, error) { + createEvent <- event + return event, nil + }, + OnUpdate: func(event *v1beta1.Event) (*v1beta1.Event, error) { + updateEvent <- event + return event, nil + }, + OnPatch: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) { + // event we receive is already patched, usually the sink uses it + // only to retrieve the name and namespace, here we'll use it directly + patchEvent <- event + return event, nil + }, + } + cache := map[eventKey]*v1beta1.Event{} + eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl) + cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started") + nonFinishedEvent := cachedEvent.DeepCopy() + nonFinishedEvent.ReportingController = "nonFinished-controller" + cachedEvent.Series = &v1beta1.EventSeries{ + Count: 10, + LastObservedTime: LastObservedTime, + } + cache[getKey(cachedEvent)] = cachedEvent + cache[getKey(nonFinishedEvent)] = nonFinishedEvent + eventBroadcaster.finishSeries() + select { + case actualEvent := <-patchEvent: + t.Logf("validating event affected by patch request") + eventBroadcaster.mu.Lock() + defer eventBroadcaster.mu.Unlock() + if len(cache) != 1 { + t.Errorf("cache should be empty, but instead got a size of %v", len(cache)) + } + if !actualEvent.Series.LastObservedTime.Equal(&cachedEvent.Series.LastObservedTime) { + t.Errorf("series was expected be seen with LastObservedTime %v, but instead got %v ", cachedEvent.Series.LastObservedTime, actualEvent.Series.LastObservedTime) + } + // check that we emitted only one event + if len(patchEvent) != 0 || len(createEvent) != 0 || len(updateEvent) != 0 { + t.Errorf("exactly one event should be emitted, but got %v", len(patchEvent)) + } + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("timeout after %v", wait.ForeverTestTimeout) + } +} + +func TestRefreshExistingEventSeries(t *testing.T) { + hostname, _ := os.Hostname() + testPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + SelfLink: "/api/version/pods/foo", + Name: "foo", + Namespace: "baz", + UID: "bar", + }, + } + regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]") + if err != nil { + t.Fatal(err) + } + related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]") + if err != nil { + t.Fatal(err) + } + LastObservedTime := metav1.MicroTime{Time: time.Now().Add(-9 * time.Minute)} + + createEvent := make(chan *v1beta1.Event, 10) + updateEvent := make(chan *v1beta1.Event, 10) + patchEvent := make(chan *v1beta1.Event, 10) + testEvents := testEventSeriesSink{ + OnCreate: func(event *v1beta1.Event) (*v1beta1.Event, error) { + createEvent <- event + return event, nil + }, + OnUpdate: func(event *v1beta1.Event) (*v1beta1.Event, error) { + updateEvent <- event + return event, nil + }, + OnPatch: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) { + // event we receive is already patched, usually the sink uses it + //only to retrieve the name and namespace, here we'll use it directly. + patchEvent <- event + return event, nil + }, + } + cache := map[eventKey]*v1beta1.Event{} + eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl) + cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started") + cachedEvent.Series = &v1beta1.EventSeries{ + Count: 10, + LastObservedTime: LastObservedTime, + } + cacheKey := getKey(cachedEvent) + cache[cacheKey] = cachedEvent + + eventBroadcaster.refreshExistingEventSeries() + select { + case <-patchEvent: + t.Logf("validating event affected by patch request") + eventBroadcaster.mu.Lock() + defer eventBroadcaster.mu.Unlock() + if len(cache) != 1 { + t.Errorf("cache should be with same size, but instead got a size of %v", len(cache)) + } + // check that we emitted only one event + if len(patchEvent) != 0 || len(createEvent) != 0 || len(updateEvent) != 0 { + t.Errorf("exactly one event should be emitted, but got %v", len(patchEvent)) + } + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("timeout after %v", wait.ForeverTestTimeout) + } +}