diff --git a/staging/src/k8s.io/client-go/tools/events/BUILD b/staging/src/k8s.io/client-go/tools/events/BUILD index 9a4c73a2188..a265bdc6ed0 100644 --- a/staging/src/k8s.io/client-go/tools/events/BUILD +++ b/staging/src/k8s.io/client-go/tools/events/BUILD @@ -42,6 +42,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", + "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/tools/reference:go_default_library", ], ) diff --git a/staging/src/k8s.io/client-go/tools/events/event_broadcaster.go b/staging/src/k8s.io/client-go/tools/events/event_broadcaster.go index 3acfe68bed3..1459922b373 100644 --- a/staging/src/k8s.io/client-go/tools/events/event_broadcaster.go +++ b/staging/src/k8s.io/client-go/tools/events/event_broadcaster.go @@ -111,7 +111,9 @@ func (e *eventBroadcasterImpl) refreshExistingEventSeries() { for isomorphicKey, event := range e.eventCache { if event.Series != nil { if recordedEvent, retry := recordEvent(e.sink, event); !retry { - e.eventCache[isomorphicKey] = recordedEvent + if recordedEvent != nil { + e.eventCache[isomorphicKey] = recordedEvent + } } } } diff --git a/staging/src/k8s.io/client-go/tools/events/eventseries_test.go b/staging/src/k8s.io/client-go/tools/events/eventseries_test.go index c930c189fab..42ea1cd72ec 100644 --- a/staging/src/k8s.io/client-go/tools/events/eventseries_test.go +++ b/staging/src/k8s.io/client-go/tools/events/eventseries_test.go @@ -30,6 +30,7 @@ import ( k8sruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes/scheme" + restclient "k8s.io/client-go/rest" ref "k8s.io/client-go/tools/reference" ) @@ -299,51 +300,72 @@ func TestRefreshExistingEventSeries(t *testing.T) { t.Fatal(err) } LastObservedTime := metav1.MicroTime{Time: time.Now().Add(-9 * time.Minute)} - createEvent := make(chan *v1beta1.Event, 10) updateEvent := make(chan *v1beta1.Event, 10) patchEvent := make(chan *v1beta1.Event, 10) - testEvents := testEventSeriesSink{ - OnCreate: func(event *v1beta1.Event) (*v1beta1.Event, error) { - createEvent <- event - return event, nil - }, - OnUpdate: func(event *v1beta1.Event) (*v1beta1.Event, error) { - updateEvent <- event - return event, nil - }, - OnPatch: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) { - // event we receive is already patched, usually the sink uses it - //only to retrieve the name and namespace, here we'll use it directly. - patchEvent <- event - return event, nil - }, - } - cache := map[eventKey]*v1beta1.Event{} - eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl) - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl) - cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started") - cachedEvent.Series = &v1beta1.EventSeries{ - Count: 10, - LastObservedTime: LastObservedTime, - } - cacheKey := getKey(cachedEvent) - cache[cacheKey] = cachedEvent - eventBroadcaster.refreshExistingEventSeries() - select { - case <-patchEvent: - t.Logf("validating event affected by patch request") - eventBroadcaster.mu.Lock() - defer eventBroadcaster.mu.Unlock() - if len(cache) != 1 { - t.Errorf("cache should be with same size, but instead got a size of %v", len(cache)) + table := []struct { + patchFunc func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) + }{ + { + patchFunc: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) { + // event we receive is already patched, usually the sink uses it + //only to retrieve the name and namespace, here we'll use it directly. + patchEvent <- event + return event, nil + }, + }, + { + patchFunc: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) { + // we simulate an apiserver error here + patchEvent <- nil + return nil, &restclient.RequestConstructionError{} + }, + }, + } + for _, item := range table { + testEvents := testEventSeriesSink{ + OnCreate: func(event *v1beta1.Event) (*v1beta1.Event, error) { + createEvent <- event + return event, nil + }, + OnUpdate: func(event *v1beta1.Event) (*v1beta1.Event, error) { + updateEvent <- event + return event, nil + }, + OnPatch: item.patchFunc, } - // check that we emitted only one event - if len(patchEvent) != 0 || len(createEvent) != 0 || len(updateEvent) != 0 { - t.Errorf("exactly one event should be emitted, but got %v", len(patchEvent)) + cache := map[eventKey]*v1beta1.Event{} + eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl) + cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started") + cachedEvent.Series = &v1beta1.EventSeries{ + Count: 10, + LastObservedTime: LastObservedTime, + } + cacheKey := getKey(cachedEvent) + cache[cacheKey] = cachedEvent + + eventBroadcaster.refreshExistingEventSeries() + select { + case <-patchEvent: + t.Logf("validating event affected by patch request") + eventBroadcaster.mu.Lock() + defer eventBroadcaster.mu.Unlock() + if len(cache) != 1 { + t.Errorf("cache should be with same size, but instead got a size of %v", len(cache)) + } + // check that we emitted only one event + if len(patchEvent) != 0 || len(createEvent) != 0 || len(updateEvent) != 0 { + t.Errorf("exactly one event should be emitted, but got %v", len(patchEvent)) + } + cacheEvent, exists := cache[cacheKey] + + if cacheEvent == nil || !exists { + t.Errorf("expected event to exist and not being nil, but instead event: %v and exists: %v", cacheEvent, exists) + } + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("timeout after %v", wait.ForeverTestTimeout) } - case <-time.After(wait.ForeverTestTimeout): - t.Fatalf("timeout after %v", wait.ForeverTestTimeout) } }