From e4ef28f0961e346d45e3334ca641e3612f499588 Mon Sep 17 00:00:00 2001 From: jiangyaoguo Date: Mon, 7 Sep 2015 00:04:29 +0800 Subject: [PATCH] generate new event cache for every event sink --- pkg/client/record/event.go | 9 +- pkg/client/record/event_test.go | 419 +++++++++++++++++++++---- pkg/client/record/events_cache.go | 26 +- pkg/client/record/events_cache_test.go | 18 +- 4 files changed, 381 insertions(+), 91 deletions(-) diff --git a/pkg/client/record/event.go b/pkg/client/record/event.go index 8e9d3d24e83..06b0f61b81d 100644 --- a/pkg/client/record/event.go +++ b/pkg/client/record/event.go @@ -104,6 +104,7 @@ func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSin // The default math/rand package functions aren't thread safe, so create a // new Rand object for each StartRecording call. randGen := rand.New(rand.NewSource(time.Now().UnixNano())) + var eventCache *historyCache = NewEventCache() return eventBroadcaster.StartEventWatcher( func(event *api.Event) { // Make a copy before modification, because there could be multiple listeners. @@ -111,7 +112,7 @@ func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSin eventCopy := *event event = &eventCopy - previousEvent := getEvent(event) + previousEvent := eventCache.getEvent(event) updateExistingEvent := previousEvent.Count > 0 if updateExistingEvent { event.Count = previousEvent.Count + 1 @@ -122,7 +123,7 @@ func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSin tries := 0 for { - if recordEvent(sink, event, updateExistingEvent) { + if recordEvent(sink, event, updateExistingEvent, eventCache) { break } tries++ @@ -156,7 +157,7 @@ func isKeyNotFoundError(err error) bool { // was successfully recorded or discarded, false if it should be retried. // If updateExistingEvent is false, it creates a new event, otherwise it updates // existing event. -func recordEvent(sink EventSink, event *api.Event, updateExistingEvent bool) bool { +func recordEvent(sink EventSink, event *api.Event, updateExistingEvent bool, eventCache *historyCache) bool { var newEvent *api.Event var err error if updateExistingEvent { @@ -169,7 +170,7 @@ func recordEvent(sink EventSink, event *api.Event, updateExistingEvent bool) boo newEvent, err = sink.Create(event) } if err == nil { - addOrUpdateEvent(newEvent) + eventCache.addOrUpdateEvent(newEvent) return true } diff --git a/pkg/client/record/event_test.go b/pkg/client/record/event_test.go index 5234513c466..33fc1f3a010 100644 --- a/pkg/client/record/event_test.go +++ b/pkg/client/record/event_test.go @@ -21,7 +21,6 @@ import ( "reflect" "strconv" "strings" - "sync" "testing" "k8s.io/kubernetes/pkg/api" @@ -271,78 +270,81 @@ func TestEventf(t *testing.T) { }, } + logCalled := make(chan struct{}) + createEvent := make(chan *api.Event) + updateEvent := make(chan *api.Event) + testEvents := testEventSink{ + OnCreate: func(event *api.Event) (*api.Event, error) { + createEvent <- event + return event, nil + }, + OnUpdate: func(event *api.Event) (*api.Event, error) { + updateEvent <- event + return event, nil + }, + } + eventBroadcaster := NewBroadcaster() + sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) + + recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"}) for _, item := range table { - var wg sync.WaitGroup - // We expect only one callback - wg.Add(1) - testEvents := testEventSink{ - OnCreate: func(event *api.Event) (*api.Event, error) { - defer wg.Done() - returnEvent, _ := validateEvent(event, item.expect, t) - if item.expectUpdate { - t.Errorf("Expected event update(), got event create()") - } - return returnEvent, nil - }, - OnUpdate: func(event *api.Event) (*api.Event, error) { - defer wg.Done() - returnEvent, _ := validateEvent(event, item.expect, t) - if !item.expectUpdate { - t.Errorf("Expected event create(), got event update()") - } - return returnEvent, nil - }, - } - eventBroadcaster := NewBroadcaster() - sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) logWatcher1 := eventBroadcaster.StartLogging(t.Logf) // Prove that it is useful - wg.Add(1) logWatcher2 := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) { - defer wg.Done() if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a { t.Errorf("Expected '%v', got '%v'", e, a) } + logCalled <- struct{}{} }) - recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"}) recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...) - wg.Wait() - sinkWatcher.Stop() + <-logCalled + + // validate event + if item.expectUpdate { + actualEvent := <-updateEvent + validateEvent(actualEvent, item.expect, t) + } else { + actualEvent := <-createEvent + validateEvent(actualEvent, item.expect, t) + } logWatcher1.Stop() logWatcher2.Stop() } + sinkWatcher.Stop() } func validateEvent(actualEvent *api.Event, expectedEvent *api.Event, t *testing.T) (*api.Event, error) { + recvEvent := *actualEvent expectCompression := expectedEvent.Count > 1 + t.Logf("expectedEvent.Count is %d\n", expectedEvent.Count) // Just check that the timestamp was set. - if actualEvent.FirstTimestamp.IsZero() || actualEvent.LastTimestamp.IsZero() { - t.Errorf("timestamp wasn't set: %#v", *actualEvent) + if recvEvent.FirstTimestamp.IsZero() || recvEvent.LastTimestamp.IsZero() { + t.Errorf("timestamp wasn't set: %#v", recvEvent) } - actualFirstTimestamp := actualEvent.FirstTimestamp - actualLastTimestamp := actualEvent.LastTimestamp + actualFirstTimestamp := recvEvent.FirstTimestamp + actualLastTimestamp := recvEvent.LastTimestamp if actualFirstTimestamp.Equal(actualLastTimestamp) { if expectCompression { - t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be different to indicate event compression happened, but were the same. Actual Event: %#v", actualFirstTimestamp, actualLastTimestamp, *actualEvent) + t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be different to indicate event compression happened, but were the same. Actual Event: %#v", actualFirstTimestamp, actualLastTimestamp, recvEvent) } } else { if expectedEvent.Count == 1 { - t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be equal to indicate only one occurrence of the event, but were different. Actual Event: %#v", actualFirstTimestamp, actualLastTimestamp, *actualEvent) + t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be equal to indicate only one occurrence of the event, but were different. Actual Event: %#v", actualFirstTimestamp, actualLastTimestamp, recvEvent) } } // Temp clear time stamps for comparison because actual values don't matter for comparison - actualEvent.FirstTimestamp = expectedEvent.FirstTimestamp - actualEvent.LastTimestamp = expectedEvent.LastTimestamp + recvEvent.FirstTimestamp = expectedEvent.FirstTimestamp + recvEvent.LastTimestamp = expectedEvent.LastTimestamp // Check that name has the right prefix. - if n, en := actualEvent.Name, expectedEvent.Name; !strings.HasPrefix(n, en) { + if n, en := recvEvent.Name, expectedEvent.Name; !strings.HasPrefix(n, en) { t.Errorf("Name '%v' does not contain prefix '%v'", n, en) } - actualEvent.Name = expectedEvent.Name - if e, a := expectedEvent, actualEvent; !reflect.DeepEqual(e, a) { + recvEvent.Name = expectedEvent.Name + if e, a := expectedEvent, &recvEvent; !reflect.DeepEqual(e, a) { t.Errorf("diff: %s", util.ObjectGoPrintDiff(e, a)) } - actualEvent.FirstTimestamp = actualFirstTimestamp - actualEvent.LastTimestamp = actualLastTimestamp + recvEvent.FirstTimestamp = actualFirstTimestamp + recvEvent.LastTimestamp = actualLastTimestamp return actualEvent, nil } @@ -526,42 +528,323 @@ func TestEventfNoNamespace(t *testing.T) { }, } + logCalled := make(chan struct{}) + createEvent := make(chan *api.Event) + updateEvent := make(chan *api.Event) + testEvents := testEventSink{ + OnCreate: func(event *api.Event) (*api.Event, error) { + createEvent <- event + return event, nil + }, + OnUpdate: func(event *api.Event) (*api.Event, error) { + updateEvent <- event + return event, nil + }, + } + eventBroadcaster := NewBroadcaster() + sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) + + recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"}) + for _, item := range table { - called := make(chan struct{}) - testEvents := testEventSink{ - OnCreate: func(event *api.Event) (*api.Event, error) { - returnEvent, _ := validateEvent(event, item.expect, t) - if item.expectUpdate { - t.Errorf("Expected event update(), got event create()") - } - called <- struct{}{} - return returnEvent, nil - }, - OnUpdate: func(event *api.Event) (*api.Event, error) { - returnEvent, _ := validateEvent(event, item.expect, t) - if !item.expectUpdate { - t.Errorf("Expected event create(), got event update()") - } - called <- struct{}{} - return returnEvent, nil - }, - } - eventBroadcaster := NewBroadcaster() - sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) logWatcher1 := eventBroadcaster.StartLogging(t.Logf) // Prove that it is useful logWatcher2 := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) { if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a { t.Errorf("Expected '%v', got '%v'", e, a) } - called <- struct{}{} + logCalled <- struct{}{} }) - recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"}) recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...) - <-called - <-called - sinkWatcher.Stop() + <-logCalled + + // validate event + if item.expectUpdate { + actualEvent := <-updateEvent + validateEvent(actualEvent, item.expect, t) + } else { + actualEvent := <-createEvent + validateEvent(actualEvent, item.expect, t) + } + logWatcher1.Stop() logWatcher2.Stop() } + sinkWatcher.Stop() +} + +func TestMultiSinkCache(t *testing.T) { + testPod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + SelfLink: "/api/version/pods/foo", + Name: "foo", + Namespace: "baz", + UID: "bar", + }, + } + testPod2 := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + SelfLink: "/api/version/pods/foo", + Name: "foo", + Namespace: "baz", + UID: "differentUid", + }, + } + testRef, err := api.GetPartialReference(testPod, "spec.containers[2]") + testRef2, err := api.GetPartialReference(testPod2, "spec.containers[3]") + if err != nil { + t.Fatal(err) + } + table := []struct { + obj runtime.Object + reason string + messageFmt string + elements []interface{} + expect *api.Event + expectLog string + expectUpdate bool + }{ + { + obj: testRef, + reason: "Started", + messageFmt: "some verbose message: %v", + elements: []interface{}{1}, + expect: &api.Event{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: "baz", + }, + InvolvedObject: api.ObjectReference{ + Kind: "Pod", + Name: "foo", + Namespace: "baz", + UID: "bar", + APIVersion: "version", + FieldPath: "spec.containers[2]", + }, + Reason: "Started", + Message: "some verbose message: 1", + Source: api.EventSource{Component: "eventTest"}, + Count: 1, + }, + expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[2]"}): reason: 'Started' some verbose message: 1`, + expectUpdate: false, + }, + { + obj: testPod, + reason: "Killed", + messageFmt: "some other verbose message: %v", + elements: []interface{}{1}, + expect: &api.Event{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: "baz", + }, + InvolvedObject: api.ObjectReference{ + Kind: "Pod", + Name: "foo", + Namespace: "baz", + UID: "bar", + APIVersion: "version", + }, + Reason: "Killed", + Message: "some other verbose message: 1", + Source: api.EventSource{Component: "eventTest"}, + Count: 1, + }, + expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:""}): reason: 'Killed' some other verbose message: 1`, + expectUpdate: false, + }, + { + obj: testRef, + reason: "Started", + messageFmt: "some verbose message: %v", + elements: []interface{}{1}, + expect: &api.Event{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: "baz", + }, + InvolvedObject: api.ObjectReference{ + Kind: "Pod", + Name: "foo", + Namespace: "baz", + UID: "bar", + APIVersion: "version", + FieldPath: "spec.containers[2]", + }, + Reason: "Started", + Message: "some verbose message: 1", + Source: api.EventSource{Component: "eventTest"}, + Count: 2, + }, + expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[2]"}): reason: 'Started' some verbose message: 1`, + expectUpdate: true, + }, + { + obj: testRef2, + reason: "Started", + messageFmt: "some verbose message: %v", + elements: []interface{}{1}, + expect: &api.Event{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: "baz", + }, + InvolvedObject: api.ObjectReference{ + Kind: "Pod", + Name: "foo", + Namespace: "baz", + UID: "differentUid", + APIVersion: "version", + FieldPath: "spec.containers[3]", + }, + Reason: "Started", + Message: "some verbose message: 1", + Source: api.EventSource{Component: "eventTest"}, + Count: 1, + }, + expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[3]"}): reason: 'Started' some verbose message: 1`, + expectUpdate: false, + }, + { + obj: testRef, + reason: "Started", + messageFmt: "some verbose message: %v", + elements: []interface{}{1}, + expect: &api.Event{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: "baz", + }, + InvolvedObject: api.ObjectReference{ + Kind: "Pod", + Name: "foo", + Namespace: "baz", + UID: "bar", + APIVersion: "version", + FieldPath: "spec.containers[2]", + }, + Reason: "Started", + Message: "some verbose message: 1", + Source: api.EventSource{Component: "eventTest"}, + Count: 3, + }, + expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[2]"}): reason: 'Started' some verbose message: 1`, + expectUpdate: true, + }, + { + obj: testRef2, + reason: "Stopped", + messageFmt: "some verbose message: %v", + elements: []interface{}{1}, + expect: &api.Event{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: "baz", + }, + InvolvedObject: api.ObjectReference{ + Kind: "Pod", + Name: "foo", + Namespace: "baz", + UID: "differentUid", + APIVersion: "version", + FieldPath: "spec.containers[3]", + }, + Reason: "Stopped", + Message: "some verbose message: 1", + Source: api.EventSource{Component: "eventTest"}, + Count: 1, + }, + expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[3]"}): reason: 'Stopped' some verbose message: 1`, + expectUpdate: false, + }, + { + obj: testRef2, + reason: "Stopped", + messageFmt: "some verbose message: %v", + elements: []interface{}{1}, + expect: &api.Event{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: "baz", + }, + InvolvedObject: api.ObjectReference{ + Kind: "Pod", + Name: "foo", + Namespace: "baz", + UID: "differentUid", + APIVersion: "version", + FieldPath: "spec.containers[3]", + }, + Reason: "Stopped", + Message: "some verbose message: 1", + Source: api.EventSource{Component: "eventTest"}, + Count: 2, + }, + expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[3]"}): reason: 'Stopped' some verbose message: 1`, + expectUpdate: true, + }, + } + + createEvent := make(chan *api.Event) + updateEvent := make(chan *api.Event) + testEvents := testEventSink{ + OnCreate: func(event *api.Event) (*api.Event, error) { + createEvent <- event + return event, nil + }, + OnUpdate: func(event *api.Event) (*api.Event, error) { + updateEvent <- event + return event, nil + }, + } + + createEvent2 := make(chan *api.Event) + updateEvent2 := make(chan *api.Event) + testEvents2 := testEventSink{ + OnCreate: func(event *api.Event) (*api.Event, error) { + createEvent2 <- event + return event, nil + }, + OnUpdate: func(event *api.Event) (*api.Event, error) { + updateEvent2 <- event + return event, nil + }, + } + + eventBroadcaster := NewBroadcaster() + recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"}) + + sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) + for _, item := range table { + recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...) + + // validate event + if item.expectUpdate { + actualEvent := <-updateEvent + validateEvent(actualEvent, item.expect, t) + } else { + actualEvent := <-createEvent + validateEvent(actualEvent, item.expect, t) + } + } + + // Another StartRecordingToSink call should start to record events with new clean cache. + sinkWatcher2 := eventBroadcaster.StartRecordingToSink(&testEvents2) + for _, item := range table { + recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...) + + // validate event + if item.expectUpdate { + actualEvent := <-updateEvent2 + validateEvent(actualEvent, item.expect, t) + } else { + actualEvent := <-createEvent2 + validateEvent(actualEvent, item.expect, t) + } + } + + sinkWatcher.Stop() + sinkWatcher2.Stop() } diff --git a/pkg/client/record/events_cache.go b/pkg/client/record/events_cache.go index 8054e229ea0..dc4b6e08f6e 100644 --- a/pkg/client/record/events_cache.go +++ b/pkg/client/record/events_cache.go @@ -47,15 +47,17 @@ type historyCache struct { cache *lru.Cache } -var previousEvents = historyCache{cache: lru.New(maxLruCacheEntries)} +func NewEventCache() *historyCache { + return &historyCache{cache: lru.New(maxLruCacheEntries)} +} // addOrUpdateEvent creates a new entry for the given event in the previous events hash table if the event // doesn't already exist, otherwise it updates the existing entry. -func addOrUpdateEvent(newEvent *api.Event) history { +func (eventCache *historyCache) addOrUpdateEvent(newEvent *api.Event) history { key := getEventKey(newEvent) - previousEvents.Lock() - defer previousEvents.Unlock() - previousEvents.cache.Add( + eventCache.Lock() + defer eventCache.Unlock() + eventCache.cache.Add( key, history{ Count: newEvent.Count, @@ -63,20 +65,20 @@ func addOrUpdateEvent(newEvent *api.Event) history { Name: newEvent.Name, ResourceVersion: newEvent.ResourceVersion, }) - return getEventFromCache(key) + return eventCache.getEventFromCache(key) } // getEvent returns the entry corresponding to the given event, if one exists, otherwise a history object // with a count of 0 is returned. -func getEvent(event *api.Event) history { +func (eventCache *historyCache) getEvent(event *api.Event) history { key := getEventKey(event) - previousEvents.RLock() - defer previousEvents.RUnlock() - return getEventFromCache(key) + eventCache.RLock() + defer eventCache.RUnlock() + return eventCache.getEventFromCache(key) } -func getEventFromCache(key string) history { - value, ok := previousEvents.cache.Get(key) +func (eventCache *historyCache) getEventFromCache(key string) history { + value, ok := eventCache.cache.Get(key) if ok { historyValue, ok := value.(history) if ok { diff --git a/pkg/client/record/events_cache_test.go b/pkg/client/record/events_cache_test.go index e1d4703858e..78dcb2d84cb 100644 --- a/pkg/client/record/events_cache_test.go +++ b/pkg/client/record/events_cache_test.go @@ -25,6 +25,7 @@ import ( func TestAddOrUpdateEventNoExisting(t *testing.T) { // Arrange + eventCache := NewEventCache() eventTime := unversioned.Now() event := api.Event{ Reason: "my reasons are many", @@ -46,7 +47,7 @@ func TestAddOrUpdateEventNoExisting(t *testing.T) { } // Act - result := addOrUpdateEvent(&event) + result := eventCache.addOrUpdateEvent(&event) // Assert compareEventWithHistoryEntry(&event, &result, t) @@ -54,6 +55,7 @@ func TestAddOrUpdateEventNoExisting(t *testing.T) { func TestAddOrUpdateEventExisting(t *testing.T) { // Arrange + eventCache := NewEventCache() event1Time := unversioned.Unix(2324, 2342) event2Time := unversioned.Now() event1 := api.Event{ @@ -100,9 +102,9 @@ func TestAddOrUpdateEventExisting(t *testing.T) { } // Act - addOrUpdateEvent(&event1) - result1 := addOrUpdateEvent(&event2) - result2 := getEvent(&event1) + eventCache.addOrUpdateEvent(&event1) + result1 := eventCache.addOrUpdateEvent(&event2) + result2 := eventCache.getEvent(&event1) // Assert compareEventWithHistoryEntry(&event2, &result1, t) @@ -111,6 +113,7 @@ func TestAddOrUpdateEventExisting(t *testing.T) { func TestGetEventNoExisting(t *testing.T) { // Arrange + eventCache := NewEventCache() event := api.Event{ Reason: "to be or not to be", Message: "do I exist", @@ -129,7 +132,7 @@ func TestGetEventNoExisting(t *testing.T) { } // Act - existingEvent := getEvent(&event) + existingEvent := eventCache.getEvent(&event) // Assert if existingEvent.Count != 0 { @@ -139,6 +142,7 @@ func TestGetEventNoExisting(t *testing.T) { func TestGetEventExisting(t *testing.T) { // Arrange + eventCache := NewEventCache() eventTime := unversioned.Now() event := api.Event{ Reason: "do I exist", @@ -158,10 +162,10 @@ func TestGetEventExisting(t *testing.T) { FirstTimestamp: eventTime, LastTimestamp: eventTime, } - addOrUpdateEvent(&event) + eventCache.addOrUpdateEvent(&event) // Act - existingEvent := getEvent(&event) + existingEvent := eventCache.getEvent(&event) // Assert compareEventWithHistoryEntry(&event, &existingEvent, t)