diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 0a6bf122..168bb348 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -288,7 +288,7 @@ }, { "ImportPath": "k8s.io/apimachinery", - "Rev": "8ca64af22337" + "Rev": "0233d01ce572" }, { "ImportPath": "k8s.io/gengo", diff --git a/go.mod b/go.mod index dcf2b311..d7f24555 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( golang.org/x/time v0.0.0-20161028155119-f51c12702a4d google.golang.org/appengine v1.5.0 // indirect k8s.io/api v0.0.0-20190820101039-d651a1528133 - k8s.io/apimachinery v0.0.0-20190823012420-8ca64af22337 + k8s.io/apimachinery v0.0.0-20190826114657-0233d01ce572 k8s.io/klog v0.4.0 k8s.io/utils v0.0.0-20190801114015-581e00157fb1 sigs.k8s.io/yaml v1.1.0 @@ -39,5 +39,5 @@ replace ( golang.org/x/sys => golang.org/x/sys v0.0.0-20190209173611-3b5209105503 golang.org/x/text => golang.org/x/text v0.3.1-0.20181227161524-e6919f6577db k8s.io/api => k8s.io/api v0.0.0-20190820101039-d651a1528133 - k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20190823012420-8ca64af22337 + k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20190826114657-0233d01ce572 ) diff --git a/go.sum b/go.sum index 8e37be86..39dc41c8 100644 --- a/go.sum +++ b/go.sum @@ -145,7 +145,7 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= k8s.io/api v0.0.0-20190820101039-d651a1528133/go.mod h1:AlhL1I0Xqh5Tyz0HsxjEhy+iKci9l1Qy3UMDFW7iG3A= -k8s.io/apimachinery v0.0.0-20190823012420-8ca64af22337/go.mod h1:EZoIMuAgG/4v58YL+bz0kqnivqupk28fKYxFCa5e6X8= +k8s.io/apimachinery v0.0.0-20190826114657-0233d01ce572/go.mod h1:EZoIMuAgG/4v58YL+bz0kqnivqupk28fKYxFCa5e6X8= k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= diff --git a/tools/events/event_broadcaster.go b/tools/events/event_broadcaster.go index 3acfe68b..1459922b 100644 --- a/tools/events/event_broadcaster.go +++ b/tools/events/event_broadcaster.go @@ -111,7 +111,9 @@ func (e *eventBroadcasterImpl) refreshExistingEventSeries() { for isomorphicKey, event := range e.eventCache { if event.Series != nil { if recordedEvent, retry := recordEvent(e.sink, event); !retry { - e.eventCache[isomorphicKey] = recordedEvent + if recordedEvent != nil { + e.eventCache[isomorphicKey] = recordedEvent + } } } } diff --git a/tools/events/eventseries_test.go b/tools/events/eventseries_test.go index c930c189..42ea1cd7 100644 --- a/tools/events/eventseries_test.go +++ b/tools/events/eventseries_test.go @@ -30,6 +30,7 @@ import ( k8sruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes/scheme" + restclient "k8s.io/client-go/rest" ref "k8s.io/client-go/tools/reference" ) @@ -299,51 +300,72 @@ func TestRefreshExistingEventSeries(t *testing.T) { t.Fatal(err) } LastObservedTime := metav1.MicroTime{Time: time.Now().Add(-9 * time.Minute)} - createEvent := make(chan *v1beta1.Event, 10) updateEvent := make(chan *v1beta1.Event, 10) patchEvent := make(chan *v1beta1.Event, 10) - testEvents := testEventSeriesSink{ - OnCreate: func(event *v1beta1.Event) (*v1beta1.Event, error) { - createEvent <- event - return event, nil - }, - OnUpdate: func(event *v1beta1.Event) (*v1beta1.Event, error) { - updateEvent <- event - return event, nil - }, - OnPatch: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) { - // event we receive is already patched, usually the sink uses it - //only to retrieve the name and namespace, here we'll use it directly. - patchEvent <- event - return event, nil - }, - } - cache := map[eventKey]*v1beta1.Event{} - eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl) - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl) - cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started") - cachedEvent.Series = &v1beta1.EventSeries{ - Count: 10, - LastObservedTime: LastObservedTime, - } - cacheKey := getKey(cachedEvent) - cache[cacheKey] = cachedEvent - eventBroadcaster.refreshExistingEventSeries() - select { - case <-patchEvent: - t.Logf("validating event affected by patch request") - eventBroadcaster.mu.Lock() - defer eventBroadcaster.mu.Unlock() - if len(cache) != 1 { - t.Errorf("cache should be with same size, but instead got a size of %v", len(cache)) + table := []struct { + patchFunc func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) + }{ + { + patchFunc: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) { + // event we receive is already patched, usually the sink uses it + //only to retrieve the name and namespace, here we'll use it directly. + patchEvent <- event + return event, nil + }, + }, + { + patchFunc: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) { + // we simulate an apiserver error here + patchEvent <- nil + return nil, &restclient.RequestConstructionError{} + }, + }, + } + for _, item := range table { + testEvents := testEventSeriesSink{ + OnCreate: func(event *v1beta1.Event) (*v1beta1.Event, error) { + createEvent <- event + return event, nil + }, + OnUpdate: func(event *v1beta1.Event) (*v1beta1.Event, error) { + updateEvent <- event + return event, nil + }, + OnPatch: item.patchFunc, } - // check that we emitted only one event - if len(patchEvent) != 0 || len(createEvent) != 0 || len(updateEvent) != 0 { - t.Errorf("exactly one event should be emitted, but got %v", len(patchEvent)) + cache := map[eventKey]*v1beta1.Event{} + eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl) + cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started") + cachedEvent.Series = &v1beta1.EventSeries{ + Count: 10, + LastObservedTime: LastObservedTime, + } + cacheKey := getKey(cachedEvent) + cache[cacheKey] = cachedEvent + + eventBroadcaster.refreshExistingEventSeries() + select { + case <-patchEvent: + t.Logf("validating event affected by patch request") + eventBroadcaster.mu.Lock() + defer eventBroadcaster.mu.Unlock() + if len(cache) != 1 { + t.Errorf("cache should be with same size, but instead got a size of %v", len(cache)) + } + // check that we emitted only one event + if len(patchEvent) != 0 || len(createEvent) != 0 || len(updateEvent) != 0 { + t.Errorf("exactly one event should be emitted, but got %v", len(patchEvent)) + } + cacheEvent, exists := cache[cacheKey] + + if cacheEvent == nil || !exists { + t.Errorf("expected event to exist and not being nil, but instead event: %v and exists: %v", cacheEvent, exists) + } + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("timeout after %v", wait.ForeverTestTimeout) } - case <-time.After(wait.ForeverTestTimeout): - t.Fatalf("timeout after %v", wait.ForeverTestTimeout) } }