diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 30086e7b..19dff653 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -188,11 +188,11 @@ }, { "ImportPath": "k8s.io/api", - "Rev": "cf720200d547" + "Rev": "96378037bc4d" }, { "ImportPath": "k8s.io/apimachinery", - "Rev": "bff0c42413f8" + "Rev": "65c6e8495981" }, { "ImportPath": "k8s.io/klog", diff --git a/go.mod b/go.mod index d3f0a4a4..0a8ea681 100644 --- a/go.mod +++ b/go.mod @@ -26,8 +26,8 @@ require ( golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a golang.org/x/time v0.0.0-20161028155119-f51c12702a4d google.golang.org/appengine v1.5.0 // indirect - k8s.io/api v0.0.0-20190531132107-cf720200d547 - k8s.io/apimachinery v0.0.0-20190529005236-bff0c42413f8 + k8s.io/api v0.0.0-20190529212817-96378037bc4d + k8s.io/apimachinery v0.0.0-20190531131812-65c6e8495981 k8s.io/klog v0.3.1 k8s.io/utils v0.0.0-20190221042446-c2654d5206da sigs.k8s.io/yaml v1.1.0 @@ -37,6 +37,6 @@ replace ( golang.org/x/sync => golang.org/x/sync v0.0.0-20181108010431-42b317875d0f golang.org/x/sys => golang.org/x/sys v0.0.0-20190209173611-3b5209105503 golang.org/x/tools => golang.org/x/tools v0.0.0-20190313210603-aa82965741a9 - k8s.io/api => k8s.io/api v0.0.0-20190531132107-cf720200d547 - k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20190529005236-bff0c42413f8 + k8s.io/api => k8s.io/api v0.0.0-20190529212817-96378037bc4d + k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20190531131812-65c6e8495981 ) diff --git a/go.sum b/go.sum index d495efd0..ebb34bc4 100644 --- a/go.sum +++ b/go.sum @@ -91,8 +91,8 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkep gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -k8s.io/api v0.0.0-20190531132107-cf720200d547/go.mod h1:QMoKrFuqE2O+M+xCzcomo1EH5b0DgFDaKvisrkBaMfk= -k8s.io/apimachinery v0.0.0-20190529005236-bff0c42413f8/go.mod h1:I4A+glKBHiTgiEjQiCCQfCAIcIMFGt291SmsvcrFzJA= +k8s.io/api v0.0.0-20190529212817-96378037bc4d/go.mod h1:QTFMweSwWjQ8mUX2YYyJaX0B/iQR1UBvNgLXG21bT5c= +k8s.io/apimachinery v0.0.0-20190531131812-65c6e8495981/go.mod h1:I4A+glKBHiTgiEjQiCCQfCAIcIMFGt291SmsvcrFzJA= k8s.io/klog v0.3.1 h1:RVgyDHY/kFKtLqh67NvEWIgkMneNoIrdkN0CxDSQc68= k8s.io/klog v0.3.1/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30 h1:TRb4wNWoBVrH9plmkp2q86FIDppkbrEXdXlxU3a3BMI= diff --git a/tools/events/event_broadcaster.go b/tools/events/event_broadcaster.go index 6978742d..49a38e92 100644 --- a/tools/events/event_broadcaster.go +++ b/tools/events/event_broadcaster.go @@ -90,20 +90,20 @@ func (e *EventSinkImpl) Patch(event *v1beta1.Event, data []byte) (*v1beta1.Event // NewBroadcaster Creates a new event broadcaster. func NewBroadcaster(sink EventSink) EventBroadcaster { - return newBroadcaster(sink, defaultSleepDuration) + return newBroadcaster(sink, defaultSleepDuration, map[eventKey]*v1beta1.Event{}) } // NewBroadcasterForTest Creates a new event broadcaster for test purposes. -func newBroadcaster(sink EventSink, sleepDuration time.Duration) EventBroadcaster { +func newBroadcaster(sink EventSink, sleepDuration time.Duration, eventCache map[eventKey]*v1beta1.Event) EventBroadcaster { return &eventBroadcasterImpl{ Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), - eventCache: map[eventKey]*v1beta1.Event{}, + eventCache: eventCache, sleepDuration: sleepDuration, sink: sink, } } -// TODO: add test for refreshExistingEventSeries +// refreshExistingEventSeries refresh events TTL func (e *eventBroadcasterImpl) refreshExistingEventSeries() { // TODO: Investigate whether lock contention won't be a problem e.mu.Lock() @@ -117,19 +117,23 @@ func (e *eventBroadcasterImpl) refreshExistingEventSeries() { } } -// TODO: add test for finishSeries +// finishSeries checks if a series has ended and either: +// - write final count to the apiserver +// - delete a singleton event (i.e. series field is nil) from the cache func (e *eventBroadcasterImpl) finishSeries() { // TODO: Investigate whether lock contention won't be a problem e.mu.Lock() defer e.mu.Unlock() for isomorphicKey, event := range e.eventCache { - eventSeries := event.Series - if eventSeries != nil { - if eventSeries.LastObservedTime.Time.Add(finishTime).Before(time.Now()) { + eventSerie := event.Series + if eventSerie != nil { + if eventSerie.LastObservedTime.Time.Before(time.Now().Add(-finishTime)) { if _, retry := recordEvent(e.sink, event); !retry { delete(e.eventCache, isomorphicKey) } } + } else if event.EventTime.Time.Before(time.Now().Add(-finishTime)) { + delete(e.eventCache, isomorphicKey) } } } diff --git a/tools/events/eventseries_test.go b/tools/events/eventseries_test.go index f4418922..f7c13024 100644 --- a/tools/events/eventseries_test.go +++ b/tools/events/eventseries_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/api/events/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes/scheme" ref "k8s.io/client-go/tools/reference" ) @@ -154,7 +155,7 @@ func TestEventSeriesf(t *testing.T) { return event, nil }, } - eventBroadcaster := newBroadcaster(&testEvents, 0) + eventBroadcaster := newBroadcaster(&testEvents, 0, map[eventKey]*v1beta1.Event{}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "eventTest") eventBroadcaster.StartRecordingToSink(stopCh) recorder.Eventf(regarding, related, isomorphicEvent.Type, isomorphicEvent.Reason, isomorphicEvent.Action, isomorphicEvent.Note, []interface{}{1}) @@ -206,3 +207,143 @@ func validateEventSerie(messagePrefix string, expectedUpdate bool, actualEvent * } } + +func TestFinishSeries(t *testing.T) { + hostname, _ := os.Hostname() + testPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + SelfLink: "/api/version/pods/foo", + Name: "foo", + Namespace: "baz", + UID: "bar", + }, + } + regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]") + if err != nil { + t.Fatal(err) + } + related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]") + if err != nil { + t.Fatal(err) + } + LastObservedTime := metav1.MicroTime{Time: time.Now().Add(-9 * time.Minute)} + + createEvent := make(chan *v1beta1.Event, 10) + updateEvent := make(chan *v1beta1.Event, 10) + patchEvent := make(chan *v1beta1.Event, 10) + testEvents := testEventSeriesSink{ + OnCreate: func(event *v1beta1.Event) (*v1beta1.Event, error) { + createEvent <- event + return event, nil + }, + OnUpdate: func(event *v1beta1.Event) (*v1beta1.Event, error) { + updateEvent <- event + return event, nil + }, + OnPatch: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) { + // event we receive is already patched, usually the sink uses it + // only to retrieve the name and namespace, here we'll use it directly + patchEvent <- event + return event, nil + }, + } + cache := map[eventKey]*v1beta1.Event{} + eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl) + cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started") + nonFinishedEvent := cachedEvent.DeepCopy() + nonFinishedEvent.ReportingController = "nonFinished-controller" + cachedEvent.Series = &v1beta1.EventSeries{ + Count: 10, + LastObservedTime: LastObservedTime, + } + cache[getKey(cachedEvent)] = cachedEvent + cache[getKey(nonFinishedEvent)] = nonFinishedEvent + eventBroadcaster.finishSeries() + select { + case actualEvent := <-patchEvent: + t.Logf("validating event affected by patch request") + eventBroadcaster.mu.Lock() + defer eventBroadcaster.mu.Unlock() + if len(cache) != 1 { + t.Errorf("cache should be empty, but instead got a size of %v", len(cache)) + } + if !actualEvent.Series.LastObservedTime.Equal(&cachedEvent.Series.LastObservedTime) { + t.Errorf("series was expected be seen with LastObservedTime %v, but instead got %v ", cachedEvent.Series.LastObservedTime, actualEvent.Series.LastObservedTime) + } + // check that we emitted only one event + if len(patchEvent) != 0 || len(createEvent) != 0 || len(updateEvent) != 0 { + t.Errorf("exactly one event should be emitted, but got %v", len(patchEvent)) + } + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("timeout after %v", wait.ForeverTestTimeout) + } +} + +func TestRefreshExistingEventSeries(t *testing.T) { + hostname, _ := os.Hostname() + testPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + SelfLink: "/api/version/pods/foo", + Name: "foo", + Namespace: "baz", + UID: "bar", + }, + } + regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]") + if err != nil { + t.Fatal(err) + } + related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]") + if err != nil { + t.Fatal(err) + } + LastObservedTime := metav1.MicroTime{Time: time.Now().Add(-9 * time.Minute)} + + createEvent := make(chan *v1beta1.Event, 10) + updateEvent := make(chan *v1beta1.Event, 10) + patchEvent := make(chan *v1beta1.Event, 10) + testEvents := testEventSeriesSink{ + OnCreate: func(event *v1beta1.Event) (*v1beta1.Event, error) { + createEvent <- event + return event, nil + }, + OnUpdate: func(event *v1beta1.Event) (*v1beta1.Event, error) { + updateEvent <- event + return event, nil + }, + OnPatch: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) { + // event we receive is already patched, usually the sink uses it + //only to retrieve the name and namespace, here we'll use it directly. + patchEvent <- event + return event, nil + }, + } + cache := map[eventKey]*v1beta1.Event{} + eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl) + cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started") + cachedEvent.Series = &v1beta1.EventSeries{ + Count: 10, + LastObservedTime: LastObservedTime, + } + cacheKey := getKey(cachedEvent) + cache[cacheKey] = cachedEvent + + eventBroadcaster.refreshExistingEventSeries() + select { + case <-patchEvent: + t.Logf("validating event affected by patch request") + eventBroadcaster.mu.Lock() + defer eventBroadcaster.mu.Unlock() + if len(cache) != 1 { + t.Errorf("cache should be with same size, but instead got a size of %v", len(cache)) + } + // check that we emitted only one event + if len(patchEvent) != 0 || len(createEvent) != 0 || len(updateEvent) != 0 { + t.Errorf("exactly one event should be emitted, but got %v", len(patchEvent)) + } + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("timeout after %v", wait.ForeverTestTimeout) + } +}