mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 18:00:08 +00:00
Merge pull request #13632 from jiangyaoguo/create-new-cache-for-every-event-sink
Auto commit by PR queue bot
This commit is contained in:
commit
915bc04488
@ -104,6 +104,7 @@ func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSin
|
|||||||
// The default math/rand package functions aren't thread safe, so create a
|
// The default math/rand package functions aren't thread safe, so create a
|
||||||
// new Rand object for each StartRecording call.
|
// new Rand object for each StartRecording call.
|
||||||
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
|
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
var eventCache *historyCache = NewEventCache()
|
||||||
return eventBroadcaster.StartEventWatcher(
|
return eventBroadcaster.StartEventWatcher(
|
||||||
func(event *api.Event) {
|
func(event *api.Event) {
|
||||||
// Make a copy before modification, because there could be multiple listeners.
|
// Make a copy before modification, because there could be multiple listeners.
|
||||||
@ -111,7 +112,7 @@ func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSin
|
|||||||
eventCopy := *event
|
eventCopy := *event
|
||||||
event = &eventCopy
|
event = &eventCopy
|
||||||
|
|
||||||
previousEvent := getEvent(event)
|
previousEvent := eventCache.getEvent(event)
|
||||||
updateExistingEvent := previousEvent.Count > 0
|
updateExistingEvent := previousEvent.Count > 0
|
||||||
if updateExistingEvent {
|
if updateExistingEvent {
|
||||||
event.Count = previousEvent.Count + 1
|
event.Count = previousEvent.Count + 1
|
||||||
@ -122,7 +123,7 @@ func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSin
|
|||||||
|
|
||||||
tries := 0
|
tries := 0
|
||||||
for {
|
for {
|
||||||
if recordEvent(sink, event, updateExistingEvent) {
|
if recordEvent(sink, event, updateExistingEvent, eventCache) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
tries++
|
tries++
|
||||||
@ -156,7 +157,7 @@ func isKeyNotFoundError(err error) bool {
|
|||||||
// was successfully recorded or discarded, false if it should be retried.
|
// was successfully recorded or discarded, false if it should be retried.
|
||||||
// If updateExistingEvent is false, it creates a new event, otherwise it updates
|
// If updateExistingEvent is false, it creates a new event, otherwise it updates
|
||||||
// existing event.
|
// existing event.
|
||||||
func recordEvent(sink EventSink, event *api.Event, updateExistingEvent bool) bool {
|
func recordEvent(sink EventSink, event *api.Event, updateExistingEvent bool, eventCache *historyCache) bool {
|
||||||
var newEvent *api.Event
|
var newEvent *api.Event
|
||||||
var err error
|
var err error
|
||||||
if updateExistingEvent {
|
if updateExistingEvent {
|
||||||
@ -169,7 +170,7 @@ func recordEvent(sink EventSink, event *api.Event, updateExistingEvent bool) boo
|
|||||||
newEvent, err = sink.Create(event)
|
newEvent, err = sink.Create(event)
|
||||||
}
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
addOrUpdateEvent(newEvent)
|
eventCache.addOrUpdateEvent(newEvent)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -21,7 +21,6 @@ import (
|
|||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
@ -271,78 +270,81 @@ func TestEventf(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logCalled := make(chan struct{})
|
||||||
|
createEvent := make(chan *api.Event)
|
||||||
|
updateEvent := make(chan *api.Event)
|
||||||
|
testEvents := testEventSink{
|
||||||
|
OnCreate: func(event *api.Event) (*api.Event, error) {
|
||||||
|
createEvent <- event
|
||||||
|
return event, nil
|
||||||
|
},
|
||||||
|
OnUpdate: func(event *api.Event) (*api.Event, error) {
|
||||||
|
updateEvent <- event
|
||||||
|
return event, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
eventBroadcaster := NewBroadcaster()
|
||||||
|
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
|
||||||
|
|
||||||
|
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"})
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
var wg sync.WaitGroup
|
|
||||||
// We expect only one callback
|
|
||||||
wg.Add(1)
|
|
||||||
testEvents := testEventSink{
|
|
||||||
OnCreate: func(event *api.Event) (*api.Event, error) {
|
|
||||||
defer wg.Done()
|
|
||||||
returnEvent, _ := validateEvent(event, item.expect, t)
|
|
||||||
if item.expectUpdate {
|
|
||||||
t.Errorf("Expected event update(), got event create()")
|
|
||||||
}
|
|
||||||
return returnEvent, nil
|
|
||||||
},
|
|
||||||
OnUpdate: func(event *api.Event) (*api.Event, error) {
|
|
||||||
defer wg.Done()
|
|
||||||
returnEvent, _ := validateEvent(event, item.expect, t)
|
|
||||||
if !item.expectUpdate {
|
|
||||||
t.Errorf("Expected event create(), got event update()")
|
|
||||||
}
|
|
||||||
return returnEvent, nil
|
|
||||||
},
|
|
||||||
}
|
|
||||||
eventBroadcaster := NewBroadcaster()
|
|
||||||
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
|
|
||||||
logWatcher1 := eventBroadcaster.StartLogging(t.Logf) // Prove that it is useful
|
logWatcher1 := eventBroadcaster.StartLogging(t.Logf) // Prove that it is useful
|
||||||
wg.Add(1)
|
|
||||||
logWatcher2 := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
|
logWatcher2 := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
|
||||||
defer wg.Done()
|
|
||||||
if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a {
|
if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a {
|
||||||
t.Errorf("Expected '%v', got '%v'", e, a)
|
t.Errorf("Expected '%v', got '%v'", e, a)
|
||||||
}
|
}
|
||||||
|
logCalled <- struct{}{}
|
||||||
})
|
})
|
||||||
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"})
|
|
||||||
recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...)
|
recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...)
|
||||||
|
|
||||||
wg.Wait()
|
<-logCalled
|
||||||
sinkWatcher.Stop()
|
|
||||||
|
// validate event
|
||||||
|
if item.expectUpdate {
|
||||||
|
actualEvent := <-updateEvent
|
||||||
|
validateEvent(actualEvent, item.expect, t)
|
||||||
|
} else {
|
||||||
|
actualEvent := <-createEvent
|
||||||
|
validateEvent(actualEvent, item.expect, t)
|
||||||
|
}
|
||||||
logWatcher1.Stop()
|
logWatcher1.Stop()
|
||||||
logWatcher2.Stop()
|
logWatcher2.Stop()
|
||||||
}
|
}
|
||||||
|
sinkWatcher.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
func validateEvent(actualEvent *api.Event, expectedEvent *api.Event, t *testing.T) (*api.Event, error) {
|
func validateEvent(actualEvent *api.Event, expectedEvent *api.Event, t *testing.T) (*api.Event, error) {
|
||||||
|
recvEvent := *actualEvent
|
||||||
expectCompression := expectedEvent.Count > 1
|
expectCompression := expectedEvent.Count > 1
|
||||||
|
t.Logf("expectedEvent.Count is %d\n", expectedEvent.Count)
|
||||||
// Just check that the timestamp was set.
|
// Just check that the timestamp was set.
|
||||||
if actualEvent.FirstTimestamp.IsZero() || actualEvent.LastTimestamp.IsZero() {
|
if recvEvent.FirstTimestamp.IsZero() || recvEvent.LastTimestamp.IsZero() {
|
||||||
t.Errorf("timestamp wasn't set: %#v", *actualEvent)
|
t.Errorf("timestamp wasn't set: %#v", recvEvent)
|
||||||
}
|
}
|
||||||
actualFirstTimestamp := actualEvent.FirstTimestamp
|
actualFirstTimestamp := recvEvent.FirstTimestamp
|
||||||
actualLastTimestamp := actualEvent.LastTimestamp
|
actualLastTimestamp := recvEvent.LastTimestamp
|
||||||
if actualFirstTimestamp.Equal(actualLastTimestamp) {
|
if actualFirstTimestamp.Equal(actualLastTimestamp) {
|
||||||
if expectCompression {
|
if expectCompression {
|
||||||
t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be different to indicate event compression happened, but were the same. Actual Event: %#v", actualFirstTimestamp, actualLastTimestamp, *actualEvent)
|
t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be different to indicate event compression happened, but were the same. Actual Event: %#v", actualFirstTimestamp, actualLastTimestamp, recvEvent)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if expectedEvent.Count == 1 {
|
if expectedEvent.Count == 1 {
|
||||||
t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be equal to indicate only one occurrence of the event, but were different. Actual Event: %#v", actualFirstTimestamp, actualLastTimestamp, *actualEvent)
|
t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be equal to indicate only one occurrence of the event, but were different. Actual Event: %#v", actualFirstTimestamp, actualLastTimestamp, recvEvent)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Temp clear time stamps for comparison because actual values don't matter for comparison
|
// Temp clear time stamps for comparison because actual values don't matter for comparison
|
||||||
actualEvent.FirstTimestamp = expectedEvent.FirstTimestamp
|
recvEvent.FirstTimestamp = expectedEvent.FirstTimestamp
|
||||||
actualEvent.LastTimestamp = expectedEvent.LastTimestamp
|
recvEvent.LastTimestamp = expectedEvent.LastTimestamp
|
||||||
// Check that name has the right prefix.
|
// Check that name has the right prefix.
|
||||||
if n, en := actualEvent.Name, expectedEvent.Name; !strings.HasPrefix(n, en) {
|
if n, en := recvEvent.Name, expectedEvent.Name; !strings.HasPrefix(n, en) {
|
||||||
t.Errorf("Name '%v' does not contain prefix '%v'", n, en)
|
t.Errorf("Name '%v' does not contain prefix '%v'", n, en)
|
||||||
}
|
}
|
||||||
actualEvent.Name = expectedEvent.Name
|
recvEvent.Name = expectedEvent.Name
|
||||||
if e, a := expectedEvent, actualEvent; !reflect.DeepEqual(e, a) {
|
if e, a := expectedEvent, &recvEvent; !reflect.DeepEqual(e, a) {
|
||||||
t.Errorf("diff: %s", util.ObjectGoPrintDiff(e, a))
|
t.Errorf("diff: %s", util.ObjectGoPrintDiff(e, a))
|
||||||
}
|
}
|
||||||
actualEvent.FirstTimestamp = actualFirstTimestamp
|
recvEvent.FirstTimestamp = actualFirstTimestamp
|
||||||
actualEvent.LastTimestamp = actualLastTimestamp
|
recvEvent.LastTimestamp = actualLastTimestamp
|
||||||
return actualEvent, nil
|
return actualEvent, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -526,42 +528,323 @@ func TestEventfNoNamespace(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logCalled := make(chan struct{})
|
||||||
|
createEvent := make(chan *api.Event)
|
||||||
|
updateEvent := make(chan *api.Event)
|
||||||
|
testEvents := testEventSink{
|
||||||
|
OnCreate: func(event *api.Event) (*api.Event, error) {
|
||||||
|
createEvent <- event
|
||||||
|
return event, nil
|
||||||
|
},
|
||||||
|
OnUpdate: func(event *api.Event) (*api.Event, error) {
|
||||||
|
updateEvent <- event
|
||||||
|
return event, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
eventBroadcaster := NewBroadcaster()
|
||||||
|
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
|
||||||
|
|
||||||
|
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"})
|
||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
called := make(chan struct{})
|
|
||||||
testEvents := testEventSink{
|
|
||||||
OnCreate: func(event *api.Event) (*api.Event, error) {
|
|
||||||
returnEvent, _ := validateEvent(event, item.expect, t)
|
|
||||||
if item.expectUpdate {
|
|
||||||
t.Errorf("Expected event update(), got event create()")
|
|
||||||
}
|
|
||||||
called <- struct{}{}
|
|
||||||
return returnEvent, nil
|
|
||||||
},
|
|
||||||
OnUpdate: func(event *api.Event) (*api.Event, error) {
|
|
||||||
returnEvent, _ := validateEvent(event, item.expect, t)
|
|
||||||
if !item.expectUpdate {
|
|
||||||
t.Errorf("Expected event create(), got event update()")
|
|
||||||
}
|
|
||||||
called <- struct{}{}
|
|
||||||
return returnEvent, nil
|
|
||||||
},
|
|
||||||
}
|
|
||||||
eventBroadcaster := NewBroadcaster()
|
|
||||||
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
|
|
||||||
logWatcher1 := eventBroadcaster.StartLogging(t.Logf) // Prove that it is useful
|
logWatcher1 := eventBroadcaster.StartLogging(t.Logf) // Prove that it is useful
|
||||||
logWatcher2 := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
|
logWatcher2 := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
|
||||||
if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a {
|
if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a {
|
||||||
t.Errorf("Expected '%v', got '%v'", e, a)
|
t.Errorf("Expected '%v', got '%v'", e, a)
|
||||||
}
|
}
|
||||||
called <- struct{}{}
|
logCalled <- struct{}{}
|
||||||
})
|
})
|
||||||
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"})
|
|
||||||
recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...)
|
recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...)
|
||||||
|
|
||||||
<-called
|
<-logCalled
|
||||||
<-called
|
|
||||||
sinkWatcher.Stop()
|
// validate event
|
||||||
|
if item.expectUpdate {
|
||||||
|
actualEvent := <-updateEvent
|
||||||
|
validateEvent(actualEvent, item.expect, t)
|
||||||
|
} else {
|
||||||
|
actualEvent := <-createEvent
|
||||||
|
validateEvent(actualEvent, item.expect, t)
|
||||||
|
}
|
||||||
|
|
||||||
logWatcher1.Stop()
|
logWatcher1.Stop()
|
||||||
logWatcher2.Stop()
|
logWatcher2.Stop()
|
||||||
}
|
}
|
||||||
|
sinkWatcher.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMultiSinkCache(t *testing.T) {
|
||||||
|
testPod := &api.Pod{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
SelfLink: "/api/version/pods/foo",
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
UID: "bar",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
testPod2 := &api.Pod{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
SelfLink: "/api/version/pods/foo",
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
UID: "differentUid",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
testRef, err := api.GetPartialReference(testPod, "spec.containers[2]")
|
||||||
|
testRef2, err := api.GetPartialReference(testPod2, "spec.containers[3]")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
table := []struct {
|
||||||
|
obj runtime.Object
|
||||||
|
reason string
|
||||||
|
messageFmt string
|
||||||
|
elements []interface{}
|
||||||
|
expect *api.Event
|
||||||
|
expectLog string
|
||||||
|
expectUpdate bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
obj: testRef,
|
||||||
|
reason: "Started",
|
||||||
|
messageFmt: "some verbose message: %v",
|
||||||
|
elements: []interface{}{1},
|
||||||
|
expect: &api.Event{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
},
|
||||||
|
InvolvedObject: api.ObjectReference{
|
||||||
|
Kind: "Pod",
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
UID: "bar",
|
||||||
|
APIVersion: "version",
|
||||||
|
FieldPath: "spec.containers[2]",
|
||||||
|
},
|
||||||
|
Reason: "Started",
|
||||||
|
Message: "some verbose message: 1",
|
||||||
|
Source: api.EventSource{Component: "eventTest"},
|
||||||
|
Count: 1,
|
||||||
|
},
|
||||||
|
expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[2]"}): reason: 'Started' some verbose message: 1`,
|
||||||
|
expectUpdate: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
obj: testPod,
|
||||||
|
reason: "Killed",
|
||||||
|
messageFmt: "some other verbose message: %v",
|
||||||
|
elements: []interface{}{1},
|
||||||
|
expect: &api.Event{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
},
|
||||||
|
InvolvedObject: api.ObjectReference{
|
||||||
|
Kind: "Pod",
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
UID: "bar",
|
||||||
|
APIVersion: "version",
|
||||||
|
},
|
||||||
|
Reason: "Killed",
|
||||||
|
Message: "some other verbose message: 1",
|
||||||
|
Source: api.EventSource{Component: "eventTest"},
|
||||||
|
Count: 1,
|
||||||
|
},
|
||||||
|
expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:""}): reason: 'Killed' some other verbose message: 1`,
|
||||||
|
expectUpdate: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
obj: testRef,
|
||||||
|
reason: "Started",
|
||||||
|
messageFmt: "some verbose message: %v",
|
||||||
|
elements: []interface{}{1},
|
||||||
|
expect: &api.Event{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
},
|
||||||
|
InvolvedObject: api.ObjectReference{
|
||||||
|
Kind: "Pod",
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
UID: "bar",
|
||||||
|
APIVersion: "version",
|
||||||
|
FieldPath: "spec.containers[2]",
|
||||||
|
},
|
||||||
|
Reason: "Started",
|
||||||
|
Message: "some verbose message: 1",
|
||||||
|
Source: api.EventSource{Component: "eventTest"},
|
||||||
|
Count: 2,
|
||||||
|
},
|
||||||
|
expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[2]"}): reason: 'Started' some verbose message: 1`,
|
||||||
|
expectUpdate: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
obj: testRef2,
|
||||||
|
reason: "Started",
|
||||||
|
messageFmt: "some verbose message: %v",
|
||||||
|
elements: []interface{}{1},
|
||||||
|
expect: &api.Event{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
},
|
||||||
|
InvolvedObject: api.ObjectReference{
|
||||||
|
Kind: "Pod",
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
UID: "differentUid",
|
||||||
|
APIVersion: "version",
|
||||||
|
FieldPath: "spec.containers[3]",
|
||||||
|
},
|
||||||
|
Reason: "Started",
|
||||||
|
Message: "some verbose message: 1",
|
||||||
|
Source: api.EventSource{Component: "eventTest"},
|
||||||
|
Count: 1,
|
||||||
|
},
|
||||||
|
expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[3]"}): reason: 'Started' some verbose message: 1`,
|
||||||
|
expectUpdate: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
obj: testRef,
|
||||||
|
reason: "Started",
|
||||||
|
messageFmt: "some verbose message: %v",
|
||||||
|
elements: []interface{}{1},
|
||||||
|
expect: &api.Event{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
},
|
||||||
|
InvolvedObject: api.ObjectReference{
|
||||||
|
Kind: "Pod",
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
UID: "bar",
|
||||||
|
APIVersion: "version",
|
||||||
|
FieldPath: "spec.containers[2]",
|
||||||
|
},
|
||||||
|
Reason: "Started",
|
||||||
|
Message: "some verbose message: 1",
|
||||||
|
Source: api.EventSource{Component: "eventTest"},
|
||||||
|
Count: 3,
|
||||||
|
},
|
||||||
|
expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[2]"}): reason: 'Started' some verbose message: 1`,
|
||||||
|
expectUpdate: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
obj: testRef2,
|
||||||
|
reason: "Stopped",
|
||||||
|
messageFmt: "some verbose message: %v",
|
||||||
|
elements: []interface{}{1},
|
||||||
|
expect: &api.Event{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
},
|
||||||
|
InvolvedObject: api.ObjectReference{
|
||||||
|
Kind: "Pod",
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
UID: "differentUid",
|
||||||
|
APIVersion: "version",
|
||||||
|
FieldPath: "spec.containers[3]",
|
||||||
|
},
|
||||||
|
Reason: "Stopped",
|
||||||
|
Message: "some verbose message: 1",
|
||||||
|
Source: api.EventSource{Component: "eventTest"},
|
||||||
|
Count: 1,
|
||||||
|
},
|
||||||
|
expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[3]"}): reason: 'Stopped' some verbose message: 1`,
|
||||||
|
expectUpdate: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
obj: testRef2,
|
||||||
|
reason: "Stopped",
|
||||||
|
messageFmt: "some verbose message: %v",
|
||||||
|
elements: []interface{}{1},
|
||||||
|
expect: &api.Event{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
},
|
||||||
|
InvolvedObject: api.ObjectReference{
|
||||||
|
Kind: "Pod",
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
UID: "differentUid",
|
||||||
|
APIVersion: "version",
|
||||||
|
FieldPath: "spec.containers[3]",
|
||||||
|
},
|
||||||
|
Reason: "Stopped",
|
||||||
|
Message: "some verbose message: 1",
|
||||||
|
Source: api.EventSource{Component: "eventTest"},
|
||||||
|
Count: 2,
|
||||||
|
},
|
||||||
|
expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[3]"}): reason: 'Stopped' some verbose message: 1`,
|
||||||
|
expectUpdate: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
createEvent := make(chan *api.Event)
|
||||||
|
updateEvent := make(chan *api.Event)
|
||||||
|
testEvents := testEventSink{
|
||||||
|
OnCreate: func(event *api.Event) (*api.Event, error) {
|
||||||
|
createEvent <- event
|
||||||
|
return event, nil
|
||||||
|
},
|
||||||
|
OnUpdate: func(event *api.Event) (*api.Event, error) {
|
||||||
|
updateEvent <- event
|
||||||
|
return event, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
createEvent2 := make(chan *api.Event)
|
||||||
|
updateEvent2 := make(chan *api.Event)
|
||||||
|
testEvents2 := testEventSink{
|
||||||
|
OnCreate: func(event *api.Event) (*api.Event, error) {
|
||||||
|
createEvent2 <- event
|
||||||
|
return event, nil
|
||||||
|
},
|
||||||
|
OnUpdate: func(event *api.Event) (*api.Event, error) {
|
||||||
|
updateEvent2 <- event
|
||||||
|
return event, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
eventBroadcaster := NewBroadcaster()
|
||||||
|
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"})
|
||||||
|
|
||||||
|
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
|
||||||
|
for _, item := range table {
|
||||||
|
recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...)
|
||||||
|
|
||||||
|
// validate event
|
||||||
|
if item.expectUpdate {
|
||||||
|
actualEvent := <-updateEvent
|
||||||
|
validateEvent(actualEvent, item.expect, t)
|
||||||
|
} else {
|
||||||
|
actualEvent := <-createEvent
|
||||||
|
validateEvent(actualEvent, item.expect, t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Another StartRecordingToSink call should start to record events with new clean cache.
|
||||||
|
sinkWatcher2 := eventBroadcaster.StartRecordingToSink(&testEvents2)
|
||||||
|
for _, item := range table {
|
||||||
|
recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...)
|
||||||
|
|
||||||
|
// validate event
|
||||||
|
if item.expectUpdate {
|
||||||
|
actualEvent := <-updateEvent2
|
||||||
|
validateEvent(actualEvent, item.expect, t)
|
||||||
|
} else {
|
||||||
|
actualEvent := <-createEvent2
|
||||||
|
validateEvent(actualEvent, item.expect, t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sinkWatcher.Stop()
|
||||||
|
sinkWatcher2.Stop()
|
||||||
}
|
}
|
||||||
|
@ -47,15 +47,17 @@ type historyCache struct {
|
|||||||
cache *lru.Cache
|
cache *lru.Cache
|
||||||
}
|
}
|
||||||
|
|
||||||
var previousEvents = historyCache{cache: lru.New(maxLruCacheEntries)}
|
func NewEventCache() *historyCache {
|
||||||
|
return &historyCache{cache: lru.New(maxLruCacheEntries)}
|
||||||
|
}
|
||||||
|
|
||||||
// addOrUpdateEvent creates a new entry for the given event in the previous events hash table if the event
|
// addOrUpdateEvent creates a new entry for the given event in the previous events hash table if the event
|
||||||
// doesn't already exist, otherwise it updates the existing entry.
|
// doesn't already exist, otherwise it updates the existing entry.
|
||||||
func addOrUpdateEvent(newEvent *api.Event) history {
|
func (eventCache *historyCache) addOrUpdateEvent(newEvent *api.Event) history {
|
||||||
key := getEventKey(newEvent)
|
key := getEventKey(newEvent)
|
||||||
previousEvents.Lock()
|
eventCache.Lock()
|
||||||
defer previousEvents.Unlock()
|
defer eventCache.Unlock()
|
||||||
previousEvents.cache.Add(
|
eventCache.cache.Add(
|
||||||
key,
|
key,
|
||||||
history{
|
history{
|
||||||
Count: newEvent.Count,
|
Count: newEvent.Count,
|
||||||
@ -63,20 +65,20 @@ func addOrUpdateEvent(newEvent *api.Event) history {
|
|||||||
Name: newEvent.Name,
|
Name: newEvent.Name,
|
||||||
ResourceVersion: newEvent.ResourceVersion,
|
ResourceVersion: newEvent.ResourceVersion,
|
||||||
})
|
})
|
||||||
return getEventFromCache(key)
|
return eventCache.getEventFromCache(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getEvent returns the entry corresponding to the given event, if one exists, otherwise a history object
|
// getEvent returns the entry corresponding to the given event, if one exists, otherwise a history object
|
||||||
// with a count of 0 is returned.
|
// with a count of 0 is returned.
|
||||||
func getEvent(event *api.Event) history {
|
func (eventCache *historyCache) getEvent(event *api.Event) history {
|
||||||
key := getEventKey(event)
|
key := getEventKey(event)
|
||||||
previousEvents.RLock()
|
eventCache.RLock()
|
||||||
defer previousEvents.RUnlock()
|
defer eventCache.RUnlock()
|
||||||
return getEventFromCache(key)
|
return eventCache.getEventFromCache(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getEventFromCache(key string) history {
|
func (eventCache *historyCache) getEventFromCache(key string) history {
|
||||||
value, ok := previousEvents.cache.Get(key)
|
value, ok := eventCache.cache.Get(key)
|
||||||
if ok {
|
if ok {
|
||||||
historyValue, ok := value.(history)
|
historyValue, ok := value.(history)
|
||||||
if ok {
|
if ok {
|
||||||
|
@ -25,6 +25,7 @@ import (
|
|||||||
|
|
||||||
func TestAddOrUpdateEventNoExisting(t *testing.T) {
|
func TestAddOrUpdateEventNoExisting(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
|
eventCache := NewEventCache()
|
||||||
eventTime := unversioned.Now()
|
eventTime := unversioned.Now()
|
||||||
event := api.Event{
|
event := api.Event{
|
||||||
Reason: "my reasons are many",
|
Reason: "my reasons are many",
|
||||||
@ -46,7 +47,7 @@ func TestAddOrUpdateEventNoExisting(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
result := addOrUpdateEvent(&event)
|
result := eventCache.addOrUpdateEvent(&event)
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
compareEventWithHistoryEntry(&event, &result, t)
|
compareEventWithHistoryEntry(&event, &result, t)
|
||||||
@ -54,6 +55,7 @@ func TestAddOrUpdateEventNoExisting(t *testing.T) {
|
|||||||
|
|
||||||
func TestAddOrUpdateEventExisting(t *testing.T) {
|
func TestAddOrUpdateEventExisting(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
|
eventCache := NewEventCache()
|
||||||
event1Time := unversioned.Unix(2324, 2342)
|
event1Time := unversioned.Unix(2324, 2342)
|
||||||
event2Time := unversioned.Now()
|
event2Time := unversioned.Now()
|
||||||
event1 := api.Event{
|
event1 := api.Event{
|
||||||
@ -100,9 +102,9 @@ func TestAddOrUpdateEventExisting(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
addOrUpdateEvent(&event1)
|
eventCache.addOrUpdateEvent(&event1)
|
||||||
result1 := addOrUpdateEvent(&event2)
|
result1 := eventCache.addOrUpdateEvent(&event2)
|
||||||
result2 := getEvent(&event1)
|
result2 := eventCache.getEvent(&event1)
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
compareEventWithHistoryEntry(&event2, &result1, t)
|
compareEventWithHistoryEntry(&event2, &result1, t)
|
||||||
@ -111,6 +113,7 @@ func TestAddOrUpdateEventExisting(t *testing.T) {
|
|||||||
|
|
||||||
func TestGetEventNoExisting(t *testing.T) {
|
func TestGetEventNoExisting(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
|
eventCache := NewEventCache()
|
||||||
event := api.Event{
|
event := api.Event{
|
||||||
Reason: "to be or not to be",
|
Reason: "to be or not to be",
|
||||||
Message: "do I exist",
|
Message: "do I exist",
|
||||||
@ -129,7 +132,7 @@ func TestGetEventNoExisting(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
existingEvent := getEvent(&event)
|
existingEvent := eventCache.getEvent(&event)
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
if existingEvent.Count != 0 {
|
if existingEvent.Count != 0 {
|
||||||
@ -139,6 +142,7 @@ func TestGetEventNoExisting(t *testing.T) {
|
|||||||
|
|
||||||
func TestGetEventExisting(t *testing.T) {
|
func TestGetEventExisting(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
|
eventCache := NewEventCache()
|
||||||
eventTime := unversioned.Now()
|
eventTime := unversioned.Now()
|
||||||
event := api.Event{
|
event := api.Event{
|
||||||
Reason: "do I exist",
|
Reason: "do I exist",
|
||||||
@ -158,10 +162,10 @@ func TestGetEventExisting(t *testing.T) {
|
|||||||
FirstTimestamp: eventTime,
|
FirstTimestamp: eventTime,
|
||||||
LastTimestamp: eventTime,
|
LastTimestamp: eventTime,
|
||||||
}
|
}
|
||||||
addOrUpdateEvent(&event)
|
eventCache.addOrUpdateEvent(&event)
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
existingEvent := getEvent(&event)
|
existingEvent := eventCache.getEvent(&event)
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
compareEventWithHistoryEntry(&event, &existingEvent, t)
|
compareEventWithHistoryEntry(&event, &existingEvent, t)
|
||||||
|
Loading…
Reference in New Issue
Block a user